Flowはメンバー関数や拡張関数で様々な機能を提供しています。
これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。
中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。
終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。
今回は、この「Flowの中間演算」のzip、merge、combineを取り上げて、まとめます。
※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1
Kotlin 1.9.0
Compose Compiler 1.5.1
org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3
中間演算とは
中間演算は通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。
中間演算 | 概要 | ダウンストリームFlow |
---|---|---|
withIndex | データにインデックを付与 | Flow(IndexValue(T)) |
map | データを割り振り直す | Flow(R) |
filter | 条件に合うデータを通す | Flow(T) |
drop | 先頭から指定個数のデータを除外 | |
dropWhile | 先頭から条件を満たす間のデータを除外 | |
take | 先頭から指定個数のデータを取得 | |
takeWhile | 先頭から条件を満たす間のデータを取得 | |
zip | Flowを統合(全部のデータが切り替わる毎に融合) | Flow(R) |
merge | Flowを統合(データをそのまま重ね合わせ) | Flow(T) |
combine | Flowを統合(一部のデータが切り替わる毎に融合) | Flow(R) |
※アップストリームFlowはFlow(T) ※正式なmergeは「kotlinx-coroutines-core 1.9.0」で同様な関数が登場予定 |
中間演算の構造
中間演算はアップストリームFlowの受信出力(collect)を受けて、何かの変換処理を伴ったダウンストリームFlowを返す構造になっています。
図にすると、ひとつなぎにFlowを追加した形です。
簡単な中間演算の例を示します。「何もしない」という変換処理を行うrelay関数です。
レシーバーがアップストリームFlowで、returnで返されるインスタンスがダウンストリームFlowになります。
fun <T> Flow<T>.relay(): Flow<T> { return flow { collect { value -> // レシーバーのcollect val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Relay Data = ${value} [${_thName}, ${_jbCode}]") /* ** ここで何もしない */ return@collect emit(value) // returnで返すFlowのemit } } }
中間演算は「コメント:ここで何もしない」の部分に変換処理が入ります。変換処理はラムダ式を引数で挿入する方法が採られます。
サンプルは「何もしない」ので、変換処理はありません。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowRelay() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).relay()
@Preview @Composable fun FlowRelay_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowRelay() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (Relay) !") } }
48737 Compose ! 50651 Start Receiver ! 50659 Start Sender ! 55665 Send Data = 8 [DefaultDispatcher-worker-1, 188663320] 55666 Relay Data = 8 [main, 145086577] ⇐ mainは受信機のコルーチンコンテキスト 55667 Receive Data = 8 [main, 145086577] 56670 Send Data = 4 [DefaultDispatcher-worker-1, 188663320] 56671 Relay Data = 4 [main, 145086577] 56672 Receive Data = 4 [main, 145086577] 57675 Send Data = 3 [DefaultDispatcher-worker-1, 188663320] 57675 Relay Data = 3 [main, 145086577] 57676 Receive Data = 3 [main, 145086577] 58678 Send Data = 9 [DefaultDispatcher-worker-1, 188663320] 58679 Relay Data = 9 [main, 145086577] 58680 Receive Data = 9 [main, 145086577] 59683 Send Data = 1 [DefaultDispatcher-worker-1, 188663320] 59684 Relay Data = 1 [main, 145086577] 59685 Receive Data = 1 [main, 145086577]
中間演算は受信機のコルーチンコンテキストで実行される点に注意してください。
zip
zipは「複数のFlowの受信出力(collect)を、全部が切り替わる毎に融合するFlow」を返します。
public fun <T1, T2, R> Flow<T1>.zip( other: Flow<T2>, transform: suspend (T1, T2) -> R ): Flow<R> = zipImpl(this, other, transform)
以下はzipのサンプルです。
2つあるアップストリームFlowの出力は、Pairへ融合されてダウンストリームFlowから出力されます。
private val Data1 = arrayOf( 8, 4, 3, 9, 1) private val Data2 = arrayOf(22,54,90,12,63,39,71,35,58,47) val zFlow1 = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data1.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Dat1 = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).buffer(capacity = 0) val zFlow2= flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5200) Data2.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Dat2 = ${it} [${_thName}, ${_jbCode}]") delay(500) } }.flowOn(Dispatchers.Default).buffer(capacity = 0) fun createFlowZip() = zFlow1.zip(zFlow2) { dat1, dat2 -> Pair(dat1, dat2) }
@Preview @Composable fun FlowZip_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Pair<Int, Int>> = remember { createFlowZip() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (zip) !") } }
アップストリームFlowの内の一つが送受信を完了すると、その他のFlowは全て閉じられる(キャンセルされる)点に注意して下さい。
53552 Compose ! 57282 Start Receiver ! 57294 Start Sender ! 57297 Start Sender ! 62298 Send Dat1 = 8 [DefaultDispatcher-worker-1, 188663320] 62499 Send Dat2 = 22 [DefaultDispatcher-worker-1, 145086577] 62500 Receive Data = (8, 22) [main, 250988118] 63002 Send Dat2 = 54 [DefaultDispatcher-worker-1, 145086577] 63300 Send Dat1 = 4 [DefaultDispatcher-worker-1, 188663320] 63301 Receive Data = (4, 54) [main, 250988118] 63505 Send Dat2 = 90 [DefaultDispatcher-worker-1, 145086577] 64302 Send Dat1 = 3 [DefaultDispatcher-worker-1, 188663320] 64303 Receive Data = (3, 90) [main, 250988118] 64304 Send Dat2 = 12 [DefaultDispatcher-worker-1, 145086577] 65305 Send Dat1 = 9 [DefaultDispatcher-worker-1, 188663320] 65306 Receive Data = (9, 12) [main, 250988118] 65309 Send Dat2 = 63 [DefaultDispatcher-worker-1, 145086577] 66310 Send Dat1 = 1 [DefaultDispatcher-worker-1, 188663320] 66312 Receive Data = (1, 63) [main, 250988118] 66314 Send Dat2 = 39 [DefaultDispatcher-worker-1, 145086577]
merge
mergeは「複数のFlowの受信出力(collect)をそのまま重ね合わせたFlow」を返します。
正式なmergeは「kotlinx-coroutines-core Ver 1.9.0」で登場予定です。
fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow { launch { collect { send(it) } } other.collect { send(it) } }
以下はmergeのサンプルです。
2つあるアップストリームFlowの出力は、混在してダウンストリームFlowから出力されます。しかし、mergeの前後で値は同じです。
private val Data1 = arrayOf( 8, 4, 3) private val Data2 = arrayOf(22,54,90,12,63,39) val mFlow1 = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data1.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Dat1 = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).buffer(capacity = 0) val mFlow2= flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5200) Data2.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Dat2 = ${it} [${_thName}, ${_jbCode}]") delay(500) } }.flowOn(Dispatchers.Default).buffer(capacity = 0) fun createFlowMerge() = mFlow1.merge(mFlow2)
@Preview @Composable fun FlowMerge_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowMerge() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (merge) !") } }
69524 Compose ! 71513 Start Receiver ! 71525 Start Sender ! 71527 Start Sender ! 76532 Send Dat1 = 8 [DefaultDispatcher-worker-1, 188663320] 76533 Receive Data = 8 [main, 145086577] 76730 Send Dat2 = 22 [DefaultDispatcher-worker-1, 250988118] 76731 Receive Data = 22 [main, 145086577] 77233 Send Dat2 = 54 [DefaultDispatcher-worker-1, 250988118] 77233 Receive Data = 54 [main, 145086577] 77536 Send Dat1 = 4 [DefaultDispatcher-worker-1, 188663320] 77537 Receive Data = 4 [main, 145086577] 77737 Send Dat2 = 90 [DefaultDispatcher-worker-1, 250988118] 77739 Receive Data = 90 [main, 145086577] 78242 Send Dat2 = 12 [DefaultDispatcher-worker-1, 250988118] 78243 Receive Data = 12 [main, 145086577] 78541 Send Dat1 = 3 [DefaultDispatcher-worker-1, 188663320] 78542 Receive Data = 3 [main, 145086577] 78746 Send Dat2 = 63 [DefaultDispatcher-worker-1, 250988118] 78746 Receive Data = 63 [main, 145086577] 79250 Send Dat2 = 39 [DefaultDispatcher-worker-1, 250988118] 79250 Receive Data = 39 [main, 145086577]
combine
combineは「複数のFlowの受信出力(collect)を、一部が切り替わる毎に融合するFlow」を返します。
@JvmName("flowCombine") public fun <T1, T2, R> Flow<T1>.combine( flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R ): Flow<R> = flow { combineInternal( arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) } ) }
以下はcombineのサンプルです。
2つあるアップストリームFlowの出力は、Pairへ融合されてダウンストリームFlowから出力されます。
private val Data1 = arrayOf( 8, 4, 3) private val Data2 = arrayOf(22,54,90,12,63,39) val cFlow1 = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data1.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Dat1 = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).buffer(capacity = 0) val cFlow2= flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5200) Data2.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Dat2 = ${it} [${_thName}, ${_jbCode}]") delay(500) } }.flowOn(Dispatchers.Default).buffer(capacity = 0) fun createFlowCombine() = cFlow1.combine(cFlow2) { dat1, dat2 -> Pair(dat1, dat2) }
@Preview @Composable fun FlowCombine_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Pair<Int, Int>> = remember { createFlowCombine() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (combine) !") } }
50012 Compose ! 52754 Start Receiver ! 52769 Start Sender ! 52770 Start Sender ! 57777 Send Dat1 = 8 [DefaultDispatcher-worker-2, 188663320] 57977 Send Dat2 = 22 [DefaultDispatcher-worker-2, 145086577] 57978 Receive Data = (8, 22) [main, 250988118] 58481 Send Dat2 = 54 [DefaultDispatcher-worker-2, 145086577] 58482 Receive Data = (8, 54) [main, 250988118] 58780 Send Dat1 = 4 [DefaultDispatcher-worker-2, 188663320] 58781 Receive Data = (4, 54) [main, 250988118] 58983 Send Dat2 = 90 [DefaultDispatcher-worker-2, 145086577] 58984 Receive Data = (4, 90) [main, 250988118] 59488 Send Dat2 = 12 [DefaultDispatcher-worker-2, 145086577] 59489 Receive Data = (4, 12) [main, 250988118] 59784 Send Dat1 = 3 [DefaultDispatcher-worker-2, 188663320] 59785 Receive Data = (3, 12) [main, 250988118] 59992 Send Dat2 = 63 [DefaultDispatcher-worker-2, 145086577] 59993 Receive Data = (3, 63) [main, 250988118] 60495 Send Dat2 = 39 [DefaultDispatcher-worker-2, 145086577] 60496 Receive Data = (3, 39) [main, 250988118]
関連記事: