Coroutine:Flowのストリームデータ変更(中間演算)

投稿日:  更新日:

Flowはメンバー関数や拡張関数で様々な機能を提供しています。

これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。

中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。

終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。

今回は、この「Flowの中間演算」のwithIndex、map、filter、drop、takeを取り上げて、まとめます。

※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1
    Kotlin 1.9.0
    Compose Compiler 1.5.1
    org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
    org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3

スポンサーリンク

中間演算とは

中間演算は通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。

中間演算概要ダウンストリームFlow
withIndexデータにインデックを付与Flow(IndexValue(T))
mapデータを割り振り直すFlow(R)
filter条件に合うデータを通すFlow(T)
drop先頭から指定個数のデータを除外
dropWhile先頭から条件を満たす間のデータを除外
take先頭から指定個数のデータを取得
takeWhile先頭から条件を満たす間のデータを取得
zipFlowを統合(全部のデータが切り替わる毎に融合)Flow(R)
mergeFlowを統合(データをそのまま重ね合わせ)Flow(T)
combineFlowを統合(一部のデータが切り替わる毎に融合)Flow(R)
※アップストリームFlowはFlow(T)
※正式なmergeは「kotlinx-coroutines-core 1.9.0」で同様な関数が登場予定
スポンサーリンク

中間演算の構造

中間演算はアップストリームFlowの受信出力(collect)を受けて、何かの変換処理を伴ったダウンストリームFlowを返す構造になっています。

図にすると、ひとつなぎにFlowを追加した形です。

中間演算の構成

簡単な中間演算の例を示します。「何もしない」という変換処理を行うrelay関数です。

レシーバーがアップストリームFlowで、returnで返されるインスタンスがダウンストリームFlowになります。

fun <T> Flow<T>.relay(): Flow<T> {
    return flow {
        collect { value ->                  // レシーバーのcollect
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Relay   Data = ${value} [${_thName}, ${_jbCode}]")
            /*
			** ここで何もしない
			*/
            return@collect emit(value)      // returnで返すFlowのemit
        }
    }
}

中間演算は「コメント:ここで何もしない」の部分に変換処理が入ります。変換処理はラムダ式を引数で挿入する方法が採られます。

サンプルは「何もしない」ので、変換処理はありません。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowRelay() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).relay()
@Preview
@Composable
fun FlowRelay_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowRelay() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (Relay) !") }
}
48737 Compose !
50651 Start Receiver !
50659 Start Sender !
55665 Send    Data = 8 [DefaultDispatcher-worker-1, 188663320]
55666 Relay   Data = 8 [main, 145086577]  ⇐ mainは受信機のコルーチンコンテキスト
55667 Receive Data = 8 [main, 145086577]
56670 Send    Data = 4 [DefaultDispatcher-worker-1, 188663320]
56671 Relay   Data = 4 [main, 145086577]
56672 Receive Data = 4 [main, 145086577]
57675 Send    Data = 3 [DefaultDispatcher-worker-1, 188663320]
57675 Relay   Data = 3 [main, 145086577]
57676 Receive Data = 3 [main, 145086577]
58678 Send    Data = 9 [DefaultDispatcher-worker-1, 188663320]
58679 Relay   Data = 9 [main, 145086577]
58680 Receive Data = 9 [main, 145086577]
59683 Send    Data = 1 [DefaultDispatcher-worker-1, 188663320]
59684 Relay   Data = 1 [main, 145086577]
59685 Receive Data = 1 [main, 145086577]

中間演算は受信機のコルーチンコンテキストで実行される点に注意してください。

スポンサーリンク

withIndex

withIndexはデータにインデックを付与します。

public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
    var index = 0
    collect { value ->
        emit(IndexedValue(checkIndexOverflow(index++), value))
    }
}

以下はwithIndexのサンプルです。

ストリームデータはインデックス番号が付与されて、IndexedValue<T>へ置き換えられます。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowWithIndex() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).withIndex()
@Preview
@Composable
fun FlowWithIndex_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<IndexedValue<Int>> = remember { createFlowWithIndex() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data[${it.index}] = ${it.value} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (withIndex) !") }
}
66938 Compose !
69711 Start Receiver !
69722 Start Sender !
74730 Send    Data = 8 [DefaultDispatcher-worker-1, 188663320]
74731 Receive Data[0] = 8 [main, 145086577]
75734 Send    Data = 4 [DefaultDispatcher-worker-1, 188663320]
75734 Receive Data[1] = 4 [main, 145086577]
76737 Send    Data = 3 [DefaultDispatcher-worker-1, 188663320]
76737 Receive Data[2] = 3 [main, 145086577]
    :
    :
83764 Send    Data = 6 [DefaultDispatcher-worker-1, 188663320]
83764 Receive Data[9] = 6 [main, 145086577]
スポンサーリンク

map

mapはデータを割り振り直します。

public inline fun <T, R> Flow<T>.map(
    crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
    return@transform emit(transform(value))
}

以下はmapのサンプルです。

ストリームデータは10%の大きさ(1/10)へ縮小され、Floatで表現されます。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowMap() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).map { it.toFloat() / 10.0f }
@Preview
@Composable
fun FlowMap_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Float> = remember { createFlowMap() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (map) !") }
}
78103 Compose !
80766 Start Receiver !
80777 Start Sender !
85785 Send    Data = 8 [DefaultDispatcher-worker-2, 188663320]
85786 Receive Data = 0.8 [main, 145086577]
86789 Send    Data = 4 [DefaultDispatcher-worker-2, 188663320]
86790 Receive Data = 0.4 [main, 145086577]
87792 Send    Data = 3 [DefaultDispatcher-worker-2, 188663320]
87792 Receive Data = 0.3 [main, 145086577]
    :
    :
94830 Receive Data = 0.6 [main, 145086577]
94829 Send    Data = 6 [DefaultDispatcher-worker-2, 188663320]

filter

filterは 条件に合うデータを通します。

public inline fun <T> Flow<T>.filter(
    crossinline predicate: suspend (T) -> Boolean
): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}

以下はfilterのサンプルです。

ストリームデータは「条件:データ < 3」のみを通し、それ以外は除外されます。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowFilter() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).filter { it < 3 }
@Preview
@Composable
fun FlowFilter_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowFilter() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (filer) !") }
}
74969 Compose !
19594 Start Receiver !
19606 Start Sender !
24612 Send    Data = 8 [DefaultDispatcher-worker-1, 188663320]
25618 Send    Data = 4 [DefaultDispatcher-worker-1, 188663320]
26623 Send    Data = 3 [DefaultDispatcher-worker-1, 188663320]
27626 Send    Data = 9 [DefaultDispatcher-worker-1, 188663320]
28629 Send    Data = 1 [DefaultDispatcher-worker-1, 188663320]
28630 Receive Data = 1 [main, 145086577]
29632 Send    Data = -1 [DefaultDispatcher-worker-1, 188663320]
29633 Receive Data = -1 [main, 145086577]
30637 Send    Data = 2 [DefaultDispatcher-worker-1, 188663320]
30637 Receive Data = 2 [main, 145086577]
31639 Send    Data = 5 [DefaultDispatcher-worker-1, 188663320]
32641 Send    Data = 7 [DefaultDispatcher-worker-1, 188663320]
33646 Send    Data = 6 [DefaultDispatcher-worker-1, 188663320]
スポンサーリンク

drop

dropは先頭から指定個数のデータを除外します。

public fun <T> Flow<T>.drop(count: Int): Flow<T> {
    require(count >= 0) { "Drop count should be non-negative, but had $count" }
    return flow {
        var skipped = 0
        collect { value ->
            if (skipped >= count) emit(value) else ++skipped
        }
    }
}

以下はdropのサンプルです。

先頭の3つのデータは、中間演算により途中で除外されて、受信機で受信できません。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowDrop() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).drop(3)
@Preview
@Composable
fun FlowDrop_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowDrop() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (drop) !") }
}
26789 Compose !
30099 Start Receiver !
30111 Start Sender !
35116 Send    Data = 8 [DefaultDispatcher-worker-1, 265517962]
36119 Send    Data = 4 [DefaultDispatcher-worker-1, 265517962]
37121 Send    Data = 3 [DefaultDispatcher-worker-1, 265517962]
38124 Send    Data = 9 [DefaultDispatcher-worker-1, 265517962]
38124 Receive Data = 9 [main, 57398779]
39126 Send    Data = 1 [DefaultDispatcher-worker-1, 265517962]
39126 Receive Data = 1 [main, 57398779]
40128 Send    Data = -1 [DefaultDispatcher-worker-1, 265517962]
40128 Receive Data = -1 [main, 57398779]
41130 Send    Data = 2 [DefaultDispatcher-worker-1, 265517962]
41130 Receive Data = 2 [main, 57398779]
42132 Send    Data = 5 [DefaultDispatcher-worker-1, 265517962]
42132 Receive Data = 5 [main, 57398779]
43135 Send    Data = 7 [DefaultDispatcher-worker-1, 265517962]
43136 Receive Data = 7 [main, 57398779]
44138 Send    Data = 6 [DefaultDispatcher-worker-1, 265517962]
44138 Receive Data = 6 [main, 57398779]
スポンサーリンク

dropWhile

dropWhileは先頭から条件を満たす間のデータを除外します。

public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
    var matched = false
    collect { value ->
        if (matched) {
            emit(value)
        } else if (!predicate(value)) {
            matched = true
            emit(value)
        }
    }
}

以下はdropWhileのサンプルです。

ストリームデータは、先頭から「条件:データ > 0」を満たす間はデータを除外し、満たさなくなった時点で通るようになります。

「除外」からスタートする点に注意してください。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowDrop() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).dropWhile{ it > 0 }
@Preview
@Composable
fun FlowDrop_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowDrop() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (drop) !") }
}
93556 Compose !
95880 Start Receiver !
95887 Start Sender !
00894 Send    Data = 8 [DefaultDispatcher-worker-1, 188663320]
01898 Send    Data = 4 [DefaultDispatcher-worker-1, 188663320]
02901 Send    Data = 3 [DefaultDispatcher-worker-1, 188663320]
03905 Send    Data = 9 [DefaultDispatcher-worker-1, 188663320]
04909 Send    Data = 1 [DefaultDispatcher-worker-1, 188663320]
05914 Send    Data = -1 [DefaultDispatcher-worker-1, 188663320] ⇐ 条件に合わない
05914 Receive Data = -1 [main, 145086577]
06917 Receive Data = 2 [main, 145086577]
06917 Send    Data = 2 [DefaultDispatcher-worker-1, 188663320]
07920 Send    Data = 5 [DefaultDispatcher-worker-1, 188663320]
07920 Receive Data = 5 [main, 145086577]
08922 Send    Data = 7 [DefaultDispatcher-worker-1, 188663320]
08923 Receive Data = 7 [main, 145086577]
09926 Send    Data = 6 [DefaultDispatcher-worker-1, 188663320]
09927 Receive Data = 6 [main, 145086577]
スポンサーリンク

take

takeは先頭から指定個数のデータを取得します。

public fun <T> Flow<T>.take(count: Int): Flow<T> {
    require(count > 0) { "Requested element count $count should be positive" }
    return flow {
        var consumed = 0
        try {
            collect { value ->
                // Note: this for take is not written via collectWhile on purpose.
                // It checks condition first and then makes a tail-call to either emit or emitAbort.
                // This way normal execution does not require a state machine, only a termination (emitAbort).
                // See "TakeBenchmark" for comparision of different approaches.
                if (++consumed < count) {
                    return@collect emit(value)
                } else {
                    return@collect emitAbort(value)
                }
            }
        } catch (e: AbortFlowException) {
            e.checkOwnership(owner = this)
        }
    }
}

以下はtakeのサンプルです。

先頭の3つのデータは取得できますが、以降は中間演算により途中で除外されて、受信機で受信できません。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowTake() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).take(3)
@Preview
@Composable
fun FlowTake_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowTake() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (take) !") }
}
27664 Compose !
30022 Start Receiver !
30029 Start Sender !
35035 Send    Data = 8 [DefaultDispatcher-worker-1, 188663320]
35035 Receive Data = 8 [main, 145086577]
36039 Send    Data = 4 [DefaultDispatcher-worker-1, 188663320]
36040 Receive Data = 4 [main, 145086577]
37041 Send    Data = 3 [DefaultDispatcher-worker-1, 188663320]
37042 Receive Data = 3 [main, 145086577]
// ダウンストリームFlowはAbort ⇒ アップストリームFlowも停止する
スポンサーリンク

takeWhile

takeWhileは先頭から条件を満たす間のデータを取得します。

public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
    // This return is needed to work around a bug in JS BE: KT-39227
    return@flow collectWhile { value ->
        if (predicate(value)) {
            emit(value)
            true
        } else {
            false
        }
    }
}

internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
    val collector = object : FlowCollector<T> {
        override suspend fun emit(value: T) {
            // Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example)
            // the the resulting code is never tail-suspending and produces a state-machine
            if (!predicate(value)) {
                throw AbortFlowException(this)
            }
        }
    }
    try {
        collect(collector)
    } catch (e: AbortFlowException) {
        e.checkOwnership(collector)
    }
}

以下はtakeWhileのサンプルです。

ストリームデータは、先頭から「条件:データ > 0」を満たす間はデータを取得し、満たさなくなった時点で除外するようになります。

「取得」からスタートする点に注意してください。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowTake() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).takeWhile { it > 0 }
@Preview
@Composable
fun FlowTake_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowTake() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (take) !") }
}
99771 Compose !
02527 Start Receiver !
02537 Start Sender !
07541 Send    Data = 8 [DefaultDispatcher-worker-1, 188663320]
07541 Receive Data = 8 [main, 145086577]
08544 Send    Data = 4 [DefaultDispatcher-worker-1, 188663320]
08545 Receive Data = 4 [main, 145086577]
09549 Send    Data = 3 [DefaultDispatcher-worker-1, 188663320]
09549 Receive Data = 3 [main, 145086577]
10553 Send    Data = 9 [DefaultDispatcher-worker-1, 188663320]
10554 Receive Data = 9 [main, 145086577]
11558 Send    Data = 1 [DefaultDispatcher-worker-1, 188663320]
11559 Receive Data = 1 [main, 145086577]
12563 Send    Data = -1 [DefaultDispatcher-worker-1, 188663320]
// ダウンストリームFlowはAbort ⇒ アップストリームFlowも停止する
スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するasyncビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。 ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。 それにより、非同期処理の連携が容易になります。 今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。 ※環境:Android Studio Flamingo | 2022.2.1    :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Produce」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Flow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「SharedFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「StateFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
Flowはメンバー関数や拡張関数で様々な機能を提供しています。 これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。 中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。 終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。 今回は、この「Flowの中間演算」のzip、merge、combineを取り上げて、まとめます。 ※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     o ...
スポンサーリンク