コルーチン(Coroutine)は「非同期処理の手法」の1つです。
Kotlinが提供します。
コルーチンを開始するasyncビルダーの仕組みについて、まとめます。
※仕組みの解析は次のバージョンを対象に行っています。
Kotlin:Ver 1.6.10
org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9
目次
asyncビルダーのバイトコード
asyncビルダーはCoroutineScopeに定義された拡張関数です。
※asyncビルダーの動作は「Coroutine:asyncビルダーでコルーチン開始」を参照
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
ソースコードは上記のように記述されていますが、バイトコードではKotlinコンパイラにより次のように変換されます。
public static final <T> Deferred<T> async(
CoroutineScope coroutineScope,
CoroutineContext context,
CoroutineStart start,
Function2 function2 // (1)タスクブロックを格納したクラス
) {
LazyDeferredCoroutine coroutine;
CoroutineContext newContext = CoroutineContextKt.newCoroutineContext(coroutineScope, context);
if (start.isLazy()) {
coroutine = new LazyDeferredCoroutine(newContext, function2);
} else {
coroutine = new DeferredCoroutine(newContext, true);
}
coroutine.start(start, coroutine, function2); // (3)コルーチンを開始する
return coroutine; // (2)コールバックを受け取るクラス
}
変換されたバイトコードのポイントは次の3つです。
(1)タスクブロックを格納したクラス(Coroutineクラス ※後述)
(2)コールバックを受け取るクラス(DeferredCoroutineクラス)
(3)コルーチンを開始するAbstractCoroutine#start( )
タスクブロックを格納したクラス…(1)
Kotlinコンパイラはバイトコードへ変換する時に、タスクブロック(asyncの引数に指定したラムダ式)を格納したクラスを作成します。
Coroutineクラス
タスクブロックを格納したクラス(MainActivity$onCreate$1$1)の例です。
以下のようなasyncでコルーチンを開始した場合の動作を考えます。
findViewById<Button>(R.id.btnStart).setOnClickListener {
// asyncの仕組み(シンプルなタスク)
scope = SampleScope()
scope?.async(Dispatchers.Default) {
println("Start")
Thread.sleep(2000) // 重い処理の代わり
println("End")
}
}
次のようなクラスが作成されます。クラス名はコンパイラが重複しない名前を付けます。
final class MainActivity$onCreate$1$1
extends SuspendLambda
implements Function2<CoroutineScope, Continuation<? super Unit>, Object>
{
int label;
public MainActivity$onCreate$1$1(
Continuation<? super MainActivity$onCreate$1$1> continuation
) {
super(2, continuation);
}
@Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
public final Continuation<Unit> create(
Object obj, Continuation<?> continuation
) {
return new MainActivity$onCreate$1$1(continuation);
}
public final Object invoke(
CoroutineScope coroutineScope,
Continuation<? super Unit> continuation
) {
return ((MainActivity$onCreate$1$1) create(coroutineScope, continuation))
.invokeSuspend(Unit.INSTANCE);
}
@Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
public final Object invokeSuspend(Object obj) {
IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(obj);
System.out.println((Object) "Start");
Thread.sleep(2000);
System.out.println((Object) "End");
return Unit.INSTANCE;
default:
throw new IllegalStateException("...");
}
// ↑↑ タスクブロック(asyncの引数に指定したラムダ式)を含む ↑↑
}
}
以後、タスクブロックを格納したクラスを「Coroutineクラス」と呼ぶことにします。
Function2インターフェース
Function2はCoroutineクラスに実装されたインターフェースです。
public interface Function2<P1, P2, R> extends Function<R> {
R invoke(P1 p1, P2 p2);
}
バイトコードでは、Coroutineクラスを引数で受け渡す際の型として、Function2が用いられます。
SuspendLambdaの継承
SuspendLambdaはCoroutineクラスに継承されたクラスです。Continuationインターフェースを実装しています。

SuspendLambdaはタスクブロックの実行と終了を管理する役割があります。そのための仕組みをBaseContinuationImplが実装しています。
コールバックを受け取るクラス…(2)
DeferredCoroutineはContinuationインターフェースを実装したクラスです。
タスクブロックの処理を終えたスレッドからコールバックを受け取る役割があります。Continuation#resumeWith( )がコールバックされる関数です。
SuspendLambdaと同様に、Countinuationインターフェースを実装していながら、BaseContinuationImplの実装を持たない点が異なります。
コルーチンを開始するstart( )…(3)
DeferredCoroutineはAbstractCoroutine抽象クラスを継承しています。
そのAbstractCoroutine#start( )でコルーチンは開始されます。
public abstract class AbstractCoroutine<in T>(
...
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this) // CoroutineStart#invoke( )関数を実行
}
...
}
オプションで開始方法の切り替え
CoroutineStartはEnumクラスです。CoroutineStartのインスタンスstartをstart(…)のように実行することは、invoke(…)関数を実行することです。
public enum class CoroutineStart {
DEFAULT,
LAZY,
ATOMIC,
UNDISPATCHED;
...
public operator fun <R, T> invoke(
block: suspend R.() -> T, receiver: R, completion: Continuation<T>
): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
...
}
実行するinvoke( )内でCoroutineStart.XXX(スタートオプション)毎にコルーチンの開始方法を切り替えています。
※スタートオプションの詳細は「Coroutine:asyncビルダーでコルーチン開始」を参照
コルーチン準備&スレッド起動
CoroutineStart.DEFAULTの場合を例にとれば、コルーチンの開始で最後に実行されるのは次の構文になります。
...
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>
) =
runSafely(completion) { // 最後の引数がラムダ式⇒( )の外へ出せる
createCoroutineUnintercepted(receiver, completion)
.intercepted()
.resumeCancellableWith(Result.success(Unit))
}
...
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block() // runSafelyは引数のラムダ式を実行する
} catch (e: Throwable) {
completion.resumeWith(Result.failure(e))
}
}
前半の2つ(青背景)はコール―チン開始の準備です。
コールバックを受け取るクラスとDispatchers(スレッドプール)をCoroutineクラスへ登録しています。

後半の1つ(赤背景)はスレッドの起動を行っています。
Dispatchersが持つdispatch( )関数(図ではDefaultScheduler#dispatch( ))が実行されると、スレッドが起動されます。

そのスレッドでCoroutineクラスのresumeWith( )が呼ばれ、タスクブロックの処理が始まります。
コルーチンの内部動作
次のようなasyncでコルーチンを開始した場合の動作を考えます。
findViewById<Button>(R.id.btnStart).setOnClickListener {
scope = SampleScope()
scope?.async(Dispatchers.Default) {
// --- State:0 ---
println("Start top ${getThread()}")
val _deferred = async(Dispatchers.Main) {
println("Start sub ${getThread()}")
Thread.sleep(1000) // 重い処理の代わり
println("End sub")
"AAA" // asyncの結果(ブロック中の最後の評価)
}
val _result = _deferred.await()
// --- State:1 ---
println("End top ${_result}");
}
}
コルーチン開始後の内部動作はSuspendLambdaが継承しているBaseContinuationImpl#resumeWith( )が制御しています。
resumeWith( )の役割は次の2つです。
(1)タスクブロックの実行
(2)コールバックの発行
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param) // タスクブロック処理
if (outcome === COROUTINE_SUSPENDED) return // 同一?
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) { // 上位のSuspend関数へ
current = completion
param = outcome
} else { // コールバックを発行
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
// ↑↑ Coroutineクラスで実装されている ↑↑ //
...
}
タスクブロックの実行…(1)
asyncのタスクブロックはSuspend関数です。よって、内部にSuspend関数を持つ(階層構造)時、ステートマシンの構成を取ります。
「=== COROUTINE_SUSPENDED」の間はresumeWith( )とinvokeSuspend( )の間をループして、ステートマシンの処理を進めます。
※詳細は「Coroutine:Suspend関数とその仕組み」を参照
コールバックの発行…(2)
「!== COROUTINE_SUSPENDED」はタスクブロックの実行が終わって、結果が得られたことを表します。
completionプロパティはコールバックを受け取るクラス(DeferredCoroutine)が入っています。このクラスはBaseContinuationImplを継承していないため、completion.resumeWith( )を実行します。
つまり、DeferredCoroutine#resumeWith( )を実行することになります。
上位へコールバックの発行
Deferred#await( )が実行されている場合は、コールバックを受け取るクラス(DeferredCoroutine)から上位のコルーチンへ、更にコールバックを返します。
このとき、スレッドを跨ぎます。
上位CoroutineクラスのDispatchersが持つdispatch( )関数により、スレッドが起動します。
スレッドが実行するのは上位CoroutineクラスのresumeWith( )関数です。
コルーチンの結果取得
asyncはコルーチンの結果としてタスクブロックの戻り値を取得できます。
Deferred#await( )の実態
結果の取得はDeferred#await( )関数を使います。
asyncは実行時にDeferredCoroutineのインスタンスを返します。そして、await( )はDeferredCoroutineに実装されたDeferredインターフェースが持つ関数です。
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
registerSelectClause1Internal(select, block)
}
public interface Deferred<out T> : Job {
public suspend fun await(): T
public val onAwait: SelectClause1<T>
public fun getCompleted(): T
public fun getCompletionExceptionOrNull(): Throwable?
}
その実態はJobSupport#awaitInternal( )になります。
internal suspend fun awaitInternal(): Any? {
// fast-path -- check state (avoid extra object creation)
while (true) { // lock-free loop on state
val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()
}
if (startInternal(state) >= 0) break // break unless needs to retry
}
return awaitSuspend() // slow-path
}
結果の取得タイミングは2つ
結果の取得タイミングは「await( )の実行」と「タスクの終了」の時間的な前後関係で2つのケースありあます。


「await( )の実行」と「タスクの終了」は別のスレッドで処理されるので非同期です。
よって、両者のタイミングが近接しているとレーシング(競争、どちらが先になるか不明)状態になります。
ですが、await( )は(1)と(2)のどちらのケースでも問題が出ないように、アトミックな処理を行っているようです。
タスクの終了より前にawait( )の実行
asyncによって開始されたコルーチンは開始から終了までのライフサイクルを持ちます。
JobSupport#stateにライフサイクルの状態を保持しています。
このstateを確認し、await( )の実行がタスクの終了よりも前であればawaitSuspend( )を実行(赤背景)します。

awaitSuspend( )はCOROUTINE_SUSPENDEDを返すので、スレッドは一時停止(non-blocking動作)します。
ここでDeferred#await( )は戻り値の取得を行いません。
結果はresumeWith(result)の引数に載せられて、Coroutineクラスへ送られます。

以下の例は、その様子を示したものです。
結果保持の変数は、結果取得数に合わせてKotlinコンパイラが自動生成します。

final class MainActivity$onCreate$1$1
extends SuspendLambda
implements Function2<CoroutineScope, Continuation<? super Unit>, Object>
{
Object L$0; // 結果保持の変数
Object L$1; // 結果保持の変数
Object L$2; // 結果保持の変数
int label;
final MainActivity this$0;
...
public final Object invokeSuspend(Object $result) { // 下位階層からの結果
...
}
...
}
タスクの終了より後にawait( )の実行
asyncによって開始されたコルーチンは開始から終了までのライフサイクルを持ちます。
JobSupport#stateにライフサイクルの状態を保持しています。
このstateを確認し、await( )の実行がタスクの終了よりも後であれば if文内を実行(青背景)します。

stateは内部にコルーチンの結果を格納しています。
取り出してawait( )の戻り値にします。
関連記事:


