Coroutine:コルーチン間でメッセージの送受信

投稿日:  更新日:

コルーチン間でメッセージ(データのこと)の送受信を行うことが出来ます。

これにより、処理の投げっぱなしになりがちな非同期処理と、連携を強化できます。

しかも、ProduceやFlowを使うと記述が簡素になり、プログラミングの容易さと読みやすさが向上して便利です。

今回は、この「メッセージの送受信」について、使い方をまとめました。

※環境:Android Studio Flamingo | 2022.2.1
   :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4

スポンサーリンク

メッセージの送受信とは

「メッセージ」はデータのことです。ですから、「メッセージの送受信」とは、一方のコルーチンからデータを送信し、片方のコルーチンで受信することを意味します。

メッセージの送受信

コルーチンの戻り値も同じと言えますが、戻り値は単一データを返すのに対し、「メッセージの送受信」は複数の連続データを返せます。

この「連続データ」のことを「ストリームデータ」といいます。

スポンサーリンク

送受信の基本

基本的なメッセージの送受信を見ていきます。

Channelで単一データ受信

ワーカースレッド(Defaultプールから取得)からメインスレッドへ単一データを送受信する例です。

Channelインスタンを使い、Channel#sendで送信し、Channel#receiveで受信します。

Channelクラスの型パラメータへIntを指定しているので、転送されるデータはInt型です。

        lifecycle.coroutineScope.launch {
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                val _data = (1..100).random() // 実際は演算により得られたデータ
                _channel.send(_data)
                Log.i(TAG, "Send    data = ${_data} [${getThreadName()}]")
            }
            delay(1000)
            val _data = _channel.receive()
            Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
10:01:07.936  Receive Data = 46 [main]
10:01:07.937  Send    data = 46 [DefaultDispatcher-worker-1]
参考:getThreadName()
fun getThreadName(): String = Thread.currentThread().name

Channelでストリーム受信

ワーカースレッドからメインスレッドへストリームデータ(サンプルはInt型の配列)を送受信する例です。

注意点は、送信側でデータの終わりを明確にする必要があることです。全ての送信が終わったら必ずChannel#closeを実行して、データの終わりを宣言します。

データの終わりが不明ですと、受信側は永遠にデータを待ち続けることになります。

1つずつ連続で受信

データを一つずつ転送しています。

        lifecycle.coroutineScope.launch {
		    Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            while (! _channel.isClosedForReceive) {
                // <-- このタイミングでcloseされるとreceiveがエラーになる
                val _data = _channel.receive()
                Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:27:46.956  Start communication !
09:28:46.961  Receive Data = 8 [main] <--- 1秒後
09:28:46.962  Send    data = 8 [DefaultDispatcher-worker-1]
09:28:46.962  Send    data = 4 [DefaultDispatcher-worker-1]
09:28:46.962  Receive Data = 4 [main]
09:28:46.963  Receive Data = 3 [main]
09:28:46.963  Send    data = 3 [DefaultDispatcher-worker-1]
09:28:46.964  Send    data = 9 [DefaultDispatcher-worker-1]
09:28:46.965  Receive Data = 9 [main]
09:28:46.966  Receive Data = 1 [main]
09:28:46.968  Send    data = 1 [DefaultDispatcher-worker-1]
-----
09:28:46.982  FATAL EXCEPTION: main
kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

ただし、このプログラムはNGです。エラーが発生します。なぜなら、Channel#isClosedForReceiveと#closeがレーシングするからです。

これを回避するには、エラーのハンドリングを行います。この方法は2つあります。

手動でハンドリングAPIでハンドリング
        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            while (! _channel.isClosedForReceive) {
                try {
                    val _data = _channel.receive()
                    Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
                }
                catch (e: ClosedReceiveChannelException) {
					// エラー時の処理
                    Log.i(TAG, "Channel was closed !")
                }
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            while (! _channel.isClosedForReceive) {
                val _result = _channel.receiveCatching()
                if(_result.isSuccess) {
                    val _data = _result.getOrNull()
                    Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
                }
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:54:30.891  Start communication !
09:55:30.891  Receive Data = 8 [main] <--- 1秒後
09:55:30.893  Send    data = 8 [DefaultDispatcher-worker-1]
09:55:30.895  Send    data = 4 [DefaultDispatcher-worker-1]
09:55:30.896  Receive Data = 4 [main]
09:55:30.897  Receive Data = 3 [main]
09:55:30.897  Send    data = 3 [DefaultDispatcher-worker-1]
09:55:30.898  Send    data = 9 [DefaultDispatcher-worker-1]
09:55:30.898  Receive Data = 9 [main]
09:55:30.899  Receive Data = 1 [main]
09:55:30.899  Send    data = 1 [DefaultDispatcher-worker-1]
09:55:30.899  End communication   !

イテレーターで受信

Channelインスタンスはイテレータ―(Iterator)機能を有しているので、for文によりループ処理が可能です。

エラーのハンドリング等が省略できるので、受信側がスッキリします。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを宣言
            }
            delay(1000)
            for(data in _channel)   // closeまでのデータを取得
                Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]")
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope
10:15:34.329  Start communication !
10:15:35.329  Receive Data = 8 [main] <--- 1秒後
10:15:35.330  Send    data = 8 [DefaultDispatcher-worker-2]
10:15:35.331  Send    data = 4 [DefaultDispatcher-worker-2]
10:15:35.332  Receive Data = 4 [main]
10:15:35.332  Receive Data = 3 [main]
10:15:35.333  Send    data = 3 [DefaultDispatcher-worker-2]
10:15:35.333  Send    data = 9 [DefaultDispatcher-worker-2]
10:15:35.333  Receive Data = 9 [main]
10:15:35.334  Receive Data = 1 [main]
10:15:35.334  Send    data = 1 [DefaultDispatcher-worker-2]
10:15:35.334  End communication   !
スポンサーリンク

送受信の様子

図は送受信の様子を描いたものです。

Channelはコルーチン間(AとBの間)に位置し、データの中継を担っています。そして、データを一時保管するためのバッファを持つことが出来ます。

送受信の様子

また、ChannelクラスはChannelSendとChannelReceiveを継承しています。それぞれ、送信側と受信側の処理を担当していて、バッファに対して独立した動作をします。

 送信側(ChannelSend)の処理 

Channel#send( )を実行すると、次の処理を行います。

  • Bufferに空きがある場合、データをBufferに格納
  • Bufferに空きがない場合、sendを一時停止(Suspend)
  • Bufferのサイズが0の場合、要求に従い受信側へ直にデータを渡す
 受信側(ChannelReceive)の処理 

Channel#receive( )を実行すると、次の処理を行います。

  • Bufferにデータがある場合、データをBufferから取得
  • Bufferにデータがない場合、receiveを一時停止(Suspend)
  • Bufferのサイズが0の場合、送信側へ直にデータを要求

Channel#send( )ならびに#receive( )はSuspend関数であり、non-blocking動作を行うことに注意してください。

スポンサーリンク

送受信のタイプ

Channelが持つバッファのサイズにより、送受信のタイプが分かれます。

タイプバッファサイズ
( )内はパラメータ名
ストリームタイプ
Hot / Cold
Rendezvous0(RENDEZVOUS)Cold
Buffered64(BUFFERED)または、指定された数Hot
Conflated1(CONFLATED)
UnlimitedInt.MAX_VALUE(UNLIMITED)

Rendezvous

バッファサイズは0(なし)です。

受信側は送信側へ直にデータを要求し、送信側は受信側へ直にデータを渡します。

データの要求を受けてから送信を開始するストリームタイプです。これをコールドストリーム(Cold Stream)といいます。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>(capacity = RENDEZVOUS)
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            for(data in _channel)   // closeまでのデータを取得
                Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]")
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
15:12:53.494  Start communication !
15:12:54.494  Receive Data = 8 [main] <--- 1秒後
15:12:54.495  Send    data = 8 [DefaultDispatcher-worker-1]
15:12:54.495  Send    data = 4 [DefaultDispatcher-worker-1]
15:12:54.496  Receive Data = 4 [main]
15:12:54.496  Receive Data = 3 [main]
15:12:54.496  Send    data = 3 [DefaultDispatcher-worker-1]
15:12:54.497  Receive Data = 9 [main]
15:12:54.497  Send    data = 9 [DefaultDispatcher-worker-1]
15:12:54.498  Receive Data = 1 [main]
15:12:54.498  Send    data = 1 [DefaultDispatcher-worker-1]
15:12:54.498  End communication   !

※ReceiveとSendの順番が逆になるのは、Logの非同期な処理タイミングが原因

Channelの引数capacityを省略した場合、デフォルトは「capacity = RENDEZVOUS」です。ですので、「送受信の基本」で示した例は全てRendezvousになります。

Buffered

バッファサイズは指定された数(例では3)です。

送信側はバッファが空いている限りデータをバッファへ格納します。バッファが埋まれば一時停止(Suspend)します。受信側はバッファーにデータがあればバッファーから受信します。バッファが空けば一時停止(Suspend)します。この繰り返しです。

例はバッファー数:3なので、Channel#send( )が3回連続で処理されています。

データの要求が無くても前もって送信を開始するストリームタイプです。これをホットストリーム(Hot Stream)といいます。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>(capacity = 3)
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach {
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            for(data in _channel)   // closeまでのデータを取得
                Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]")
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
15:16:18.924  Start communication !
15:16:18.924  Send    data = 8 [DefaultDispatcher-worker-1]
15:16:18.924  Send    data = 4 [DefaultDispatcher-worker-1]
15:16:18.924  Send    data = 3 [DefaultDispatcher-worker-1]
15:16:19.926  Receive Data = 8 [main] <--- 1秒後
15:16:19.928  Send    data = 9 [DefaultDispatcher-worker-1]
15:16:19.929  Send    data = 1 [DefaultDispatcher-worker-1]
15:16:19.929  Receive Data = 4 [main]
15:16:19.930  Receive Data = 3 [main]
15:16:19.931  Receive Data = 9 [main]
15:16:19.932  Receive Data = 1 [main]
15:16:19.932  End communication   !

「capacity = BUFFERED」を指定すると、バッファー数:64になります。

Conflated

バッファサイズは1です。特殊な動作をします。

送信側からバッファーへ送信されたデータはバッファー上で上書きされます。受信側はバッファーから上書き後の最終データを受信します。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>(capacity = CONFLATED)
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach {
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            for(data in _channel)   // closeまでのデータを取得
                Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]")
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
15:17:29.138  Start communication !
15:17:29.138  Send    data = 8 [DefaultDispatcher-worker-1]
15:17:29.138  Send    data = 4 [DefaultDispatcher-worker-1]
15:17:29.138  Send    data = 3 [DefaultDispatcher-worker-1]
15:17:29.139  Send    data = 9 [DefaultDispatcher-worker-1]
15:17:29.139  Send    data = 1 [DefaultDispatcher-worker-1]
15:17:30.138  Receive Data = 1 [main] <--- 1秒後
15:17:30.138  End communication   !

Unlimited

バッファサイズはInt.MAX_VALUEです。「制限なし≒最大」となります。

動作はBufferedと同じです。ただし、バッファに格納されるデータ数が制限されないので、メモリーのオーバーフローに注意してください。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>(capacity = UNLIMITED)
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach {
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            for(data in _channel)   // closeまでのデータを取得
                Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]")
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
15:18:25.412  Start communication !
15:18:25.412  Send    data = 8 [DefaultDispatcher-worker-1]
15:18:25.412  Send    data = 4 [DefaultDispatcher-worker-1]
15:18:25.412  Send    data = 3 [DefaultDispatcher-worker-1]
15:18:25.412  Send    data = 9 [DefaultDispatcher-worker-1]
15:18:25.413  Send    data = 1 [DefaultDispatcher-worker-1]
15:18:26.412  Receive Data = 8 [main] <--- 1秒後
15:18:26.413  Receive Data = 4 [main]
15:18:26.413  Receive Data = 3 [main]
15:18:26.413  Receive Data = 9 [main]
15:18:26.414  Receive Data = 1 [main]
15:18:26.414  End communication   !
スポンサーリンク

Produce(ホットストリーム)

Produceを使うと、データをホットストリームで送受信できます。

動作は「送受信タイプ=Buffered」と全く同じです。つまり、Channelを使った記述をproduceに置き換えたものになります。

コルーチンビルダーはproduceに含まれます。

また、送信側のコルーチンが終了するときに、produce内でChannel#close( )は実行されます。記述する必要はありません。

ですので、記述がシンプルになります。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _produce = produce(Dispatchers.Default, capacity = 3) {
                arrayListOf(8, 4, 3, 9, 1).forEach {
                    send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
            }
            delay(1000)
            _produce.consumeEach {
                Log.i(TAG, "Receive Data = ${it} [${getThreadName()}]")
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:28:01.878  Start communication !
09:28:01.880  Send    data = 8 [DefaultDispatcher-worker-1]
09:28:01.880  Send    data = 4 [DefaultDispatcher-worker-1]
09:28:01.880  Send    data = 3 [DefaultDispatcher-worker-1]
09:28:02.881  Receive Data = 8 [main] <--- 1秒後
09:28:02.881  Receive Data = 4 [main]
09:28:02.881  Receive Data = 3 [main]
09:28:02.882  Receive Data = 9 [main]
09:28:02.882  Send    data = 9 [DefaultDispatcher-worker-1]
09:28:02.882  Send    data = 1 [DefaultDispatcher-worker-1]
09:28:02.882  Receive Data = 1 [main]
09:28:02.885  End communication   !
スポンサーリンク

Flow(コールドストリーム)

Flowを使うと、データをコールドストリームで送受信できます。

動作は「送受信タイプ=Rendezvous」と全く同じです。つまり、Channelを使った記述をflowに置き換えたものになります。

コルーチンビルダーはflowに含まれます。

また、送信側のコルーチンが終了するときに、flow内でChannel#close( )は実行されます。記述する必要はありません。

ですので、記述がシンプルになります。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _flow = flow {
                arrayListOf(8, 4, 3, 9, 1).forEach {
                    emit(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
            }.flowOn(Dispatchers.Default)
            delay(1000)
            _flow.collect {
                Log.i(TAG, "Receive Data = ${it} [${getThreadName()}]")
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:32:18.661  Start communication !
09:32:19.663  Send    data = 8 [DefaultDispatcher-worker-1] <--- 1秒後
09:32:19.663  Receive Data = 8 [main]
09:32:19.663  Send    data = 4 [DefaultDispatcher-worker-1]
09:32:19.663  Receive Data = 4 [main]
09:32:19.663  Receive Data = 3 [main]
09:32:19.664  Send    data = 3 [DefaultDispatcher-worker-1]
09:32:19.664  Send    data = 9 [DefaultDispatcher-worker-1]
09:32:19.664  Send    data = 1 [DefaultDispatcher-worker-1]
09:32:19.664  Receive Data = 9 [main]
09:32:19.664  Receive Data = 1 [main]
09:32:19.664  End communication   !
スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するasyncビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
スポンサーリンク