Coroutine:コルーチン間でメッセージの送受信

投稿日:  更新日:

コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。

ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。

それにより、非同期処理の連携が容易になります。

今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。

※環境:Android Studio Flamingo | 2022.2.1
   :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4

スポンサーリンク

メッセージの送受信とは

「メッセージ」はデータのことです。ですから、「メッセージの送受信」とは、一方のコルーチンからデータを送信し、片方のコルーチンで受信することを意味します。

メッセージの送受信

コルーチンの戻り値も同じと言えますが、「戻り値」は単一データを返すのに対し、「メッセージの送受信」は複数の連続したデータを返せます。

この「連続したデータ」のことを「ストリームデータ」といいます。

スポンサーリンク

コールド/ホットストリーム

メッセージの送受信は、データを送信する「送信機」とデータを受信する「受信機」の間に、ストリームデータが流れる「通信経路」が存在します。

通信経路は、その特徴により「コールドストリーム」と「ホットストリーム」という2つのタイプに分類できます。

ストリームタイプ通信経路の特徴(違い)身近な例
Cold送信機は、受信機の受信要求を受けて、送信処理を開始
・受信:送信 ⇒ 1:1
・受信機を追加すると、経路は個別に作られる
トランシーバー
電話
Hot送信機は、受信機の有無に関係なく、送信処理を開始
・受信:送信 ⇒ 1:1 or 多:1
・受信機を追加すると、経路は分岐される
テレビ
ラジオ
※送信機を「Observable」と呼ぶ、受信機を「Subscribe」と呼ぶ

通信経路の特徴は送受信の開始時に明確に表れます。

 コールドストリーム 

【 送受信の開始 】
コールドストリームの特徴

  • (1)受信機は受信処理を開始、送信機は何もしない
  • (2)送信機は受信要求を受けて、送信処理を開始
  • (3)通信経路が確立、データが流れ始める

【 受信機の追加 】
コールドストリームの特徴(受信機の追加)

  • (4) (1)(2)を繰り返す、新たな通信経路が確立、データが流れ始める
 ホットストリーム 

【 送受信の開始 】
ホットストリームの特徴

  • (1)受信機に関係なく、送信機は送信処理を開始
  • (2)受信機は受信処理を開始
  • (3)通信経路が確立、データが流れ始める

【 受信機の追加 】
ホットストリームの特徴(受信機の追加)

  • (4)受信機は受信処理を開始
  • (5)通信経路が分岐、データが流れ始める
スポンサーリンク

送受信の基本

Channelを使った最も基本的な送受信を見ていきます。

Channelはホットストリーム

Channelはコルーチン間で複数の連続したメッセージを送受信する仕組みです。

紹介するサンプルは、子コルーチンで送信処理を開始した後に、1000ms待って受信処理を開始します。

ですので、ホットストリーム(Hot Stream)の通信経路を提供します。

送信処理・受信処理
送信処理は「送信するための準備」や「送信できるまでの待機」を含みます。処理が始まって、直ちに送信が始められるとは限りません。

受信処理も同様です。処理が始まって、直ちに受信が始められるとは限りません。

Channelで単一データ受信

ワーカースレッド(Defaultプールから取得)からメインスレッドへ単一データを送受信する例です。

Channelインスタンを使い、Channel#sendで送信し、Channel#receiveで受信します。

Channelクラスの型パラメータへIntを指定しているので、転送されるデータはInt型です。

        lifecycle.coroutineScope.launch {
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                val _data = (1..100).random() // 実際は演算により得られたデータ
                _channel.send(_data)
                Log.i(TAG, "Send    data = ${_data} [${getThreadName()}]")
            }
            delay(1000)
            val _data = _channel.receive()
            Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
10:01:07.936  Receive Data = 46 [main]
10:01:07.937  Send    data = 46 [DefaultDispatcher-worker-1]
参考:getThreadName()
fun getThreadName(): String = Thread.currentThread().name

Channelでストリーム受信

ワーカースレッドからメインスレッドへストリームデータ(サンプルはInt型の配列)を送受信する例です。

注意点は、送信側でデータの終わりを明確にする必要があることです。全ての送信が終わったら必ずChannel#closeを実行して、データの終わりを宣言します。

データの終わりが不明ですと、受信側は永遠にデータを待ち続けることになります。

1つずつ連続で受信

データを一つずつ受信しています。

        lifecycle.coroutineScope.launch {
		    Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            while (! _channel.isClosedForReceive) {
                // <-- このタイミングでcloseされるとreceiveがエラーになる
                val _data = _channel.receive()
                Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:27:46.956  Start communication !
09:28:46.961  Receive Data = 8 [main] <--- 1秒後
09:28:46.962  Send    data = 8 [DefaultDispatcher-worker-1]
09:28:46.962  Send    data = 4 [DefaultDispatcher-worker-1]
09:28:46.962  Receive Data = 4 [main]
09:28:46.963  Receive Data = 3 [main]
09:28:46.963  Send    data = 3 [DefaultDispatcher-worker-1]
09:28:46.964  Send    data = 9 [DefaultDispatcher-worker-1]
09:28:46.965  Receive Data = 9 [main]
09:28:46.966  Receive Data = 1 [main]
09:28:46.968  Send    data = 1 [DefaultDispatcher-worker-1]
-----
09:28:46.982  FATAL EXCEPTION: main
kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

ただし、このプログラムはNGです。エラーが発生します。なぜなら、Channel#isClosedForReceiveと#closeがレーシングするからです。

これを回避するには、エラーのハンドリングを行います。この方法は2つあります。

手動でハンドリングAPIでハンドリング
        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            while (! _channel.isClosedForReceive) {
                try {
                    val _data = _channel.receive()
                    Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
                }
                catch (e: ClosedReceiveChannelException) {
					// エラー時の処理
                    Log.i(TAG, "Channel was closed !")
                }
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを知らせる
            }
            delay(1000)
            while (! _channel.isClosedForReceive) {
                val _result = _channel.receiveCatching()
                if(_result.isSuccess) {
                    val _data = _result.getOrNull()
                    Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
                }
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:54:30.891  Start communication !
09:55:30.891  Receive Data = 8 [main] <--- 1秒後
09:55:30.893  Send    data = 8 [DefaultDispatcher-worker-1]
09:55:30.895  Send    data = 4 [DefaultDispatcher-worker-1]
09:55:30.896  Receive Data = 4 [main]
09:55:30.897  Receive Data = 3 [main]
09:55:30.897  Send    data = 3 [DefaultDispatcher-worker-1]
09:55:30.898  Send    data = 9 [DefaultDispatcher-worker-1]
09:55:30.898  Receive Data = 9 [main]
09:55:30.899  Receive Data = 1 [main]
09:55:30.899  Send    data = 1 [DefaultDispatcher-worker-1]
09:55:30.899  End communication   !

イテレーターで受信

Channelインスタンスはイテレータ―(Iterator)を有しているので、for文によりループ処理が可能です。

エラーのハンドリング等が省略できるので、受信側がスッキリします。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _channel = Channel<Int>()
            launch(Dispatchers.Default) {
                arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
                    _channel.send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
                _channel.close()    // データの終わりを宣言
            }
            delay(1000)
            for(data in _channel)   // closeまでのデータを取得
                Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]")
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope
10:15:34.329  Start communication !
10:15:35.329  Receive Data = 8 [main] <--- 1秒後
10:15:35.330  Send    data = 8 [DefaultDispatcher-worker-2]
10:15:35.331  Send    data = 4 [DefaultDispatcher-worker-2]
10:15:35.332  Receive Data = 4 [main]
10:15:35.332  Receive Data = 3 [main]
10:15:35.333  Send    data = 3 [DefaultDispatcher-worker-2]
10:15:35.333  Send    data = 9 [DefaultDispatcher-worker-2]
10:15:35.333  Receive Data = 9 [main]
10:15:35.334  Receive Data = 1 [main]
10:15:35.334  Send    data = 1 [DefaultDispatcher-worker-2]
10:15:35.334  End communication   !
スポンサーリンク

送受信の様子

図は送受信の様子です。

Channelはコルーチン間(AとBの間)に位置し、データの中継を担っています。つまり、通信経路です。

そして、データを一時的に保持するためのバッファを持つことが出来ます。

送受信の様子

また、ChannelクラスはChannelSendとChannelReceiveを継承しています。それぞれ、送信側と受信側の処理を担当していて、バッファを挟んでお互いに独立した動作をします。

 送信側(ChannelSend)の処理 

Channel#send( )を実行すると、次の処理を行います。

●Bufferサイズ > 0
    ・Bufferに空きがあれば、データをBufferに格納
    ・Bufferに空きがなければ、sendを休止

●Bufferサイズ = 0
    ・受信の要求があれば、受信側へ直にデータを渡す
    ・受信の要求がなければ、sendを休止
 受信側(ChannelReceive)の処理 

Channel#receive( )を実行すると、次の処理を行います。

●Bufferサイズ > 0
    ・Bufferにデータがあれば、データをBufferから取得
    ・Bufferにデータがなければ、receiveを休止

●Bufferサイズ = 0
    ・送信データがあれば、送信側から直に受け取る
    ・送信データがなければ、receiveを休止

Channel#send( )ならびに#receive( )はSuspend関数であり、non-blocking動作を行うことに注意してください。

Produce

Produceはコルーチン間で複数の連続したメッセージを送受信する仕組みです。

ホットストリーム(Hot Stream)の通信経路を提供します。

Produceの通信経路を構築するproduce関数は、Channelのファクトリー関数です。ReceiveChannelのインスタンスを返します。

ですので、「Produce ≒ Channel」です。送受信の動作はChannelと変わりません。

コルーチンビルダーはproduceに含まれます。また、送信側のコルーチンが終了するときに、produce内でChannel#close( )が実行されます。

ですので、Chennelに比べて記述がシンプルになります。

        lifecycle.coroutineScope.launch {
            Log.i(TAG, "Start communication !")
            val _produce = produce(Dispatchers.Default, capacity = 3) {
                arrayListOf(8, 4, 3, 9, 1).forEach {
                    send(it)
                    Log.i(TAG, "Send    data = ${it} [${getThreadName()}]")
                }
            }
            delay(1000)
            _produce.consumeEach {
                Log.i(TAG, "Receive Data = ${it} [${getThreadName()}]")
            }
            Log.i(TAG, "End communication   !")
        } // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:28:01.878  Start communication !
09:28:01.880  Send    data = 8 [DefaultDispatcher-worker-1]
09:28:01.880  Send    data = 4 [DefaultDispatcher-worker-1]
09:28:01.880  Send    data = 3 [DefaultDispatcher-worker-1]
09:28:02.881  Receive Data = 8 [main] <--- 1秒後
09:28:02.881  Receive Data = 4 [main]
09:28:02.881  Receive Data = 3 [main]
09:28:02.882  Receive Data = 9 [main]
09:28:02.882  Send    data = 9 [DefaultDispatcher-worker-1]
09:28:02.882  Send    data = 1 [DefaultDispatcher-worker-1]
09:28:02.882  Receive Data = 1 [main]
09:28:02.885  End communication   !

サンプルは基本的な使い方です。
※Produce/Channelの詳細は「Coroutine:Produce」を参照

スポンサーリンク

メッセージを送受信する仕組み

Produce以外にも「メッセージを送受信する仕組み」が存在します。

ストリームタイプ通信経路状態の監視※
Produce / ChannelHotデータ分岐
×
Flow(SafeFlow)Cold1対1
SharedFlowHotブロードキャスト
StateFlow
※状態の監視 :再Composeのスケジューリングが可能かどうか

※仕組みの詳細は各記事を参照
 「Coroutine:Produce
 「Coroutine:Flow(SafeFlow)
 「Coroutine:SharedFlow
 「Coroutine:StateFlow

スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するasyncビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Produce」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Flow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「SharedFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「StateFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
Flowはメンバー関数や拡張関数で様々な機能を提供しています。 これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。 中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。 終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。 今回は、この「Flowの中間演算」のwithIndex、map、filter、drop、takeを取り上げて、まとめます。 ※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-androi ...
Flowはメンバー関数や拡張関数で様々な機能を提供しています。 これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。 中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。 終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。 今回は、この「Flowの中間演算」のzip、merge、combineを取り上げて、まとめます。 ※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     o ...
スポンサーリンク