Coroutine:Flow(SafeFlow)

投稿日:  更新日:

KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。

Channel、Produce、Flow、SharedFlow、StateFlowなどです。

これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。

プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。

ですので、各々を比較しつつ、まとめました。

この記事は「Flow」について、まとめたものです。

※環境:Android Studio Koala Feature Drop | 2024.1.2
    Kotlin 1.9.0
    Compose Compiler 1.5.1
    org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
    org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3

スポンサーリンク

Flowとは

Flowはコルーチン間で複数の連続したメッセージを送受信する仕組みです。

コールドストリーム(Cold Stream)の通信経路を提供します。

Flowの構成

※メッセージの送受信については「Coroutine:コルーチン間でメッセージの送受信」を参照

Flowは、表のような特徴を持ちます。

ストリームタイプ通信経路状態の監視※
Produce / ChannelHotデータ分岐
×
Flow(SafeFlow)Cold1対1
SharedFlowHotブロードキャスト
StateFlow
※状態の監視 :再Composeのスケジューリングが可能かどうか

なお、SharedFlowとStateFlowはFlowを継承しています。よって、Flowは両者の基底(親またはスーパー)クラスです。

Flowはインターフェイスなので、実装はファクトリー関数(flow関数など)により行われる点に注意してください。

スポンサーリンク

基本的な例とストリームタイプ

以下はFlowの基本的な例です。flow関数を用いて実装しています。

サンプルの動作は単純で、5つの整数値(配列:Data)をワーカーからメインスレッドへ送ります。

送信機(Sender)は処理の開始(コルーチンの起動)から5000ms後に、データの送信(emit)を開始します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//        delay(1000)
    }
}.flowOn(Dispatchers.Default)

受信機(Receiver)はボタンの押下でデータの要求・受信(collect)を開始します。

@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test !") }
}
補助関数
fun getThreadName(): String = Thread.currentThread().name
fun getMilliTime5(): String = "%05d".format(System.currentTimeMillis() % 100000)

送信機は受信機からのデータの要求を受けて処理を開始しています。これをコールドストリーム(Cold Stream)といいます。

Flowの基本的な例

データの送信よりも前に受信(要求)が行われた場合、受信は休止して送信を待ちます。

46656 Compose !        <--- createFlow()
51419 Start Receiver ! <--- ボタン押下、データの要求(collect)
51427 Start Sender !   <--- 送信処理の開始
56433 Send    Data = 8 [DefaultDispatcher-worker-1, 29056681]
56433 Receive Data = 8 [main, 55901230]
56434 Send    Data = 4 [DefaultDispatcher-worker-1, 29056681]
56434 Receive Data = 4 [main, 55901230]
56435 Send    Data = 3 [DefaultDispatcher-worker-1, 29056681]
56435 Receive Data = 3 [main, 55901230]
56436 Send    Data = 9 [DefaultDispatcher-worker-1, 29056681]
56436 Receive Data = 9 [main, 55901230]
56437 Send    Data = 1 [DefaultDispatcher-worker-1, 29056681]
56437 Receive Data = 1 [main, 55901230]
※左端の数値はミリ秒
スポンサーリンク

送信間隔>要求・受信間隔

「送信間隔>要求・受信間隔」(送信が遅い)のような通信経路の場合、受信は送信間隔に合わせて休止します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)	// 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)	// 送信間隔:1000ms
    }
}.flowOn(Dispatchers.Default)
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {		// 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000) // 受信間隔:0ms
                }
            }
        }
    ) { Text(text = "Flow test !") }
}

送信間隔>要求・受信間隔

17653 Compose !         <--- createFlow()
20771 Start Receiver !	<--- ボタン押下、データの要求(collect)
20779 Start Sender !    <--- 送信処理の開始
25786 Send    Data = 8 [DefaultDispatcher-worker-1, 29056681]
25787 Receive Data = 8 [main, 55901230]
26790 Send    Data = 4 [DefaultDispatcher-worker-1, 29056681]
26791 Receive Data = 4 [main, 55901230]
27796 Send    Data = 3 [DefaultDispatcher-worker-1, 29056681]
27796 Receive Data = 3 [main, 55901230]
28798 Receive Data = 9 [main, 55901230]
28798 Send    Data = 9 [DefaultDispatcher-worker-1, 29056681]
29804 Receive Data = 1 [main, 55901230]
29804 Send    Data = 1 [DefaultDispatcher-worker-1, 29056681]
※左端の数値はミリ秒

Flow#collectはSuspend関数です。ノンブロッキング動作をします。ですので、要求したデータが取得できない場合は、データが取得できる状態になる(emitされる)まで休止します。

スポンサーリンク

送出間隔<要求・受信間隔

「送信間隔<要求・受信間隔」(受信が遅い)のような通信経路の場合、送信データはFlow内のバッファーへ保持され、受信間隔に合わせて取り出されます。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)	// 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//        delay(1000)	// 送信間隔:0ms
    }
}.flowOn(Dispatchers.Default)
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {		// 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000) // 受信間隔:1000ms
                }
            }
        }
    ) { Text(text = "Flow test !") }
}

送信間隔<要求・受信間隔

42513 Compose !        <--- createFlow()
44604 Start Receiver ! <--- ボタン押下、データの要求(collect)
44610 Start Sender !   <--- 送信処理の開始
49619 Send    Data = 8 [DefaultDispatcher-worker-1, 29056681]
49620 Send    Data = 4 [DefaultDispatcher-worker-1, 29056681]
49621 Receive Data = 8 [main, 55901230]
49622 Send    Data = 3 [DefaultDispatcher-worker-1, 29056681]
49625 Send    Data = 9 [DefaultDispatcher-worker-1, 29056681]
49629 Send    Data = 1 [DefaultDispatcher-worker-1, 29056681]
50628 Receive Data = 4 [main, 55901230]
51633 Receive Data = 3 [main, 55901230]
52636 Receive Data = 9 [main, 55901230]
53641 Receive Data = 1 [main, 55901230]
※左端の数値はミリ秒

Flowはバッファーを持ちます。デフォルトサイズは64です。サイズはFlow#buffer拡張関数(後述)により変更が可能です。

スポンサーリンク

通信経路

通信経路の最小構成は「送信機:受信機=1:1」です。

ここに、受信機を追加すると、対応する送信機が起動され、「送信機:受信機=1:1」の新たな通信経路が構築されます。

Flowの通信経路

各々の通信経路は独立しています。通信経路間で影響を及ぼす動作(例えば休止)は行われません。

※以下のサンプルは「送信間隔<要求・受信間隔」の場合と同じ

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)	// 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//        delay(1000)	// 送信間隔:0ms
    }
}.flowOn(Dispatchers.Default)
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {		// 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000) // 受信間隔:1000ms
                }
            }
        }
    ) { Text(text = "Flow test !") }
}
27512 Compose !
29146 Start Receiver !
29155 Start Sender !
34160 Send    Data = 8 [DefaultDispatcher-worker-1, 8529103]
34161 Receive Data = 8 [main, 96136796]
34161 Send    Data = 4 [DefaultDispatcher-worker-1, 8529103]
34161 Send    Data = 3 [DefaultDispatcher-worker-1, 8529103]
34162 Send    Data = 9 [DefaultDispatcher-worker-1, 8529103]
34163 Send    Data = 1 [DefaultDispatcher-worker-1, 8529103]
35164 Receive Data = 4 [main, 96136796]
36167 Receive Data = 3 [main, 96136796]
37169 Receive Data = 9 [main, 96136796]
38172 Receive Data = 1 [main, 96136796]
41561 Start Receiver !
41562 Start Sender !
46565 Send    Data = 8 [DefaultDispatcher-worker-1, 237387499]
46565 Send    Data = 4 [DefaultDispatcher-worker-1, 237387499]
46565 Receive Data = 8 [main, 37131336]
46566 Send    Data = 3 [DefaultDispatcher-worker-1, 237387499]
46567 Send    Data = 9 [DefaultDispatcher-worker-1, 237387499]
46568 Send    Data = 1 [DefaultDispatcher-worker-1, 237387499]
47570 Receive Data = 4 [main, 37131336]
48574 Receive Data = 3 [main, 37131336]
49581 Receive Data = 9 [main, 37131336]
50585 Receive Data = 1 [main, 37131336]
※左端の数値はミリ秒

なお、並列(通信経路を同時刻に構築)に動作させるても、経路の独立は保たれます。

スポンサーリンク

バッファー

Flowはバッファーを持っています。

バッファーの役割

バッファーは送信機と受信機の中間(Flow内)にある一時的な記憶領域です。送信機から受信機へ流れるデータの渋滞を緩和します。

送信機は、データの要求が無くても、自由なタイミングでバッファーへデータを送出できるので、送信が渋滞しません。ただし、バッファーに空きがある場合です。

受信機は、データの送信が無くても、自由なタイミングでバッファーからデータを取得できるので、受信が渋滞しません。ただし、バッファーにデータがある場合です。

バッファーの構成

バッファーは図のような構成になっています。

バッファの構成(Flow)

capacityがバッファーサイズを示しています。デフォルトは「capacity = 64(BUFFERED)」です。ですので、何も指定しなければ「バッファー有り」になっています。

※「オーバーフロー」については後述

バッファーの制御

バッファーはFlow#buffer拡張関数の引数で制御できます。

@Suppress("NAME_SHADOWING")
public fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T> { ... }
引数概要
capacityIntバッファーサイズ(指定された値)
送受信のタイプ
・Rendezvous(0)
・Buffered(64)
・Conflated(-1)
・Unlimited(Int.MAX_VALUE)
onBufferOverflowBufferOverflow オーバーフロー時の扱い

capacityの値は、「バッファーのサイズ」と「送受信のタイプ(特別な動作)」の2つの役割が割り振られています。※「送受信のタイプ」については後述

バッファーの動作

バッファーの動作をサンプルで示します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//        delay(1000)
    }
}.flowOn(Dispatchers.Default)
    .buffer(capacity = 3)
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test !") }
}

1番目のデータ「8」はバッファーを介さずに送受信されます。バッファーを利用するのは2番目以降のデータです。

71586 Compose !
74689 Start Receiver !
74696 Start Sender !
79702 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
79702 Receive Data = 8 [main, 120720841]
79703 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152] 送信機⇒Buffer
79704 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
79705 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
80707 Receive Data = 4 [main, 120720841]                       Buffer⇒受信機
80708 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
81712 Receive Data = 3 [main, 120720841]
82716 Receive Data = 9 [main, 120720841]
83721 Receive Data = 1 [main, 120720841]

送信機⇒Buffer(送信側)

送信されたデータはバッファーに入り、新しいデータが送信される毎に「積み上げる方向」へシフトされて行きます。

オーバーフローになると送信は休止します。

なお、indexは受信時の取り出し位置です。

ステップ1⇒2⇒3⇒4
送信機⇒Buffer Step1
送信機⇒Buffer Step2
送信機⇒Buffer Step3
送信機⇒Buffer Step4

Buffer⇒受信機(受信側)

受信されるデータはindexの位置から取り出されます。

データの取り出されたバッファは無効になり、新しいデータが送信されることにより再び埋められます。

ステップ1⇒2⇒3⇒4
Buffer⇒受信機 Step1
Buffer⇒受信機 Step2
Buffer⇒受信機 Step3
Buffer⇒受信機 Step4
スポンサーリンク

送受信のタイプ

capacityはバッファーのサイズを指定する引数ですが、値の一部にパラメータ名が定義されており、「送受信のタイプ」の役割を合わせ持ちます。

タイプcapacityの値
( )内はパラメータ名
振舞い
Rendezvous0(RENDEZVOUS)デフォルトバッファー無し
Buffered64(BUFFERED)バッファーサイズ
Conflated-1(CONFLATED)特別な動作
UnlimitedInt.MAX_VALUE(UNLIMITED)バッファーサイズ

この中で、「Conflated」は特別な動作をします。その他は、ただのサイズです。

private val Data = arrayOf(8, 4, 3, 9, 1)
//private val Data2 = Array(100) { it }

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//        delay(1000)
    }
}.flowOn(Dispatchers.Default)
    .buffer(capacity = RENDEZVOUS)  // 0
//    .buffer(capacity = 3)           // 3
//    .buffer(capacity = BUFFERED)    // 64(デフォルト)
//    .buffer(capacity = CONFLATED)   // -1
//    .buffer(capacity = UNLIMITED)   // Int.MAX_VALUE
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test !") }
}

Rendezvous

バッファサイズは「0(無し)」です。

受信側は送信側へ直にデータを要求し、送信側は受信側へ直にデータを渡します。

送信データが無い時、受信側は休止して送信を待ちます。

受信の要求が無い時、送信側は休止して要求を待ちます。

56780 Compose !
59530 Start Receiver !
59540 Start Sender !
64546 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
64546 Receive Data = 8 [main, 120720841]
65553 Receive Data = 4 [main, 120720841]
65553 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
66557 Receive Data = 3 [main, 120720841]
66557 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
67560 Receive Data = 9 [main, 120720841]
67560 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
68564 Receive Data = 1 [main, 120720841]
68564 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]

Buffered

バッファサイズは「64」です。

送信側はバッファが空いている限りデータをバッファへ格納します。バッファが埋まれば休止します。

受信側はバッファーにデータがあればバッファーから受信します。バッファが空になれば休止します。

15754 Compose !
18477 Start Receiver !
18484 Start Sender !
23494 Send    Data = 0 [DefaultDispatcher-worker-1, 169843152]
23494 Receive Data = 0 [main, 120720841]                        ⇐ バッファーを介さない
23494 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
23498 Send    Data = 2 [DefaultDispatcher-worker-1, 169843152]
    :
    :
23550 Send    Data = 63 [DefaultDispatcher-worker-1, 169843152]
23551 Send    Data = 64 [DefaultDispatcher-worker-1, 169843152]  ⇐ 64個(Data = 1~64)
24499 Receive Data = 1 [main, 120720841]
24499 Send    Data = 65 [DefaultDispatcher-worker-1, 169843152]
25502 Send    Data = 66 [DefaultDispatcher-worker-1, 169843152]
25502 Receive Data = 2 [main, 120720841]
26506 Send    Data = 67 [DefaultDispatcher-worker-1, 169843152]
26506 Receive Data = 3 [main, 120720841]
27514 Receive Data = 4 [main, 120720841]
27514 Send    Data = 68 [DefaultDispatcher-worker-1, 169843152]
28518 Receive Data = 5 [main, 120720841]
28519 Send    Data = 69 [DefaultDispatcher-worker-1, 169843152]
    :
    :

デフォルトは「capacity = BUFFERED」です。ですので、「基本的な例」で示した例は全てBufferedになります。

Conflated

バッファサイズは「1」です。特殊な動作をします。

送信側からバッファーへ送信されたデータはバッファー上で上書きされます。受信側はバッファーから上書き後の最終データを受信します。

81200 Compose !
83734 Start Receiver !
83744 Start Sender !
88751 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
88752 Receive Data = 8 [main, 120720841]                       ⇐ バッファーを介さない
88752 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
88755 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
88756 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
88757 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152] ⇐ 最終データ
89757 Receive Data = 1 [main, 120720841]

Unlimited

バッファサイズは「Int.MAX_VALUE(2147483647)」です。「サイズは無制限」になります。

動作はBufferedと同じです。

ただし、バッファーのデータ数が無制限なので、メモリーのオーバーフローに注意してください。

32660 Compose !
34685 Start Receiver !
34693 Start Sender !
39698 Send    Data = 0 [DefaultDispatcher-worker-1, 169843152]
39698 Receive Data = 0 [main, 120720841]                       ⇐ バッファーを介さない
39698 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
39699 Send    Data = 2 [DefaultDispatcher-worker-1, 169843152]
39701 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
    :
    :
39784 Send    Data = 98 [DefaultDispatcher-worker-1, 169843152]
39784 Send    Data = 99 [DefaultDispatcher-worker-1, 169843152]
    :
    :    ⇐ 2147483647個まで可能(サンプルは100で制限)
スポンサーリンク

オーバーフロー

オーバーフローは、バッファーに空きがない状態(全て埋まっている、バッファーサイズ=0)で、新たなデータが送信(入力)された場合に発生します。

オーバーフローになった時、「新たなデータの扱い」が問題になります。

引数onBufferOverflowは、この「新たなデータの扱い」を指定します。

パレメータ
(BufferOverflow.***)
動作一時休止
SUSPENDバッファーに空きができるまで休止して待つする
DROP_OLDESTバッファーの最も古い値を削除し、入力された値を追加しない
DROP_LATESTバッファーの値を保持し、入力された値を破棄

バッファーサイズ > 0

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEachIndexed { index, data ->
        emit(data)
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${data} [${_thName}, ${_jbCode}]")
//        delay(1000)
    }
}.flowOn(Dispatchers.Default)
    .buffer(capacity = 3, onBufferOverflow = BufferOverflow.SUSPEND)
//    .buffer(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST)
//    .buffer(capacity = 3, onBufferOverflow = BufferOverflow.DROP_LATEST)
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {		// 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000) // 受信間隔:1000ms
                }
            }
        }
    ) { Text(text = "Flow test !") }
}
SUSPENDDROP_OLDESTDROP_LATEST
BufferOverflow時のSUSPEND動作

18883 Compose !
20889 Start Receiver !
20897 Start Sender !
25905 Send    Data = 8 [DefaultDispatcher-worker-1, 73716183]
25905 Receive Data = 8 [main, 170330820]
25905 Send    Data = 4 [DefaultDispatcher-worker-1, 73716183]
25906 Send    Data = 3 [DefaultDispatcher-worker-1, 73716183]
25908 Send    Data = 9 [DefaultDispatcher-worker-1, 73716183]
                                                              ⇐ 空き無し、emitは休止
26911 Receive Data = 4 [main, 170330820]                      ⇐ 取り出し
26912 Send    Data = 1 [DefaultDispatcher-worker-1, 73716183] ⇐ 空きあり、emitは復帰・1を追加
27915 Receive Data = 3 [main, 170330820]
28920 Receive Data = 9 [main, 170330820]
29923 Receive Data = 1 [main, 170330820]
※左端の数値はミリ秒
BufferOverflow時のDROP_OLDEST動作

45740 Compose !
47574 Start Receiver !
47586 Start Sender !
52593 Send    Data = 8 [DefaultDispatcher-worker-2, 73716183]
52594 Receive Data = 8 [main, 170330820]
52594 Send    Data = 4 [DefaultDispatcher-worker-2, 73716183]
52595 Send    Data = 3 [DefaultDispatcher-worker-2, 73716183]
52595 Send    Data = 9 [DefaultDispatcher-worker-2, 73716183]
                                                              ⇐ 空き無し
52596 Send    Data = 1 [DefaultDispatcher-worker-2, 73716183] ⇐ 4を削除、1を追加
53600 Receive Data = 3 [main, 170330820]
54604 Receive Data = 9 [main, 170330820]
55611 Receive Data = 1 [main, 170330820]
※左端の数値はミリ秒
BufferOverflow時のDROP_LATEST動作

00560 Compose !
02802 Start Receiver !
02809 Start Sender !
07817 Send    Data = 8 [DefaultDispatcher-worker-1, 73716183]
07818 Receive Data = 8 [main, 170330820]
07818 Send    Data = 4 [DefaultDispatcher-worker-1, 73716183]
07819 Send    Data = 3 [DefaultDispatcher-worker-1, 73716183]
07819 Send    Data = 9 [DefaultDispatcher-worker-1, 73716183]
                                                              ⇐ 空き無し
07820 Send    Data = 1 [DefaultDispatcher-worker-1, 73716183] ⇐ 1を破棄
08821 Receive Data = 4 [main, 170330820]
09823 Receive Data = 3 [main, 170330820]
10825 Receive Data = 9 [main, 170330820]
※左端の数値はミリ秒

バッファーサイズ = 0

「バッファーサイズ(capacity)= 0」の時は、SUSUPEND以外を推奨しません。

意図が不明な動作をします。原因はrequireの判定にあると思われます。
※今後、改善されることを望みます!

@Suppress("NAME_SHADOWING")
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
    require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
        "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
    }
    require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
        "CONFLATED capacity cannot be used with non-default onBufferOverflow"
    }
    ...
}
 capacity = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST 

「capacity = 1」に強制されます。

62804 Compose !
65080 Start Receiver !
65086 Start Sender !
70091 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
70092 Receive Data = 8 [main, 120720841]
70092 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
70092 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
70093 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
70094 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
71096 Receive Data = 1 [main, 120720841]
 capacity = 0, onBufferOverflow = BufferOverflow.DROP_LATEST 

「capacity = 1」に強制されます。

91083 Compose !
93011 Start Receiver !
93017 Start Sender !
98024 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
98024 Receive Data = 8 [main, 120720841]
98025 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
98027 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
98027 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
98027 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
99030 Receive Data = 4 [main, 120720841]
スポンサーリンク

CorutineContextの変更

送信側コルーチンのCoroutineContextは、受信側で指定されたCoroutineContextが渡されます。

flowOnなし

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {	// 	デフォルトはDispatchers.Main
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test !") }
}
82492 Compose !
92397 Start Receiver !
92397 Start Sender !
97404 Receive Data = 8 [main, 8529103]
97404 Send    Data = 8 [main, 8529103]
98407 Receive Data = 4 [main, 8529103]
98408 Send    Data = 4 [main, 8529103]
99411 Receive Data = 3 [main, 8529103]
99412 Send    Data = 3 [main, 8529103]
00415 Receive Data = 9 [main, 8529103]
00416 Send    Data = 9 [main, 8529103]
01421 Receive Data = 1 [main, 8529103]
01422 Send    Data = 1 [main, 8529103]

Flow#flowOn関数を使用することで、送信側コルーチンのCoroutineContextを変更できます。以下は、スレッドプールを変更した例です。

flowOnあり

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default)
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {	// 	デフォルトはDispatchers.Main
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test !") }
}
57170 Compose !
58754 Start Receiver !
58763 Start Sender !
63767 Send    Data = 8 [DefaultDispatcher-worker-1, 8529103]
63767 Receive Data = 8 [main, 96136796]
64770 Send    Data = 4 [DefaultDispatcher-worker-1, 8529103]
64770 Receive Data = 4 [main, 96136796]
65773 Send    Data = 3 [DefaultDispatcher-worker-1, 8529103]
65774 Receive Data = 3 [main, 96136796]
66776 Send    Data = 9 [DefaultDispatcher-worker-1, 8529103]
66777 Receive Data = 9 [main, 96136796]
67781 Send    Data = 1 [DefaultDispatcher-worker-1, 8529103]
67782 Receive Data = 1 [main, 96136796]
スポンサーリンク

他の実装方法

Flowの実装方法はflow( )関数以外に複数あります。

以下の例は、同じストリームデータ(Int型)を送受信する通信経路を作成します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlowA(): Flow<Int> = flowOf(8, 4, 3, 9, 1)
fun createFlowB(): Flow<Int> = Data.asFlow()
fun createFlowC(): Flow<Int> = flow<Int> {  // この記事のサンプルで登場
    Data.forEach { emit(it) }
    // 「レジーバー:FlowCollector」のコマンドが利用可能
}
fun createFlowD(): Flow<Int> = channelFlow<Int> {
    Data.forEach { send(it) }
    // 「レシーバー:ProducerScope」のコマンドが利用可能
}
@Preview
@Composable
fun Flow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlowA() }
//    datCh: Flow<Int> = remember { createFlowB() }
//    datCh: Flow<Int> = remember { createFlowC() }
//    datCh: Flow<Int> = remember { createFlowD() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {     // 要求・受信
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Flow test !") }
}
97301 Compose !
99117 Start Receiver !
99121 Receive Data = 8 [main, 166465328]
99122 Receive Data = 4 [main, 166465328]
99122 Receive Data = 3 [main, 166465328]
99123 Receive Data = 9 [main, 166465328]
99124 Receive Data = 1 [main, 166465328]
スポンサーリンク

状態の監視

Flowによって送受信されるストリームデータは、値の監視が行われません。ですので、再Composeはスケジューリングされません。

再Composeのスケジューリングを可能にするためには、ストリームデータを監視データへ変換します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createFlow() = flow {
    Log.i(TAG, "${getMilliTime5()} Start Sender !")
    delay(5000)
    Data.forEach {
        emit(it)      // 送信
        val _thName = getThreadName()
        val _jbCode = currentCoroutineContext().job.hashCode()
        Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
        delay(1000)
    }
}.flowOn(Dispatchers.Default)
@Preview
@Composable
fun Flow_Monitor(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: Flow<Int> = remember { createFlow() }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    // ストリームデータ ⇒ 監視データ
    val _data = remember { mutableStateOf(0) }
    LaunchedEffect(Unit) {
        datCh.collect {     	// 要求・受信
            _data.value = it	// 監視データへ変換
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
        }
    }

    Text(
        text = "${getMilliTime5()} Receive Data = ${_data.value}",
        fontSize = 20.sp
    )
}
03115 Compose !
03340 Start Sender !
08348 Send    Data = 8 [DefaultDispatcher-worker-1, 32015087]
08348 Receive Data = 8 [main, 197050876]
08357 Compose !
09350 Send    Data = 4 [DefaultDispatcher-worker-1, 32015087]
09351 Receive Data = 4 [main, 197050876]
09357 Compose !
10352 Send    Data = 3 [DefaultDispatcher-worker-1, 32015087]
10353 Receive Data = 3 [main, 197050876]
10357 Compose !
11354 Send    Data = 9 [DefaultDispatcher-worker-1, 32015087]
11356 Receive Data = 9 [main, 197050876]
11358 Compose !
12357 Send    Data = 1 [DefaultDispatcher-worker-1, 32015087]
12358 Receive Data = 1 [main, 197050876]
12373 Compose !

ちなみに、StateFlowは「State(状態の監視)+Flow(通信経路:SharedFlow)」です。StateFlowを用いれば、ここで紹介した「ストリームデータを監視データへ変換」する作業は必要ありません。

再Composeのスケジューリングを可能にする方法として、StateFlowの利用が定石になています。※詳細は「Coroutine:StateFlow」を参照

スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するasyncビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。 ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。 それにより、非同期処理の連携が容易になります。 今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。 ※環境:Android Studio Flamingo | 2022.2.1    :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4 ...
KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Produce」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「SharedFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「StateFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
スポンサーリンク