コルーチン(Coroutine)は「非同期処理の手法」の1つです。
Kotlinが提供します。
コルーチンの構成要素であるSuspend関数について、まとめます。
目次
Suspend関数とは
「Suspend関数とは何か?」についてです。
non-blocking動作を含む
non-blocking動作を含む関数をSuspend関数と呼びます。
non-blocking動作は「スレッドをブロックせずに処理を一時停止・再開できる機能」です。
例えば、delay( )はnon-blocking動作をします。
delay( )は自身を実行したスレッドを一時停止(suspend)し、プールへ返却します。
プール中のスレッドは要求があれば新たな処理へ割り当て可能(ブロックしない)です。それまでは待機します。
そして、指定された経過時間後にプールからスレッド再取得して、delay( )関数以降の処理を再開します。
※詳細は「Coroutine:blockingとnon-blockingの違い」を参照
Suspend関数の種類
表はコルーチンのライブラリで提供されているSuspend関数です。皆、non-blocking動作を含みます。
分類 | suspend関数 | 概要 | |
---|---|---|---|
Delay | delay( ) | 処理を遅らせる | |
Builder | ――――― | withContext( ) | CoroutineContextを切り替える |
Async | await( ) awaitAll( ) | タスクブロックの戻り値を待つ | |
Launch | join( ) joinAll( ) | タスクブロックのの終了を待つ | |
Channel | Message | send( ) | データを送信 |
receive( ) | データを受信 | ||
Hot Stream | send( ) | データストリームを送信 | |
consumeEach( ) | データストリームを受信 | ||
Flow | Cold Stream | emit( ) | データストリームを送信 |
collect( ) | データストリームを受信 |
Suspend関数の階層化
Suspend関数を持つ関数
Suspend関数を持つ関数もSuspend関数です。
Suspend関数にはsuspend修飾子を付けるという決まりがあります。これは後述する「仕組み」を実現するためです。
suspend fun putTextWithDelay(view: TextView, text: String) { // suspend付き delay(1000) // Suspend関数(non-blocking動作を含む) view.text = text } suspend fun exeCountDown(view: TextView?) { // suspend付き view?.text = "3" putTextWithDelay(view, "2") // Suspend関数(non-blocking動作を含む) putTextWithDelay(view, "1") // Suspend関数(non-blocking動作を含む) putTextWithDelay(view, "0") // Suspend関数(non-blocking動作を含む) }
ちなみに、Suspend関数のdelay( )もsuspend修飾子が付いています。
public suspend fun delay(timeMillis: Long) { if (timeMillis <= 0) return // don`t delay return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> -> cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) } }
また、逆の表現をすると、Suspend関数はSuspend関数内にのみ配置できます。
トップブロックもSuspend関数
ビルダー(launchなど)によって起動されたスレッドが実行するトップのタスクブロックもSuspend関数です。
launchは拡張関数の定義内でタスクブロックへsuspend修飾子を指定しています。
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
Suspend関数の強み
suspend関数の強みは非同期処理を同期処理のように記述できることです。
※同期処理:複数の命令を一つずつ順番(上から下へ)に実行していく処理
suspend fun exeCountDown(view: TextView?) { view?.text = "3" delay(1000) // suspend関数 ← 非同期処理、non-blocking動作 view?.text = "2" delay(1000) // suspend関数 ← 非同期処理、non-blocking動作 view?.text = "1" delay(1000) // suspend関数 ← 非同期処理、non-blocking動作 view?.text = "0" }
Suspend関数を用いないのであれば、non-blocking動作を「スレッド+コールバック」で表現するしかありません。
Suspend関数の方が、実行の流れが記述の順番通りでシンプルですし、可読性も高いです。
Suspend関数の仕組み
Suspend関数の「非同期処理を同期処理のように記述できる」という強みは、どのように実現されているのでしょう?!
Kotlinコンパイラが構成を変換
suspend修飾子の付いた関数をkotlinコンパイラがバイトコードへ変換するとき、図のような構成に変換されます。
変換のポイントは次の3つです。
(1)Suspend関数の位置でタスクを分割し、ステートマシンを構成
(2)各ステートのnon-blocking動作を「スレッド+コールバック」へ変換
(3)ステートを管理するクラスを生成(図のX$task$1)
Suspend関数内にSuspend関数が配置されていれば、同じ構成が次々と下へ積まれていきます。
一見すると複雑そうに見えますが、行っていることは「Susupend関数の強み」で紹介した「スレッド+コールバック」のプログラムと大きく変わりません。
スレッドとコールバックが入れ子になるところを、ステートマシンを利用したループに置き換えた構成です。
これが、強みを実現する方法の正体です。
構成の変換例
例えば、以下のようなSuspend関数の場合を見てみましょう。
suspend fun task(): String { // State:0 println("TaskAの前処理") val _resultA = taskA() // Suspend関数 // State:1 println("TaskBの前処理") val _resultB = taskB() // Suspend関数 // State:2 println("TaskCの前処理") val _resultC = taskC() // Suspend関数 // State:3 println("結果の集計") val _result = _resultA + _resultB + _resultC // return "重い処理の結果 ${_result}" }
ステートマシン
タスクブロックは3つのSuspend関数を含むので、4つの子タスクへ分割されます。
子タスクを順番にState:0⇒1⇒2⇒3と実行して行けば、分割前と同じ動作です。よって、ステートマシンは実行する子タスクを切り替えるswitch文になっています。
public final Object task(Continuation<? super String> var1) { MainActivity$task$1 mainActivity$task$1; label37: { if (var1 instanceof MainActivity$task$1) { mainActivity$task$1 = (MainActivity$task$1)var1; if ((mainActivity$task$1.label & Integer.MIN_VALUE) != 0) { mainActivity$task$1.label -= Integer.MIN_VALUE; break label37; } } mainActivity$task$1 = new ContinuationImpl(var1); } Object var10000; String _resultA; String _resultB; String _resultC; label31: { Object var8; label30: { Object $result = mainActivity$task$1.result; var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(mainActivity$task$1.label) { case 0: ResultKt.throwOnFailure($result); _resultA = "TaskAの前処理"; System.out.println(_resultA); mainActivity$task$1.L$0 = this; mainActivity$task$1.label = 1; var10000 = this.taskA((Continuation)mainActivity$task$1); if (var10000 == var8) { return var8; } break; case 1: this = (MainActivity)mainActivity$task$1.L$0; ResultKt.throwOnFailure($result); var10000 = $result; break; case 2: _resultA = (String)mainActivity$task$1.L$1; this = (MainActivity)mainActivity$task$1.L$0; ResultKt.throwOnFailure($result); var10000 = $result; break label30; case 3: _resultB = (String)mainActivity$task$1.L$1; _resultA = (String)mainActivity$task$1.L$0; ResultKt.throwOnFailure($result); var10000 = $result; break label31; default: throw new IllegalStateException( "call to 'resume' before 'invoke' with coroutine"); } _resultA = (String)var10000; _resultB = "TaskBの前処理"; System.out.println(_resultB); mainActivity$task$1.L$0 = this; mainActivity$task$1.L$1 = _resultA; mainActivity$task$1.label = 2; var10000 = this.taskB((Continuation)mainActivity$task$1); if (var10000 == var8) { return var8; } } _resultB = (String)var10000; _resultC = "TaskCの前処理"; System.out.println(_resultC); mainActivity$task$1.L$0 = _resultA; mainActivity$task$1.L$1 = _resultB; mainActivity$task$1.label = 3; var10000 = this.taskC((Continuation)mainActivity$task$1); if (var10000 == var8) { return var8; } } _resultC = (String)var10000; String _result = "結果の集計"; System.out.println(_result); _result = _resultA + _resultB + _resultC; return "重い処理の結果 " + _result; }
分割前のSuspend関数がif文などの制御構文を持っていれば、もっと複雑なステートマシンになると予測されます。
ステートを管理するクラス
ステートを管理するクラスは、Kotlinコンパイラが自動生成します。
※クラス名はコンパイラが決めます。例はMainActivity$task$1(図はX$task$1と表現)となりました。task( )関数をMainActivityクラスに記述したためであると思われます。
public final class MainActivity$task$1 extends ContinuationImpl { Object L$0; // _resultAを格納 Object L$1; // _resultBを格納 int label; final /* synthetic */ MainActivity this$0; MainActivity$task$1( MainActivity mainActivity, Continuation<? super MainActivity$task$1> continuation ) { super(continuation); this.this$0 = mainActivity; } @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl public final Object invokeSuspend(Object obj) { this.result = obj; this.label |= Integer.MIN_VALUE; // このinvokeSuspendから呼ばれたことを示すフラグ return this.this$0.task(this); } }
このクラスはContinuationインターフェースを実装しています。
コールバック
BaseContinuationImpl#resumeWith( )がコールバックで呼ばれる関数です。
resumeWith( )は内部でinvokeSuspend( )を呼び出し、子タスクの実行を次のステートへ進めます。※抽象関数invokeSuspend( )の実装はステートを管理するクラス(MainActivity$task$1)にある
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? // 上位Suspend関数のステートを管理するクラス ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result: Result<Any?>) { var current = this // 現Suspend関数のスレートを管理するクラス var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() if (completion is BaseContinuationImpl) { current = completion // 上位Suspend関数へ切り替え param = outcome } else { completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any? ... }
while(true)のループが特徴的で、「outcome !== COROUTINE_SUSPENDED」の場合はcurrenntを上位Suspend関数の「ステートを管理するクラス」へ切り替えて、ループを回します。そして、再度invokeSuspend( )を呼びます。
つまり、最後の子ステートの実行が行われた後に、上位Susupend関数へ処理を返しています。
関連記事: