KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。
Channel、Produce、Flow、SharedFlow、StateFlowなどです。
これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。
プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。
ですので、各々を比較しつつ、まとめました。
この記事は「Produce」について、まとめたものです。
※環境:Android Studio Koala Feature Drop | 2024.1.2
Kotlin 1.9.0
Compose Compiler 1.5.1
org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3
目次
Produceとは
Produceはコルーチン間で複数の連続したメッセージを送受信する仕組みです。
ホットストリーム(Hot Stream)の通信経路を提供します。
※メッセージの送受信については「Coroutine:コルーチン間でメッセージの送受信」を参照
Produceは、表のような特徴を持ちます。
ストリームタイプ | 通信経路 | 状態の監視※ | |
---|---|---|---|
Produce / Channel | Hot | データ分岐 | |
Flow(SafeFlow) | Cold | 1対1 | |
SharedFlow | Hot | ブロードキャスト | |
StateFlow | |||
※状態の監視 :再Composeのスケジューリングが可能かどうか |
なお、Produceの通信経路を構築するproduce関数は、Channelのビルダー関数です。ReceiveChannelのインスタンスを返します。ですので、「Produce ≒ Channel」です。
この記事で述べている「Produce」は、「仕組みの名称」であって、クラスやオブジェクトではありません。
また、produce関数はコルーチンビルダー関数でもあります。launchと同様にコルーチンを起動します。
基本的な例とストリームタイプ
以下はProduceの基本的な例です。
サンプルの動作は単純で、5つの整数値(配列:Data)をワーカーからメインスレッドへ送ります。
送信機(Sender)は処理の開始(コルーチンの起動)から5000ms後に、データの送信(send)を開始します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createProduce(scope: CoroutineScope) = scope.produce(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start Send !") delay(5000) Data.forEach { send(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } }
受信機(Receiver)はボタンの押下で、データの要求・受信(receive/consumeEach)を開始します。
@Preview @Composable fun Produce_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: ReceiveChannel<Int> = remember { createProduce(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receive !") datCh.consumeEach { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Produce test !") } }
送信機は受信機からのデータの要求を待たずに処理を開始しています。これをホットストリーム(Hot Stream)といいます。
データの受信(要求)よりも前に送信が行われた場合、送信は休止して受信を待ちます。
79231 Start Send ! <--- createProduce() 79231 Compose ! <--- 送信処理の開始 90129 Start Receive ! <--- ボタン押下、データの要求(receive/consumeEach) 90129 Receive Data = 8 [main, 74532243] 90130 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 90131 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 90133 Receive Data = 4 [main, 74532243] 90134 Receive Data = 3 [main, 74532243] 90134 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 90136 Receive Data = 9 [main, 74532243] 90136 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 90139 Receive Data = 1 [main, 74532243] 90139 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] ※左端の数値はミリ秒
データの送信よりも前に受信(要求)が行われた場合、受信は休止して送信を待ちます。
86180 Start Send ! <--- createProduce() 86181 Compose ! <--- 送信処理の開始 88601 Start Receive ! <--- ボタン押下、データの要求(receive/consumeEach) 91184 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 91185 Receive Data = 8 [main, 120720841] 91187 Receive Data = 4 [main, 120720841] 91187 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 91188 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 91188 Receive Data = 3 [main, 120720841] 91189 Receive Data = 9 [main, 120720841] 91189 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 91190 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] 91192 Receive Data = 1 [main, 120720841] ※左端の数値はミリ秒
送信間隔>要求・受信間隔
「送信間隔>要求・受信間隔」(送信が遅い)のような通信経路の場合、受信は送信間隔に合わせて休止します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createProduce(scope: CoroutineScope) = scope.produce(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start Send !") delay(5000) Data.forEach { send(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }
@Preview @Composable fun Produce_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: ReceiveChannel<Int> = remember { createProduce(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receive !") datCh.consumeEach { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Produce test !") } }
13779 Compose ! <--- 送信処理の開始 13779 Start Send ! <--- createProduce() 16402 Start Receive ! <--- ボタン押下、データの要求(receive/consumeEach) 18787 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 18788 Receive Data = 8 [main, 120720841] 19789 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 19790 Receive Data = 4 [main, 120720841] 20792 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 20793 Receive Data = 3 [main, 120720841] 21796 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 21797 Receive Data = 9 [main, 120720841] 22800 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] 22800 Receive Data = 1 [main, 120720841] ※左端の数値はミリ秒
Channel#receiveはSuspend関数です。ノンブロッキング動作をします。ですので、要求したデータが取得できない場合は、データが取得できる状態になる(sendされる)まで休止します。
送信間隔<要求・受信間隔
「送信間隔<受信間隔」(受信が遅い)のような通信経路の場合、送信は受信間隔に合わせて休止します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createProduce(scope: CoroutineScope) = scope.produce(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start Send !") delay(5000) Data.forEach { send(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } }
@Preview @Composable fun Produce_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: ReceiveChannel<Int> = remember { createProduce(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receive !") datCh.consumeEach { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } } ) { Text(text = "Produce test !") } }
20157 Start Send ! <--- createProduce() 20157 Compose ! <--- 送信処理の開始 22699 Start Receive ! <--- ボタン押下、データの要求(receive/consumeEach) 25162 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 25162 Receive Data = 8 [main, 120720841] 26169 Receive Data = 4 [main, 120720841] 26169 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 27172 Receive Data = 3 [main, 120720841] 27172 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 28175 Receive Data = 9 [main, 120720841] 28175 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 29178 Receive Data = 1 [main, 120720841] 29179 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] ※左端の数値はミリ秒
Channel#sendはSuspend関数です。ノンブロッキング動作をします。ですので、データが要求されていない場合は、要求される(receiveされる)まで休止します。
通信経路
通信経路の最小構成は「送信機:受信機=1:1」です。
ここに、受信機を追加して、「送信機:受信機=1:多」のマルチレシーバー構成に出来ます。
この場合のストリームデータは、複数の受信機へ分配(スイッチ)されます。
分配は「受信機がデータを要求した順」になります。つまり、「早い者勝ち」です。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createProduce(scope: CoroutineScope) = scope.produce(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start Send !") delay(5000) Data.forEach { send(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }
@Preview @Composable fun Produce_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: ReceiveChannel<Int> = remember { createProduce(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receive !") datCh.consumeEach { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Produce test !") } }
68493 Compose ! 68493 Start Send ! 69581 Start Receive ! 69793 Start Receive ! 73499 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 73500 Receive Data = 8 [main, 120720841] 74503 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 74503 Receive Data = 4 [main, 179077884] 75507 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 75507 Receive Data = 3 [main, 120720841] 76511 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 76512 Receive Data = 9 [main, 179077884] 77516 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] 77516 Receive Data = 1 [main, 120720841]
他の受信機よりデータの要求を増やせば、より多く受信できる点に注意して下さい。
バッファー
Produceはバッファーを持つことができます。
バッファーの役割
バッファーは送信機と受信機の中間(Produce内)にある一時的な記憶領域です。送信機から受信機へ流れるデータの渋滞を緩和します。
送信機は、データの要求が無くても、自由なタイミングでバッファーへデータを送出できるので、送信が渋滞しません。ただし、バッファーに空きがある場合です。
受信機は、データの送信が無くても、自由なタイミングでバッファーからデータを取得できるので、受信が渋滞しません。ただし、バッファーにデータがある場合です。
バッファーの構成
バッファーは図のような構成になっています。
capacityがバッファーサイズを示しています。デフォルトは「capacity = 0」です。ですので、何も指定しなければ「バッファー無し」になっています。
また、オーバーフロー時の扱いはSUSPENDに固定で、「capacity ≧ 0」の時のみ有効です。
@ExperimentalCoroutinesApi public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E> = produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
バッファーの制御
バッファーはCorutineScope.produce関数のcapacity引数で制御できます。
@ExperimentalCoroutinesApi public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E> = produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
引数 | 概要 | |
---|---|---|
context | CoroutineContext | コンテキスト |
capacity | Int | バッファーサイズ(指定された値) 送受信のタイプ ・Rendezvous(0) ・Buffered(64) ・Conflated(-1) ・Unlimited(Int.MAX_VALUE) |
block | suspend ProducerScope | 実行するラムダ式(タスクブロック) |
capacityの値は、「バッファーのサイズ」と「送受信のタイプ(特別な動作)」の2つの役割が割り振られています。※「送受信のタイプ」については後述
バッファーの動作
バッファーの動作をサンプルで示します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createProduce(scope: CoroutineScope) = scope.produce( Dispatchers.Default, capacity = 3 // 3(Buffered) ) { Log.i(TAG, "${getMilliTime5()} Start Send !") delay(5000) Data.forEach { send(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }
@Preview @Composable fun Produce_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: ReceiveChannel<Int> = remember { createProduce(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receive !") datCh.consumeEach { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Produce test !") } }
67657 Compose ! 67658 Start Send ! 72660 Send Data = 8 [DefaultDispatcher-worker-1, 95538892] 送信機⇒Buffer 73664 Send Data = 4 [DefaultDispatcher-worker-1, 95538892] 〃 74668 Send Data = 3 [DefaultDispatcher-worker-1, 95538892] 〃 82606 Start Receive ! 82606 Receive Data = 8 [main, 169843152] Buffer⇒受信機 82607 Receive Data = 4 [main, 169843152] 〃 82608 Receive Data = 3 [main, 169843152] 〃 82609 Receive Data = 9 [main, 169843152] 82607 Send Data = 9 [DefaultDispatcher-worker-1, 95538892] 83615 Send Data = 1 [DefaultDispatcher-worker-1, 95538892] 83615 Receive Data = 1 [main, 169843152]
送信機⇒Buffer(送信側)
送信されたデータはバッファーに入り、新しいデータが送信される毎に「積み上げる方向」へシフトされて行きます。
オーバーフローになると送信は休止します。
なお、indexは受信時の取り出し位置です。
Buffer⇒受信機(受信側)
受信されるデータはindexの位置から取り出されます。
データの取り出されたバッファは無効になり、新しいデータが送信されることにより再び埋められます。
送受信のタイプ
capacityはバッファーのサイズを指定する引数ですが、値の一部にパラメータ名が定義されており、「送受信のタイプ」の役割を合わせ持ちます。
タイプ | capacityの値 ( )内はパラメータ名 | 振舞い |
---|---|---|
Rendezvous | 0(RENDEZVOUS)デフォルト | バッファー無し |
Buffered | 64(BUFFERED) | バッファーサイズ |
Conflated | -1(CONFLATED) | 特別な動作 |
Unlimited | Int.MAX_VALUE(UNLIMITED) | バッファーサイズ |
この中で、「Conflated」は特別な動作をします。その他は、ただのサイズです。
private val Data = arrayOf(8, 4, 3, 9, 1) //private val Data = Array(100) { it } fun createProduce(scope: CoroutineScope) = scope.produce( Dispatchers.Default, capacity = RENDEZVOUS // 0(デフォルト) // capacity = 3 // 3 // capacity = BUFFERED // 64 // capacity = CONFLATED // -1 // capacity = UNLIMITED // Int.MAX_VALUE ) { Log.i(TAG, "${getMilliTime5()} Start Send !") delay(5000) Data.forEach { send(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } }
@Preview @Composable fun Produce_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: ReceiveChannel<Int> = remember { createProduce(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receive !") datCh.consumeEach { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } } ) { Text(text = "Produce test !") } }
Rendezvous
バッファサイズは「0(無し)」です。
受信側は送信側へ直にデータを要求し、送信側は受信側へ直にデータを渡します。
送信データが無い時、受信側は休止して送信を待ちます。
受信の要求が無い時、送信側は休止して要求を待ちます。
25400 Start Send ! 25400 Compose ! 29660 Start Receive ! 30405 Receive Data = 8 [main, 120720841] 30405 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 31408 Receive Data = 4 [main, 120720841] 31408 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 32413 Receive Data = 3 [main, 120720841] 32413 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 33416 Receive Data = 9 [main, 120720841] 33416 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 34418 Receive Data = 1 [main, 120720841] 34418 Send Data = 1 [DefaultDispatcher-worker-1, 169843152]
デフォルトは「capacity = RENDEZVOUS」です。ですので、「基本的な例」で示した例は全てRendezvousになります。
Buffered
バッファサイズは「64」です。
送信側はバッファが空いている限りデータをバッファへ格納します。バッファが埋まれば休止します。
受信側はバッファーにデータがあればバッファーから受信します。バッファが空になれば休止します。
58719 Start Send ! 58719 Compose ! 63725 Send Data = 0 [DefaultDispatcher-worker-1, 232751359] 63726 Send Data = 1 [DefaultDispatcher-worker-1, 232751359] : : 63768 Send Data = 62 [DefaultDispatcher-worker-1, 232751359] 63768 Send Data = 63 [DefaultDispatcher-worker-1, 232751359] ⇐ 64個まで 67561 Start Receive ! 67561 Receive Data = 0 [main, 169843152] 67561 Send Data = 64 [DefaultDispatcher-worker-1, 232751359] 68563 Receive Data = 1 [main, 169843152] 68564 Send Data = 65 [DefaultDispatcher-worker-1, 232751359] 69567 Receive Data = 2 [main, 169843152] 69567 Send Data = 66 [DefaultDispatcher-worker-1, 232751359] 70572 Receive Data = 3 [main, 169843152] 70572 Send Data = 67 [DefaultDispatcher-worker-1, 232751359] 71577 Receive Data = 4 [main, 169843152] 71577 Send Data = 68 [DefaultDispatcher-worker-1, 232751359] : :
Conflated
バッファサイズは「1」です。特殊な動作をします。
送信側からバッファーへ送信されたデータはバッファー上で上書きされます。受信側はバッファーから上書き後の最終データを受信します。
33453 Start Send ! 33453 Compose ! 38457 Send Data = 8 [DefaultDispatcher-worker-1, 125789461] 38457 Send Data = 4 [DefaultDispatcher-worker-1, 125789461] 38458 Send Data = 3 [DefaultDispatcher-worker-1, 125789461] 38458 Send Data = 9 [DefaultDispatcher-worker-1, 125789461] 38459 Send Data = 1 [DefaultDispatcher-worker-1, 125789461] ⇐ 最終データ 40697 Start Receive ! 40697 Receive Data = 1 [main, 169843152]
Unlimited
バッファサイズは「Int.MAX_VALUE(2147483647)」です。「サイズは無制限」になります。
動作はBufferedと同じです。
ただし、バッファーのデータ数が無制限なので、メモリーのオーバーフローに注意してください。
07254 Compose ! 07254 Start Send ! 12257 Send Data = 0 [DefaultDispatcher-worker-1, 125789461] 12257 Send Data = 1 [DefaultDispatcher-worker-1, 125789461] 12258 Send Data = 2 [DefaultDispatcher-worker-1, 125789461] : : 12322 Send Data = 98 [DefaultDispatcher-worker-1, 125789461] 12323 Send Data = 99 [DefaultDispatcher-worker-1, 125789461] : : ⇐ 2147483647個まで可能(サンプルは100で制限)
状態の監視
Produceによって送受信されるストリームデータは、値の監視が行われません。ですので、再Composeはスケジューリングされません。
再Composeのスケジューリングを可能にするためには、ストリームデータを監視データへ変換します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createProduce(scope: CoroutineScope) = scope.produce(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start Send !") delay(5000) Data.forEach { send(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }
@Preview @Composable fun Produce_Monitor( scope: CoroutineScope = rememberCoroutineScope(), datCh: ReceiveChannel<Int> = remember { createProduce(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") // ストリームデータ -> 監視可データ val _data = remember { mutableStateOf(0) } LaunchedEffect(Unit) { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.consumeEach { // 要求・受信 _data.value = it // 監視データへ変換 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") } } Text( text = "${getMilliTime5()} Receive Data = ${_data.value}", fontSize = 20.sp ) }
26472 Start Send ! 26472 Compose ! 26672 Start Receiver ! 31476 Send Data = 8 [DefaultDispatcher-worker-1, 138257264] 31476 Receive Data = 8 [main, 44754665] 31486 Compose ! 32477 Send Data = 4 [DefaultDispatcher-worker-1, 138257264] 32479 Receive Data = 4 [main, 44754665] 32485 Compose ! 33479 Send Data = 3 [DefaultDispatcher-worker-1, 138257264] 33480 Receive Data = 3 [main, 44754665] 33484 Compose ! 34481 Send Data = 9 [DefaultDispatcher-worker-1, 138257264] 34482 Receive Data = 9 [main, 44754665] 34487 Compose ! 35483 Send Data = 1 [DefaultDispatcher-worker-1, 138257264] 35483 Receive Data = 1 [main, 44754665] 35502 Compose !
ちなみに、StateFlowは「State(状態の監視)+Flow(通信経路:SharedFlow)」です。StateFlowを用いれば、ここで紹介した「ストリームデータを監視データへ変換」する作業は必要ありません。
再Composeのスケジューリングを可能にする方法として、StateFlowの利用が定石になています。※詳細は「Coroutine:StateFlow」を参照
関連記事: