コルーチン(Coroutine)は「非同期処理の手法」の1つです。
Kotlinが提供します。
コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。
※仕組みの解析は次のバージョンを対象に行っています。
Kotlin:Ver 1.6.10
org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9
目次
launchビルダーのバイトコード
launchビルダーはCoroutineScopeに定義された拡張関数です。
※launchビルダーの動作は「Coroutine:launchビルダーでコルーチン開始」を参照
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
ソースコードは上記のように記述されていますが、バイトコードではKotlinコンパイラにより次のように変換されます。
public static final Job launch( CoroutineScope coroutineScope, CoroutineContext context, CoroutineStart start, Function2 function2 // (1)タスクブロックを格納したクラス ) { LazyStandaloneCoroutine coroutine; CoroutineContext newContext = CoroutineContextKt.newCoroutineContext(coroutineScope, context); if (start.isLazy()) { coroutine = new LazyStandaloneCoroutine(newContext, function2); } else { coroutine = new StandaloneCoroutine(newContext, true); } coroutine.start(start, coroutine, function2); // (3)コルーチンを開始する return coroutine; // (2)コールバックを受け取るクラス }
変換されたバイトコードのポイントは次の3つです。
(1)タスクブロックを格納したクラス(Coroutineクラス ※後述)
(2)コールバックを受け取るクラス(StandaloneCoroutineクラス)
(3)コルーチンを開始するAbstractCoroutine#start( )
タスクブロックを格納したクラス…(1)
Kotlinコンパイラはバイトコードへ変換する時に、タスクブロック(launchの引数に指定したラムダ式)を格納したクラスを作成します。
Coroutineクラス
タスクブロックを格納したクラス(MainActivity$onCreate$1$1)の例です。
以下のようなlaunchでコルーチンを開始した場合の動作を考えます。
findViewById<Button>(R.id.btnStart).setOnClickListener { scope = SampleScope() scope?.launch { println("Start") Thread.sleep(2000) // 重い処理の代わり println("End") } }
次のようなクラスが作成されます。クラス名はコンパイラが重複しない名前を付けます。
final class MainActivity$onCreate$1$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> { int label; public MainActivity$onCreate$1$1( Continuation<? super MainActivity$onCreate$1$1> continuation ) { super(2, continuation); } @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl public final Continuation<Unit> create( Object obj, Continuation<?> continuation ) { return new MainActivity$onCreate$1$1(continuation); } public final Object invoke( CoroutineScope coroutineScope, Continuation<? super Unit> continuation ) { return ((MainActivity$onCreate$1$1) create(coroutineScope, continuation)) .invokeSuspend(Unit.INSTANCE); } @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl public final Object invokeSuspend(Object obj) { IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch (this.label) { case 0: ResultKt.throwOnFailure(obj); System.out.println((Object) "Start"); Thread.sleep(2000); System.out.println((Object) "End"); return Unit.INSTANCE; default: throw new IllegalStateException("..."); } // ↑↑ タスクブロック(launchの引数に指定したラムダ式)を含む ↑↑ } }
以後、タスクブロックを格納したクラスを「Coroutineクラス」と呼ぶことにします。
Function2インターフェース
Function2はCoroutineクラスに実装されたインターフェースです。
public interface Function2<P1, P2, R> extends Function<R> { R invoke(P1 p1, P2 p2); }
バイトコードでは、Coroutineクラスを引数で受け渡す際の型として、Function2が用いられます。
SuspendLambdaの継承
SuspendLambdaはCoroutineクラスに継承されたクラスです。Continuationインターフェースを実装しています。
SuspendLambdaはタスクブロックの実行と終了を管理する役割があります。そのための仕組みをBaseContinuationImplが実装しています。
コールバックを受け取るクラス…(2)
StandaloneCoroutineはContinuationインターフェースを実装したクラスです。
タスクブロックの処理を終えたスレッドからコールバックを受け取る役割があります。Continuation#resumeWith( )がコールバックされる関数です。
SuspendLambdaと同様に、Countinuationインターフェースを実装していながら、BaseContinuationImplの継承を持たない点が異なります。
コルーチンを開始するstart( )…(3)
StandaloneCoroutineはAbstractCoroutine抽象クラスを継承しています。
そのAbstractCoroutine#start( )でコルーチンは開始されます。
public abstract class AbstractCoroutine<in T>( ... public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) // CoroutineStart#invoke( )関数を実行 } ... }
オプションで開始方法の切り替え
CoroutineStartはEnumクラスです。CoroutineStartのインスタンスstartをstart(…)のように実行することは、invoke(…)関数を実行することです。
public enum class CoroutineStart { DEFAULT, LAZY, ATOMIC, UNDISPATCHED; ... public operator fun <R, T> invoke( block: suspend R.() -> T, receiver: R, completion: Continuation<T> ): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit // will start lazily } ... }
実行するinvoke( )内でCoroutineStart.XXX(スタートオプション)毎にコルーチンの開始方法を切り替えています。
※スタートオプションの詳細は「Coroutine:launchビルダーでコルーチン開始」を参照
コルーチン準備&スレッド起動
CoroutineStart.DEFAULTの場合を例にとれば、コルーチンの開始で最後に実行されるのは次の構文になります。
... internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T> ) = runSafely(completion) { // 最後の引数がラムダ式⇒( )の外へ出せる createCoroutineUnintercepted(receiver, completion) .intercepted() .resumeCancellableWith(Result.success(Unit)) } ... private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) { try { block() // runSafelyは引数のラムダ式を実行する } catch (e: Throwable) { completion.resumeWith(Result.failure(e)) } }
前半の2つ(青背景)はコール―チン開始の準備です。
コールバックを受け取るクラスとDispatchers(スレッドプール)をCoroutineクラスへ登録しています。
後半の1つ(赤背景)はスレッドの起動を行っています。
Dispatchersが持つdispatch( )関数(図ではDefaultScheduler#dispatch( ))が実行されると、スレッドが起動されます。
そのスレッドでCoroutineクラスのresumeWith( )が呼ばれ、タスクブロックの処理が始まります。
コルーチンの内部動作
次のようなlaunchでコルーチンを開始した場合の動作を考えます。
findViewById<Button>(R.id.btnStart).setOnClickListener { // launchの仕組み(入れ子) scope = SampleScope() scope?.launch(Dispatchers.Default) { // Coroutine A // --- State:0 --- println("Start top ${getThread()}") val _job = launch(Dispatchers.Main) { // Coroutine B println("Start sub ${getThread()}") Thread.sleep(2000) // 重い処理の代わり println("End sub") } _job.join() // --- State:1 --- println("End top") } }
コルーチン開始後の内部動作はSuspendLambdaが継承しているBaseContinuationImpl#resumeWith( )が制御しています。
resumeWith( )の役割は次の2つです。
(1)タスクブロックの実行
(2)コールバックの発行
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result: Result<Any?>) { var current = this var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) // タスクブロック処理 if (outcome === COROUTINE_SUSPENDED) return // 同一? Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() if (completion is BaseContinuationImpl) { // 上位のSuspend関数へ current = completion param = outcome } else { // コールバックを発行 completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any? // ↑↑ Coroutineクラスで実装されている ↑↑ // ... }
タスクブロックの実行…(1)
launchのタスクブロックはSuspend関数です。よって、内部にSuspend関数を持つ(階層構造)時、ステートマシンの構成を取ります。
「=== COROUTINE_SUSPENDED」の間はresumeWith( )とinvokeSuspend( )の間をループして、ステートマシンの処理を進めます。
※詳細は「Coroutine:Suspend関数とその仕組み」を参照
コールバックの発行…(2)
「!== COROUTINE_SUSPENDED」はタスクブロックの実行が終わって、結果が得られたことを表します。
completionプロパティはコールバックを受け取るクラス(StandaloneCoroutine)が入っています。このクラスはBaseContinuationImplを継承していないため、completion.resumeWith( )を実行します。
つまり、StandaloneCoroutine#resumeWith( )を実行することになります。
上位へコールバックの発行
Job#join( )が実行されている場合は、コールバックを受け取るクラス(StandaloneCoroutine)から上位のコルーチンへ、更にコールバックを返します。
このとき、スレッドを跨ぎます。
上位CoroutineクラスのDispatchersが持つdispatch( )関数により、スレッドが起動します。
スレッドが実行するのは上位CoroutineクラスのresumeWith( )関数です。
関連記事: