KotlinのコルーチンAPIは、複数の「コルーチン間でメッセージを送受信する仕組み」を提供しています。
Channel、Produce、Flow、SharedFlow、StateFlowなどです。
これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。
プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。
ですので、各々を比較しつつ、まとめました。
この記事は「StateFlow」について、まとめたものです。
※環境:Android Studio Koala Feature Drop | 2024.1.2
Kotlin 1.9.0
Compose Compiler 1.5.1
org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3
StateFlowとは
StateFlowはコルーチン間で複数の連続したメッセージを送受信する仕組みです。
ホットストリーム(Hot Stream)の通信経路を提供します。
さらに、「状態の監視」を行う機能が追加されています。この点が最大の特徴です。
※メッセージの送受信については「Coroutine:コルーチン間でメッセージの送受信」を参照
StateFlowは、表のような特徴を持ちます。
ストリームタイプ | 通信経路 | 状態の監視※ | |
---|---|---|---|
Produce / Channel | Hot | データ分岐 | |
Flow(SafeFlow) | Cold | 1対1 | |
SharedFlow | Hot | ブロードキャスト | |
StateFlow | |||
※状態の監視 :再Composeのスケジューリングが可能かどうか |
「状態の監視」を使うと、「通信経路を通してデータ(状態)を受信し、変更されていたら再Composeをスケジュールする」ことが可能になります。
つまり、時々刻々と変化する画面の表示を、通信経路を流れるストリームデータで表現できます。
内部の構成
内部の構成は「State(状態の監視)+Flow(通信経路:SharedFlow)」です。
※SharedFlowについては「Coroutine:SharedFlow」を参照
通信経路
通信経路の実態はSharedFlowです。ただし、次に上げる「バッファーの動作」に特化した実装になっています。
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> { public override var value: T public fun compareAndSet(expect: T, update: T): Boolean }
- extraBufferCapacity ⇐ 0
- replay ⇐ 1
- onBufferOverflow ⇐ DROP_OLDEST
単体で用いるSharedFlowと実装が異なる点に注意してください。
状態の監視
「ストリームデータを監視データへ変換する処理」が内部的に行われています。
@Composable fun <T : R, R> Flow<T>.collectAsState( initial: R, context: CoroutineContext = EmptyCoroutineContext ): State<R> = produceState(initial, this, context) { if (context == EmptyCoroutineContext) { collect { value = it } // collectで受信、Stateのvalueへ代入 } else withContext(context) { collect { value = it } // collectで受信、Stateのvalueへ代入 } }
基本的な動作
以下はStateFlowの基本的な例です。
サンプルの動作は単純で、5つの整数値(配列:Data)をワーカーからメインスレッドへ送ります。
送信機(Sender)は処理の開始(コルーチンの起動)から5000ms後に、データの送信(valueへ代入)を開始します。
private val Data = arrayOf(8, 4, 3, 9, 1) fun createStateFlow(scope: CoroutineScope, initValue: Int = 0) = MutableStateFlow<Int>(initValue).apply { scope.launch(Dispatchers.Default) { Log.i(TAG, "${getMilliTime5()} Start send !") delay(5000) Data.forEach { this@apply.value = it val _thName = getThreadName() val _jbCode = currentCoroutineContext().job.hashCode() Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]") delay(1000) } } }.asStateFlow()
受信機(Monitor)は受信データの表示を行います。受信データは、Stateのインスタンスを取り出して、valueを参照することで得ます。
@Preview @Composable fun StateFlow_Monitor( scope: CoroutineScope = rememberCoroutineScope(), datCh: StateFlow<Int> = remember { createStateFlow(scope) } ) { Log.i(TAG, "${getMilliTime5()} Compose !") val _data = datCh.collectAsState().value Text( text = "${getMilliTime5()} Receive Data = ${_data}", fontSize = 20.sp ) }
同様のことをSharedFlowを用いて行うと、ストリームデータを監視データへ変換する処理で数行(ハイライト部分)が必要です。
しかし、StateFlowは1行(ハイライト部分)で済みます。とても、簡素に記述できます。
10035 Start send ! 10035 Compose ! 15047 Send Data = 8 [DefaultDispatcher-worker-1, 198267682] 15054 Compose ! 16050 Send Data = 4 [DefaultDispatcher-worker-1, 198267682] 16055 Compose ! 17051 Send Data = 3 [DefaultDispatcher-worker-1, 198267682] 17056 Compose ! 18053 Send Data = 9 [DefaultDispatcher-worker-1, 198267682] 18071 Compose ! 19055 Send Data = 1 [DefaultDispatcher-worker-1, 198267682] 19071 Compose !
状態の初期値
状態の初期値はMutableStateFlowの引数valueで指定できます。
: @Suppress("FunctionName") public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL) : : private class StateFlowImpl<T>( initialState: Any // T | NULL ) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> { private val _state = atomic(initialState) // T | NULL private var sequence = 0 // serializes updates, value update is in process when sequence is odd ... } :
指定された状態の初期値(引数value、図は「value ⇐ 0」を指定)は、送信機の起動時に送信されて、リプレイキャッシュに保持されます。
ホットストリームなので、送信機は単独で送信処理を始められます。
初回のComposeが行われると、リプレイキャッシュから状態の初期値(value)を受信します。
リプレイ動作をするので、複数のComposable関数が同じ状態(表示)になります。
マルチレシーバー
次のように記述すると、マルチレシーバー構成になります。
@Preview @Composable fun StateFlow_Multi( scope: CoroutineScope = rememberCoroutineScope(), datCh: StateFlow<Int> = remember { createStateFlow(scope) } ) { Column { StateFlow_Monitor(scope, datCh) // (1) StateFlow_Monitor(scope, datCh) // (2) } }
ストリームデータは全てのComposable関数へブロードキャストされます。ですので、同じ状態になります。
29140 Compose ! 29140 Start send ! 29150 Compose ! 34147 Send Data = 8 [DefaultDispatcher-worker-1, 102658184] 34163 Compose ! 34167 Compose ! 35149 Send Data = 4 [DefaultDispatcher-worker-1, 102658184] 35162 Compose ! 35164 Compose ! 36152 Send Data = 3 [DefaultDispatcher-worker-1, 102658184] 36162 Compose ! 36165 Compose ! 37155 Send Data = 9 [DefaultDispatcher-worker-1, 102658184] 37163 Compose ! 37165 Compose ! 38160 Send Data = 1 [DefaultDispatcher-worker-1, 102658184] 38164 Compose ! 38165 Compose !
関連記事: