Coroutine:Produce

投稿日:  更新日:

KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。

Channel、Produce、Flow、SharedFlow、StateFlowなどです。

これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。

プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。

ですので、各々を比較しつつ、まとめました。

この記事は「Produce」について、まとめたものです。

※環境:Android Studio Koala Feature Drop | 2024.1.2
    Kotlin 1.9.0
    Compose Compiler 1.5.1
    org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
    org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3

スポンサーリンク

Produceとは

Produceはコルーチン間で複数の連続したメッセージを送受信する仕組みです。

ホットストリーム(Hot Stream)の通信経路を提供します。

Produceの構成

※メッセージの送受信については「Coroutine:コルーチン間でメッセージの送受信」を参照

Produceは、表のような特徴を持ちます。

ストリームタイプ通信経路状態の監視※
Produce / ChannelHotデータ分岐
×
Flow(SafeFlow)Cold1対1
SharedFlowHotブロードキャスト
StateFlow
※状態の監視 :再Composeのスケジューリングが可能かどうか

なお、Produceの通信経路を構築するproduce関数は、Channelのビルダー関数です。ReceiveChannelのインスタンスを返します。ですので、「Produce ≒ Channel」です。

この記事で述べている「Produce」は、「仕組みの名称」であって、クラスやオブジェクトではありません。

また、produce関数はコルーチンビルダー関数でもあります。launchと同様にコルーチンを起動します。

スポンサーリンク

基本的な例とストリームタイプ

以下はProduceの基本的な例です。

サンプルの動作は単純で、5つの整数値(配列:Data)をワーカーからメインスレッドへ送ります。

送信機(Sender)は処理の開始(コルーチンの起動)から5000ms後に、データの送信(send)を開始します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createProduce(scope: CoroutineScope) =
    scope.produce(Dispatchers.Default) {
        Log.i(TAG, "${getMilliTime5()} Start Send !")
        delay(5000)
        Data.forEach {
            send(it)
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//            delay(1000)
        }
    }

受信機(Receiver)はボタンの押下で、データの要求・受信(receive/consumeEach)を開始します。

@Preview
@Composable
fun Produce_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: ReceiveChannel<Int> = remember { createProduce(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receive !")
                datCh.consumeEach {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Produce test !") }
}
補助関数
fun getThreadName(): String = Thread.currentThread().name
fun getMilliTime5(): String = "%05d".format(System.currentTimeMillis() % 100000)

送信機は受信機からのデータの要求を待たずに処理を開始しています。これをホットストリーム(Hot Stream)といいます。

データの受信(要求)よりも前に送信が行われた場合、送信は休止して受信を待ちます。

Produceの基本的な例

79231 Start Send !     <--- createProduce()
79231 Compose !        <--- 送信処理の開始
90129 Start Receive !  <--- ボタン押下、データの要求(receive/consumeEach)
90129 Receive Data = 8 [main, 74532243]
90130 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
90131 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
90133 Receive Data = 4 [main, 74532243]
90134 Receive Data = 3 [main, 74532243]
90134 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
90136 Receive Data = 9 [main, 74532243]
90136 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
90139 Receive Data = 1 [main, 74532243]
90139 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
※左端の数値はミリ秒

データの送信よりも前に受信(要求)が行われた場合、受信は休止して送信を待ちます。

Produceの基本的な例

86180 Start Send !     <--- createProduce()
86181 Compose !        <--- 送信処理の開始
88601 Start Receive !  <--- ボタン押下、データの要求(receive/consumeEach)
91184 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
91185 Receive Data = 8 [main, 120720841]
91187 Receive Data = 4 [main, 120720841]
91187 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
91188 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
91188 Receive Data = 3 [main, 120720841]
91189 Receive Data = 9 [main, 120720841]
91189 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
91190 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
91192 Receive Data = 1 [main, 120720841]
※左端の数値はミリ秒
スポンサーリンク

送信間隔>要求・受信間隔

「送信間隔>要求・受信間隔」(送信が遅い)のような通信経路の場合、受信は送信間隔に合わせて休止します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createProduce(scope: CoroutineScope) =
    scope.produce(Dispatchers.Default) {
        Log.i(TAG, "${getMilliTime5()} Start Send !")
        delay(5000)
        Data.forEach {
            send(it)
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
            delay(1000)
        }
    }
@Preview
@Composable
fun Produce_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: ReceiveChannel<Int> = remember { createProduce(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receive !")
                datCh.consumeEach {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Produce test !") }
}

送信間隔>要求・受信間隔(Produce)

13779 Compose !        <--- 送信処理の開始
13779 Start Send !     <--- createProduce()
16402 Start Receive !  <--- ボタン押下、データの要求(receive/consumeEach)
18787 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
18788 Receive Data = 8 [main, 120720841]
19789 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
19790 Receive Data = 4 [main, 120720841]
20792 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
20793 Receive Data = 3 [main, 120720841]
21796 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
21797 Receive Data = 9 [main, 120720841]
22800 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
22800 Receive Data = 1 [main, 120720841]
※左端の数値はミリ秒

Channel#receiveはSuspend関数です。ノンブロッキング動作をします。ですので、要求したデータが取得できない場合は、データが取得できる状態になる(sendされる)まで休止します。

スポンサーリンク

送信間隔<要求・受信間隔

「送信間隔<受信間隔」(受信が遅い)のような通信経路の場合、送信は受信間隔に合わせて休止します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createProduce(scope: CoroutineScope) =
    scope.produce(Dispatchers.Default) {
        Log.i(TAG, "${getMilliTime5()} Start Send !")
        delay(5000)
        Data.forEach {
            send(it)
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//            delay(1000)
        }
    }
@Preview
@Composable
fun Produce_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: ReceiveChannel<Int> = remember { createProduce(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receive !")
                datCh.consumeEach {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000)
                }
            }
        }
    ) { Text(text = "Produce test !") }
}

送信間隔<要求・受信間隔(Produce)

20157 Start Send !     <--- createProduce()
20157 Compose !        <--- 送信処理の開始
22699 Start Receive !  <--- ボタン押下、データの要求(receive/consumeEach)
25162 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
25162 Receive Data = 8 [main, 120720841]
26169 Receive Data = 4 [main, 120720841]
26169 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
27172 Receive Data = 3 [main, 120720841]
27172 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
28175 Receive Data = 9 [main, 120720841]
28175 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
29178 Receive Data = 1 [main, 120720841]
29179 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
※左端の数値はミリ秒

Channel#sendはSuspend関数です。ノンブロッキング動作をします。ですので、データが要求されていない場合は、要求される(receiveされる)まで休止します。

スポンサーリンク

通信経路

通信経路の最小構成は「送信機:受信機=1:1」です。

ここに、受信機を追加して、「送信機:受信機=1:多」のマルチレシーバー構成に出来ます。

Produceの通信経路

この場合のストリームデータは、複数の受信機へ分配(スイッチ)されます。

分配は「受信機がデータを要求した順」になります。つまり、「早い者勝ち」です。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createProduce(scope: CoroutineScope) =
    scope.produce(Dispatchers.Default) {
        Log.i(TAG, "${getMilliTime5()} Start Send !")
        delay(5000)
        Data.forEach {
            send(it)
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
            delay(1000)
        }
    }
@Preview
@Composable
fun Produce_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: ReceiveChannel<Int> = remember { createProduce(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receive !")
                datCh.consumeEach {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Produce test !") }
}
68493 Compose !
68493 Start Send !
69581 Start Receive !
69793 Start Receive !
73499 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
73500 Receive Data = 8 [main, 120720841]
74503 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
74503 Receive Data = 4 [main, 179077884]
75507 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
75507 Receive Data = 3 [main, 120720841]
76511 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
76512 Receive Data = 9 [main, 179077884]
77516 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]
77516 Receive Data = 1 [main, 120720841]

Peoduceのマルチレシーバー

他の受信機よりデータの要求を増やせば、より多く受信できる点に注意して下さい。

スポンサーリンク

バッファー

Produceはバッファーを持つことができます。

バッファーの役割

バッファーは送信機と受信機の中間(Produce内)にある一時的な記憶領域です。送信機から受信機へ流れるデータの渋滞を緩和します。

送信機は、データの要求が無くても、自由なタイミングでバッファーへデータを送出できるので、送信が渋滞しません。ただし、バッファーに空きがある場合です。

受信機は、データの送信が無くても、自由なタイミングでバッファーからデータを取得できるので、受信が渋滞しません。ただし、バッファーにデータがある場合です。

バッファーの構成

バッファーは図のような構成になっています。

バッファの構成(Prodece)

capacityがバッファーサイズを示しています。デフォルトは「capacity = 0」です。ですので、何も指定しなければ「バッファー無し」になっています。

また、オーバーフロー時の扱いはSUSPENDに固定で、「capacity ≧ 0」の時のみ有効です。

@ExperimentalCoroutinesApi
public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
    produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)

バッファーの制御

バッファーはCorutineScope.produce関数のcapacity引数で制御できます。

@ExperimentalCoroutinesApi
public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
    produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
引数概要
contextCoroutineContextコンテキスト
capacityIntバッファーサイズ(指定された値)
送受信のタイプ
・Rendezvous(0)
・Buffered(64)
・Conflated(-1)
・Unlimited(Int.MAX_VALUE)
blocksuspend ProducerScope.() -> Unit実行するラムダ式(タスクブロック)

capacityの値は、「バッファーのサイズ」と「送受信のタイプ(特別な動作)」の2つの役割が割り振られています。※「送受信のタイプ」については後述

バッファーの動作

バッファーの動作をサンプルで示します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createProduce(scope: CoroutineScope) =
    scope.produce(
        Dispatchers.Default,
        capacity = 3            // 3(Buffered)
    ) {
        Log.i(TAG, "${getMilliTime5()} Start Send !")
        delay(5000)
        Data.forEach {
            send(it)
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
            delay(1000)
        }
    }
@Preview
@Composable
fun Produce_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: ReceiveChannel<Int> = remember { createProduce(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receive !")
                datCh.consumeEach {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
//                    delay(1000)
                }
            }
        }
    ) { Text(text = "Produce test !") }
}
67657 Compose !
67658 Start Send !
72660 Send    Data = 8 [DefaultDispatcher-worker-1, 95538892]  送信機⇒Buffer
73664 Send    Data = 4 [DefaultDispatcher-worker-1, 95538892]     〃
74668 Send    Data = 3 [DefaultDispatcher-worker-1, 95538892]     〃
82606 Start Receive !
82606 Receive Data = 8 [main, 169843152]                       Buffer⇒受信機
82607 Receive Data = 4 [main, 169843152]                             〃
82608 Receive Data = 3 [main, 169843152]                             〃
82609 Receive Data = 9 [main, 169843152]
82607 Send    Data = 9 [DefaultDispatcher-worker-1, 95538892]
83615 Send    Data = 1 [DefaultDispatcher-worker-1, 95538892]
83615 Receive Data = 1 [main, 169843152]

送信機⇒Buffer(送信側)

送信されたデータはバッファーに入り、新しいデータが送信される毎に「積み上げる方向」へシフトされて行きます。

オーバーフローになると送信は休止します。

なお、indexは受信時の取り出し位置です。

ステップ1⇒2⇒3⇒4
送信機⇒Buffer Step1
送信機⇒Buffer Step2
送信機⇒Buffer Step3
送信機⇒Buffer Step4

Buffer⇒受信機(受信側)

受信されるデータはindexの位置から取り出されます。

データの取り出されたバッファは無効になり、新しいデータが送信されることにより再び埋められます。

ステップ1⇒2⇒3⇒4
Buffer⇒受信機 Step1
Buffer⇒受信機 Step2
Buffer⇒受信機 Step3
Buffer⇒受信機 Step4
スポンサーリンク

送受信のタイプ

capacityはバッファーのサイズを指定する引数ですが、値の一部にパラメータ名が定義されており、「送受信のタイプ」の役割を合わせ持ちます。

タイプcapacityの値
( )内はパラメータ名
振舞い
Rendezvous0(RENDEZVOUS)デフォルトバッファー無し
Buffered64(BUFFERED)バッファーサイズ
Conflated-1(CONFLATED)特別な動作
UnlimitedInt.MAX_VALUE(UNLIMITED)バッファーサイズ

この中で、「Conflated」は特別な動作をします。その他は、ただのサイズです。

private val Data = arrayOf(8, 4, 3, 9, 1)
//private val Data = Array(100) { it }

fun createProduce(scope: CoroutineScope) =
    scope.produce(
        Dispatchers.Default,
        capacity = RENDEZVOUS   // 0(デフォルト)
//        capacity = 3            // 3
//        capacity = BUFFERED     // 64
//        capacity = CONFLATED    // -1
//        capacity = UNLIMITED    // Int.MAX_VALUE
    ) {
        Log.i(TAG, "${getMilliTime5()} Start Send !")
        delay(5000)
        Data.forEach {
            send(it)
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
//            delay(1000)
        }
    }
@Preview
@Composable
fun Produce_Receiver(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: ReceiveChannel<Int> = remember { createProduce(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    Button(
        modifier = Modifier.fillMaxWidth(),
        onClick = {
            scope.launch {
                Log.i(TAG, "${getMilliTime5()} Start Receive !")
                datCh.consumeEach {
                    val _thName = getThreadName()
                    val _jbCode = currentCoroutineContext().job.hashCode()
                    Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
                    delay(1000)
                }
            }
        }
    ) { Text(text = "Produce test !") }
}

Rendezvous

バッファサイズは「0(無し)」です。

受信側は送信側へ直にデータを要求し、送信側は受信側へ直にデータを渡します。

送信データが無い時、受信側は休止して送信を待ちます。

受信の要求が無い時、送信側は休止して要求を待ちます。

25400 Start Send !
25400 Compose !
29660 Start Receive !
30405 Receive Data = 8 [main, 120720841]
30405 Send    Data = 8 [DefaultDispatcher-worker-1, 169843152]
31408 Receive Data = 4 [main, 120720841]
31408 Send    Data = 4 [DefaultDispatcher-worker-1, 169843152]
32413 Receive Data = 3 [main, 120720841]
32413 Send    Data = 3 [DefaultDispatcher-worker-1, 169843152]
33416 Receive Data = 9 [main, 120720841]
33416 Send    Data = 9 [DefaultDispatcher-worker-1, 169843152]
34418 Receive Data = 1 [main, 120720841]
34418 Send    Data = 1 [DefaultDispatcher-worker-1, 169843152]

デフォルトは「capacity = RENDEZVOUS」です。ですので、「基本的な例」で示した例は全てRendezvousになります。

Buffered

バッファサイズは「64」です。

送信側はバッファが空いている限りデータをバッファへ格納します。バッファが埋まれば休止します。

受信側はバッファーにデータがあればバッファーから受信します。バッファが空になれば休止します。

58719 Start Send !
58719 Compose !
63725 Send    Data = 0 [DefaultDispatcher-worker-1, 232751359]
63726 Send    Data = 1 [DefaultDispatcher-worker-1, 232751359]
    :
    :
63768 Send    Data = 62 [DefaultDispatcher-worker-1, 232751359]
63768 Send    Data = 63 [DefaultDispatcher-worker-1, 232751359]  ⇐ 64個まで
67561 Start Receive !
67561 Receive Data = 0 [main, 169843152]
67561 Send    Data = 64 [DefaultDispatcher-worker-1, 232751359]
68563 Receive Data = 1 [main, 169843152]
68564 Send    Data = 65 [DefaultDispatcher-worker-1, 232751359]
69567 Receive Data = 2 [main, 169843152]
69567 Send    Data = 66 [DefaultDispatcher-worker-1, 232751359]
70572 Receive Data = 3 [main, 169843152]
70572 Send    Data = 67 [DefaultDispatcher-worker-1, 232751359]
71577 Receive Data = 4 [main, 169843152]
71577 Send    Data = 68 [DefaultDispatcher-worker-1, 232751359]
    :
    :

Conflated

バッファサイズは「1」です。特殊な動作をします。

送信側からバッファーへ送信されたデータはバッファー上で上書きされます。受信側はバッファーから上書き後の最終データを受信します。

33453 Start Send !
33453 Compose !
38457 Send    Data = 8 [DefaultDispatcher-worker-1, 125789461]
38457 Send    Data = 4 [DefaultDispatcher-worker-1, 125789461]
38458 Send    Data = 3 [DefaultDispatcher-worker-1, 125789461]
38458 Send    Data = 9 [DefaultDispatcher-worker-1, 125789461]
38459 Send    Data = 1 [DefaultDispatcher-worker-1, 125789461] ⇐ 最終データ
40697 Start Receive !
40697 Receive Data = 1 [main, 169843152]

Unlimited

バッファサイズは「Int.MAX_VALUE(2147483647)」です。「サイズは無制限」になります。

動作はBufferedと同じです。

ただし、バッファーのデータ数が無制限なので、メモリーのオーバーフローに注意してください。

07254 Compose !
07254 Start Send !
12257 Send    Data = 0 [DefaultDispatcher-worker-1, 125789461]
12257 Send    Data = 1 [DefaultDispatcher-worker-1, 125789461]
12258 Send    Data = 2 [DefaultDispatcher-worker-1, 125789461]
    :
    :
12322 Send    Data = 98 [DefaultDispatcher-worker-1, 125789461]
12323 Send    Data = 99 [DefaultDispatcher-worker-1, 125789461]
    :
    :    ⇐ 2147483647個まで可能(サンプルは100で制限)
スポンサーリンク

状態の監視

Produceによって送受信されるストリームデータは、値の監視が行われません。ですので、再Composeはスケジューリングされません。

再Composeのスケジューリングを可能にするためには、ストリームデータを監視データへ変換します。

private val Data = arrayOf(8, 4, 3, 9, 1)

fun createProduce(scope: CoroutineScope) =
    scope.produce(Dispatchers.Default) {
        Log.i(TAG, "${getMilliTime5()} Start Send !")
        delay(5000)
        Data.forEach {
            send(it)
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Send    Data = ${it} [${_thName}, ${_jbCode}]")
            delay(1000)
        }
    }
@Preview
@Composable
fun Produce_Monitor(
    scope: CoroutineScope = rememberCoroutineScope(),
    datCh: ReceiveChannel<Int> = remember { createProduce(scope) }
) {
    Log.i(TAG, "${getMilliTime5()} Compose !")

    // ストリームデータ -> 監視可データ
    val _data = remember { mutableStateOf(0) }
    LaunchedEffect(Unit) {
        Log.i(TAG, "${getMilliTime5()} Start Receiver !")
        datCh.consumeEach {     // 要求・受信
            _data.value = it    // 監視データへ変換
            val _thName = getThreadName()
            val _jbCode = currentCoroutineContext().job.hashCode()
            Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
        }
    }

    Text(
        text = "${getMilliTime5()} Receive Data = ${_data.value}",
        fontSize = 20.sp
    )
}
26472 Start Send !
26472 Compose !
26672 Start Receiver !
31476 Send    Data = 8 [DefaultDispatcher-worker-1, 138257264]
31476 Receive Data = 8 [main, 44754665]
31486 Compose !
32477 Send    Data = 4 [DefaultDispatcher-worker-1, 138257264]
32479 Receive Data = 4 [main, 44754665]
32485 Compose !
33479 Send    Data = 3 [DefaultDispatcher-worker-1, 138257264]
33480 Receive Data = 3 [main, 44754665]
33484 Compose !
34481 Send    Data = 9 [DefaultDispatcher-worker-1, 138257264]
34482 Receive Data = 9 [main, 44754665]
34487 Compose !
35483 Send    Data = 1 [DefaultDispatcher-worker-1, 138257264]
35483 Receive Data = 1 [main, 44754665]
35502 Compose !

ちなみに、StateFlowは「State(状態の監視)+Flow(通信経路:SharedFlow)」です。StateFlowを用いれば、ここで紹介した「ストリームデータを監視データへ変換」する作業は必要ありません。

再Composeのスケジューリングを可能にする方法として、StateFlowの利用が定石になています。※詳細は「Coroutine:StateFlow」を参照

スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するasyncビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
コルーチン間でメッセージ(データ)の送受信を行うことが出来ます。 ここで紹介する「メッセージの送受信」を使えば、非同期処理の間で確実にデータを受け渡し出来ます。 それにより、非同期処理の連携が容易になります。 今回は、メッセージの送受信についての基礎と、Channelを使った最も基本的な送受信の動作をまとめます。 ※環境:Android Studio Flamingo | 2022.2.1    :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「Flow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「SharedFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
KotlinのコルーチンAPIは「コルーチン間でメッセージを送受信する仕組み」を提供しています。 Channel、Produce、Flow、SharedFlow、StateFlowなどです。 これらは、「メッセージを送受信する」という本命の動作は変わりませんが、特徴や違いを持ちます。 プログラミングで利用する際は、特徴や違いを理解して、使い分けが必要になります。 ですので、各々を比較しつつ、まとめました。 この記事は「StateFlow」について、まとめたものです。 ※環境:Android Studio Koala Feature Drop | 2024.1.2     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3 ...
Flowはメンバー関数や拡張関数で様々な機能を提供しています。 これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。 中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。 終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。 今回は、この「Flowの中間演算」のwithIndex、map、filter、drop、takeを取り上げて、まとめます。 ※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-androi ...
Flowはメンバー関数や拡張関数で様々な機能を提供しています。 これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。 中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。 終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。 今回は、この「Flowの中間演算」のzip、merge、combineを取り上げて、まとめます。 ※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1     Kotlin 1.9.0     Compose Compiler 1.5.1     org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3     o ...
スポンサーリンク