Flowはメンバー関数や拡張関数で様々な機能を提供しています。
これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。
中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。
終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。
今回は、この「Flowの中間演算」のzip、merge、combineを取り上げて、まとめます。
※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1
Kotlin 1.9.0
Compose Compiler 1.5.1
org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3
中間演算とは
中間演算は通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。
| 中間演算 | 概要 | ダウンストリームFlow |
|---|---|---|
| withIndex | データにインデックを付与 | Flow(IndexValue(T)) |
| map | データを割り振り直す | Flow(R) |
| filter | 条件に合うデータを通す | Flow(T) |
| drop | 先頭から指定個数のデータを除外 | |
| dropWhile | 先頭から条件を満たす間のデータを除外 | |
| take | 先頭から指定個数のデータを取得 | |
| takeWhile | 先頭から条件を満たす間のデータを取得 | |
| zip | Flowを統合(全部のデータが切り替わる毎に融合) | Flow(R) |
| merge | Flowを統合(データをそのまま重ね合わせ) | Flow(T) |
| combine | Flowを統合(一部のデータが切り替わる毎に融合) | Flow(R) |
| ※アップストリームFlowはFlow(T) ※正式なmergeは「kotlinx-coroutines-core 1.9.0」で同様な関数が登場予定 |
||
中間演算の構造
中間演算はアップストリームFlowの受信出力(collect)を受けて、何かの変換処理を伴ったダウンストリームFlowを返す構造になっています。
図にすると、ひとつなぎにFlowを追加した形です。

簡単な中間演算の例を示します。「何もしない」という変換処理を行うrelay関数です。
レシーバーがアップストリームFlowで、returnで返されるインスタンスがダウンストリームFlowになります。
fun <T> Flow<T>.relay(): Flow<T> {
return flow {
collect { value -> // レシーバーのcollect
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Relay Data = ${value} [${_thName}, ${_jbCode}]")
/*
** ここで何もしない
*/
return@collect emit(value) // returnで返すFlowのemit
}
}
}
中間演算は「コメント:ここで何もしない」の部分に変換処理が入ります。変換処理はラムダ式を引数で挿入する方法が採られます。
サンプルは「何もしない」ので、変換処理はありません。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowRelay() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).relay()
@Preview
@Composable
fun FlowRelay_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Int> = remember { createFlowRelay() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (Relay) !") }
}
48737 Compose ! 50651 Start Receiver ! 50659 Start Sender ! 55665 Send Data = 8 [DefaultDispatcher-worker-1, 188663320] 55666 Relay Data = 8 [main, 145086577] ⇐ mainは受信機のコルーチンコンテキスト 55667 Receive Data = 8 [main, 145086577] 56670 Send Data = 4 [DefaultDispatcher-worker-1, 188663320] 56671 Relay Data = 4 [main, 145086577] 56672 Receive Data = 4 [main, 145086577] 57675 Send Data = 3 [DefaultDispatcher-worker-1, 188663320] 57675 Relay Data = 3 [main, 145086577] 57676 Receive Data = 3 [main, 145086577] 58678 Send Data = 9 [DefaultDispatcher-worker-1, 188663320] 58679 Relay Data = 9 [main, 145086577] 58680 Receive Data = 9 [main, 145086577] 59683 Send Data = 1 [DefaultDispatcher-worker-1, 188663320] 59684 Relay Data = 1 [main, 145086577] 59685 Receive Data = 1 [main, 145086577]
中間演算は受信機のコルーチンコンテキストで実行される点に注意してください。
zip
zipは「複数のFlowの受信出力(collect)を、全部が切り替わる毎に融合するFlow」を返します。
public fun <T1, T2, R> Flow<T1>.zip(
other: Flow<T2>,
transform: suspend (T1, T2) -> R
): Flow<R> = zipImpl(this, other, transform)


以下はzipのサンプルです。
2つあるアップストリームFlowの出力は、Pairへ融合されてダウンストリームFlowから出力されます。
private val Data1 = arrayOf( 8, 4, 3, 9, 1)
private val Data2 = arrayOf(22,54,90,12,63,39,71,35,58,47)
val zFlow1 = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data1.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Dat1 = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).buffer(capacity = 0)
val zFlow2= flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5200)
Data2.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Dat2 = ${it} [${_thName}, ${_jbCode}]")
delay(500)
}
}.flowOn(Dispatchers.Default).buffer(capacity = 0)
fun createFlowZip() = zFlow1.zip(zFlow2) { dat1, dat2 -> Pair(dat1, dat2) }
@Preview
@Composable
fun FlowZip_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Pair<Int, Int>> = remember { createFlowZip() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (zip) !") }
}
アップストリームFlowの内の一つが送受信を完了すると、その他のFlowは全て閉じられる(キャンセルされる)点に注意して下さい。
53552 Compose ! 57282 Start Receiver ! 57294 Start Sender ! 57297 Start Sender ! 62298 Send Dat1 = 8 [DefaultDispatcher-worker-1, 188663320] 62499 Send Dat2 = 22 [DefaultDispatcher-worker-1, 145086577] 62500 Receive Data = (8, 22) [main, 250988118] 63002 Send Dat2 = 54 [DefaultDispatcher-worker-1, 145086577] 63300 Send Dat1 = 4 [DefaultDispatcher-worker-1, 188663320] 63301 Receive Data = (4, 54) [main, 250988118] 63505 Send Dat2 = 90 [DefaultDispatcher-worker-1, 145086577] 64302 Send Dat1 = 3 [DefaultDispatcher-worker-1, 188663320] 64303 Receive Data = (3, 90) [main, 250988118] 64304 Send Dat2 = 12 [DefaultDispatcher-worker-1, 145086577] 65305 Send Dat1 = 9 [DefaultDispatcher-worker-1, 188663320] 65306 Receive Data = (9, 12) [main, 250988118] 65309 Send Dat2 = 63 [DefaultDispatcher-worker-1, 145086577] 66310 Send Dat1 = 1 [DefaultDispatcher-worker-1, 188663320] 66312 Receive Data = (1, 63) [main, 250988118] 66314 Send Dat2 = 39 [DefaultDispatcher-worker-1, 145086577]
merge
mergeは「複数のFlowの受信出力(collect)をそのまま重ね合わせたFlow」を返します。
正式なmergeは「kotlinx-coroutines-core Ver 1.9.0」で登場予定です。
fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
launch {
collect { send(it) }
}
other.collect { send(it) }
}


以下はmergeのサンプルです。
2つあるアップストリームFlowの出力は、混在してダウンストリームFlowから出力されます。しかし、mergeの前後で値は同じです。
private val Data1 = arrayOf( 8, 4, 3)
private val Data2 = arrayOf(22,54,90,12,63,39)
val mFlow1 = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data1.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Dat1 = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).buffer(capacity = 0)
val mFlow2= flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5200)
Data2.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Dat2 = ${it} [${_thName}, ${_jbCode}]")
delay(500)
}
}.flowOn(Dispatchers.Default).buffer(capacity = 0)
fun createFlowMerge() = mFlow1.merge(mFlow2)
@Preview
@Composable
fun FlowMerge_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Int> = remember { createFlowMerge() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (merge) !") }
}
69524 Compose ! 71513 Start Receiver ! 71525 Start Sender ! 71527 Start Sender ! 76532 Send Dat1 = 8 [DefaultDispatcher-worker-1, 188663320] 76533 Receive Data = 8 [main, 145086577] 76730 Send Dat2 = 22 [DefaultDispatcher-worker-1, 250988118] 76731 Receive Data = 22 [main, 145086577] 77233 Send Dat2 = 54 [DefaultDispatcher-worker-1, 250988118] 77233 Receive Data = 54 [main, 145086577] 77536 Send Dat1 = 4 [DefaultDispatcher-worker-1, 188663320] 77537 Receive Data = 4 [main, 145086577] 77737 Send Dat2 = 90 [DefaultDispatcher-worker-1, 250988118] 77739 Receive Data = 90 [main, 145086577] 78242 Send Dat2 = 12 [DefaultDispatcher-worker-1, 250988118] 78243 Receive Data = 12 [main, 145086577] 78541 Send Dat1 = 3 [DefaultDispatcher-worker-1, 188663320] 78542 Receive Data = 3 [main, 145086577] 78746 Send Dat2 = 63 [DefaultDispatcher-worker-1, 250988118] 78746 Receive Data = 63 [main, 145086577] 79250 Send Dat2 = 39 [DefaultDispatcher-worker-1, 250988118] 79250 Receive Data = 39 [main, 145086577]
combine
combineは「複数のFlowの受信出力(collect)を、一部が切り替わる毎に融合するFlow」を返します。
@JvmName("flowCombine")
public fun <T1, T2, R> Flow<T1>.combine(
flow: Flow<T2>,
transform: suspend (a: T1, b: T2) -> R
): Flow<R> = flow {
combineInternal(
arrayOf(this@combine, flow),
nullArrayFactory(),
{ emit(transform(it[0] as T1, it[1] as T2)) }
)
}


以下はcombineのサンプルです。
2つあるアップストリームFlowの出力は、Pairへ融合されてダウンストリームFlowから出力されます。
private val Data1 = arrayOf( 8, 4, 3)
private val Data2 = arrayOf(22,54,90,12,63,39)
val cFlow1 = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data1.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Dat1 = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).buffer(capacity = 0)
val cFlow2= flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5200)
Data2.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Dat2 = ${it} [${_thName}, ${_jbCode}]")
delay(500)
}
}.flowOn(Dispatchers.Default).buffer(capacity = 0)
fun createFlowCombine() = cFlow1.combine(cFlow2) {
dat1, dat2 -> Pair(dat1, dat2)
}
@Preview
@Composable
fun FlowCombine_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Pair<Int, Int>> = remember { createFlowCombine() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (combine) !") }
}
50012 Compose ! 52754 Start Receiver ! 52769 Start Sender ! 52770 Start Sender ! 57777 Send Dat1 = 8 [DefaultDispatcher-worker-2, 188663320] 57977 Send Dat2 = 22 [DefaultDispatcher-worker-2, 145086577] 57978 Receive Data = (8, 22) [main, 250988118] 58481 Send Dat2 = 54 [DefaultDispatcher-worker-2, 145086577] 58482 Receive Data = (8, 54) [main, 250988118] 58780 Send Dat1 = 4 [DefaultDispatcher-worker-2, 188663320] 58781 Receive Data = (4, 54) [main, 250988118] 58983 Send Dat2 = 90 [DefaultDispatcher-worker-2, 145086577] 58984 Receive Data = (4, 90) [main, 250988118] 59488 Send Dat2 = 12 [DefaultDispatcher-worker-2, 145086577] 59489 Receive Data = (4, 12) [main, 250988118] 59784 Send Dat1 = 3 [DefaultDispatcher-worker-2, 188663320] 59785 Receive Data = (3, 12) [main, 250988118] 59992 Send Dat2 = 63 [DefaultDispatcher-worker-2, 145086577] 59993 Receive Data = (3, 63) [main, 250988118] 60495 Send Dat2 = 39 [DefaultDispatcher-worker-2, 145086577] 60496 Receive Data = (3, 39) [main, 250988118]
関連記事:
