コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。
ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。
それにより、非同期処理の連携が容易になります。
今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。
※環境:Android Studio Flamingo | 2022.2.1
:org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4
目次
メッセージの送受信とは
「メッセージ」はデータのことです。ですから、「メッセージの送受信」とは、一方のコルーチンからデータを送信し、片方のコルーチンで受信することを意味します。

コルーチンの戻り値も同じと言えますが、「戻り値」は単一データを返すのに対し、「メッセージの送受信」は複数の連続したデータを返せます。
この「連続したデータ」のことを「ストリームデータ」といいます。
コールド/ホットストリーム
メッセージの送受信は、データを送信する「送信機」とデータを受信する「受信機」の間に、ストリームデータが流れる「通信経路」が存在します。
通信経路は、その特徴により「コールドストリーム」と「ホットストリーム」という2つのタイプに分類できます。
| ストリームタイプ | 通信経路の特徴(違い) | 身近な例 |
|---|---|---|
| Cold | 送信機は、受信機の受信要求を受けて、送信処理を開始 ・受信:送信 ⇒ 1:1 ・受信機を追加すると、経路は個別に作られる | トランシーバー 電話 |
| Hot | 送信機は、受信機の有無に関係なく、送信処理を開始 ・受信:送信 ⇒ 1:1 or 多:1 ・受信機を追加すると、経路は分岐される | テレビ ラジオ |
| ※送信機を「Observable」と呼ぶ、受信機を「Subscribe」と呼ぶ | ||
通信経路の特徴は送受信の開始時に明確に表れます。
コールドストリーム
- (1)受信機は受信処理を開始、送信機は何もしない
- (2)送信機は受信要求を受けて、送信処理を開始
- (3)通信経路が確立、データが流れ始める
- (4) (1)(2)を繰り返す、新たな通信経路が確立、データが流れ始める
- (1)受信機に関係なく、送信機は送信処理を開始
- (2)受信機は受信処理を開始
- (3)通信経路が確立、データが流れ始める
- (4)受信機は受信処理を開始
- (5)通信経路が分岐、データが流れ始める
送受信の基本
Channelを使った最も基本的な送受信を見ていきます。
Channelはホットストリーム
Channelはコルーチン間で複数の連続したメッセージを送受信する仕組みです。
紹介するサンプルは、子コルーチンで送信処理を開始した後に、1000ms待って受信処理を開始します。
ですので、ホットストリーム(Hot Stream)の通信経路を提供します。
受信処理も同様です。処理が始まって、直ちに受信が始められるとは限りません。
Channelで単一データ受信
ワーカースレッド(Defaultプールから取得)からメインスレッドへ単一データを送受信する例です。
Channelインスタンを使い、Channel#sendで送信し、Channel#receiveで受信します。
Channelクラスの型パラメータへIntを指定しているので、転送されるデータはInt型です。
lifecycle.coroutineScope.launch {
val _channel = Channel<Int>()
launch(Dispatchers.Default) {
val _data = (1..100).random() // 実際は演算により得られたデータ
_channel.send(_data)
Log.i(TAG, "Send data = ${_data} [${getThreadName()}]")
}
delay(1000)
val _data = _channel.receive()
Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
} // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
10:01:07.936 Receive Data = 46 [main] 10:01:07.937 Send data = 46 [DefaultDispatcher-worker-1]
Channelでストリーム受信
ワーカースレッドからメインスレッドへストリームデータ(サンプルはInt型の配列)を送受信する例です。
注意点は、送信側でデータの終わりを明確にする必要があることです。全ての送信が終わったら必ずChannel#closeを実行して、データの終わりを宣言します。
データの終わりが不明ですと、受信側は永遠にデータを待ち続けることになります。
1つずつ連続で受信
データを一つずつ受信しています。
lifecycle.coroutineScope.launch {
Log.i(TAG, "Start communication !")
val _channel = Channel<Int>()
launch(Dispatchers.Default) {
arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
_channel.send(it)
Log.i(TAG, "Send data = ${it} [${getThreadName()}]")
}
_channel.close() // データの終わりを知らせる
}
delay(1000)
while (! _channel.isClosedForReceive) {
// <-- このタイミングでcloseされるとreceiveがエラーになる
val _data = _channel.receive()
Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
}
Log.i(TAG, "End communication !")
} // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:27:46.956 Start communication ! 09:28:46.961 Receive Data = 8 [main] <--- 1秒後 09:28:46.962 Send data = 8 [DefaultDispatcher-worker-1] 09:28:46.962 Send data = 4 [DefaultDispatcher-worker-1] 09:28:46.962 Receive Data = 4 [main] 09:28:46.963 Receive Data = 3 [main] 09:28:46.963 Send data = 3 [DefaultDispatcher-worker-1] 09:28:46.964 Send data = 9 [DefaultDispatcher-worker-1] 09:28:46.965 Receive Data = 9 [main] 09:28:46.966 Receive Data = 1 [main] 09:28:46.968 Send data = 1 [DefaultDispatcher-worker-1] ----- 09:28:46.982 FATAL EXCEPTION: main kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
ただし、このプログラムはNGです。エラーが発生します。なぜなら、Channel#isClosedForReceiveと#closeがレーシングするからです。
これを回避するには、エラーのハンドリングを行います。この方法は2つあります。
lifecycle.coroutineScope.launch {
Log.i(TAG, "Start communication !")
val _channel = Channel<Int>()
launch(Dispatchers.Default) {
arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
_channel.send(it)
Log.i(TAG, "Send data = ${it} [${getThreadName()}]")
}
_channel.close() // データの終わりを知らせる
}
delay(1000)
while (! _channel.isClosedForReceive) {
try {
val _data = _channel.receive()
Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
}
catch (e: ClosedReceiveChannelException) {
// エラー時の処理
Log.i(TAG, "Channel was closed !")
}
}
Log.i(TAG, "End communication !")
} // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
lifecycle.coroutineScope.launch {
Log.i(TAG, "Start communication !")
val _channel = Channel<Int>()
launch(Dispatchers.Default) {
arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
_channel.send(it)
Log.i(TAG, "Send data = ${it} [${getThreadName()}]")
}
_channel.close() // データの終わりを知らせる
}
delay(1000)
while (! _channel.isClosedForReceive) {
val _result = _channel.receiveCatching()
if(_result.isSuccess) {
val _data = _result.getOrNull()
Log.i(TAG, "Receive Data = ${_data} [${getThreadName()}]")
}
}
Log.i(TAG, "End communication !")
} // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:54:30.891 Start communication !
09:55:30.891 Receive Data = 8 [main] <--- 1秒後
09:55:30.893 Send data = 8 [DefaultDispatcher-worker-1]
09:55:30.895 Send data = 4 [DefaultDispatcher-worker-1]
09:55:30.896 Receive Data = 4 [main]
09:55:30.897 Receive Data = 3 [main]
09:55:30.897 Send data = 3 [DefaultDispatcher-worker-1]
09:55:30.898 Send data = 9 [DefaultDispatcher-worker-1]
09:55:30.898 Receive Data = 9 [main]
09:55:30.899 Receive Data = 1 [main]
09:55:30.899 Send data = 1 [DefaultDispatcher-worker-1]
09:55:30.899 End communication !
イテレーターで受信
Channelインスタンスはイテレータ―(Iterator)を有しているので、for文によりループ処理が可能です。
エラーのハンドリング等が省略できるので、受信側がスッキリします。
lifecycle.coroutineScope.launch {
Log.i(TAG, "Start communication !")
val _channel = Channel<Int>()
launch(Dispatchers.Default) {
arrayListOf(8, 4, 3, 9, 1).forEach { // 実際は演算により得られたデータ
_channel.send(it)
Log.i(TAG, "Send data = ${it} [${getThreadName()}]")
}
_channel.close() // データの終わりを宣言
}
delay(1000)
for(data in _channel) // closeまでのデータを取得
Log.i(TAG, "Receive Data = ${data} [${getThreadName()}]")
Log.i(TAG, "End communication !")
} // lifecycle.coroutineScope => LifecycleScope
10:15:34.329 Start communication !
10:15:35.329 Receive Data = 8 [main] <--- 1秒後
10:15:35.330 Send data = 8 [DefaultDispatcher-worker-2]
10:15:35.331 Send data = 4 [DefaultDispatcher-worker-2]
10:15:35.332 Receive Data = 4 [main]
10:15:35.332 Receive Data = 3 [main]
10:15:35.333 Send data = 3 [DefaultDispatcher-worker-2]
10:15:35.333 Send data = 9 [DefaultDispatcher-worker-2]
10:15:35.333 Receive Data = 9 [main]
10:15:35.334 Receive Data = 1 [main]
10:15:35.334 Send data = 1 [DefaultDispatcher-worker-2]
10:15:35.334 End communication !
送受信の様子
図は送受信の様子です。
Channelはコルーチン間(AとBの間)に位置し、データの中継を担っています。つまり、通信経路です。
そして、データを一時的に保持するためのバッファを持つことが出来ます。

また、ChannelクラスはChannelSendとChannelReceiveを継承しています。それぞれ、送信側と受信側の処理を担当していて、バッファを挟んでお互いに独立した動作をします。
送信側(ChannelSend)の処理Channel#send( )を実行すると、次の処理を行います。
●Bufferサイズ > 0
・Bufferに空きがあれば、データをBufferに格納
・Bufferに空きがなければ、sendを休止
●Bufferサイズ = 0
・受信の要求があれば、受信側へ直にデータを渡す
・受信の要求がなければ、sendを休止
受信側(ChannelReceive)の処理
Channel#receive( )を実行すると、次の処理を行います。
●Bufferサイズ > 0
・Bufferにデータがあれば、データをBufferから取得
・Bufferにデータがなければ、receiveを休止
●Bufferサイズ = 0
・送信データがあれば、送信側から直に受け取る
・送信データがなければ、receiveを休止
Channel#send( )ならびに#receive( )はSuspend関数であり、non-blocking動作を行うことに注意してください。
Produce
Produceはコルーチン間で複数の連続したメッセージを送受信する仕組みです。
ホットストリーム(Hot Stream)の通信経路を提供します。
Produceの通信経路を構築するproduce関数は、Channelのファクトリー関数です。ReceiveChannelのインスタンスを返します。
ですので、「Produce ≒ Channel」です。送受信の動作はChannelと変わりません。
コルーチンビルダーはproduceに含まれます。また、送信側のコルーチンが終了するときに、produce内でChannel#close( )が実行されます。
ですので、Chennelに比べて記述がシンプルになります。
lifecycle.coroutineScope.launch {
Log.i(TAG, "Start communication !")
val _produce = produce(Dispatchers.Default, capacity = 3) {
arrayListOf(8, 4, 3, 9, 1).forEach {
send(it)
Log.i(TAG, "Send data = ${it} [${getThreadName()}]")
}
}
delay(1000)
_produce.consumeEach {
Log.i(TAG, "Receive Data = ${it} [${getThreadName()}]")
}
Log.i(TAG, "End communication !")
} // lifecycle.coroutineScope => LifecycleScope、Mainプールを使用
09:28:01.878 Start communication !
09:28:01.880 Send data = 8 [DefaultDispatcher-worker-1]
09:28:01.880 Send data = 4 [DefaultDispatcher-worker-1]
09:28:01.880 Send data = 3 [DefaultDispatcher-worker-1]
09:28:02.881 Receive Data = 8 [main] <--- 1秒後
09:28:02.881 Receive Data = 4 [main]
09:28:02.881 Receive Data = 3 [main]
09:28:02.882 Receive Data = 9 [main]
09:28:02.882 Send data = 9 [DefaultDispatcher-worker-1]
09:28:02.882 Send data = 1 [DefaultDispatcher-worker-1]
09:28:02.882 Receive Data = 1 [main]
09:28:02.885 End communication !
サンプルは基本的な使い方です。
※Produce/Channelの詳細は「Coroutine:Produce」を参照
メッセージを送受信する仕組み
Produce以外にも「メッセージを送受信する仕組み」が存在します。
| ストリームタイプ | 通信経路 | 状態の監視※ | |
|---|---|---|---|
| Produce / Channel | Hot | データ分岐 | |
| Flow(SafeFlow) | Cold | 1対1 | |
| SharedFlow | Hot | ブロードキャスト | |
| StateFlow | |||
| ※状態の監視 :再Composeのスケジューリングが可能かどうか | |||
※仕組みの詳細は各記事を参照
「Coroutine:Produce」
「Coroutine:Flow(SafeFlow)」
「Coroutine:SharedFlow」
「Coroutine:StateFlow」
関連記事:
