Coroutine:SharedFlow

投稿日:  更新日:

KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。

Channel、Produce、Flow、SharedFlow、StateFlowなどです。

これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。

プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。

ですので、各々を比較しつつ、まとめました。

この記事は「SharedFlow」について、まとめたものです。

※環境:Android Studio Koala Feature Drop | 2024.1.2
    Kotlin 1.9.0
    Compose Compiler 1.5.1
    org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
    org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3

スポンサーリンク

SharedFlowとは

SharedFlowはコルーチン間で複数の連続したメッセージを送受信する仕組みです。

ホットストリーム(Hot Stream)の通信経路を提供します。

SharedFlowの構成

※メッセージの送受信については「Coroutine:コルーチン間でメッセージの送受信」を参照

SharedFlowは、表のような特徴を持ちます。

ストリームタイプ通信経路状態の監視※
Produce / ChannelHotデータ分岐
×
Flow(SafeFlow)Cold1対1
SharedFlowHotブロードキャスト
StateFlow
※状態の監視 :再Composeのスケジューリングが可能かどうか

なお、SharedFlowとStateFlowはFlowを継承しています。よって、Flowは両者の基底クラスです。

スポンサーリンク

基本的な例とストリームタイプ

以下はSharedFlowの基本的な例です。

サンプルの動作は単純で、5つの整数値(配列:Data)をワーカーからメインスレッドへ送ります。

送信機(Sender)は処理の開始(コルーチンの起動)から5000ms後に、データの送信(emit)を開始します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
    MutableSharedFlow<Int>().apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//                delay(1000)
            }
        }
    }.asSharedFlow()

受信機(Receiver)はボタンの押下でデータの要求・受信(collect)を開始します。

@Preview
@Composable
fun SharedFlow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "SharedFlow test !") }
}
補助関数
fun getThreadName(): String = Thread.currentThread().name
fun getMilliTime5(): String = "%05d".format(System.currentTimeMillis() % 100000)

送信機は受信機からのデータの要求を待たずに処理を開始しています。これをホットストリーム(Hot Stream)といいます。

ただし、要求(collect)よりも前に発行された送信(emit)は破棄されてしまうので、注意してください。

SharedFlowの基本的な例(NGなケース)

19674 Compose !        <--- createSharedFlow()
19674 Start sender !   <--- 送信処理の開始
24681 Send    Data = 8 [DefaultDispatcher-worker-1, 46343680]
24682 Send    Data = 4 [DefaultDispatcher-worker-1, 46343680]
24683 Send    Data = 3 [DefaultDispatcher-worker-1, 46343680]
24687 Send    Data = 9 [DefaultDispatcher-worker-1, 46343680]
24688 Send    Data = 1 [DefaultDispatcher-worker-1, 46343680]
29042 Start Receiver ! <--- ボタン押下、データの要求(collect)

データの取得(正常なデータの受け渡し)を行いたければ、要求(collect)を送信(emit)の前に行う必要があります。これにより通信経路が確立されます。

SharedFlowの基本的な例(OKなケース)

17238 Compose !        <--- createSharedFlow()
17238 Start sender !   <--- 送信処理の開始
19995 Start Receiver ! <--- ボタン押下、データの要求(collect)
22246 Receive Data = 8 [main, 73716183]
22246 Send    Data = 8 [DefaultDispatcher-worker-1, 170330820]
22248 Receive Data = 4 [main, 73716183]
22248 Send    Data = 4 [DefaultDispatcher-worker-1, 170330820]
22250 Receive Data = 3 [main, 73716183]
22250 Send    Data = 3 [DefaultDispatcher-worker-1, 170330820]
22252 Receive Data = 9 [main, 73716183]
22253 Send    Data = 9 [DefaultDispatcher-worker-1, 170330820]
22254 Receive Data = 1 [main, 73716183]
22254 Send    Data = 1 [DefaultDispatcher-worker-1, 170330820]
スポンサーリンク

送信間隔>要求・受信間隔

「送信間隔>要求・受信間隔」(送信が遅い)のような通信経路の場合、受信は送信間隔に合わせて休止します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
    MutableSharedFlow<Int>().apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
                delay(1000)
            }
        }
    }.asSharedFlow()
@Preview
@Composable
fun SharedFlow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "SharedFlow test !") }
}

送信間隔>要求・受信間隔(SharedFloe)

07999 Start sender !    <--- 送信処理の開始
08001 Compose !         <--- createSharedFlow()
10826 Start Receiver !	<--- ボタン押下、データの要求(collect)
13005 Receive Data = 8 [main, 243112817]
13006 Send    Data = 8 [DefaultDispatcher-worker-2, 188599638]
14009 Receive Data = 4 [main, 243112817]
14009 Send    Data = 4 [DefaultDispatcher-worker-2, 188599638]
15012 Receive Data = 3 [main, 243112817]
15012 Send    Data = 3 [DefaultDispatcher-worker-2, 188599638]
16015 Receive Data = 9 [main, 243112817]
16015 Send    Data = 9 [DefaultDispatcher-worker-2, 188599638]
17018 Receive Data = 1 [main, 243112817]
17018 Send    Data = 1 [DefaultDispatcher-worker-2, 188599638]

SharedFlow#collectはSuspend関数です。ノンブロッキング動作をします。ですので、要求したデータが取得できない場合は、データが取得できる状態になる(emitされる)まで休止します。

スポンサーリンク

送信間隔<要求・受信間隔

「送信間隔<受信間隔」(受信が遅い)のような通信経路の場合、送信は受信間隔に合わせて休止します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
    MutableSharedFlow<Int>().apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//                delay(1000)
            }
        }
    }.asSharedFlow()
@Preview
@Composable
fun SharedFlow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000)
                }
            }
        }
    ) { Text(text = "SharedFlow test !") }
}

送信間隔<要求・受信間隔(SharedFlow)

10862 Compose !         <--- createSharedFlow()
10863 Start sender !    <--- 送信処理の開始
13299 Start Receiver !	<--- ボタン押下、データの要求(collect)
15869 Receive Data = 8 [main, 73716183]
15869 Send    Data = 8 [DefaultDispatcher-worker-2, 170330820]
16872 Receive Data = 4 [main, 73716183]
16872 Send    Data = 4 [DefaultDispatcher-worker-2, 170330820]
17874 Receive Data = 3 [main, 73716183]
17874 Send    Data = 3 [DefaultDispatcher-worker-2, 170330820]
18876 Receive Data = 9 [main, 73716183]
18877 Send    Data = 9 [DefaultDispatcher-worker-2, 170330820]
19879 Receive Data = 1 [main, 73716183]
19879 Send    Data = 1 [DefaultDispatcher-worker-2, 170330820]

Flow#emitはSuspend関数です。ノンブロッキング動作をします。ですので、データが要求されていない場合は、要求される(collectされる)まで休止します。

通信経路

通信経路の最小構成は「送信機:受信機=1:1」です。

ここに、受信機を追加して、「送信機:受信機=1:多」のマルチレシーバー構成に出来ます。

SharedFlowの通信経路

この場合、送信機は複数の受信機で共有され(Shared)、ストリームデータは全ての受信機へブロードキャスト(一斉送信)されます。

ブロードキャストは「全ての受信機がデータを受信して、送信は完了する動作(送受信サイクル)」で成り立っています。これは、送信機と受信機が休止を使って、歩調を合わせることで実現されます。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
    MutableSharedFlow<Int>().apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//                delay(1000)
            }
        }
    }.asSharedFlow()
@Preview
@Composable
fun SharedFlow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) },
    interval: Long = 1000
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(interval)
                }
            }
        }
    ) { Text(text = "SharedFlow test ! Interval=${interval}") }
}
24202 Start sender !
24202 Compose !
24255 Compose !
25414 Start Receiver !
27462 Start Receiver !
29207 Receive Data = 8 [main, 159137840]
29207 Receive Data = 8 [main, 177611945]
29208 Send    Data = 8 [DefaultDispatcher-worker-1, 80693806]
30212 Receive Data = 4 [main, 159137840]
32212 Receive Data = 4 [main, 177611945]
32213 Send    Data = 4 [DefaultDispatcher-worker-1, 80693806]
33216 Receive Data = 3 [main, 159137840]
35215 Receive Data = 3 [main, 177611945]
35215 Send    Data = 3 [DefaultDispatcher-worker-1, 80693806]
36217 Receive Data = 9 [main, 159137840]    ⇐ 送受信サイクル
38218 Receive Data = 9 [main, 177611945]
38219 Send    Data = 9 [DefaultDispatcher-worker-1, 80693806]
39222 Receive Data = 1 [main, 159137840]
41221 Receive Data = 1 [main, 177611945]
41222 Send    Data = 1 [DefaultDispatcher-worker-1, 80693806]

SharedFlowのマルチレシーバーの送受信サイクル

なお、送受信サイクルは「送信間隔と受信間隔の最大値(サンプルの場合は3000ms)」になります。

スポンサーリンク

バッファー

SharedFlowはバッファーを持つことができます。

バッファーの役割

バッファーは送信機と受信機の中間(ShareFlow内)にある一時的な記憶領域です。送信機から受信機へ流れるデータの渋滞を緩和します。

送信機は、データの要求が無くても、自由なタイミングでバッファーへデータを送出できるので、送信が渋滞しません。ただし、バッファーに空きがある場合です。

受信機は、データの送信が無くても、自由なタイミングでバッファーからデータを取得できるので、受信が渋滞しません。ただし、バッファーにデータがある場合です。

バッファーの構成

バッファーは図のような構成になっています。

バッファの構成(SharedFlow)

「エキストラバッファー + リプレイキャッシュ」がバッファーです。

replayとextraBufferCapacityが各々のサイズを示しています。ですので、バッファーのサイズは両者を足したものになります。

 リプレイキャッシュ 

最新のデータを保持し続ける特殊なバッファーです。取得が行われても消去されません。

 エキストラバッファー 

一般的なバッファーです。取得が行われると消去されます。

※「リプレイ」ならびに「オーバーフロー」については後述

バッファーの制御

バッファーはMutableShareFlow関数の引数で制御できます。

@Suppress("FunctionName", "UNCHECKED_CAST")
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> { ...  }
引数概要
replayIntリプレイキャッシュのサイズ
extraBufferCapacityIntエキストラバッファーのサイズ
onBufferOverflowBufferOverflow オーバーフロー時の扱い

バッファーの動作

バッファーの動作をサンプルで示します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
    MutableSharedFlow<Int>(
        replay = 2,
        extraBufferCapacity = 2,
        onBufferOverflow = BufferOverflow.SUSPEND  // デフォルト
    ).apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//                delay(1000)
            }
        }
    }.asSharedFlow()
@Preview
@Composable
fun SharedFlow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000)
                }
            }
        }
    ) { Text(text = "SharedFlow test !") }
}

1番目のデータ「8」はバッファーを介さずに送受信されます。バッファーを利用するのは2番目以降のデータです。

03489 Compose !
03489 Start sender !
05107 Start Receiver !
08496 Send    Data = 8 [DefaultDispatcher-worker-1, 53555502]
08496 Receive Data = 8 [main, 8529103]
08497 Send    Data = 4 [DefaultDispatcher-worker-1, 53555502]  送信機⇒Buffer
08500 Send    Data = 3 [DefaultDispatcher-worker-1, 53555502]     〃
08501 Send    Data = 9 [DefaultDispatcher-worker-1, 53555502]     〃
08501 Send    Data = 1 [DefaultDispatcher-worker-1, 53555502]     〃
09502 Receive Data = 4 [main, 8529103]                         Buffer⇒受信機
10507 Receive Data = 3 [main, 8529103]                               〃
11513 Receive Data = 9 [main, 8529103]                               〃
12518 Receive Data = 1 [main, 8529103]                               〃

送信機⇒Buffer(送信側)

送信されたデータはリプレイキャッシュに入り、新しいデータが送信される毎に「積み上げる方向」へシフトされて行きます。

なお、indexは受信時の取り出し位置です。

ステップ1⇒2⇒3⇒4⇒5
送信機⇒Buffer Step1
送信機⇒Buffer Step2
送信機⇒Buffer Step3
送信機⇒Buffer Step4
送信機⇒Buffer Step5

Buffer⇒受信機(受信側)

受信されるデータはindexの位置から取り出されます。

データの取り出されたバッファは無効になり、新しいデータが送信されることにより再び埋められます。

ただし、リプレイキャッシュ―はデータが残り、無効になりません。

ステップ1⇒2⇒3⇒4⇒5
Buffer⇒受信機 Step1
Buffer⇒受信機 Step2
Buffer⇒受信機 Step3
Buffer⇒受信機 Step4
Buffer⇒受信機 Step5
スポンサーリンク

オーバーフロー

オーバーフローは、バッファーに空きがない状態(全て埋まっている、バッファーサイズ=0)で、新たなデータが送信(入力)された場合に発生します。

オーバーフローになった時、「新たなデータの扱い」が問題になります。

引数onBufferOverflowは、この「新たなデータの扱い」を指定します。

パレメータ
(BufferOverflow.***)
動作一時休止
SUSPENDバッファーに空きができるまで休止して待つする
DROP_OLDESTバッファーの最も古い値を削除し、入力された値を追加しない
DROP_LATESTバッファーの値を保持し、入力された値を破棄

バッファーサイズ > 0

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
    MutableSharedFlow<Int>(
        replay = 1,
        extraBufferCapacity = 2,
        onBufferOverflow = BufferOverflow.SUSPEND  // デフォルト
//        onBufferOverflow = BufferOverflow.DROP_OLDEST
//        onBufferOverflow = BufferOverflow.DROP_LATEST
    ).apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//                delay(1000)
            }
        }
    }.asSharedFlow()
@Preview
@Composable
fun SharedFlow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000)
                }
            }
        }
    ) { Text(text = "SharedFlow test !") }
}
SUSPENDDROP_OLDESTDROP_LATEST
BufferOverFlow時のSUSPEND動作

20377 Compose !
20376 Start sender !
21733 Start Receiver !
25384 Send    Data = 8 [DefaultDispatcher-worker-1, 53555502]
25385 Receive Data = 8 [main, 8529103]
25385 Send    Data = 4 [DefaultDispatcher-worker-1, 53555502]
25385 Send    Data = 3 [DefaultDispatcher-worker-1, 53555502]
25386 Send    Data = 9 [DefaultDispatcher-worker-1, 53555502]
                                                              ⇐ 空き無し、emitは休止
26390 Receive Data = 4 [main, 8529103]                        ⇐ 取り出し
26390 Send    Data = 1 [DefaultDispatcher-worker-1, 53555502] ⇐ 空き有り、emitは復帰・1を追加
27393 Receive Data = 3 [main, 8529103]
28400 Receive Data = 9 [main, 8529103]
29404 Receive Data = 1 [main, 8529103]
BufferOverFlow時のDROP_OLDEST動作

66242 Compose !
66242 Start sender !
68610 Start Receiver !
71248 Send    Data = 8 [DefaultDispatcher-worker-1, 166465328]
71249 Receive Data = 8 [main, 88295337]
71249 Send    Data = 4 [DefaultDispatcher-worker-1, 166465328]
71250 Send    Data = 3 [DefaultDispatcher-worker-1, 166465328]
71252 Send    Data = 9 [DefaultDispatcher-worker-1, 166465328]
                                                               ⇐ 空き無し
71253 Send    Data = 1 [DefaultDispatcher-worker-1, 166465328] ⇐ 4を削除、1を追加
72254 Receive Data = 3 [main, 88295337]
73257 Receive Data = 9 [main, 88295337]
74259 Receive Data = 1 [main, 88295337]
BufferOverFlow時のDROP_LATEST動作

03116 Compose !
03115 Start sender !
05754 Start Receiver !
08123 Send    Data = 8 [DefaultDispatcher-worker-1, 166465328]
08123 Receive Data = 8 [main, 88295337]
08124 Send    Data = 4 [DefaultDispatcher-worker-1, 166465328]
08124 Send    Data = 3 [DefaultDispatcher-worker-1, 166465328]
08125 Send    Data = 9 [DefaultDispatcher-worker-1, 166465328]
                                                               ⇐ 空き無し
08126 Send    Data = 1 [DefaultDispatcher-worker-1, 166465328] ⇐ 1を破棄
09126 Receive Data = 4 [main, 88295337]
10128 Receive Data = 3 [main, 88295337]
11132 Receive Data = 9 [main, 88295337]

バッファーサイズ = 0

「バッファーサイズ(replay + extraBufferCapacity)= 0」の時は、SUSUPEND以外を指定できません。

これは、引数の要件を満たせないためです。

@Suppress("FunctionName", "UNCHECKED_CAST")
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    require(replay >= 0) { "replay cannot be negative, but was $replay" }
    require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
    require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
        "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
    }
    ...
}

SUSUPEND以外を指定すると例外になり、プログラムは停止します。

java.lang.IllegalArgumentException: 
    replay or extraBufferCapacity must be positive 
    with non-default onBufferOverflow strategy DROP_LATEST
スポンサーリンク

リプレイ

マルチレシーバー構成にした時、追加された2番目以降のレシーバーは、最初にリプレイキャッシュからデータを受信します。

リプレイキャッシュは最新のデータを保持し続ける特殊なバッファーです。取得が行われても、保持した値は無効になりません。

ですので、追加された全てのレシーバで、同じデータを繰り返し受信(リプレイ)することになります。

ステップ1⇒2⇒3⇒4⇒5⇒6
リプレイ Step1
リプレイ Step2
リプレイ Step3
リプレイ Step4
リプレイ Step5
リプレイ Step6
private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
//    MutableSharedFlow<Int>().apply {
    MutableSharedFlow<Int>(
        replay = 2,
        extraBufferCapacity = 2
    ).apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//                delay(1000)
            }
        }
    }.asSharedFlow()
@Preview
@Composable
fun SharedFlow_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receiver !")
                datCh.collect {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000)
                }
            }
        }
    ) { Text(text = "SharedFlow test !") }
}
44567 Start sender !
44567 Compose !
46522 Start Receiver ! ⇐ 1st Receiver
49573 Send    Data = 8 [DefaultDispatcher-worker-1, 155257474]
49574 Receive Data = 8 [main, 74532243]
49574 Send    Data = 4 [DefaultDispatcher-worker-1, 155257474]
49576 Send    Data = 3 [DefaultDispatcher-worker-1, 155257474]
49578 Send    Data = 9 [DefaultDispatcher-worker-1, 155257474]
49581 Send    Data = 1 [DefaultDispatcher-worker-1, 155257474]
50578 Receive Data = 4 [main, 74532243]
51582 Receive Data = 3 [main, 74532243]
52587 Receive Data = 9 [main, 74532243]
53591 Receive Data = 1 [main, 74532243]
57363 Start Receiver ! ⇐ 2nd Receiver
57363 Receive Data = 9 [main, 120720841]
58367 Receive Data = 1 [main, 120720841]
60337 Start Receiver ! ⇐ 3rd Receiver
60338 Receive Data = 9 [main, 179077884]
61341 Receive Data = 1 [main, 179077884]
スポンサーリンク

状態の監視

SharedFlowによって送受信されるストリームデータは、値の監視が行われません。ですので、再Composeはスケジューリングされません。

再Composeのスケジューリングを可能にするためには、ストリームデータを監視データへ変換します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
    MutableSharedFlow<Int>().apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
                delay(1000)
            }
        }
    }.asSharedFlow()
@Preview
@Composable
fun SharedFlow_Monitor(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    // ストリームデータ -> 監視可データ
    val _data = remember { mutableStateOf(0) }
    LaunchedEffect(Unit) {
        Log.i(TAG, "${getMilliTime5()} Start Receiver !")
        datCh.collect {         // 要求・受信
            _data.value = it    // 監視データへ変換
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
        }
    }

    Text(
        text = "${getMilliTime5()} Receive Data = ${_data.value}",
        fontSize = 20.sp
    )
}
21666 Compose !
21666 Start sender !
21890 Start Receiver !
26672 Receive Data = 8 [main, 138257264]
26672 Send    Data = 8 [DefaultDispatcher-worker-1, 44754665]
26678 Compose !
27674 Receive Data = 4 [main, 138257264]
27674 Send    Data = 4 [DefaultDispatcher-worker-1, 44754665]
27679 Compose !
28677 Send    Data = 3 [DefaultDispatcher-worker-1, 44754665]
28677 Receive Data = 3 [main, 138257264]
28694 Compose !
29680 Receive Data = 9 [main, 138257264]
29680 Send    Data = 9 [DefaultDispatcher-worker-1, 44754665]
29693 Compose !
30684 Receive Data = 1 [main, 138257264]
30684 Send    Data = 1 [DefaultDispatcher-worker-1, 44754665]
30693 Compose !

ちなみに、StateFlowは「State(状態の監視)+Flow(通信経路:SharedFlow)」です。StateFlowを用いれば、ここで紹介した「ストリームデータを監視データへ変換」する作業は必要ありません。

再Composeのスケジューリングを可能にする方法として、StateFlowの利用が定石になています。※詳細は「Coroutine:StateFlow」を参照

スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するasyncビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。 ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。 それにより、非同期処理の連携が容易になります。 今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。 ※環境:Android Studio Flamingo | 2022.2.1    :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Produce」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Flow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「StateFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
Flowはメンバー関数や拡張関数で様々な機能を提供しています。 これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。 中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。 終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。 今回は、この「Flowの中間演算」のwithIndex、map、filter、drop、takeを取り上げて、まとめます。 ※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-androi ...
Flowはメンバー関数や拡張関数で様々な機能を提供しています。 これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。 中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。 終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。 今回は、この「Flowの中間演算」のzip、merge、combineを取り上げて、まとめます。 ※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     o ...
スポンサーリンク