Coroutine:Flowの統合(中間演算)

投稿日:  更新日:

Flowはメンバー関数や拡張関数で様々な機能を提供しています。

これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。

中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。

終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。

今回は、この「Flowの中間演算」のzip、merge、combineを取り上げて、まとめます。

※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1
    Kotlin 1.9.0
    Compose Compiler 1.5.1
    org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
    org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3

スポンサーリンク

中間演算とは

中間演算は通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。

中間演算概要ダウンストリームFlow
withIndexデータにインデックを付与Flow(IndexValue(T))
mapデータを割り振り直すFlow(R)
filter条件に合うデータを通すFlow(T)
drop先頭から指定個数のデータを除外
dropWhile先頭から条件を満たす間のデータを除外
take先頭から指定個数のデータを取得
takeWhile先頭から条件を満たす間のデータを取得
zipFlowを統合(全部のデータが切り替わる毎に融合)Flow(R)
mergeFlowを統合(データをそのまま重ね合わせ)Flow(T)
combineFlowを統合(一部のデータが切り替わる毎に融合)Flow(R)
※アップストリームFlowはFlow(T)
※正式なmergeは「kotlinx-coroutines-core 1.9.0」で同様な関数が登場予定
スポンサーリンク

中間演算の構造

中間演算はアップストリームFlowの受信出力(collect)を受けて、何かの変換処理を伴ったダウンストリームFlowを返す構造になっています。

図にすると、ひとつなぎにFlowを追加した形です。

中間演算の構成

簡単な中間演算の例を示します。「何もしない」という変換処理を行うrelay関数です。

レシーバーがアップストリームFlowで、returnで返されるインスタンスがダウンストリームFlowになります。

fun <T> Flow<T>.relay(): Flow<T> {
    return flow {
        collect { value ->                  // レシーバーのcollect
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Relay   Data = ${value} [${_thName}, ${_jbCode}]")
            /*
			** ここで何もしない
			*/
            return@collect emit(value)      // returnで返すFlowのemit
        }
    }
}

中間演算は「コメント:ここで何もしない」の部分に変換処理が入ります。変換処理はラムダ式を引数で挿入する方法が採られます。

サンプルは「何もしない」ので、変換処理はありません。

//                          0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)

fun createFlowRelay() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).relay()
@Preview
@Composable
fun FlowRelay_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowRelay() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (Relay) !") }
}
48737 Compose !
50651 Start Receiver !
50659 Start Sender !
55665 Send    Data = 8 [DefaultDispatcher-worker-1, 188663320]
55666 Relay   Data = 8 [main, 145086577]  ⇐ mainは受信機のコルーチンコンテキスト
55667 Receive Data = 8 [main, 145086577]
56670 Send    Data = 4 [DefaultDispatcher-worker-1, 188663320]
56671 Relay   Data = 4 [main, 145086577]
56672 Receive Data = 4 [main, 145086577]
57675 Send    Data = 3 [DefaultDispatcher-worker-1, 188663320]
57675 Relay   Data = 3 [main, 145086577]
57676 Receive Data = 3 [main, 145086577]
58678 Send    Data = 9 [DefaultDispatcher-worker-1, 188663320]
58679 Relay   Data = 9 [main, 145086577]
58680 Receive Data = 9 [main, 145086577]
59683 Send    Data = 1 [DefaultDispatcher-worker-1, 188663320]
59684 Relay   Data = 1 [main, 145086577]
59685 Receive Data = 1 [main, 145086577]

中間演算は受信機のコルーチンコンテキストで実行される点に注意してください。

スポンサーリンク

zip

zipは「複数のFlowの受信出力(collect)を、全部が切り替わる毎に融合するFlow」を返します。

public fun <T1, T2, R> Flow<T1>.zip(
    other: Flow<T2>, 
    transform: suspend (T1, T2) -> R
): Flow<R> = zipImpl(this, other, transform)
zipImpl(Zip.ktより抜粋)
internal fun <T1, T2, R> zipImpl(
    flow: Flow<T1>, 
    flow2: Flow<T2>, 
    transform: suspend (T1, T2) -> R
): Flow<R> =
    unsafeFlow {
        coroutineScope {
            val second = produce<Any> {
                flow2.collect { value ->
                    return@collect channel.send(value ?: NULL)
                }
            }

            val collectJob = Job()
            (second as SendChannel<*>).invokeOnClose {
                // Optimization to avoid AFE allocation when the other flow is done
                if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
            }

            try {
                val scopeContext = coroutineContext
                val cnt = threadContextElements(scopeContext)
                withContextUndispatched(coroutineContext + collectJob, Unit) {
                    flow.collect { value ->
                        withContextUndispatched(scopeContext, Unit, cnt) {
                            val otherValue = second.receiveCatching().getOrElse {
                                throw it ?:AbortFlowException(this@unsafeFlow)
                            }
                            emit(transform(value, NULL.unbox(otherValue)))
                        }
                    }
                }
            } catch (e: AbortFlowException) {
                e.checkOwnership(owner = this@unsafeFlow)
            } finally {
                second.cancel()
            }
        }
    }

Flow#zipの構成

Flow#zipのタイミングチャート

以下はzipのサンプルです。

2つあるアップストリームFlowの出力は、Pairへ融合されてダウンストリームFlowから出力されます。

private val Data1 = arrayOf( 8, 4, 3, 9, 1)
private val Data2 = arrayOf(22,54,90,12,63,39,71,35,58,47)

val zFlow1 = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data1.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Dat1 = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).buffer(capacity = 0)

val zFlow2= flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5200)
    Data2.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Dat2 = ${it} [${_thName}, ${_jbCode}]")
        delay(500)
    }
}.flowOn(Dispatchers.Default).buffer(capacity = 0)

fun createFlowZip() = zFlow1.zip(zFlow2) { dat1, dat2 -> Pair(dat1, dat2) }
@Preview
@Composable
fun FlowZip_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Pair<Int, Int>> = remember { createFlowZip() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (zip) !") }
}

アップストリームFlowの内の一つが送受信を完了すると、その他のFlowは全て閉じられる(キャンセルされる)点に注意して下さい。

53552 Compose !
57282 Start Receiver !
57294 Start Sender !
57297 Start Sender !
62298 Send    Dat1 = 8 [DefaultDispatcher-worker-1, 188663320]
62499 Send    Dat2 = 22 [DefaultDispatcher-worker-1, 145086577]
62500 Receive Data = (8, 22) [main, 250988118]
63002 Send    Dat2 = 54 [DefaultDispatcher-worker-1, 145086577]
63300 Send    Dat1 = 4 [DefaultDispatcher-worker-1, 188663320]
63301 Receive Data = (4, 54) [main, 250988118]
63505 Send    Dat2 = 90 [DefaultDispatcher-worker-1, 145086577]
64302 Send    Dat1 = 3 [DefaultDispatcher-worker-1, 188663320]
64303 Receive Data = (3, 90) [main, 250988118]
64304 Send    Dat2 = 12 [DefaultDispatcher-worker-1, 145086577]
65305 Send    Dat1 = 9 [DefaultDispatcher-worker-1, 188663320]
65306 Receive Data = (9, 12) [main, 250988118]
65309 Send    Dat2 = 63 [DefaultDispatcher-worker-1, 145086577]
66310 Send    Dat1 = 1 [DefaultDispatcher-worker-1, 188663320]
66312 Receive Data = (1, 63) [main, 250988118]
66314 Send    Dat2 = 39 [DefaultDispatcher-worker-1, 145086577]
スポンサーリンク

merge

mergeは「複数のFlowの受信出力(collect)をそのまま重ね合わせたFlow」を返します。

mergeはAPI Ver 1.9.0で登場予定
ここで紹介しているmergeは、channelFlowのドキュメントに使用例として記載されている関数です。

正式なmergeは「kotlinx-coroutines-core Ver 1.9.0」で登場予定です。

fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
    launch {
        collect { send(it) }
    }
    other.collect { send(it) }
}

Flow#mergeの構成

Flow#mergeのタイミングチャート

以下はmergeのサンプルです。

2つあるアップストリームFlowの出力は、混在してダウンストリームFlowから出力されます。しかし、mergeの前後で値は同じです。

private val Data1 = arrayOf( 8, 4, 3)
private val Data2 = arrayOf(22,54,90,12,63,39)

val mFlow1 = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data1.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Dat1 = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).buffer(capacity = 0)

val mFlow2= flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5200)
    Data2.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Dat2 = ${it} [${_thName}, ${_jbCode}]")
        delay(500)
    }
}.flowOn(Dispatchers.Default).buffer(capacity = 0)

fun createFlowMerge() = mFlow1.merge(mFlow2)
@Preview
@Composable
fun FlowMerge_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowMerge() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (merge) !") }
}
69524 Compose !
71513 Start Receiver !
71525 Start Sender !
71527 Start Sender !
76532 Send    Dat1 = 8 [DefaultDispatcher-worker-1, 188663320]
76533 Receive Data = 8 [main, 145086577]
76730 Send    Dat2 = 22 [DefaultDispatcher-worker-1, 250988118]
76731 Receive Data = 22 [main, 145086577]
77233 Send    Dat2 = 54 [DefaultDispatcher-worker-1, 250988118]
77233 Receive Data = 54 [main, 145086577]
77536 Send    Dat1 = 4 [DefaultDispatcher-worker-1, 188663320]
77537 Receive Data = 4 [main, 145086577]
77737 Send    Dat2 = 90 [DefaultDispatcher-worker-1, 250988118]
77739 Receive Data = 90 [main, 145086577]
78242 Send    Dat2 = 12 [DefaultDispatcher-worker-1, 250988118]
78243 Receive Data = 12 [main, 145086577]
78541 Send    Dat1 = 3 [DefaultDispatcher-worker-1, 188663320]
78542 Receive Data = 3 [main, 145086577]
78746 Send    Dat2 = 63 [DefaultDispatcher-worker-1, 250988118]
78746 Receive Data = 63 [main, 145086577]
79250 Send    Dat2 = 39 [DefaultDispatcher-worker-1, 250988118]
79250 Receive Data = 39 [main, 145086577]
スポンサーリンク

combine

combineは「複数のFlowの受信出力(collect)を、一部が切り替わる毎に融合するFlow」を返します。

@JvmName("flowCombine")
public fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>, 
    transform: suspend (a: T1, b: T2) -> R
): Flow<R> = flow {
    combineInternal(
	    arrayOf(this@combine, flow), 
		nullArrayFactory(), 
		{ emit(transform(it[0] as T1, it[1] as T2)) }
	)
}
combineInternal(Combine.ktより抜粋)
@PublishedApi
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
    flows: Array<out Flow<T>>,
    arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
    transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
    val size = flows.size
    if (size == 0) return@flowScope // bail-out for empty input
    val latestValues = arrayOfNulls<Any?>(size)
    latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster than Array(size) { UNINITIALIZED }
    val resultChannel = Channel<Update>(size)
    val nonClosed = LocalAtomicInt(size)
    var remainingAbsentValues = size
    for (i in 0 until size) {
        // Coroutine per flow that keeps track of its value and sends result to downstream
        launch {
            try {
                flows[i].collect { value ->
                    resultChannel.send(Update(i, value))
                    yield() // Emulate fairness, giving each flow chance to emit
                }
            } finally {
                // Close the channel when there is no more flows
                if (nonClosed.decrementAndGet() == 0) {
                    resultChannel.close()
                }
            }
        }
    }

    /*
     * Batch-receive optimization: read updates in batches, but bail-out
     * as soon as we encountered two values from the same source
     */
    val lastReceivedEpoch = ByteArray(size)
    var currentEpoch: Byte = 0
    while (true) {
        ++currentEpoch
        // Start batch
        // The very first receive in epoch should be suspending
        var element = resultChannel.receiveCatching().getOrNull() ?: break // Channel is closed, nothing to do here
        while (true) {
            val index = element.index
            // Update values
            val previous = latestValues[index]
            latestValues[index] = element.value
            if (previous === UNINITIALIZED) --remainingAbsentValues
            // Check epoch
            // Received the second value from the same flow in the same epoch -- bail out
            if (lastReceivedEpoch[index] == currentEpoch) break
            lastReceivedEpoch[index] = currentEpoch
            element = resultChannel.tryReceive().getOrNull() ?: break
        }

        // Process batch result if there is enough data
        if (remainingAbsentValues == 0) {
            /*
             * If arrayFactory returns null, then we can avoid array copy because
             * it's our own safe transformer that immediately deconstructs the array
             */
            val results = arrayFactory()
            if (results == null) {
                transform(latestValues as Array<T>)
            } else {
                (latestValues as Array<T?>).copyInto(results)
                transform(results as Array<T>)
            }
        }
    }
}

Flow#combineの構成

Flow#combineのタイミングチャート

以下はcombineのサンプルです。

2つあるアップストリームFlowの出力は、Pairへ融合されてダウンストリームFlowから出力されます。

private val Data1 = arrayOf( 8, 4, 3)
private val Data2 = arrayOf(22,54,90,12,63,39)

val cFlow1 = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data1.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Dat1 = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default).buffer(capacity = 0)

val cFlow2= flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5200)
    Data2.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Dat2 = ${it} [${_thName}, ${_jbCode}]")
        delay(500)
    }
}.flowOn(Dispatchers.Default).buffer(capacity = 0)

fun createFlowCombine() = cFlow1.combine(cFlow2) {
        dat1, dat2 -> Pair(dat1, dat2)
}
@Preview
@Composable
fun FlowCombine_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Pair<Int, Int>> = remember { createFlowCombine() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test (combine) !") }
}
50012 Compose !
52754 Start Receiver !
52769 Start Sender !
52770 Start Sender !
57777 Send    Dat1 = 8 [DefaultDispatcher-worker-2, 188663320]
57977 Send    Dat2 = 22 [DefaultDispatcher-worker-2, 145086577]
57978 Receive Data = (8, 22) [main, 250988118]
58481 Send    Dat2 = 54 [DefaultDispatcher-worker-2, 145086577]
58482 Receive Data = (8, 54) [main, 250988118]
58780 Send    Dat1 = 4 [DefaultDispatcher-worker-2, 188663320]
58781 Receive Data = (4, 54) [main, 250988118]
58983 Send    Dat2 = 90 [DefaultDispatcher-worker-2, 145086577]
58984 Receive Data = (4, 90) [main, 250988118]
59488 Send    Dat2 = 12 [DefaultDispatcher-worker-2, 145086577]
59489 Receive Data = (4, 12) [main, 250988118]
59784 Send    Dat1 = 3 [DefaultDispatcher-worker-2, 188663320]
59785 Receive Data = (3, 12) [main, 250988118]
59992 Send    Dat2 = 63 [DefaultDispatcher-worker-2, 145086577]
59993 Receive Data = (3, 63) [main, 250988118]
60495 Send    Dat2 = 39 [DefaultDispatcher-worker-2, 145086577]
60496 Receive Data = (3, 39) [main, 250988118]
スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するasyncビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。 ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。 それにより、非同期処理の連携が容易になります。 今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。 ※環境:Android Studio Flamingo | 2022.2.1    :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Produce」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Flow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「SharedFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「StateFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
Flowはメンバー関数や拡張関数で様々な機能を提供しています。 これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。 中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。 終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。 今回は、この「Flowの中間演算」のwithIndex、map、filter、drop、takeを取り上げて、まとめます。 ※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-androi ...
スポンサーリンク