Flowはメンバー関数や拡張関数で様々な機能を提供しています。
これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。
中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。
終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。
今回は、この「Flowの中間演算」のwithIndex、map、filter、drop、takeを取り上げて、まとめます。
※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1
Kotlin 1.9.0
Compose Compiler 1.5.1
org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3
中間演算とは
中間演算は通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。
中間演算 | 概要 | ダウンストリームFlow |
---|---|---|
withIndex | データにインデックを付与 | Flow(IndexValue(T)) |
map | データを割り振り直す | Flow(R) |
filter | 条件に合うデータを通す | Flow(T) |
drop | 先頭から指定個数のデータを除外 | |
dropWhile | 先頭から条件を満たす間のデータを除外 | |
take | 先頭から指定個数のデータを取得 | |
takeWhile | 先頭から条件を満たす間のデータを取得 | |
zip | Flowを統合(全部のデータが切り替わる毎に融合) | Flow(R) |
merge | Flowを統合(データをそのまま重ね合わせ) | Flow(T) |
combine | Flowを統合(一部のデータが切り替わる毎に融合) | Flow(R) |
※アップストリームFlowはFlow(T) ※正式なmergeは「kotlinx-coroutines-core 1.9.0」で同様な関数が登場予定 |
中間演算の構造
中間演算はアップストリームFlowの受信出力(collect)を受けて、何かの変換処理を伴ったダウンストリームFlowを返す構造になっています。
図にすると、ひとつなぎにFlowを追加した形です。
簡単な中間演算の例を示します。「何もしない」という変換処理を行うrelay関数です。
レシーバーがアップストリームFlowで、returnで返されるインスタンスがダウンストリームFlowになります。
fun <T> Flow<T>.relay(): Flow<T> { return flow { collect { value -> // レシーバーのcollect val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Relay Data = ${value} [${_thName}, ${_jbCode}]") /* ** ここで何もしない */ return@collect emit(value) // returnで返すFlowのemit } } }
中間演算は「コメント:ここで何もしない」の部分に変換処理が入ります。変換処理はラムダ式を引数で挿入する方法が採られます。
サンプルは「何もしない」ので、変換処理はありません。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowRelay() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).relay()
@Preview @Composable fun FlowRelay_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowRelay() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (Relay) !") } }
48737 Compose ! 50651 Start Receiver ! 50659 Start Sender ! 55665 Send Data = 8 [DefaultDispatcher-worker-1, 188663320] 55666 Relay Data = 8 [main, 145086577] ⇐ mainは受信機のコルーチンコンテキスト 55667 Receive Data = 8 [main, 145086577] 56670 Send Data = 4 [DefaultDispatcher-worker-1, 188663320] 56671 Relay Data = 4 [main, 145086577] 56672 Receive Data = 4 [main, 145086577] 57675 Send Data = 3 [DefaultDispatcher-worker-1, 188663320] 57675 Relay Data = 3 [main, 145086577] 57676 Receive Data = 3 [main, 145086577] 58678 Send Data = 9 [DefaultDispatcher-worker-1, 188663320] 58679 Relay Data = 9 [main, 145086577] 58680 Receive Data = 9 [main, 145086577] 59683 Send Data = 1 [DefaultDispatcher-worker-1, 188663320] 59684 Relay Data = 1 [main, 145086577] 59685 Receive Data = 1 [main, 145086577]
中間演算は受信機のコルーチンコンテキストで実行される点に注意してください。
withIndex
withIndexはデータにインデックを付与します。
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow { var index = 0 collect { value -> emit(IndexedValue(checkIndexOverflow(index++), value)) } }
以下はwithIndexのサンプルです。
ストリームデータはインデックス番号が付与されて、IndexedValue<T>へ置き換えられます。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowWithIndex() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).withIndex()
@Preview @Composable fun FlowWithIndex_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<IndexedValue<Int>> = remember { createFlowWithIndex() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data[${it.index}] = ${it.value} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (withIndex) !") } }
66938 Compose ! 69711 Start Receiver ! 69722 Start Sender ! 74730 Send Data = 8 [DefaultDispatcher-worker-1, 188663320] 74731 Receive Data[0] = 8 [main, 145086577] 75734 Send Data = 4 [DefaultDispatcher-worker-1, 188663320] 75734 Receive Data[1] = 4 [main, 145086577] 76737 Send Data = 3 [DefaultDispatcher-worker-1, 188663320] 76737 Receive Data[2] = 3 [main, 145086577] : : 83764 Send Data = 6 [DefaultDispatcher-worker-1, 188663320] 83764 Receive Data[9] = 6 [main, 145086577]
map
mapはデータを割り振り直します。
public inline fun <T, R> Flow<T>.map( crossinline transform: suspend (value: T) -> R ): Flow<R> = transform { value -> return@transform emit(transform(value)) }
以下はmapのサンプルです。
ストリームデータは10%の大きさ(1/10)へ縮小され、Floatで表現されます。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowMap() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).map { it.toFloat() / 10.0f }
@Preview @Composable fun FlowMap_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Float> = remember { createFlowMap() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (map) !") } }
78103 Compose ! 80766 Start Receiver ! 80777 Start Sender ! 85785 Send Data = 8 [DefaultDispatcher-worker-2, 188663320] 85786 Receive Data = 0.8 [main, 145086577] 86789 Send Data = 4 [DefaultDispatcher-worker-2, 188663320] 86790 Receive Data = 0.4 [main, 145086577] 87792 Send Data = 3 [DefaultDispatcher-worker-2, 188663320] 87792 Receive Data = 0.3 [main, 145086577] : : 94830 Receive Data = 0.6 [main, 145086577] 94829 Send Data = 6 [DefaultDispatcher-worker-2, 188663320]
filter
filterは 条件に合うデータを通します。
public inline fun <T> Flow<T>.filter( crossinline predicate: suspend (T) -> Boolean ): Flow<T> = transform { value -> if (predicate(value)) return@transform emit(value) }
以下はfilterのサンプルです。
ストリームデータは「条件:データ < 3」のみを通し、それ以外は除外されます。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowFilter() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).filter { it < 3 }
@Preview @Composable fun FlowFilter_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowFilter() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (filer) !") } }
74969 Compose !
19594 Start Receiver !
19606 Start Sender !
24612 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
25618 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
26623 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
27626 Send Data = 9 [DefaultDispatcher-worker-1, 188663320]
28629 Send Data = 1 [DefaultDispatcher-worker-1, 188663320]
28630 Receive Data = 1 [main, 145086577]
29632 Send Data = -1 [DefaultDispatcher-worker-1, 188663320]
29633 Receive Data = -1 [main, 145086577]
30637 Send Data = 2 [DefaultDispatcher-worker-1, 188663320]
30637 Receive Data = 2 [main, 145086577]
31639 Send Data = 5 [DefaultDispatcher-worker-1, 188663320]
32641 Send Data = 7 [DefaultDispatcher-worker-1, 188663320]
33646 Send Data = 6 [DefaultDispatcher-worker-1, 188663320]
drop
dropは先頭から指定個数のデータを除外します。
public fun <T> Flow<T>.drop(count: Int): Flow<T> { require(count >= 0) { "Drop count should be non-negative, but had $count" } return flow { var skipped = 0 collect { value -> if (skipped >= count) emit(value) else ++skipped } } }
以下はdropのサンプルです。
先頭の3つのデータは、中間演算により途中で除外されて、受信機で受信できません。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowDrop() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).drop(3)
@Preview @Composable fun FlowDrop_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowDrop() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (drop) !") } }
26789 Compose !
30099 Start Receiver !
30111 Start Sender !
35116 Send Data = 8 [DefaultDispatcher-worker-1, 265517962]
36119 Send Data = 4 [DefaultDispatcher-worker-1, 265517962]
37121 Send Data = 3 [DefaultDispatcher-worker-1, 265517962]
38124 Send Data = 9 [DefaultDispatcher-worker-1, 265517962]
38124 Receive Data = 9 [main, 57398779]
39126 Send Data = 1 [DefaultDispatcher-worker-1, 265517962]
39126 Receive Data = 1 [main, 57398779]
40128 Send Data = -1 [DefaultDispatcher-worker-1, 265517962]
40128 Receive Data = -1 [main, 57398779]
41130 Send Data = 2 [DefaultDispatcher-worker-1, 265517962]
41130 Receive Data = 2 [main, 57398779]
42132 Send Data = 5 [DefaultDispatcher-worker-1, 265517962]
42132 Receive Data = 5 [main, 57398779]
43135 Send Data = 7 [DefaultDispatcher-worker-1, 265517962]
43136 Receive Data = 7 [main, 57398779]
44138 Send Data = 6 [DefaultDispatcher-worker-1, 265517962]
44138 Receive Data = 6 [main, 57398779]
dropWhile
dropWhileは先頭から条件を満たす間のデータを除外します。
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow { var matched = false collect { value -> if (matched) { emit(value) } else if (!predicate(value)) { matched = true emit(value) } } }
以下はdropWhileのサンプルです。
ストリームデータは、先頭から「条件:データ > 0」を満たす間はデータを除外し、満たさなくなった時点で通るようになります。
「除外」からスタートする点に注意してください。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowDrop() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).dropWhile{ it > 0 }
@Preview @Composable fun FlowDrop_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowDrop() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (drop) !") } }
93556 Compose !
95880 Start Receiver !
95887 Start Sender !
00894 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
01898 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
02901 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
03905 Send Data = 9 [DefaultDispatcher-worker-1, 188663320]
04909 Send Data = 1 [DefaultDispatcher-worker-1, 188663320]
05914 Send Data = -1 [DefaultDispatcher-worker-1, 188663320] ⇐ 条件に合わない
05914 Receive Data = -1 [main, 145086577]
06917 Receive Data = 2 [main, 145086577]
06917 Send Data = 2 [DefaultDispatcher-worker-1, 188663320]
07920 Send Data = 5 [DefaultDispatcher-worker-1, 188663320]
07920 Receive Data = 5 [main, 145086577]
08922 Send Data = 7 [DefaultDispatcher-worker-1, 188663320]
08923 Receive Data = 7 [main, 145086577]
09926 Send Data = 6 [DefaultDispatcher-worker-1, 188663320]
09927 Receive Data = 6 [main, 145086577]
take
takeは先頭から指定個数のデータを取得します。
public fun <T> Flow<T>.take(count: Int): Flow<T> { require(count > 0) { "Requested element count $count should be positive" } return flow { var consumed = 0 try { collect { value -> // Note: this for take is not written via collectWhile on purpose. // It checks condition first and then makes a tail-call to either emit or emitAbort. // This way normal execution does not require a state machine, only a termination (emitAbort). // See "TakeBenchmark" for comparision of different approaches. if (++consumed < count) { return@collect emit(value) } else { return@collect emitAbort(value) } } } catch (e: AbortFlowException) { e.checkOwnership(owner = this) } } }
以下はtakeのサンプルです。
先頭の3つのデータは取得できますが、以降は中間演算により途中で除外されて、受信機で受信できません。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowTake() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).take(3)
@Preview @Composable fun FlowTake_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowTake() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (take) !") } }
27664 Compose !
30022 Start Receiver !
30029 Start Sender !
35035 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
35035 Receive Data = 8 [main, 145086577]
36039 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
36040 Receive Data = 4 [main, 145086577]
37041 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
37042 Receive Data = 3 [main, 145086577]
// ダウンストリームFlowはAbort ⇒ アップストリームFlowも停止する
takeWhile
takeWhileは先頭から条件を満たす間のデータを取得します。
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow { // This return is needed to work around a bug in JS BE: KT-39227 return@flow collectWhile { value -> if (predicate(value)) { emit(value) true } else { false } } } internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) { val collector = object : FlowCollector<T> { override suspend fun emit(value: T) { // Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example) // the the resulting code is never tail-suspending and produces a state-machine if (!predicate(value)) { throw AbortFlowException(this) } } } try { collect(collector) } catch (e: AbortFlowException) { e.checkOwnership(collector) } }
以下はtakeWhileのサンプルです。
ストリームデータは、先頭から「条件:データ > 0」を満たす間はデータを取得し、満たさなくなった時点で除外するようになります。
「取得」からスタートする点に注意してください。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6) fun createFlowTake() = flow { Log.i(TAG, "${getMilliTime5()} Start Sender !") delay(5000) Data.forEach { emit(it) // 送信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } }.flowOn(Dispatchers.Default).takeWhile { it > 0 }
@Preview @Composable fun FlowTake_Receiver( scope: CoroutineScope = rememberCoroutineScope(), datCh: Flow<Int> = remember { createFlowTake() } ) { Log.i(TAG, "${getMilliTime5()} Compose !") Button( modifier = Modifier.fillMaxWidth(), onClick = { scope.launch { Log.i(TAG, "${getMilliTime5()} Start Receiver !") datCh.collect { // 要求・受信 val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]") // delay(1000) } } } ) { Text(text = "Flow test (take) !") } }
99771 Compose !
02527 Start Receiver !
02537 Start Sender !
07541 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
07541 Receive Data = 8 [main, 145086577]
08544 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
08545 Receive Data = 4 [main, 145086577]
09549 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
09549 Receive Data = 3 [main, 145086577]
10553 Send Data = 9 [DefaultDispatcher-worker-1, 188663320]
10554 Receive Data = 9 [main, 145086577]
11558 Send Data = 1 [DefaultDispatcher-worker-1, 188663320]
11559 Receive Data = 1 [main, 145086577]
12563 Send Data = -1 [DefaultDispatcher-worker-1, 188663320]
// ダウンストリームFlowはAbort ⇒ アップストリームFlowも停止する
関連記事: