コルーチン間でメッセージ(データのこと)の送受信を行うことが出来ます。
これにより、処理の投げっぱなしになりがちな非同期処理と、連携を強化できます。
しかも、ProduceやFlowを使うと記述が簡素になり、プログラミングの容易さと読みやすさが向上して便利です。
今回は、この「メッセージの送受信」について、使い方をまとめました。
※環境:Android Studio Flamingo | 2022.2.1
:org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4
目次
メッセージの送受信とは
「メッセージ」はデータのことです。ですから、「メッセージの送受信」とは、一方のコルーチンからデータを送信し、片方のコルーチンで受信することを意味します。
コルーチンの戻り値も同じと言えますが、戻り値は単一データを返すのに対し、「メッセージの送受信」は複数の連続データを返せます。
この「連続データ」のことを「ストリームデータ」といいます。
送受信の基本
基本的なメッセージの送受信を見ていきます。
Channelで単一データ受信
ワーカースレッド(Defaultプールから取得)からメインスレッドへ単一データを送受信する例です。
Channelインスタンを使い、Channel#sendで送信し、Channel#receiveで受信します。
Channelクラスの型パラメータへIntを指定しているので、転送されるデータはInt型です。
lifecycle.coroutineScope.launch { val _channel = Channel<Int>() launch(Dispatchers.Default) { val _data = (1..100).random() // 実際は演算により得られたデータ _channel.send(_data) Log.i(TAG, "Send data = ${_data} [${getThreadName()}]") } delay(1000) val _data = _channel.receive() Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
10:01:07.936 Receive Data = 46 [main] 10:01:07.937 Send data = 46 [DefaultDispatcher-worker-1]
Channelでストリーム受信
ワーカースレッドからメインスレッドへストリームデータ(サンプルはInt型の配列)を送受信する例です。
注意点は、送信側でデータの終わりを明確にする必要があることです。全ての送信が終わったら必ずChannel#closeを実行して、データの終わりを宣言します。
データの終わりが不明ですと、受信側は永遠にデータを待ち続けることになります。
1つずつ連続で受信
データを一つずつ転送しています。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>() launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) while (! _channel.isClosedForReceive) { // <-- このタイミングでcloseされるとreceiveがエラーになる val _data = _channel.receive() Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]") } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:27:46.956 Start communication ! 09:28:46.961 Receive Data = 8 [main] <--- 1秒後 09:28:46.962 Send data = 8 [DefaultDispatcher-worker-1] 09:28:46.962 Send data = 4 [DefaultDispatcher-worker-1] 09:28:46.962 Receive Data = 4 [main] 09:28:46.963 Receive Data = 3 [main] 09:28:46.963 Send data = 3 [DefaultDispatcher-worker-1] 09:28:46.964 Send data = 9 [DefaultDispatcher-worker-1] 09:28:46.965 Receive Data = 9 [main] 09:28:46.966 Receive Data = 1 [main] 09:28:46.968 Send data = 1 [DefaultDispatcher-worker-1] ----- 09:28:46.982 FATAL EXCEPTION: main kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
ただし、このプログラムはNGです。エラーが発生します。なぜなら、Channel#isClosedForReceiveと#closeがレーシングするからです。
これを回避するには、エラーのハンドリングを行います。この方法は2つあります。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>() launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) while (! _channel.isClosedForReceive) { try { val _data = _channel.receive() Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]") } catch (e: ClosedReceiveChannelException) { // エラー時の処理 Log.i(TAG, "Channel was closed !") } } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>() launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) while (! _channel.isClosedForReceive) { val _result = _channel.receiveCatching() if(_result.isSuccess) { val _data = _result.getOrNull() Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]") } } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:54:30.891 Start communication !
09:55:30.891 Receive Data = 8 [main] <--- 1秒後
09:55:30.893 Send data = 8 [DefaultDispatcher-worker-1]
09:55:30.895 Send data = 4 [DefaultDispatcher-worker-1]
09:55:30.896 Receive Data = 4 [main]
09:55:30.897 Receive Data = 3 [main]
09:55:30.897 Send data = 3 [DefaultDispatcher-worker-1]
09:55:30.898 Send data = 9 [DefaultDispatcher-worker-1]
09:55:30.898 Receive Data = 9 [main]
09:55:30.899 Receive Data = 1 [main]
09:55:30.899 Send data = 1 [DefaultDispatcher-worker-1]
09:55:30.899 End communication !
イテレーターで受信
Channelインスタンスはイテレータ―(Iterator)機能を有しているので、for文によりループ処理が可能です。
エラーのハンドリング等が省略できるので、受信側がスッキリします。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>() launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを宣言 } delay(1000) for(data in _channel) // closeまでのデータを取得 Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]") Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope
10:15:34.329 Start communication !
10:15:35.329 Receive Data = 8 [main] <--- 1秒後
10:15:35.330 Send data = 8 [DefaultDispatcher-worker-2]
10:15:35.331 Send data = 4 [DefaultDispatcher-worker-2]
10:15:35.332 Receive Data = 4 [main]
10:15:35.332 Receive Data = 3 [main]
10:15:35.333 Send data = 3 [DefaultDispatcher-worker-2]
10:15:35.333 Send data = 9 [DefaultDispatcher-worker-2]
10:15:35.333 Receive Data = 9 [main]
10:15:35.334 Receive Data = 1 [main]
10:15:35.334 Send data = 1 [DefaultDispatcher-worker-2]
10:15:35.334 End communication !
送受信の様子
図は送受信の様子を描いたものです。
Channelはコルーチン間(AとBの間)に位置し、データの中継を担っています。そして、データを一時保管するためのバッファを持つことが出来ます。
また、ChannelクラスはChannelSendとChannelReceiveを継承しています。それぞれ、送信側と受信側の処理を担当していて、バッファに対して独立した動作をします。
送信側(ChannelSend)の処理Channel#send( )を実行すると、次の処理を行います。
- Bufferに空きがある場合、データをBufferに格納
- Bufferに空きがない場合、sendを一時停止(Suspend)
- Bufferのサイズが0の場合、要求に従い受信側へ直にデータを渡す
Channel#receive( )を実行すると、次の処理を行います。
- Bufferにデータがある場合、データをBufferから取得
- Bufferにデータがない場合、receiveを一時停止(Suspend)
- Bufferのサイズが0の場合、送信側へ直にデータを要求
Channel#send( )ならびに#receive( )はSuspend関数であり、non-blocking動作を行うことに注意してください。
送受信のタイプ
Channelが持つバッファのサイズにより、送受信のタイプが分かれます。
タイプ | バッファサイズ ( )内はパラメータ名 | ストリームタイプ Hot / Cold |
---|---|---|
Rendezvous | 0(RENDEZVOUS) | Cold |
Buffered | 64(BUFFERED)または、指定された数 | Hot |
Conflated | 1(CONFLATED) | |
Unlimited | Int.MAX_VALUE(UNLIMITED) |
Rendezvous
バッファサイズは0(なし)です。
受信側は送信側へ直にデータを要求し、送信側は受信側へ直にデータを渡します。
データの要求を受けてから送信を開始するストリームタイプです。これをコールドストリーム(Cold Stream)といいます。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>(capacity = RENDEZVOUS) launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) for(data in _channel) // closeまでのデータを取得 Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]") Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
15:12:53.494 Start communication !
15:12:54.494 Receive Data = 8 [main] <--- 1秒後
15:12:54.495 Send data = 8 [DefaultDispatcher-worker-1]
15:12:54.495 Send data = 4 [DefaultDispatcher-worker-1]
15:12:54.496 Receive Data = 4 [main]
15:12:54.496 Receive Data = 3 [main]
15:12:54.496 Send data = 3 [DefaultDispatcher-worker-1]
15:12:54.497 Receive Data = 9 [main]
15:12:54.497 Send data = 9 [DefaultDispatcher-worker-1]
15:12:54.498 Receive Data = 1 [main]
15:12:54.498 Send data = 1 [DefaultDispatcher-worker-1]
15:12:54.498 End communication !
※ReceiveとSendの順番が逆になるのは、Logの非同期な処理タイミングが原因
Channelの引数capacityを省略した場合、デフォルトは「capacity = RENDEZVOUS」です。ですので、「送受信の基本」で示した例は全てRendezvousになります。
Buffered
バッファサイズは指定された数(例では3)です。
送信側はバッファが空いている限りデータをバッファへ格納します。バッファが埋まれば一時停止(Suspend)します。受信側はバッファーにデータがあればバッファーから受信します。バッファが空けば一時停止(Suspend)します。この繰り返しです。
例はバッファー数:3なので、Channel#send( )が3回連続で処理されています。
データの要求が無くても前もって送信を開始するストリームタイプです。これをホットストリーム(Hot Stream)といいます。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>(capacity = 3) launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) for(data in _channel) // closeまでのデータを取得 Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]") Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
15:16:18.924 Start communication !
15:16:18.924 Send data = 8 [DefaultDispatcher-worker-1]
15:16:18.924 Send data = 4 [DefaultDispatcher-worker-1]
15:16:18.924 Send data = 3 [DefaultDispatcher-worker-1]
15:16:19.926 Receive Data = 8 [main] <--- 1秒後
15:16:19.928 Send data = 9 [DefaultDispatcher-worker-1]
15:16:19.929 Send data = 1 [DefaultDispatcher-worker-1]
15:16:19.929 Receive Data = 4 [main]
15:16:19.930 Receive Data = 3 [main]
15:16:19.931 Receive Data = 9 [main]
15:16:19.932 Receive Data = 1 [main]
15:16:19.932 End communication !
「capacity = BUFFERED」を指定すると、バッファー数:64になります。
Conflated
バッファサイズは1です。特殊な動作をします。
送信側からバッファーへ送信されたデータはバッファー上で上書きされます。受信側はバッファーから上書き後の最終データを受信します。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>(capacity = CONFLATED) launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) for(data in _channel) // closeまでのデータを取得 Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]") Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
15:17:29.138 Start communication !
15:17:29.138 Send data = 8 [DefaultDispatcher-worker-1]
15:17:29.138 Send data = 4 [DefaultDispatcher-worker-1]
15:17:29.138 Send data = 3 [DefaultDispatcher-worker-1]
15:17:29.139 Send data = 9 [DefaultDispatcher-worker-1]
15:17:29.139 Send data = 1 [DefaultDispatcher-worker-1]
15:17:30.138 Receive Data = 1 [main] <--- 1秒後
15:17:30.138 End communication !
Unlimited
バッファサイズはInt.MAX_VALUEです。「制限なし≒最大」となります。
動作はBufferedと同じです。ただし、バッファに格納されるデータ数が制限されないので、メモリーのオーバーフローに注意してください。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _channel = Channel<Int>(capacity = UNLIMITED) launch(Dispatchers.Default) { arrayListOf(8, 4, 3, 9, 1).forEach { _channel.send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } _channel.close() // データの終わりを知らせる } delay(1000) for(data in _channel) // closeまでのデータを取得 Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]") Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
15:18:25.412 Start communication !
15:18:25.412 Send data = 8 [DefaultDispatcher-worker-1]
15:18:25.412 Send data = 4 [DefaultDispatcher-worker-1]
15:18:25.412 Send data = 3 [DefaultDispatcher-worker-1]
15:18:25.412 Send data = 9 [DefaultDispatcher-worker-1]
15:18:25.413 Send data = 1 [DefaultDispatcher-worker-1]
15:18:26.412 Receive Data = 8 [main] <--- 1秒後
15:18:26.413 Receive Data = 4 [main]
15:18:26.413 Receive Data = 3 [main]
15:18:26.413 Receive Data = 9 [main]
15:18:26.414 Receive Data = 1 [main]
15:18:26.414 End communication !
Produce(ホットストリーム)
Produceを使うと、データをホットストリームで送受信できます。
動作は「送受信タイプ=Buffered」と全く同じです。つまり、Channelを使った記述をproduceに置き換えたものになります。
コルーチンビルダーはproduceに含まれます。
また、送信側のコルーチンが終了するときに、produce内でChannel#close( )は実行されます。記述する必要はありません。
ですので、記述がシンプルになります。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _produce = produce(Dispatchers.Default, capacity = 3) { arrayListOf(8, 4, 3, 9, 1).forEach { send(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } } delay(1000) _produce.consumeEach { Log.i(TAG, "Receive Data = ${it} [${getThreadName()}]") } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:28:01.878 Start communication !
09:28:01.880 Send data = 8 [DefaultDispatcher-worker-1]
09:28:01.880 Send data = 4 [DefaultDispatcher-worker-1]
09:28:01.880 Send data = 3 [DefaultDispatcher-worker-1]
09:28:02.881 Receive Data = 8 [main] <--- 1秒後
09:28:02.881 Receive Data = 4 [main]
09:28:02.881 Receive Data = 3 [main]
09:28:02.882 Receive Data = 9 [main]
09:28:02.882 Send data = 9 [DefaultDispatcher-worker-1]
09:28:02.882 Send data = 1 [DefaultDispatcher-worker-1]
09:28:02.882 Receive Data = 1 [main]
09:28:02.885 End communication !
Flow(コールドストリーム)
Flowを使うと、データをコールドストリームで送受信できます。
動作は「送受信タイプ=Rendezvous」と全く同じです。つまり、Channelを使った記述をflowに置き換えたものになります。
コルーチンビルダーはflowに含まれます。
また、送信側のコルーチンが終了するときに、flow内でChannel#close( )は実行されます。記述する必要はありません。
ですので、記述がシンプルになります。
lifecycle.coroutineScope.launch { Log.i(TAG, "Start communication !") val _flow = flow { arrayListOf(8, 4, 3, 9, 1).forEach { emit(it) Log.i(TAG, "Send data = ${it} [${getThreadName()}]") } }.flowOn(Dispatchers.Default) delay(1000) _flow.collect { Log.i(TAG, "Receive Data = ${it} [${getThreadName()}]") } Log.i(TAG, "End communication !") } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:32:18.661 Start communication !
09:32:19.663 Send data = 8 [DefaultDispatcher-worker-1] <--- 1秒後
09:32:19.663 Receive Data = 8 [main]
09:32:19.663 Send data = 4 [DefaultDispatcher-worker-1]
09:32:19.663 Receive Data = 4 [main]
09:32:19.663 Receive Data = 3 [main]
09:32:19.664 Send data = 3 [DefaultDispatcher-worker-1]
09:32:19.664 Send data = 9 [DefaultDispatcher-worker-1]
09:32:19.664 Send data = 1 [DefaultDispatcher-worker-1]
09:32:19.664 Receive Data = 9 [main]
09:32:19.664 Receive Data = 1 [main]
09:32:19.664 End communication !
関連記事: