コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。
ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。
それにより、非同期処理の連携が容易になります。
今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。
※環境:Android Studio Flamingo | 2022.2.1
:org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4
目次
メッセージの送受信とは
「メッセージ」はデータのことです。ですから、「メッセージの送受信」とは、一方のコルーチンからデータを送信し、片方のコルーチンで受信することを意味します。
コルーチンの戻り値も同じと言えますが、「戻り値」は単一データを返すのに対し、「メッセージの送受信」は複数の連続したデータを返せます。
この「連続したデータ」のことを「ストリームデータ」といいます。
コールド/ホットストリーム
メッセージの送受信は、データを送信する「送信機」とデータを受信する「受信機」の間に、ストリームデータが流れる「通信経路」が存在します。
通信経路は、その特徴により「コールドストリーム」と「ホットストリーム」という2つのタイプに分類できます。
ストリームタイプ | 通信経路の特徴(違い) | 身近な例 |
---|---|---|
Cold | 送信機は、受信機の受信要求を受けて、送信処理を開始 ・受信:送信 ⇒ 1:1 ・受信機を追加すると、経路は個別に作られる | トランシーバー 電話 |
Hot | 送信機は、受信機の有無に関係なく、送信処理を開始 ・受信:送信 ⇒ 1:1 or 多:1 ・受信機を追加すると、経路は分岐される | テレビ ラジオ |
※送信機を「Observable」と呼ぶ、受信機を「Subscribe」と呼ぶ |
通信経路の特徴は送受信の開始時に明確に表れます。
コールドストリーム- (1)受信機は受信処理を開始、送信機は何もしない
- (2)送信機は受信要求を受けて、送信処理を開始
- (3)通信経路が確立、データが流れ始める
- (4) (1)(2)を繰り返す、新たな通信経路が確立、データが流れ始める
- (1)受信機に関係なく、送信機は送信処理を開始
- (2)受信機は受信処理を開始
- (3)通信経路が確立、データが流れ始める
- (4)受信機は受信処理を開始
- (5)通信経路が分岐、データが流れ始める
送受信の基本
Channelを使った最も基本的な送受信を見ていきます。
Channelはホットストリーム
Channelはコルーチン間で複数の連続したメッセージを送受信する仕組みです。
紹介するサンプルは、子コルーチンで送信処理を開始した後に、1000ms待って受信処理を開始します。
ですので、ホットストリーム(Hot Stream)の通信経路を提供します。
受信処理も同様です。処理が始まって、直ちに受信が始められるとは限りません。
Channelで単一データ受信
ワーカースレッド(Defaultプールから取得)からメインスレッドへ単一データを送受信する例です。
Channelインスタンを使い、Channel#sendで送信し、Channel#receiveで受信します。
Channelクラスの型パラメータへIntを指定しているので、転送されるデータはInt型です。
lifecycle.coroutineScope.launch { val _channel = Channel<Int>() launch(Dispatchers.Default) { val _data = (1..100).random() // 実際は演算により得られたデータ _channel.send(_data) Log.i(TAG, "Send data = ${_data} [${getThreadName()}]") } delay(1000) val _data = _channel.receive() Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
10:01:07.936 Receive Data = 46 [main] 10:01:07.937 Send data = 46 [DefaultDispatcher-worker-1]
Channelでストリーム受信
ワーカースレッドからメインスレッドへストリームデータ(サンプルはInt型の配列)を送受信する例です。
注意点は、送信側でデータの終わりを明確にする必要があることです。全ての送信が終わったら必ずChannel#closeを実行して、データの終わりを宣言します。
データの終わりが不明ですと、受信側は永遠にデータを待ち続けることになります。
1つずつ連続で受信
データを一つずつ受信しています。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>() launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) while (! _channel.isClosedForReceive) { // <-- このタイミングでcloseされるとreceiveがエラーになる val _data = _channel.receive() Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]") } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:27:46.956 Start communication ! 09:28:46.961 Receive Data = 8 [main] <--- 1秒後 09:28:46.962 Send data = 8 [DefaultDispatcher-worker-1] 09:28:46.962 Send data = 4 [DefaultDispatcher-worker-1] 09:28:46.962 Receive Data = 4 [main] 09:28:46.963 Receive Data = 3 [main] 09:28:46.963 Send data = 3 [DefaultDispatcher-worker-1] 09:28:46.964 Send data = 9 [DefaultDispatcher-worker-1] 09:28:46.965 Receive Data = 9 [main] 09:28:46.966 Receive Data = 1 [main] 09:28:46.968 Send data = 1 [DefaultDispatcher-worker-1] ----- 09:28:46.982 FATAL EXCEPTION: main kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
ただし、このプログラムはNGです。エラーが発生します。なぜなら、Channel#isClosedForReceiveと#closeがレーシングするからです。
これを回避するには、エラーのハンドリングを行います。この方法は2つあります。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>() launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) while (! _channel.isClosedForReceive) { try { val _data = _channel.receive() Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]") } catch (e: ClosedReceiveChannelException) { // エラー時の処理 Log.i(TAG, "Channel was closed !") } } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>() launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) while (! _channel.isClosedForReceive) { val _result = _channel.receiveCatching() if(_result.isSuccess) { val _data = _result.getOrNull() Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]") } } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:54:30.891 Start communication !
09:55:30.891 Receive Data = 8 [main] <--- 1秒後
09:55:30.893 Send data = 8 [DefaultDispatcher-worker-1]
09:55:30.895 Send data = 4 [DefaultDispatcher-worker-1]
09:55:30.896 Receive Data = 4 [main]
09:55:30.897 Receive Data = 3 [main]
09:55:30.897 Send data = 3 [DefaultDispatcher-worker-1]
09:55:30.898 Send data = 9 [DefaultDispatcher-worker-1]
09:55:30.898 Receive Data = 9 [main]
09:55:30.899 Receive Data = 1 [main]
09:55:30.899 Send data = 1 [DefaultDispatcher-worker-1]
09:55:30.899 End communication !
イテレーターで受信
Channelインスタンスはイテレータ―(Iterator)を有しているので、for文によりループ処理が可能です。
エラーのハンドリング等が省略できるので、受信側がスッキリします。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>() launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを宣言 } delay(1000) for(data in _channel) // closeまでのデータを取得 Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]") Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope
10:15:34.329 Start communication !
10:15:35.329 Receive Data = 8 [main] <--- 1秒後
10:15:35.330 Send data = 8 [DefaultDispatcher-worker-2]
10:15:35.331 Send data = 4 [DefaultDispatcher-worker-2]
10:15:35.332 Receive Data = 4 [main]
10:15:35.332 Receive Data = 3 [main]
10:15:35.333 Send data = 3 [DefaultDispatcher-worker-2]
10:15:35.333 Send data = 9 [DefaultDispatcher-worker-2]
10:15:35.333 Receive Data = 9 [main]
10:15:35.334 Receive Data = 1 [main]
10:15:35.334 Send data = 1 [DefaultDispatcher-worker-2]
10:15:35.334 End communication !
送受信の様子
図は送受信の様子です。
Channelはコルーチン間(AとBの間)に位置し、データの中継を担っています。つまり、通信経路です。
そして、データを一時的に保持するためのバッファを持つことが出来ます。
また、ChannelクラスはChannelSendとChannelReceiveを継承しています。それぞれ、送信側と受信側の処理を担当していて、バッファを挟んでお互いに独立した動作をします。
送信側(ChannelSend)の処理Channel#send( )を実行すると、次の処理を行います。
●Bufferサイズ > 0 ・Bufferに空きがあれば、データをBufferに格納 ・Bufferに空きがなければ、sendを休止 ●Bufferサイズ = 0 ・受信の要求があれば、受信側へ直にデータを渡す ・受信の要求がなければ、sendを休止受信側(ChannelReceive)の処理
Channel#receive( )を実行すると、次の処理を行います。
●Bufferサイズ > 0 ・Bufferにデータがあれば、データをBufferから取得 ・Bufferにデータがなければ、receiveを休止 ●Bufferサイズ = 0 ・送信データがあれば、送信側から直に受け取る ・送信データがなければ、receiveを休止
Channel#send( )ならびに#receive( )はSuspend関数であり、non-blocking動作を行うことに注意してください。
Produce
Produceはコルーチン間で複数の連続したメッセージを送受信する仕組みです。
ホットストリーム(Hot Stream)の通信経路を提供します。
Produceの通信経路を構築するproduce関数は、Channelのファクトリー関数です。ReceiveChannelのインスタンスを返します。
ですので、「Produce ≒ Channel」です。送受信の動作はChannelと変わりません。
コルーチンビルダーはproduceに含まれます。また、送信側のコルーチンが終了するときに、produce内でChannel#close( )が実行されます。
ですので、Chennelに比べて記述がシンプルになります。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _produce = produce(Dispatchers.Default, capacity = 3) { arrayListOf(8, 4, 3, 9, 1).forEach { send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } } delay(1000) _produce.consumeEach { Log.i(TAG, "Receive Data = ${it} [${getThreadName()}]") } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:28:01.878 Start communication !
09:28:01.880 Send data = 8 [DefaultDispatcher-worker-1]
09:28:01.880 Send data = 4 [DefaultDispatcher-worker-1]
09:28:01.880 Send data = 3 [DefaultDispatcher-worker-1]
09:28:02.881 Receive Data = 8 [main] <--- 1秒後
09:28:02.881 Receive Data = 4 [main]
09:28:02.881 Receive Data = 3 [main]
09:28:02.882 Receive Data = 9 [main]
09:28:02.882 Send data = 9 [DefaultDispatcher-worker-1]
09:28:02.882 Send data = 1 [DefaultDispatcher-worker-1]
09:28:02.882 Receive Data = 1 [main]
09:28:02.885 End communication !
サンプルは基本的な使い方です。
※Produce/Channelの詳細は「Coroutine:Produce」を参照
メッセージを送受信する仕組み
Produce以外にも「メッセージを送受信する仕組み」が存在します。
ストリームタイプ | 通信経路 | 状態の監視※ | |
---|---|---|---|
Produce / Channel | Hot | データ分岐 | |
Flow(SafeFlow) | Cold | 1対1 | |
SharedFlow | Hot | ブロードキャスト | |
StateFlow | |||
※状態の監視 :再Composeのスケジューリングが可能かどうか |
※仕組みの詳細は各記事を参照
「Coroutine:Produce」
「Coroutine:Flow(SafeFlow)」
「Coroutine:SharedFlow」
「Coroutine:StateFlow」
関連記事: