Flowはメンバー関数や拡張関数で様々な機能を提供しています。
これらの関数は大きく分けて、中間演算(Intermediate operators)と終端演算(Terminal operators)に分けられます。
中間演算とは、withIndex、map、filter、drop、take、zip、merge、combineなどです。通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。
終端演算とは、collect、single、reduce、toListなどです。通信経路の末端に位置して、ストリームデータを収集します。
今回は、この「Flowの中間演算」のwithIndex、map、filter、drop、takeを取り上げて、まとめます。
※環境:Android Studio Koala Feature Drop | 2024.1.2 Patch 1
Kotlin 1.9.0
Compose Compiler 1.5.1
org.jetbrains.kotlinx:kotlinx-coroutines-android 1.7.3
org.jetbrains.kotlinx:kotlinx-coroutines-core 1.7.3
中間演算とは
中間演算は通信経路(Flow)の途中に位置して、ストリームデータを変更したり、Flowを統合したりします。
| 中間演算 | 概要 | ダウンストリームFlow |
|---|---|---|
| withIndex | データにインデックを付与 | Flow(IndexValue(T)) |
| map | データを割り振り直す | Flow(R) |
| filter | 条件に合うデータを通す | Flow(T) |
| drop | 先頭から指定個数のデータを除外 | |
| dropWhile | 先頭から条件を満たす間のデータを除外 | |
| take | 先頭から指定個数のデータを取得 | |
| takeWhile | 先頭から条件を満たす間のデータを取得 | |
| zip | Flowを統合(全部のデータが切り替わる毎に融合) | Flow(R) |
| merge | Flowを統合(データをそのまま重ね合わせ) | Flow(T) |
| combine | Flowを統合(一部のデータが切り替わる毎に融合) | Flow(R) |
| ※アップストリームFlowはFlow(T) ※正式なmergeは「kotlinx-coroutines-core 1.9.0」で同様な関数が登場予定 |
||
中間演算の構造
中間演算はアップストリームFlowの受信出力(collect)を受けて、何かの変換処理を伴ったダウンストリームFlowを返す構造になっています。
図にすると、ひとつなぎにFlowを追加した形です。

簡単な中間演算の例を示します。「何もしない」という変換処理を行うrelay関数です。
レシーバーがアップストリームFlowで、returnで返されるインスタンスがダウンストリームFlowになります。
fun <T> Flow<T>.relay(): Flow<T> {
return flow {
collect { value -> // レシーバーのcollect
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Relay Data = ${value} [${_thName}, ${_jbCode}]")
/*
** ここで何もしない
*/
return@collect emit(value) // returnで返すFlowのemit
}
}
}
中間演算は「コメント:ここで何もしない」の部分に変換処理が入ります。変換処理はラムダ式を引数で挿入する方法が採られます。
サンプルは「何もしない」ので、変換処理はありません。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowRelay() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).relay()
@Preview
@Composable
fun FlowRelay_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Int> = remember { createFlowRelay() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (Relay) !") }
}
48737 Compose ! 50651 Start Receiver ! 50659 Start Sender ! 55665 Send Data = 8 [DefaultDispatcher-worker-1, 188663320] 55666 Relay Data = 8 [main, 145086577] ⇐ mainは受信機のコルーチンコンテキスト 55667 Receive Data = 8 [main, 145086577] 56670 Send Data = 4 [DefaultDispatcher-worker-1, 188663320] 56671 Relay Data = 4 [main, 145086577] 56672 Receive Data = 4 [main, 145086577] 57675 Send Data = 3 [DefaultDispatcher-worker-1, 188663320] 57675 Relay Data = 3 [main, 145086577] 57676 Receive Data = 3 [main, 145086577] 58678 Send Data = 9 [DefaultDispatcher-worker-1, 188663320] 58679 Relay Data = 9 [main, 145086577] 58680 Receive Data = 9 [main, 145086577] 59683 Send Data = 1 [DefaultDispatcher-worker-1, 188663320] 59684 Relay Data = 1 [main, 145086577] 59685 Receive Data = 1 [main, 145086577]
中間演算は受信機のコルーチンコンテキストで実行される点に注意してください。
withIndex
withIndexはデータにインデックを付与します。
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
var index = 0
collect { value ->
emit(IndexedValue(checkIndexOverflow(index++), value))
}
}
以下はwithIndexのサンプルです。
ストリームデータはインデックス番号が付与されて、IndexedValue<T>へ置き換えられます。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowWithIndex() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).withIndex()
@Preview
@Composable
fun FlowWithIndex_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<IndexedValue<Int>> = remember { createFlowWithIndex() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data[${it.index}] = ${it.value} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (withIndex) !") }
}
66938 Compose !
69711 Start Receiver !
69722 Start Sender !
74730 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
74731 Receive Data[0] = 8 [main, 145086577]
75734 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
75734 Receive Data[1] = 4 [main, 145086577]
76737 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
76737 Receive Data[2] = 3 [main, 145086577]
:
:
83764 Send Data = 6 [DefaultDispatcher-worker-1, 188663320]
83764 Receive Data[9] = 6 [main, 145086577]
map
mapはデータを割り振り直します。
public inline fun <T, R> Flow<T>.map(
crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
以下はmapのサンプルです。
ストリームデータは10%の大きさ(1/10)へ縮小され、Floatで表現されます。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowMap() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).map { it.toFloat() / 10.0f }
@Preview
@Composable
fun FlowMap_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Float> = remember { createFlowMap() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (map) !") }
}
78103 Compose !
80766 Start Receiver !
80777 Start Sender !
85785 Send Data = 8 [DefaultDispatcher-worker-2, 188663320]
85786 Receive Data = 0.8 [main, 145086577]
86789 Send Data = 4 [DefaultDispatcher-worker-2, 188663320]
86790 Receive Data = 0.4 [main, 145086577]
87792 Send Data = 3 [DefaultDispatcher-worker-2, 188663320]
87792 Receive Data = 0.3 [main, 145086577]
:
:
94830 Receive Data = 0.6 [main, 145086577]
94829 Send Data = 6 [DefaultDispatcher-worker-2, 188663320]
filter
filterは 条件に合うデータを通します。
public inline fun <T> Flow<T>.filter(
crossinline predicate: suspend (T) -> Boolean
): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
以下はfilterのサンプルです。
ストリームデータは「条件:データ < 3」のみを通し、それ以外は除外されます。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowFilter() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).filter { it < 3 }
@Preview
@Composable
fun FlowFilter_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Int> = remember { createFlowFilter() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (filer) !") }
}
74969 Compose !
19594 Start Receiver !
19606 Start Sender !
24612 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
25618 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
26623 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
27626 Send Data = 9 [DefaultDispatcher-worker-1, 188663320]
28629 Send Data = 1 [DefaultDispatcher-worker-1, 188663320]
28630 Receive Data = 1 [main, 145086577]
29632 Send Data = -1 [DefaultDispatcher-worker-1, 188663320]
29633 Receive Data = -1 [main, 145086577]
30637 Send Data = 2 [DefaultDispatcher-worker-1, 188663320]
30637 Receive Data = 2 [main, 145086577]
31639 Send Data = 5 [DefaultDispatcher-worker-1, 188663320]
32641 Send Data = 7 [DefaultDispatcher-worker-1, 188663320]
33646 Send Data = 6 [DefaultDispatcher-worker-1, 188663320]
drop
dropは先頭から指定個数のデータを除外します。
public fun <T> Flow<T>.drop(count: Int): Flow<T> {
require(count >= 0) { "Drop count should be non-negative, but had $count" }
return flow {
var skipped = 0
collect { value ->
if (skipped >= count) emit(value) else ++skipped
}
}
}
以下はdropのサンプルです。
先頭の3つのデータは、中間演算により途中で除外されて、受信機で受信できません。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowDrop() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).drop(3)
@Preview
@Composable
fun FlowDrop_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Int> = remember { createFlowDrop() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (drop) !") }
}
26789 Compose !
30099 Start Receiver !
30111 Start Sender !
35116 Send Data = 8 [DefaultDispatcher-worker-1, 265517962]
36119 Send Data = 4 [DefaultDispatcher-worker-1, 265517962]
37121 Send Data = 3 [DefaultDispatcher-worker-1, 265517962]
38124 Send Data = 9 [DefaultDispatcher-worker-1, 265517962]
38124 Receive Data = 9 [main, 57398779]
39126 Send Data = 1 [DefaultDispatcher-worker-1, 265517962]
39126 Receive Data = 1 [main, 57398779]
40128 Send Data = -1 [DefaultDispatcher-worker-1, 265517962]
40128 Receive Data = -1 [main, 57398779]
41130 Send Data = 2 [DefaultDispatcher-worker-1, 265517962]
41130 Receive Data = 2 [main, 57398779]
42132 Send Data = 5 [DefaultDispatcher-worker-1, 265517962]
42132 Receive Data = 5 [main, 57398779]
43135 Send Data = 7 [DefaultDispatcher-worker-1, 265517962]
43136 Receive Data = 7 [main, 57398779]
44138 Send Data = 6 [DefaultDispatcher-worker-1, 265517962]
44138 Receive Data = 6 [main, 57398779]
dropWhile
dropWhileは先頭から条件を満たす間のデータを除外します。
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
var matched = false
collect { value ->
if (matched) {
emit(value)
} else if (!predicate(value)) {
matched = true
emit(value)
}
}
}
以下はdropWhileのサンプルです。
ストリームデータは、先頭から「条件:データ > 0」を満たす間はデータを除外し、満たさなくなった時点で通るようになります。
「除外」からスタートする点に注意してください。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowDrop() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).dropWhile{ it > 0 }
@Preview
@Composable
fun FlowDrop_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Int> = remember { createFlowDrop() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (drop) !") }
}
93556 Compose !
95880 Start Receiver !
95887 Start Sender !
00894 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
01898 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
02901 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
03905 Send Data = 9 [DefaultDispatcher-worker-1, 188663320]
04909 Send Data = 1 [DefaultDispatcher-worker-1, 188663320]
05914 Send Data = -1 [DefaultDispatcher-worker-1, 188663320] ⇐ 条件に合わない
05914 Receive Data = -1 [main, 145086577]
06917 Receive Data = 2 [main, 145086577]
06917 Send Data = 2 [DefaultDispatcher-worker-1, 188663320]
07920 Send Data = 5 [DefaultDispatcher-worker-1, 188663320]
07920 Receive Data = 5 [main, 145086577]
08922 Send Data = 7 [DefaultDispatcher-worker-1, 188663320]
08923 Receive Data = 7 [main, 145086577]
09926 Send Data = 6 [DefaultDispatcher-worker-1, 188663320]
09927 Receive Data = 6 [main, 145086577]
take
takeは先頭から指定個数のデータを取得します。
public fun <T> Flow<T>.take(count: Int): Flow<T> {
require(count > 0) { "Requested element count $count should be positive" }
return flow {
var consumed = 0
try {
collect { value ->
// Note: this for take is not written via collectWhile on purpose.
// It checks condition first and then makes a tail-call to either emit or emitAbort.
// This way normal execution does not require a state machine, only a termination (emitAbort).
// See "TakeBenchmark" for comparision of different approaches.
if (++consumed < count) {
return@collect emit(value)
} else {
return@collect emitAbort(value)
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
}
}
}
以下はtakeのサンプルです。
先頭の3つのデータは取得できますが、以降は中間演算により途中で除外されて、受信機で受信できません。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowTake() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).take(3)
@Preview
@Composable
fun FlowTake_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Int> = remember { createFlowTake() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (take) !") }
}
27664 Compose !
30022 Start Receiver !
30029 Start Sender !
35035 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
35035 Receive Data = 8 [main, 145086577]
36039 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
36040 Receive Data = 4 [main, 145086577]
37041 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
37042 Receive Data = 3 [main, 145086577]
// ダウンストリームFlowはAbort ⇒ アップストリームFlowも停止する
takeWhile
takeWhileは先頭から条件を満たす間のデータを取得します。
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
// This return is needed to work around a bug in JS BE: KT-39227
return@flow collectWhile { value ->
if (predicate(value)) {
emit(value)
true
} else {
false
}
}
}
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
val collector = object : FlowCollector<T> {
override suspend fun emit(value: T) {
// Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example)
// the the resulting code is never tail-suspending and produces a state-machine
if (!predicate(value)) {
throw AbortFlowException(this)
}
}
}
try {
collect(collector)
} catch (e: AbortFlowException) {
e.checkOwnership(collector)
}
}
以下はtakeWhileのサンプルです。
ストリームデータは、先頭から「条件:データ > 0」を満たす間はデータを取得し、満たさなくなった時点で除外するようになります。
「取得」からスタートする点に注意してください。
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
private val Data = arrayOf( 8, 4, 3, 9, 1,-1, 2, 5, 7, 6)
fun createFlowTake() = flow {
Log.i(TAG, "${getMilliTime5()} Start Sender !")
delay(5000)
Data.forEach {
emit(it) // 送信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Send Data = ${it} [${_thName}, ${_jbCode}]")
delay(1000)
}
}.flowOn(Dispatchers.Default).takeWhile { it > 0 }
@Preview
@Composable
fun FlowTake_Receiver(
scope: CoroutineScope = rememberCoroutineScope(),
datCh: Flow<Int> = remember { createFlowTake() }
) {
Log.i(TAG, "${getMilliTime5()} Compose !")
Button(
modifier = Modifier.fillMaxWidth(),
onClick = {
scope.launch {
Log.i(TAG, "${getMilliTime5()} Start Receiver !")
datCh.collect { // 要求・受信
val _thName = getThreadName()
val _jbCode = currentCoroutineContext().job.hashCode()
Log.i(TAG, "${getMilliTime5()} Receive Data = ${it} [${_thName}, ${_jbCode}]")
// delay(1000)
}
}
}
) { Text(text = "Flow test (take) !") }
}
99771 Compose !
02527 Start Receiver !
02537 Start Sender !
07541 Send Data = 8 [DefaultDispatcher-worker-1, 188663320]
07541 Receive Data = 8 [main, 145086577]
08544 Send Data = 4 [DefaultDispatcher-worker-1, 188663320]
08545 Receive Data = 4 [main, 145086577]
09549 Send Data = 3 [DefaultDispatcher-worker-1, 188663320]
09549 Receive Data = 3 [main, 145086577]
10553 Send Data = 9 [DefaultDispatcher-worker-1, 188663320]
10554 Receive Data = 9 [main, 145086577]
11558 Send Data = 1 [DefaultDispatcher-worker-1, 188663320]
11559 Receive Data = 1 [main, 145086577]
12563 Send Data = -1 [DefaultDispatcher-worker-1, 188663320]
// ダウンストリームFlowはAbort ⇒ アップストリームFlowも停止する
関連記事:
