KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。
Channel、Produce、Flow、SharedFlow、StateFlowなどです。
これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。
プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。
ですので、各々を比較しつつ、まとめました。
この記事は「SharedFlow」について、まとめたものです。
※環境:Android Studio Koala Feature Drop | 2024.1.2
Kotlin 1.9.0
Compose Compiler 1.5.1
org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3
目次
SharedFlowはコルーチン間で複数の連続したメッセージを送受信する仕組みです。
ホットストリーム(Hot Stream)の通信経路を提供します。
※メッセージの送受信については「Coroutine:コルーチン間でメッセージの送受信」を参照
SharedFlowは、表のような特徴を持ちます。
ストリームタイプ | 通信経路 | 状態の監視※ | |
---|---|---|---|
Produce / Channel | Hot | データ分岐 | |
Flow(SafeFlow) | Cold | 1対1 | |
SharedFlow | Hot | ブロードキャスト | |
StateFlow | |||
※状態の監視 :再Composeのスケジューリングが可能かどうか |
なお、SharedFlowとStateFlowはFlowを継承しています。よって、Flowは両者の基底クラスです。
基本的な例とストリームタイプ
以下はSharedFlowの基本的な例です。
サンプルの動作は単純で、5つの整数値(配列:Data)をワーカーからメインスレッドへ送ります。
送信機(Sender)は処理の開始(コルーチンの起動)から5000ms後に、データの送信(emit)を開始します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createSharedFlow(scope: CoroutineScope) = MutableSharedFlow<Int>().apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start sender !") delay(5000) Data.forEach { this@apply.emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } }.asSharedFlow()
受信機(Receiver)はボタンの押下でデータの要求・受信(collect)を開始します。
@Preview @Composable fun SharedFlow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: SharedFlow<Int> = remember { createSharedFlow(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "SharedFlow test !") } }
送信機は受信機からのデータの要求を待たずに処理を開始しています。これをホットストリーム(Hot Stream)といいます。
ただし、要求(collect)よりも前に発行された送信(emit)は破棄されてしまうので、注意してください。
19674 Compose ! <--- createSharedFlow() 19674 Start sender ! <--- 送信処理の開始 24681 Send Data = 8 [DefaultDispatcher-worker-1, 46343680] 24682 Send Data = 4 [DefaultDispatcher-worker-1, 46343680] 24683 Send Data = 3 [DefaultDispatcher-worker-1, 46343680] 24687 Send Data = 9 [DefaultDispatcher-worker-1, 46343680] 24688 Send Data = 1 [DefaultDispatcher-worker-1, 46343680] 29042 Start Receiver ! <--- ボタン押下、データの要求(collect)
データの取得(正常なデータの受け渡し)を行いたければ、要求(collect)を送信(emit)の前に行う必要があります。これにより通信経路が確立されます。
17238 Compose ! <--- createSharedFlow() 17238 Start sender ! <--- 送信処理の開始 19995 Start Receiver ! <--- ボタン押下、データの要求(collect) 22246 Receive Data = 8 [main, 73716183] 22246 Send Data = 8 [DefaultDispatcher-worker-1, 170330820] 22248 Receive Data = 4 [main, 73716183] 22248 Send Data = 4 [DefaultDispatcher-worker-1, 170330820] 22250 Receive Data = 3 [main, 73716183] 22250 Send Data = 3 [DefaultDispatcher-worker-1, 170330820] 22252 Receive Data = 9 [main, 73716183] 22253 Send Data = 9 [DefaultDispatcher-worker-1, 170330820] 22254 Receive Data = 1 [main, 73716183] 22254 Send Data = 1 [DefaultDispatcher-worker-1, 170330820]
送信間隔>要求・受信間隔
「送信間隔>要求・受信間隔」(送信が遅い)のような通信経路の場合、受信は送信間隔に合わせて休止します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createSharedFlow(scope: CoroutineScope) = MutableSharedFlow<Int>().apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start sender !") delay(5000) Data.forEach { this@apply.emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } }.asSharedFlow()
@Preview @Composable fun SharedFlow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: SharedFlow<Int> = remember { createSharedFlow(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "SharedFlow test !") } }
07999 Start sender ! <--- 送信処理の開始 08001 Compose ! <--- createSharedFlow() 10826 Start Receiver ! <--- ボタン押下、データの要求(collect) 13005 Receive Data = 8 [main, 243112817] 13006 Send Data = 8 [DefaultDispatcher-worker-2, 188599638] 14009 Receive Data = 4 [main, 243112817] 14009 Send Data = 4 [DefaultDispatcher-worker-2, 188599638] 15012 Receive Data = 3 [main, 243112817] 15012 Send Data = 3 [DefaultDispatcher-worker-2, 188599638] 16015 Receive Data = 9 [main, 243112817] 16015 Send Data = 9 [DefaultDispatcher-worker-2, 188599638] 17018 Receive Data = 1 [main, 243112817] 17018 Send Data = 1 [DefaultDispatcher-worker-2, 188599638]
SharedFlow#collectはSuspend関数です。ノンブロッキング動作をします。ですので、要求したデータが取得できない場合は、データが取得できる状態になる(emitされる)まで休止します。
送信間隔<要求・受信間隔
「送信間隔<受信間隔」(受信が遅い)のような通信経路の場合、送信は受信間隔に合わせて休止します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createSharedFlow(scope: CoroutineScope) = MutableSharedFlow<Int>().apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start sender !") delay(5000) Data.forEach { this@apply.emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } }.asSharedFlow()
@Preview @Composable fun SharedFlow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: SharedFlow<Int> = remember { createSharedFlow(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } } ) { Text(text = "SharedFlow test !") } }
10862 Compose ! <--- createSharedFlow() 10863 Start sender ! <--- 送信処理の開始 13299 Start Receiver ! <--- ボタン押下、データの要求(collect) 15869 Receive Data = 8 [main, 73716183] 15869 Send Data = 8 [DefaultDispatcher-worker-2, 170330820] 16872 Receive Data = 4 [main, 73716183] 16872 Send Data = 4 [DefaultDispatcher-worker-2, 170330820] 17874 Receive Data = 3 [main, 73716183] 17874 Send Data = 3 [DefaultDispatcher-worker-2, 170330820] 18876 Receive Data = 9 [main, 73716183] 18877 Send Data = 9 [DefaultDispatcher-worker-2, 170330820] 19879 Receive Data = 1 [main, 73716183] 19879 Send Data = 1 [DefaultDispatcher-worker-2, 170330820]
Flow#emitはSuspend関数です。ノンブロッキング動作をします。ですので、データが要求されていない場合は、要求される(collectされる)まで休止します。
通信経路
通信経路の最小構成は「送信機:受信機=1:1」です。
ここに、受信機を追加して、「送信機:受信機=1:多」のマルチレシーバー構成に出来ます。
この場合、送信機は複数の受信機で共有され(Shared)、ストリームデータは全ての受信機へブロードキャスト(一斉送信)されます。
ブロードキャストは「全ての受信機がデータを受信して、送信は完了する動作(送受信サイクル)」で成り立っています。これは、送信機と受信機が休止を使って、歩調を合わせることで実現されます。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createSharedFlow(scope: CoroutineScope) = MutableSharedFlow<Int>().apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start sender !") delay(5000) Data.forEach { this@apply.emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } }.asSharedFlow()
@Preview @Composable fun SharedFlow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }, interval: Long = 1000 ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(interval) } } } ) { Text(text = "SharedFlow test ! Interval=${interval}") } }
24202 Start sender !
24202 Compose !
24255 Compose !
25414 Start Receiver !
27462 Start Receiver !
29207 Receive Data = 8 [main, 159137840]
29207 Receive Data = 8 [main, 177611945]
29208 Send Data = 8 [DefaultDispatcher-worker-1, 80693806]
30212 Receive Data = 4 [main, 159137840]
32212 Receive Data = 4 [main, 177611945]
32213 Send Data = 4 [DefaultDispatcher-worker-1, 80693806]
33216 Receive Data = 3 [main, 159137840]
35215 Receive Data = 3 [main, 177611945]
35215 Send Data = 3 [DefaultDispatcher-worker-1, 80693806]
36217 Receive Data = 9 [main, 159137840] ⇐ 送受信サイクル
38218 Receive Data = 9 [main, 177611945]
38219 Send Data = 9 [DefaultDispatcher-worker-1, 80693806]
39222 Receive Data = 1 [main, 159137840]
41221 Receive Data = 1 [main, 177611945]
41222 Send Data = 1 [DefaultDispatcher-worker-1, 80693806]
なお、送受信サイクルは「送信間隔と受信間隔の最大値(サンプルの場合は3000ms)」になります。
バッファー
SharedFlowはバッファーを持つことができます。
バッファーの役割
バッファーは送信機と受信機の中間(ShareFlow内)にある一時的な記憶領域です。送信機から受信機へ流れるデータの渋滞を緩和します。
送信機は、データの要求が無くても、自由なタイミングでバッファーへデータを送出できるので、送信が渋滞しません。ただし、バッファーに空きがある場合です。
受信機は、データの送信が無くても、自由なタイミングでバッファーからデータを取得できるので、受信が渋滞しません。ただし、バッファーにデータがある場合です。
バッファーの構成
バッファーは図のような構成になっています。
「エキストラバッファー + リプレイキャッシュ」がバッファーです。
replayとextraBufferCapacityが各々のサイズを示しています。ですので、バッファーのサイズは両者を足したものになります。
リプレイキャッシュ最新のデータを保持し続ける特殊なバッファーです。取得が行われても消去されません。
エキストラバッファー一般的なバッファーです。取得が行われると消去されます。
※「リプレイ」ならびに「オーバーフロー」については後述
バッファーの制御
バッファーはMutableShareFlow関数の引数で制御できます。
@Suppress("FunctionName", "UNCHECKED_CAST") public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T> { ... }
引数 | 概要 | |
---|---|---|
replay | Int | リプレイキャッシュのサイズ |
extraBufferCapacity | Int | エキストラバッファーのサイズ |
onBufferOverflow | BufferOverflow | オーバーフロー時の扱い |
バッファーの動作
バッファーの動作をサンプルで示します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createSharedFlow(scope: CoroutineScope) = MutableSharedFlow<Int>( replay = 2, extraBufferCapacity = 2, onBufferOverflow = BufferOverflow.SUSPEND // デフォルト ).apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start sender !") delay(5000) Data.forEach { this@apply.emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } }.asSharedFlow()
@Preview @Composable fun SharedFlow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: SharedFlow<Int> = remember { createSharedFlow(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } } ) { Text(text = "SharedFlow test !") } }
1番目のデータ「8」はバッファーを介さずに送受信されます。バッファーを利用するのは2番目以降のデータです。
03489 Compose ! 03489 Start sender ! 05107 Start Receiver ! 08496 Send Data = 8 [DefaultDispatcher-worker-1, 53555502] 08496 Receive Data = 8 [main, 8529103] 08497 Send Data = 4 [DefaultDispatcher-worker-1, 53555502] 送信機⇒Buffer 08500 Send Data = 3 [DefaultDispatcher-worker-1, 53555502] 〃 08501 Send Data = 9 [DefaultDispatcher-worker-1, 53555502] 〃 08501 Send Data = 1 [DefaultDispatcher-worker-1, 53555502] 〃 09502 Receive Data = 4 [main, 8529103] Buffer⇒受信機 10507 Receive Data = 3 [main, 8529103] 〃 11513 Receive Data = 9 [main, 8529103] 〃 12518 Receive Data = 1 [main, 8529103] 〃
送信機⇒Buffer(送信側)
送信されたデータはリプレイキャッシュに入り、新しいデータが送信される毎に「積み上げる方向」へシフトされて行きます。
なお、indexは受信時の取り出し位置です。
Buffer⇒受信機(受信側)
受信されるデータはindexの位置から取り出されます。
データの取り出されたバッファは無効になり、新しいデータが送信されることにより再び埋められます。
ただし、リプレイキャッシュ―はデータが残り、無効になりません。
オーバーフロー
オーバーフローは、バッファーに空きがない状態(全て埋まっている、バッファーサイズ=0)で、新たなデータが送信(入力)された場合に発生します。
オーバーフローになった時、「新たなデータの扱い」が問題になります。
引数onBufferOverflowは、この「新たなデータの扱い」を指定します。
パレメータ (BufferOverflow.***) | 動作 | 一時休止 |
---|---|---|
SUSPEND | バッファーに空きができるまで休止して待つ | する |
DROP_OLDEST | バッファーの最も古い値を削除し、入力された値を追加 | しない |
DROP_LATEST | バッファーの値を保持し、入力された値を破棄 |
バッファーサイズ > 0
private val Data = arrayOf(8, 4, 3, 9, 1) fun createSharedFlow(scope: CoroutineScope) = MutableSharedFlow<Int>( replay = 1, extraBufferCapacity = 2, onBufferOverflow = BufferOverflow.SUSPEND // デフォルト // onBufferOverflow = BufferOverflow.DROP_OLDEST // onBufferOverflow = BufferOverflow.DROP_LATEST ).apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start sender !") delay(5000) Data.forEach { this@apply.emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } }.asSharedFlow()
@Preview @Composable fun SharedFlow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: SharedFlow<Int> = remember { createSharedFlow(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } } ) { Text(text = "SharedFlow test !") } }
20377 Compose ! 20376 Start sender ! 21733 Start Receiver ! 25384 Send Data = 8 [DefaultDispatcher-worker-1, 53555502] 25385 Receive Data = 8 [main, 8529103] 25385 Send Data = 4 [DefaultDispatcher-worker-1, 53555502] 25385 Send Data = 3 [DefaultDispatcher-worker-1, 53555502] 25386 Send Data = 9 [DefaultDispatcher-worker-1, 53555502] ⇐ 空き無し、emitは休止 26390 Receive Data = 4 [main, 8529103] ⇐ 取り出し 26390 Send Data = 1 [DefaultDispatcher-worker-1, 53555502] ⇐ 空き有り、emitは復帰・1を追加 27393 Receive Data = 3 [main, 8529103] 28400 Receive Data = 9 [main, 8529103] 29404 Receive Data = 1 [main, 8529103]
66242 Compose ! 66242 Start sender ! 68610 Start Receiver ! 71248 Send Data = 8 [DefaultDispatcher-worker-1, 166465328] 71249 Receive Data = 8 [main, 88295337] 71249 Send Data = 4 [DefaultDispatcher-worker-1, 166465328] 71250 Send Data = 3 [DefaultDispatcher-worker-1, 166465328] 71252 Send Data = 9 [DefaultDispatcher-worker-1, 166465328] ⇐ 空き無し 71253 Send Data = 1 [DefaultDispatcher-worker-1, 166465328] ⇐ 4を削除、1を追加 72254 Receive Data = 3 [main, 88295337] 73257 Receive Data = 9 [main, 88295337] 74259 Receive Data = 1 [main, 88295337]
03116 Compose ! 03115 Start sender ! 05754 Start Receiver ! 08123 Send Data = 8 [DefaultDispatcher-worker-1, 166465328] 08123 Receive Data = 8 [main, 88295337] 08124 Send Data = 4 [DefaultDispatcher-worker-1, 166465328] 08124 Send Data = 3 [DefaultDispatcher-worker-1, 166465328] 08125 Send Data = 9 [DefaultDispatcher-worker-1, 166465328] ⇐ 空き無し 08126 Send Data = 1 [DefaultDispatcher-worker-1, 166465328] ⇐ 1を破棄 09126 Receive Data = 4 [main, 88295337] 10128 Receive Data = 3 [main, 88295337] 11132 Receive Data = 9 [main, 88295337]
バッファーサイズ = 0
「バッファーサイズ(replay + extraBufferCapacity)= 0」の時は、SUSUPEND以外を指定できません。
これは、引数の要件を満たせないためです。
@Suppress("FunctionName", "UNCHECKED_CAST") public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T> { require(replay >= 0) { "replay cannot be negative, but was $replay" } require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" } require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) { "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow" } ... }
SUSUPEND以外を指定すると例外になり、プログラムは停止します。
java.lang.IllegalArgumentException:
replay or extraBufferCapacity must be positive
with non-default onBufferOverflow strategy DROP_LATEST
リプレイ
マルチレシーバー構成にした時、追加された2番目以降のレシーバーは、最初にリプレイキャッシュからデータを受信します。
リプレイキャッシュは最新のデータを保持し続ける特殊なバッファーです。取得が行われても、保持した値は無効になりません。
ですので、追加された全てのレシーバで、同じデータを繰り返し受信(リプレイ)することになります。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createSharedFlow(scope: CoroutineScope) = // MutableSharedFlow<Int>().apply { MutableSharedFlow<Int>( replay = 2, extraBufferCapacity = 2 ).apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start sender !") delay(5000) Data.forEach { this@apply.emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } }.asSharedFlow()
@Preview @Composable fun SharedFlow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: SharedFlow<Int> = remember { createSharedFlow(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } } ) { Text(text = "SharedFlow test !") } }
44567 Start sender ! 44567 Compose ! 46522 Start Receiver ! ⇐ 1st Receiver 49573 Send Data = 8 [DefaultDispatcher-worker-1, 155257474] 49574 Receive Data = 8 [main, 74532243] 49574 Send Data = 4 [DefaultDispatcher-worker-1, 155257474] 49576 Send Data = 3 [DefaultDispatcher-worker-1, 155257474] 49578 Send Data = 9 [DefaultDispatcher-worker-1, 155257474] 49581 Send Data = 1 [DefaultDispatcher-worker-1, 155257474] 50578 Receive Data = 4 [main, 74532243] 51582 Receive Data = 3 [main, 74532243] 52587 Receive Data = 9 [main, 74532243] 53591 Receive Data = 1 [main, 74532243] 57363 Start Receiver ! ⇐ 2nd Receiver 57363 Receive Data = 9 [main, 120720841] 58367 Receive Data = 1 [main, 120720841] 60337 Start Receiver ! ⇐ 3rd Receiver 60338 Receive Data = 9 [main, 179077884] 61341 Receive Data = 1 [main, 179077884]
状態の監視
SharedFlowによって送受信されるストリームデータは、値の監視が行われません。ですので、再Composeはスケジューリングされません。
再Composeのスケジューリングを可能にするためには、ストリームデータを監視データへ変換します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createSharedFlow(scope: CoroutineScope) = MutableSharedFlow<Int>().apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start sender !") delay(5000) Data.forEach { this@apply.emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } }.asSharedFlow()
@Preview @Composable fun SharedFlow_Monitor( scope: CoroutineScope = rememberCoroutineScope(), datCh: SharedFlow<Int> = remember { createSharedFlow(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") // ストリームデータ -> 監視可データ val _data = remember { mutableStateOf(0) } LaunchedEffect(Unit) { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 _data.value = it // 監視データへ変換 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") } } Text( text = "${getMilliTime5()} Receive Data = ${_data.value}", fontSize = 20.sp ) }
21666 Compose ! 21666 Start sender ! 21890 Start Receiver ! 26672 Receive Data = 8 [main, 138257264] 26672 Send Data = 8 [DefaultDispatcher-worker-1, 44754665] 26678 Compose ! 27674 Receive Data = 4 [main, 138257264] 27674 Send Data = 4 [DefaultDispatcher-worker-1, 44754665] 27679 Compose ! 28677 Send Data = 3 [DefaultDispatcher-worker-1, 44754665] 28677 Receive Data = 3 [main, 138257264] 28694 Compose ! 29680 Receive Data = 9 [main, 138257264] 29680 Send Data = 9 [DefaultDispatcher-worker-1, 44754665] 29693 Compose ! 30684 Receive Data = 1 [main, 138257264] 30684 Send Data = 1 [DefaultDispatcher-worker-1, 44754665] 30693 Compose !
ちなみに、StateFlowは「State(状態の監視)+Flow(通信経路:SharedFlow)」です。StateFlowを用いれば、ここで紹介した「ストリームデータを監視データへ変換」する作業は必要ありません。
再Composeのスケジューリングを可能にする方法として、StateFlowの利用が定石になています。※詳細は「Coroutine:StateFlow」を参照
関連記事: