KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。
Channel、Produce、Flow、SharedFlow、StateFlowなどです。
これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。
プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。
ですので、各々を比較しつつ、まとめました。
この記事は「Flow」について、まとめたものです。
※環境:Android Studio Koala Feature Drop | 2024.1.2
Kotlin 1.9.0
Compose Compiler 1.5.1
org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3
目次
Flowとは
Flowはコルーチン間で複数の連続したメッセージを送受信する仕組みです。
コールドストリーム(Cold Stream)の通信経路を提供します。
※メッセージの送受信については「Coroutine:コルーチン間でメッセージの送受信」を参照
Flowは、表のような特徴を持ちます。
ストリームタイプ | 通信経路 | 状態の監視※ | |
---|---|---|---|
Produce / Channel | Hot | データ分岐 | |
Flow(SafeFlow) | Cold | 1対1 | |
SharedFlow | Hot | ブロードキャスト | |
StateFlow | |||
※状態の監視 :再Composeのスケジューリングが可能かどうか |
なお、SharedFlowとStateFlowはFlowを継承しています。よって、Flowは両者の基底(親またはスーパー)クラスです。
Flowはインターフェイスなので、実装はビルダー関数(flow関数など)により行われる点に注意してください。
基本的な例とストリームタイプ
以下はFlowの基本的な例です。flow関数を用いて実装しています。
サンプルの動作は単純で、5つの整数値(配列:Data)をワーカーからメインスレッドへ送ります。
送信機(Sender)は処理の開始(コルーチンの起動)から5000ms後に、データの送信(emit)を開始します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } }.flowOn(Dispatchers.Default)
受信機(Receiver)はボタンの押下でデータの要求・受信(collect)を開始します。
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test !") } }
送信機は受信機からのデータの要求を受けて処理を開始しています。これをコールドストリーム(Cold Stream)といいます。
データの送信よりも前に受信(要求)が行われた場合、受信は休止して送信を待ちます。
46656 Compose ! <--- createFlow() 51419 Start Receiver ! <--- ボタン押下、データの要求(collect) 51427 Start Sender ! <--- 送信処理の開始 56433 Send Data = 8 [DefaultDispatcher-worker-1, 29056681] 56433 Receive Data = 8 [main, 55901230] 56434 Send Data = 4 [DefaultDispatcher-worker-1, 29056681] 56434 Receive Data = 4 [main, 55901230] 56435 Send Data = 3 [DefaultDispatcher-worker-1, 29056681] 56435 Receive Data = 3 [main, 55901230] 56436 Send Data = 9 [DefaultDispatcher-worker-1, 29056681] 56436 Receive Data = 9 [main, 55901230] 56437 Send Data = 1 [DefaultDispatcher-worker-1, 29056681] 56437 Receive Data = 1 [main, 55901230] ※左端の数値はミリ秒
送信間隔>要求・受信間隔
「送信間隔>要求・受信間隔」(送信が遅い)のような通信経路の場合、受信は送信間隔に合わせて休止します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) // 送信間隔:1000ms } }.flowOn(Dispatchers.Default)
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) // 受信間隔:0ms } } } ) { Text(text = "Flow test !") } }
17653 Compose ! <--- createFlow() 20771 Start Receiver ! <--- ボタン押下、データの要求(collect) 20779 Start Sender ! <--- 送信処理の開始 25786 Send Data = 8 [DefaultDispatcher-worker-1, 29056681] 25787 Receive Data = 8 [main, 55901230] 26790 Send Data = 4 [DefaultDispatcher-worker-1, 29056681] 26791 Receive Data = 4 [main, 55901230] 27796 Send Data = 3 [DefaultDispatcher-worker-1, 29056681] 27796 Receive Data = 3 [main, 55901230] 28798 Receive Data = 9 [main, 55901230] 28798 Send Data = 9 [DefaultDispatcher-worker-1, 29056681] 29804 Receive Data = 1 [main, 55901230] 29804 Send Data = 1 [DefaultDispatcher-worker-1, 29056681] ※左端の数値はミリ秒
Flow#collectはSuspend関数です。ノンブロッキング動作をします。ですので、要求したデータが取得できない場合は、データが取得できる状態になる(emitされる)まで休止します。
送出間隔<要求・受信間隔
「送信間隔<要求・受信間隔」(受信が遅い)のような通信経路の場合、送信データはFlow内のバッファーへ保持され、受信間隔に合わせて取り出されます。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) // 送信間隔:0ms } }.flowOn(Dispatchers.Default)
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) // 受信間隔:1000ms } } } ) { Text(text = "Flow test !") } }
42513 Compose ! <--- createFlow() 44604 Start Receiver ! <--- ボタン押下、データの要求(collect) 44610 Start Sender ! <--- 送信処理の開始 49619 Send Data = 8 [DefaultDispatcher-worker-1, 29056681] 49620 Send Data = 4 [DefaultDispatcher-worker-1, 29056681] 49621 Receive Data = 8 [main, 55901230] 49622 Send Data = 3 [DefaultDispatcher-worker-1, 29056681] 49625 Send Data = 9 [DefaultDispatcher-worker-1, 29056681] 49629 Send Data = 1 [DefaultDispatcher-worker-1, 29056681] 50628 Receive Data = 4 [main, 55901230] 51633 Receive Data = 3 [main, 55901230] 52636 Receive Data = 9 [main, 55901230] 53641 Receive Data = 1 [main, 55901230] ※左端の数値はミリ秒
Flowはバッファーを持つことが出来ます。
単に、flow関数により実装したFlowインスタンスは、バッファー無し(バッファーサイズ ⇐ 0)になります。
しかし、サンプルのようにFlow#flowOn(後述)を用いると、勝手にバッファーが追加されます。デフォルトサイズは64です。
サイズはFlow#buffer拡張関数(後述)により変更が可能です。
通信経路
通信経路の最小構成は「送信機:受信機=1:1」です。
ここに、受信機を追加すると、対応する送信機が起動され、「送信機:受信機=1:1」の新たな通信経路が構築されます。
各々の通信経路は独立しています。通信経路間で影響を及ぼす動作(例えば休止)は行われません。
※以下のサンプルは「送信間隔<要求・受信間隔」の場合と同じ
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) // 送信間隔:0ms } }.flowOn(Dispatchers.Default)
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) // 受信間隔:1000ms } } } ) { Text(text = "Flow test !") } }
27512 Compose ! 29146 Start Receiver ! 29155 Start Sender ! 34160 Send Data = 8 [DefaultDispatcher-worker-1, 8529103] 34161 Receive Data = 8 [main, 96136796] 34161 Send Data = 4 [DefaultDispatcher-worker-1, 8529103] 34161 Send Data = 3 [DefaultDispatcher-worker-1, 8529103] 34162 Send Data = 9 [DefaultDispatcher-worker-1, 8529103] 34163 Send Data = 1 [DefaultDispatcher-worker-1, 8529103] 35164 Receive Data = 4 [main, 96136796] 36167 Receive Data = 3 [main, 96136796] 37169 Receive Data = 9 [main, 96136796] 38172 Receive Data = 1 [main, 96136796] 41561 Start Receiver ! 41562 Start Sender ! 46565 Send Data = 8 [DefaultDispatcher-worker-1, 237387499] 46565 Send Data = 4 [DefaultDispatcher-worker-1, 237387499] 46565 Receive Data = 8 [main, 37131336] 46566 Send Data = 3 [DefaultDispatcher-worker-1, 237387499] 46567 Send Data = 9 [DefaultDispatcher-worker-1, 237387499] 46568 Send Data = 1 [DefaultDispatcher-worker-1, 237387499] 47570 Receive Data = 4 [main, 37131336] 48574 Receive Data = 3 [main, 37131336] 49581 Receive Data = 9 [main, 37131336] 50585 Receive Data = 1 [main, 37131336] ※左端の数値はミリ秒
なお、並列(通信経路を同時刻に構築)に動作させるても、経路の独立は保たれます。
バッファー
Flowはバッファーを持つことができます。
バッファーの役割
バッファーは送信機と受信機の中間(Flow内)にある一時的な記憶領域です。送信機から受信機へ流れるデータの渋滞を緩和します。
送信機は、データの要求が無くても、自由なタイミングでバッファーへデータを送出できるので、送信が渋滞しません。ただし、バッファーに空きがある場合です。
受信機は、データの送信が無くても、自由なタイミングでバッファーからデータを取得できるので、受信が渋滞しません。ただし、バッファーにデータがある場合です。
バッファーの構成
バッファーは図のような構成になっています。
capacityがバッファーサイズを示しています。デフォルトは「capacity = 64(BUFFERED)」です。ですので、何も指定しなければ「バッファー有り」になっています。
※「オーバーフロー」については後述
バッファーの利用
単に、ファクトリ関数(flow、flowOf、など)で実装されたFlowインスタンスは、バッファー無し(バッファーサイズ ⇐ 0)になります。
このインスタンスでバッファーを利用する方法は2つあります。
(1)Flow#buffer拡張関数
buffer拡張関数の指定でバッファーは利用可能になります。
fun createFlow() = flow { ... } // バッファー無し fun createFlow() = flow { ... }.buffer() // バッファー有り(サイズ ⇐ 64)
@Suppress("NAME_SHADOWING") public fun <T> Flow<T>.buffer( capacity: Int = BUFFERED, // サイズ ⇐ 64 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): Flow<T> { ... }
(2)Flow#flowOn拡張関数
flowOn拡張関数の指定でバッファーは利用可能になります。
fun createFlow() = flow { ... } // バッファー無し fun createFlow() = flow { ... }.flowOn(...) // バッファー有り(サイズ ⇐ 64)
FusibleFlowとChannelFlowOperatorImplの引数capacityのデフォルトはOPTIONAL_CHANNELです。
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } }
FusibleFlowまたはChannelFlowOperatorImplと継承関係にあるクラスChannelFlowで、OPTIONAL_CHANNEL⇒BUFFEREDへ置き換えられています。
@InternalCoroutinesApi public abstract class ChannelFlow<T>( // upstream context @JvmField public val context: CoroutineContext, // buffer capacity between upstream and downstream context @JvmField public val capacity: Int, // buffer overflow strategy @JvmField public val onBufferOverflow: BufferOverflow ) : FusibleFlow<T> { ... internal val produceCapacity: Int get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED // サイズ ⇐ 64 else capacity ... }
バッファーの制御
バッファーはFlow#buffer拡張関数の引数で制御できます。
@Suppress("NAME_SHADOWING") public fun <T> Flow<T>.buffer( capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): Flow<T> { ... }
引数 | 概要 | |
---|---|---|
capacity | Int | バッファーサイズ(指定された値) 送受信のタイプ ・Rendezvous(0) ・Buffered(64) ・Conflated(-1) ・Unlimited(Int.MAX_VALUE) |
onBufferOverflow | BufferOverflow | オーバーフロー時の扱い |
capacityの値は、「バッファーのサイズ」と「送受信のタイプ(特別な動作)」の2つの役割が割り振られています。※「送受信のタイプ」については後述
バッファーの動作
バッファーの動作をサンプルで示します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } }.flowOn(Dispatchers.Default) .buffer(capacity = 3)
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } } ) { Text(text = "Flow test !") } }
1番目のデータ「8」はバッファーを介さずに送受信されます。バッファーを利用するのは2番目以降のデータです。
71586 Compose ! 74689 Start Receiver ! 74696 Start Sender ! 79702 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 79702 Receive Data = 8 [main, 120720841] 79703 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 送信機⇒Buffer 79704 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 79705 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 80707 Receive Data = 4 [main, 120720841] Buffer⇒受信機 80708 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] 81712 Receive Data = 3 [main, 120720841] 82716 Receive Data = 9 [main, 120720841] 83721 Receive Data = 1 [main, 120720841]
送信機⇒Buffer(送信側)
送信されたデータはバッファーに入り、新しいデータが送信される毎に「積み上げる方向」へシフトされて行きます。
オーバーフローになると送信は休止します。
なお、indexは受信時の取り出し位置です。
Buffer⇒受信機(受信側)
受信されるデータはindexの位置から取り出されます。
データの取り出されたバッファは無効になり、新しいデータが送信されることにより再び埋められます。
送受信のタイプ
capacityはバッファーのサイズを指定する引数ですが、値の一部にパラメータ名が定義されており、「送受信のタイプ」の役割を合わせ持ちます。
タイプ | capacityの値 ( )内はパラメータ名 | 振舞い |
---|---|---|
Rendezvous | 0(RENDEZVOUS)デフォルト | バッファー無し |
Buffered | 64(BUFFERED) | バッファーサイズ |
Conflated | -1(CONFLATED) | 特別な動作 |
Unlimited | Int.MAX_VALUE(UNLIMITED) | バッファーサイズ |
この中で、「Conflated」は特別な動作をします。その他は、ただのサイズです。
private val Data = arrayOf(8, 4, 3, 9, 1) //private val Data2 = Array(100) { it } fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } }.flowOn(Dispatchers.Default) .buffer(capacity = RENDEZVOUS) // 0 // .buffer(capacity = 3) // 3 // .buffer(capacity = BUFFERED) // 64(デフォルト) // .buffer(capacity = CONFLATED) // -1 // .buffer(capacity = UNLIMITED) // Int.MAX_VALUE
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } } ) { Text(text = "Flow test !") } }
Rendezvous
バッファサイズは「0(無し)」です。
受信側は送信側へ直にデータを要求し、送信側は受信側へ直にデータを渡します。
送信データが無い時、受信側は休止して送信を待ちます。
受信の要求が無い時、送信側は休止して要求を待ちます。
56780 Compose ! 59530 Start Receiver ! 59540 Start Sender ! 64546 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 64546 Receive Data = 8 [main, 120720841] 65553 Receive Data = 4 [main, 120720841] 65553 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 66557 Receive Data = 3 [main, 120720841] 66557 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 67560 Receive Data = 9 [main, 120720841] 67560 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 68564 Receive Data = 1 [main, 120720841] 68564 Send Data = 1 [DefaultDispatcher-worker-1, 169843152]
Buffered
バッファサイズは「64」です。
送信側はバッファが空いている限りデータをバッファへ格納します。バッファが埋まれば休止します。
受信側はバッファーにデータがあればバッファーから受信します。バッファが空になれば休止します。
15754 Compose ! 18477 Start Receiver ! 18484 Start Sender ! 23494 Send Data = 0 [DefaultDispatcher-worker-1, 169843152] 23494 Receive Data = 0 [main, 120720841] ⇐ バッファーを介さない 23494 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] 23498 Send Data = 2 [DefaultDispatcher-worker-1, 169843152] : : 23550 Send Data = 63 [DefaultDispatcher-worker-1, 169843152] 23551 Send Data = 64 [DefaultDispatcher-worker-1, 169843152] ⇐ 64個(Data = 1~64) 24499 Receive Data = 1 [main, 120720841] 24499 Send Data = 65 [DefaultDispatcher-worker-1, 169843152] 25502 Send Data = 66 [DefaultDispatcher-worker-1, 169843152] 25502 Receive Data = 2 [main, 120720841] 26506 Send Data = 67 [DefaultDispatcher-worker-1, 169843152] 26506 Receive Data = 3 [main, 120720841] 27514 Receive Data = 4 [main, 120720841] 27514 Send Data = 68 [DefaultDispatcher-worker-1, 169843152] 28518 Receive Data = 5 [main, 120720841] 28519 Send Data = 69 [DefaultDispatcher-worker-1, 169843152] : :
デフォルトは「capacity = BUFFERED」です。ですので、「基本的な例」で示した例は全てBufferedになります。
Conflated
バッファサイズは「1」です。特殊な動作をします。
送信側からバッファーへ送信されたデータはバッファー上で上書きされます。受信側はバッファーから上書き後の最終データを受信します。
81200 Compose ! 83734 Start Receiver ! 83744 Start Sender ! 88751 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 88752 Receive Data = 8 [main, 120720841] ⇐ バッファーを介さない 88752 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 88755 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 88756 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 88757 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] ⇐ 最終データ 89757 Receive Data = 1 [main, 120720841]
Unlimited
バッファサイズは「Int.MAX_VALUE(2147483647)」です。「サイズは無制限」になります。
動作はBufferedと同じです。
ただし、バッファーのデータ数が無制限なので、メモリーのオーバーフローに注意してください。
32660 Compose ! 34685 Start Receiver ! 34693 Start Sender ! 39698 Send Data = 0 [DefaultDispatcher-worker-1, 169843152] 39698 Receive Data = 0 [main, 120720841] ⇐ バッファーを介さない 39698 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] 39699 Send Data = 2 [DefaultDispatcher-worker-1, 169843152] 39701 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] : : 39784 Send Data = 98 [DefaultDispatcher-worker-1, 169843152] 39784 Send Data = 99 [DefaultDispatcher-worker-1, 169843152] : : ⇐ 2147483647個まで可能(サンプルは100で制限)
オーバーフロー
オーバーフローは、バッファーに空きがない状態(全て埋まっている、バッファーサイズ=0)で、新たなデータが送信(入力)された場合に発生します。
オーバーフローになった時、「新たなデータの扱い」が問題になります。
引数onBufferOverflowは、この「新たなデータの扱い」を指定します。
パレメータ (BufferOverflow.***) | 動作 | 一時休止 |
---|---|---|
SUSPEND | バッファーに空きができるまで休止して待つ | する |
DROP_OLDEST | バッファーの最も古い値を削除し、入力された値を追加 | しない |
DROP_LATEST | バッファーの値を保持し、入力された値を破棄 |
バッファーサイズ > 0
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEachIndexed { index, data -> emit(data) val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${data} [${_thName}, ${_jbCode}]") // delay(1000) } }.flowOn(Dispatchers.Default) .buffer(capacity = 3, onBufferOverflow = BufferOverflow.SUSPEND) // .buffer(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) // .buffer(capacity = 3, onBufferOverflow = BufferOverflow.DROP_LATEST)
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) // 受信間隔:1000ms } } } ) { Text(text = "Flow test !") } }
18883 Compose ! 20889 Start Receiver ! 20897 Start Sender ! 25905 Send Data = 8 [DefaultDispatcher-worker-1, 73716183] 25905 Receive Data = 8 [main, 170330820] 25905 Send Data = 4 [DefaultDispatcher-worker-1, 73716183] 25906 Send Data = 3 [DefaultDispatcher-worker-1, 73716183] 25908 Send Data = 9 [DefaultDispatcher-worker-1, 73716183] ⇐ 空き無し、emitは休止 26911 Receive Data = 4 [main, 170330820] ⇐ 取り出し 26912 Send Data = 1 [DefaultDispatcher-worker-1, 73716183] ⇐ 空きあり、emitは復帰・1を追加 27915 Receive Data = 3 [main, 170330820] 28920 Receive Data = 9 [main, 170330820] 29923 Receive Data = 1 [main, 170330820] ※左端の数値はミリ秒
45740 Compose ! 47574 Start Receiver ! 47586 Start Sender ! 52593 Send Data = 8 [DefaultDispatcher-worker-2, 73716183] 52594 Receive Data = 8 [main, 170330820] 52594 Send Data = 4 [DefaultDispatcher-worker-2, 73716183] 52595 Send Data = 3 [DefaultDispatcher-worker-2, 73716183] 52595 Send Data = 9 [DefaultDispatcher-worker-2, 73716183] ⇐ 空き無し 52596 Send Data = 1 [DefaultDispatcher-worker-2, 73716183] ⇐ 4を削除、1を追加 53600 Receive Data = 3 [main, 170330820] 54604 Receive Data = 9 [main, 170330820] 55611 Receive Data = 1 [main, 170330820] ※左端の数値はミリ秒
00560 Compose ! 02802 Start Receiver ! 02809 Start Sender ! 07817 Send Data = 8 [DefaultDispatcher-worker-1, 73716183] 07818 Receive Data = 8 [main, 170330820] 07818 Send Data = 4 [DefaultDispatcher-worker-1, 73716183] 07819 Send Data = 3 [DefaultDispatcher-worker-1, 73716183] 07819 Send Data = 9 [DefaultDispatcher-worker-1, 73716183] ⇐ 空き無し 07820 Send Data = 1 [DefaultDispatcher-worker-1, 73716183] ⇐ 1を破棄 08821 Receive Data = 4 [main, 170330820] 09823 Receive Data = 3 [main, 170330820] 10825 Receive Data = 9 [main, 170330820] ※左端の数値はミリ秒
バッファーサイズ = 0
「バッファーサイズ(capacity)= 0」の時は、SUSUPEND以外を推奨しません。
意図が不明な動作をします。原因はrequireの判定にあると思われます。
※今後、改善されることを望みます!
@Suppress("NAME_SHADOWING") public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> { require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) { "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity" } require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } ... }
「capacity = 1」に強制されます。
62804 Compose ! 65080 Start Receiver ! 65086 Start Sender ! 70091 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 70092 Receive Data = 8 [main, 120720841] 70092 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 70092 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 70093 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 70094 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] 71096 Receive Data = 1 [main, 120720841]capacity = 0, onBufferOverflow = BufferOverflow.DROP_LATEST
「capacity = 1」に強制されます。
91083 Compose ! 93011 Start Receiver ! 93017 Start Sender ! 98024 Send Data = 8 [DefaultDispatcher-worker-1, 169843152] 98024 Receive Data = 8 [main, 120720841] 98025 Send Data = 4 [DefaultDispatcher-worker-1, 169843152] 98027 Send Data = 3 [DefaultDispatcher-worker-1, 169843152] 98027 Send Data = 9 [DefaultDispatcher-worker-1, 169843152] 98027 Send Data = 1 [DefaultDispatcher-worker-1, 169843152] 99030 Receive Data = 4 [main, 120720841]
CorutineContextの変更
送信側コルーチンのCoroutineContextは、受信側で指定されたCoroutineContextが渡されます。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { // デフォルトはDispatchers.Main Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test !") } }
82492 Compose ! 92397 Start Receiver ! 92397 Start Sender ! 97404 Receive Data = 8 [main, 8529103] 97404 Send Data = 8 [main, 8529103] 98407 Receive Data = 4 [main, 8529103] 98408 Send Data = 4 [main, 8529103] 99411 Receive Data = 3 [main, 8529103] 99412 Send Data = 3 [main, 8529103] 00415 Receive Data = 9 [main, 8529103] 00416 Send Data = 9 [main, 8529103] 01421 Receive Data = 1 [main, 8529103] 01422 Send Data = 1 [main, 8529103]
Flow#flowOn関数を使用することで、送信側コルーチンのCoroutineContextを変更できます。以下は、スレッドプールを変更した例です。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default)
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { // デフォルトはDispatchers.Main Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test !") } }
57170 Compose ! 58754 Start Receiver ! 58763 Start Sender ! 63767 Send Data = 8 [DefaultDispatcher-worker-1, 8529103] 63767 Receive Data = 8 [main, 96136796] 64770 Send Data = 4 [DefaultDispatcher-worker-1, 8529103] 64770 Receive Data = 4 [main, 96136796] 65773 Send Data = 3 [DefaultDispatcher-worker-1, 8529103] 65774 Receive Data = 3 [main, 96136796] 66776 Send Data = 9 [DefaultDispatcher-worker-1, 8529103] 66777 Receive Data = 9 [main, 96136796] 67781 Send Data = 1 [DefaultDispatcher-worker-1, 8529103] 67782 Receive Data = 1 [main, 96136796]
他の実装方法
Flowの実装方法はflow( )関数以外に複数あります。
以下の例は、同じストリームデータ(Int型)を送受信する通信経路を作成します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlowA(): Flow<Int> = flowOf(8, 4, 3, 9, 1) fun createFlowB(): Flow<Int> = Data.asFlow() fun createFlowC(): Flow<Int> = flow<Int> { // この記事のサンプルで登場 Data.forEach { emit(it) } // 「レジーバー:FlowCollector」のコマンドが利用可能 } fun createFlowD(): Flow<Int> = channelFlow<Int> { Data.forEach { send(it) } // 「レシーバー:ProducerScope」のコマンドが利用可能 }
@Preview @Composable fun Flow_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowA() } // datCh: Flow<Int> = remember { createFlowB() } // datCh: Flow<Int> = remember { createFlowC() } // datCh: Flow<Int> = remember { createFlowD() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test !") } }
97301 Compose ! 99117 Start Receiver ! 99121 Receive Data = 8 [main, 166465328] 99122 Receive Data = 4 [main, 166465328] 99122 Receive Data = 3 [main, 166465328] 99123 Receive Data = 9 [main, 166465328] 99124 Receive Data = 1 [main, 166465328]
状態の監視
Flowによって送受信されるストリームデータは、値の監視が行われません。ですので、再Composeはスケジューリングされません。
再Composeのスケジューリングを可能にするためには、ストリームデータを監視データへ変換します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createFlow() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default)
@Preview @Composable fun Flow_Monitor( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlow() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") // ストリームデータ ⇒ 監視データ val _data = remember { mutableStateOf(0) } LaunchedEffect(Unit) { datCh.collect { // 要求・受信 _data.value = it // 監視データへ変換 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") } } Text( text = "${getMilliTime5()} Receive Data = ${_data.value}", fontSize = 20.sp ) }
03115 Compose ! 03340 Start Sender ! 08348 Send Data = 8 [DefaultDispatcher-worker-1, 32015087] 08348 Receive Data = 8 [main, 197050876] 08357 Compose ! 09350 Send Data = 4 [DefaultDispatcher-worker-1, 32015087] 09351 Receive Data = 4 [main, 197050876] 09357 Compose ! 10352 Send Data = 3 [DefaultDispatcher-worker-1, 32015087] 10353 Receive Data = 3 [main, 197050876] 10357 Compose ! 11354 Send Data = 9 [DefaultDispatcher-worker-1, 32015087] 11356 Receive Data = 9 [main, 197050876] 11358 Compose ! 12357 Send Data = 1 [DefaultDispatcher-worker-1, 32015087] 12358 Receive Data = 1 [main, 197050876] 12373 Compose !
ちなみに、StateFlowは「State(状態の監視)+Flow(通信経路:SharedFlow)」です。StateFlowを用いれば、ここで紹介した「ストリームデータを監視データへ変換」する作業は必要ありません。
再Composeのスケジューリングを可能にする方法として、StateFlowの利用が定石になています。※詳細は「Coroutine:StateFlow」を参照
関連記事: