Coroutine:StateFlow

投稿日:  更新日:

KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。

Channel、Produce、Flow、SharedFlow、StateFlowなどです。

これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。

プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。

ですので、各々を比較しつつ、まとめました。

この記事は「StateFlow」について、まとめたものです。

※環境:Android Studio Koala Feature Drop | 2024.1.2
    Kotlin 1.9.0
    Compose Compiler 1.5.1
    org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
    org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3

スポンサーリンク

StateFlowとは

StateFlowはコルーチン間で複数の連続したメッセージを送受信する仕組みです。

ホットストリーム(Hot Stream)の通信経路を提供します。

さらに、「状態の監視」を行う機能が追加されています。この点が最大の特徴です。

StateFlowの構成

※メッセージの送受信については「Coroutine:コルーチン間でメッセージの送受信」を参照

StateFlowは、表のような特徴を持ちます。

ストリームタイプ通信経路状態の監視※
Produce / ChannelHotデータ分岐
×
Flow(SafeFlow)Cold1対1
SharedFlowHotブロードキャスト
StateFlow
※状態の監視 :再Composeのスケジューリングが可能かどうか

「状態の監視」を使うと、「通信経路を通してデータ(状態)を受信し、変更されていたら再Composeをスケジュールする」ことが可能になります。

つまり、時々刻々と変化する画面の表示を、通信経路を流れるストリームデータで表現できます。

スポンサーリンク

内部の構成

内部の構成は「State(状態の監視)+Flow(通信経路:SharedFlow)」です。
※SharedFlowについては「Coroutine:SharedFlow」を参照

通信経路

通信経路の実態はSharedFlowです。ただし、次に上げる「バッファーの動作」に特化した実装になっています。

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    public override var value: T
    public fun compareAndSet(expect: T, update: T): Boolean
}
  • extraBufferCapacity ⇐ 0
  • replay ⇐ 1
  • onBufferOverflow ⇐ DROP_OLDEST

単体で用いるSharedFlowと実装が異なる点に注意してください。

状態の監視

「ストリームデータを監視データへ変換する処理」が内部的に行われています。

@Composable
fun <T : R, R> Flow<T>.collectAsState(
    initial: R,
    context: CoroutineContext = EmptyCoroutineContext
): State<R> = produceState(initial, this, context) {
    if (context == EmptyCoroutineContext) {
        collect { value = it }		// collectで受信、Stateのvalueへ代入
    } else withContext(context) {
        collect { value = it }		// collectで受信、Stateのvalueへ代入
    }
}
スポンサーリンク

基本的な動作

以下はStateFlowの基本的な例です。

サンプルの動作は単純で、5つの整数値(配列:Data)をワーカーからメインスレッドへ送ります。

送信機(Sender)は処理の開始(コルーチンの起動)から5000ms後に、データの送信(valueへ代入)を開始します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createStateFlow(scope: CoroutineScope, initValue: Int = 0) =
    MutableStateFlow<Int>(initValue).apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start send !")
            delay(5000)
            Data.forEach {
                this@apply.value = it
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
                delay(1000)
            }
        }
    }.asStateFlow()

受信機(Monitor)は受信データの表示を行います。受信データは、Stateのインスタンスを取り出して、valueを参照することで得ます。

@Preview
@Composable
fun StateFlow_Monitor(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: StateFlow<Int> = remember { createStateFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    val _data = datCh.collectAsState().value
    Text(
        text = "${getMilliTime5()} Receive Data = ${_data}",
        fontSize = 20.sp
    )
}
SharedFlowの場合(参考)
private val Data = arrayOf(8, 4, 3, 9, 1)

fun createSharedFlow(scope: CoroutineScope) =
    MutableSharedFlow<Int>().apply {
        scope.launch(Dispatchers.Default) {
            Log.i(TAG, "${getMilliTime5()} Start sender !")
            delay(5000)
            Data.forEach {
                this@apply.emit(it)
                val _thName = getThreadName()
                val _jbCode = currentCoroutineContext().job.hashCode()
                Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
                delay(1000)
            }
        }
    }.asSharedFlow()
@Preview
@Composable
fun SharedFlow_Monitor(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: SharedFlow<Int> = remember { createSharedFlow(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    // ストリームデータ -> 監視データ
    val _data = remember { mutableStateOf(0) }
    LaunchedEffect(Unit) {
        Log.i(TAG, "${getMilliTime5()} Start Receiver !")
        datCh.collect {         // 要求・受信
            _data.value = it    // 監視データへ変換
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
        }
    }

    Text(
        text = "${getMilliTime5()} Receive Data = ${_data.value}",
        fontSize = 20.sp
    )
}
補助関数
fun getThreadName(): String = Thread.currentThread().name
fun getMilliTime5(): String = "%05d".format(System.currentTimeMillis() % 100000)

同様のことをSharedFlowを用いて行うと、ストリームデータを監視データへ変換する処理で数行(ハイライト部分)が必要です。

しかし、StateFlowは1行(ハイライト部分)で済みます。とても、簡素に記述できます。

10035 Start send !
10035 Compose !
15047 Send    Data = 8 [DefaultDispatcher-worker-1, 198267682]
15054 Compose !
16050 Send    Data = 4 [DefaultDispatcher-worker-1, 198267682]
16055 Compose !
17051 Send    Data = 3 [DefaultDispatcher-worker-1, 198267682]
17056 Compose !
18053 Send    Data = 9 [DefaultDispatcher-worker-1, 198267682]
18071 Compose !
19055 Send    Data = 1 [DefaultDispatcher-worker-1, 198267682]
19071 Compose !
スポンサーリンク

状態の初期値

状態の初期値はMutableStateFlowの引数valueで指定できます。

    :
@Suppress("FunctionName")
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
    :
    :
private class StateFlowImpl<T>(
    initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialState) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
	...
}
    :

指定された状態の初期値(引数value、図は「value ⇐ 0」を指定)は、送信機の起動時に送信されて、リプレイキャッシュに保持されます。

ホットストリームなので、送信機は単独で送信処理を始められます。

StateFlowの初期状態(保持)

初回のComposeが行われると、リプレイキャッシュから状態の初期値(value)を受信します。

StateFlowの初期状態(読み出し)

リプレイ動作をするので、複数のComposable関数が同じ状態(表示)になります。

スポンサーリンク

マルチレシーバー

次のように記述すると、マルチレシーバー構成になります。

@Preview
@Composable
fun StateFlow_Multi(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: StateFlow<Int> = remember { createStateFlow(scope) }
) {
    Column {
        StateFlow_Monitor(scope, datCh)     // (1)
        StateFlow_Monitor(scope, datCh)     // (2)
    }
}

ストリームデータは全てのComposable関数へブロードキャストされます。ですので、同じ状態になります。

StateFlowのマルチレシーバ構成

29140 Compose !
29140 Start send !
29150 Compose !
34147 Send    Data = 8 [DefaultDispatcher-worker-1, 102658184]
34163 Compose !
34167 Compose !
35149 Send    Data = 4 [DefaultDispatcher-worker-1, 102658184]
35162 Compose !
35164 Compose !
36152 Send    Data = 3 [DefaultDispatcher-worker-1, 102658184]
36162 Compose !
36165 Compose !
37155 Send    Data = 9 [DefaultDispatcher-worker-1, 102658184]
37163 Compose !
37165 Compose !
38160 Send    Data = 1 [DefaultDispatcher-worker-1, 102658184]
38164 Compose !
38165 Compose !
スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するasyncビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。 ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。 それにより、非同期処理の連携が容易になります。 今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。 ※環境:Android Studio Flamingo | 2022.2.1    :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4 ...
KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Produce」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Flow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「SharedFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
スポンサーリンク