Coroutine:asyncビルダーの仕組み

投稿日:  更新日:

コルーチン(Coroutine)は「非同期処理の手法」の1つです。

Kotlinが提供します。

コルーチンを開始するasyncビルダーの仕組みについて、まとめます。

※仕組みの解析は次のバージョンを対象に行っています。

   Kotlin:Ver 1.6.10
   org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9

スポンサーリンク

asyncビルダーのバイトコード

asyncビルダーはCoroutineScopeに定義された拡張関数です。
※asyncビルダーの動作は「Coroutine:asyncビルダーでコルーチン開始」を参照

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

ソースコードは上記のように記述されていますが、バイトコードではKotlinコンパイラにより次のように変換されます。

    public static final <T> Deferred<T> async(
	    CoroutineScope coroutineScope,
		CoroutineContext context, 
		CoroutineStart start, 
		Function2 function2					  // (1)タスクブロックを格納したクラス
	) {
        LazyDeferredCoroutine coroutine;
        CoroutineContext newContext = CoroutineContextKt.newCoroutineContext(coroutineScope, context);
        if (start.isLazy()) {
            coroutine = new LazyDeferredCoroutine(newContext, function2);
        } else {
            coroutine = new DeferredCoroutine(newContext, true);
        }
        coroutine.start(start, coroutine, function2);	// (3)コルーチンを開始する
        return coroutine;		                // (2)コールバックを受け取るクラス
    }

変換されたバイトコードのポイントは次の3つです。

(1)タスクブロックを格納したクラス(Coroutineクラス ※後述)
(2)コールバックを受け取るクラス(DeferredCoroutineクラス)
(3)コルーチンを開始するAbstractCoroutine#start( )

スポンサーリンク

タスクブロックを格納したクラス…(1)

Kotlinコンパイラはバイトコードへ変換する時に、タスクブロック(asyncの引数に指定したラムダ式)を格納したクラスを作成します。

Coroutineクラス

タスクブロックを格納したクラス(MainActivity$onCreate$1$1)の例です。

以下のようなasyncでコルーチンを開始した場合の動作を考えます。

SampleScope.kt
class SampleScope : CoroutineScope {
    override val coroutineContext: CoroutineContext =
        Job() + Dispatchers.Default + CoroutineName("Hoge") // Contextを定義
}
        findViewById<Button>(R.id.btnStart).setOnClickListener {
            // asyncの仕組み(シンプルなタスク)
            scope = SampleScope()
            scope?.async(Dispatchers.Default) {
                println("Start")
                Thread.sleep(2000)  // 重い処理の代わり
                println("End")
            }
        }

次のようなクラスが作成されます。クラス名はコンパイラが重複しない名前を付けます。

final class MainActivity$onCreate$1$1 
    extends SuspendLambda
	implements Function2<CoroutineScope, Continuation<? super Unit>, Object> 
{
    int label;

    public MainActivity$onCreate$1$1(
	    Continuation<? super MainActivity$onCreate$1$1> continuation
	) {
        super(2, continuation);
    }

    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    public final Continuation<Unit> create(
	    Object obj, Continuation<?> continuation
	) {
        return new MainActivity$onCreate$1$1(continuation);
    }

    public final Object invoke(
	    CoroutineScope coroutineScope, 
		Continuation<? super Unit> continuation
	) {
        return ((MainActivity$onCreate$1$1) create(coroutineScope, continuation))
		    .invokeSuspend(Unit.INSTANCE);
    }

    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    public final Object invokeSuspend(Object obj) {
        IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure(obj);
                System.out.println((Object) "Start");
                Thread.sleep(2000);
                System.out.println((Object) "End");
                return Unit.INSTANCE;
            default:
                throw new IllegalStateException("...");
        }
		// ↑↑ タスクブロック(asyncの引数に指定したラムダ式)を含む ↑↑
    }
}

以後、タスクブロックを格納したクラスを「Coroutineクラス」と呼ぶことにします。

Function2インターフェース

Function2はCoroutineクラスに実装されたインターフェースです。

public interface Function2<P1, P2, R> extends Function<R> {
    R invoke(P1 p1, P2 p2);
}

バイトコードでは、Coroutineクラスを引数で受け渡す際の型として、Function2が用いられます。

SuspendLambdaの継承

SuspendLambdaはCoroutineクラスに継承されたクラスです。Continuationインターフェースを実装しています。

SyspendLambdaのクラス図

SuspendLambdaはタスクブロックの実行と終了を管理する役割があります。そのための仕組みをBaseContinuationImplが実装しています。

スポンサーリンク

コールバックを受け取るクラス…(2)

DeferredCoroutineはContinuationインターフェースを実装したクラスです。

XXXCoroutineのクラス図

〔↑↑画像のクリックで拡大↑↑〕

タスクブロックの処理を終えたスレッドからコールバックを受け取る役割があります。Continuation#resumeWith( )がコールバックされる関数です。

SuspendLambdaと同様に、Countinuationインターフェースを実装していながら、BaseContinuationImplの実装を持たない点が異なります。

スポンサーリンク

コルーチンを開始するstart( )…(3)

DeferredCoroutineはAbstractCoroutine抽象クラスを継承しています。

そのAbstractCoroutine#start( )でコルーチンは開始されます。

public abstract class AbstractCoroutine<in T>(
    ...
	public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this) // CoroutineStart#invoke( )関数を実行
    }
	...
}

オプションで開始方法の切り替え

CoroutineStartはEnumクラスです。CoroutineStartのインスタンスstartをstart(…)のように実行することは、invoke(…)関数を実行することです。

public enum class CoroutineStart {
    DEFAULT,
    LAZY,
    ATOMIC,
    UNDISPATCHED;

    ...
    public operator fun <R, T> invoke(
	    block: suspend R.() -> T, receiver: R, completion: Continuation<T>
	): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }
    ...
}

実行するinvoke( )内でCoroutineStart.XXX(スタートオプション)毎にコルーチンの開始方法を切り替えています。
※スタートオプションの詳細は「Coroutine:asyncビルダーでコルーチン開始」を参照

コルーチン準備&スレッド起動

CoroutineStart.DEFAULTの場合を例にとれば、コルーチンの開始で最後に実行されるのは次の構文になります。

...
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>
) =
    runSafely(completion) {  // 最後の引数がラムダ式⇒( )の外へ出せる
        createCoroutineUnintercepted(receiver, completion)
		    .intercepted()
			.resumeCancellableWith(Result.success(Unit))
    }
...
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()              // runSafelyは引数のラムダ式を実行する
    } catch (e: Throwable) {
        completion.resumeWith(Result.failure(e))
    }
}

前半の2つ(青背景)はコール―チン開始の準備です。

コールバックを受け取るクラスとDispatchers(スレッドプール)をCoroutineクラスへ登録しています。

コルーチン準備&スレッド起動

後半の1つ(赤背景)はスレッドの起動を行っています。

Dispatchersが持つdispatch( )関数(図ではDefaultScheduler#dispatch( ))が実行されると、スレッドが起動されます。

スレッド起動

そのスレッドでCoroutineクラスのresumeWith( )が呼ばれ、タスクブロックの処理が始まります。

スポンサーリンク

コルーチンの内部動作

次のようなasyncでコルーチンを開始した場合の動作を考えます。

        findViewById<Button>(R.id.btnStart).setOnClickListener {
            scope = SampleScope()
            scope?.async(Dispatchers.Default) {
                // --- State:0 ---
                println("Start top ${getThread()}")
                val _deferred = async(Dispatchers.Main) {
                    println("Start sub ${getThread()}")
                    Thread.sleep(1000)  // 重い処理の代わり
                    println("End sub")
                    "AAA"               // asyncの結果(ブロック中の最後の評価)
                }
                val _result = _deferred.await()
                // --- State:1 ---
                println("End top ${_result}");
            }
        }

コルーチン開始後の内部動作はSuspendLambdaが継承しているBaseContinuationImpl#resumeWith( )が制御しています。

resumeWith( )の役割は次の2つです。

(1)タスクブロックの実行
(2)コールバックの発行

asyncの仕組み

〔↑↑画像のクリックで拡大↑↑〕

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {

    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param) // タスクブロック処理
                        if (outcome === COROUTINE_SUSPENDED) return  // 同一?
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted()
                if (completion is BaseContinuationImpl) { // 上位のSuspend関数へ
                    current = completion
                    param = outcome
                } else {                                  // コールバックを発行
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
	
	protected abstract fun invokeSuspend(result: Result<Any?>): Any?
	// ↑↑ Coroutineクラスで実装されている ↑↑ //
    ...
}

タスクブロックの実行…(1)

asyncのタスクブロックはSuspend関数です。よって、内部にSuspend関数を持つ(階層構造)時、ステートマシンの構成を取ります。

「=== COROUTINE_SUSPENDED」の間はresumeWith( )とinvokeSuspend( )の間をループして、ステートマシンの処理を進めます。

※詳細は「Coroutine:Suspend関数とその仕組み」を参照

コールバックの発行…(2)

「!== COROUTINE_SUSPENDED」はタスクブロックの実行が終わって、結果が得られたことを表します。

completionプロパティはコールバックを受け取るクラス(DeferredCoroutine)が入っています。このクラスはBaseContinuationImplを継承していないため、completion.resumeWith( )を実行します。

つまり、DeferredCoroutine#resumeWith( )を実行することになります。

上位へコールバックの発行

Deferred#await( )が実行されている場合は、コールバックを受け取るクラス(DeferredCoroutine)から上位のコルーチンへ、更にコールバックを返します。

このとき、スレッドを跨ぎます。

上位CoroutineクラスのDispatchersが持つdispatch( )関数により、スレッドが起動します。

スレッドが実行するのは上位CoroutineクラスのresumeWith( )関数です。

スポンサーリンク

コルーチンの結果取得

asyncはコルーチンの結果としてタスクブロックの戻り値を取得できます。

Deferred#await( )の実態

結果の取得はDeferred#await( )関数を使います。

asyncは実行時にDeferredCoroutineのインスタンスを返します。そして、await( )はDeferredCoroutineに実装されたDeferredインターフェースが持つ関数です。

private open class DeferredCoroutine<T>(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
    override fun getCompleted(): T = getCompletedInternal() as T
    override suspend fun await(): T = awaitInternal() as T
    override val onAwait: SelectClause1<T> get() = this
    override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
        registerSelectClause1Internal(select, block)
}
public interface Deferred<out T> : Job {
    public suspend fun await(): T
    public val onAwait: SelectClause1<T>
    public fun getCompleted(): T
    public fun getCompletionExceptionOrNull(): Throwable?
}

その実態はJobSupport#awaitInternal( )になります。

    internal suspend fun awaitInternal(): Any? {
        // fast-path -- check state (avoid extra object creation)
        while (true) { // lock-free loop on state
            val state = this.state
            if (state !is Incomplete) {
                // already complete -- just return result
                if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                    recoverAndThrow(state.cause)
                }
                return state.unboxState()

            }
            if (startInternal(state) >= 0) break // break unless needs to retry
        }
        return awaitSuspend() // slow-path
    }

結果の取得タイミングは2つ

結果の取得タイミングは「await( )の実行」と「タスクの終了」の時間的な前後関係で2つのケースありあます。

(1)タスクの終了より前にawait( )の実行

(2)タスクの終了より後にawait( )の実行
タスクの終了より後にawait( )の実行

「await( )の実行」と「タスクの終了」は別のスレッドで処理されるので非同期です。

よって、両者のタイミングが近接しているとレーシング(競争、どちらが先になるか不明)状態になります。

ですが、await( )は(1)と(2)のどちらのケースでも問題が出ないように、アトミックな処理を行っているようです。

タスクの終了より前にawait( )の実行

asyncによって開始されたコルーチンは開始から終了までのライフサイクルを持ちます。

JobSupport#stateにライフサイクルの状態を保持しています。

このstateを確認し、await( )の実行がタスクの終了よりも前であればawaitSuspend( )を実行(赤背景)します。

awaitInternal( )はawait( )の実態

awaitSuspend( )はCOROUTINE_SUSPENDEDを返すので、スレッドは一時停止(non-blocking動作)します。

ここでDeferred#await( )は戻り値の取得を行いません。

結果はresumeWith(result)の引数に載せられて、Coroutineクラスへ送られます。

結果取得:結果が送られる経路

以下の例は、その様子を示したものです。

結果保持の変数は、結果取得数に合わせてKotlinコンパイラが自動生成します。

結果取得:結果が送られる様子

final class MainActivity$onCreate$1$1 
    extends SuspendLambda
	implements Function2<CoroutineScope, Continuation<? super Unit>, Object> 
{
    Object L$0;		// 結果保持の変数
    Object L$1;		// 結果保持の変数
    Object L$2;		// 結果保持の変数
    int label;
    final MainActivity this$0;
    ...
	
	public final Object invokeSuspend(Object $result) {  // 下位階層からの結果
        ...
	}
	
	...
}

タスクの終了より後にawait( )の実行

asyncによって開始されたコルーチンは開始から終了までのライフサイクルを持ちます。

JobSupport#stateにライフサイクルの状態を保持しています。

このstateを確認し、await( )の実行がタスクの終了よりも後であれば if文内を実行(青背景)します。

awaitInternal( )はawait( )の実態

stateは内部にコルーチンの結果を格納しています。

取り出してawait( )の戻り値にします。

スポンサーリンク

関連記事:

近頃の携帯端末はクワッドコア(プロセッサが4つ)やオクタコア(プロセッサが8つ)が当たり前になりました。 サクサク動作するアプリを作るために、この恩恵を使わなければ損です。 となると、必然的に非同期処理(マルチスレッド)を使うことになります。 JavaのThreadクラス、Android APIのAsyncTaskクラスが代表的な手法です。 Kotlinは上記に加えて「コルーチン(Coroutine)」が使えるようになっています。 今回は、このコルーチンについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 特徴としてnon-blocking動作をサポートします。 このnon-blocking動作についてまとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンの構成要素であるSuspend関数について、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがlaunchです。 このlaunchビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンを開始するlaunchビルダーの仕組みについて、まとめます。 ※仕組みの解析は次のバージョンを対象に行っています。    Kotlin:Ver 1.6.10    org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがasyncです。 このasyncビルダーについて、まとめます。 ...
コルーチン(Coroutine)は「非同期処理の手法」の1つです。 Kotlinが提供します。 コルーチンはビルダー(Builder)により開始されます。 ビルダーは3つの種類があり、その中の1つがrunBlockingです。 このrunBlockingビルダーについて、まとめます。 ...
CoroutineContextはコルーチンで起動されるスレッドの属性を格納しています。 その中にコルーチンの名前を表現するName属性があります。 Name属性を出力する方法を紹介します。 ...
コルーチン(Coroutine)は「非同期処理プログラミングの手法」の1つです。 Kotlinが提供します。 withContextはCoroutineContextを切り替えてスレッドを起動するSuspend関数です。 このwithContextについて、まとめます。 ...
コルーチン間でメッセージ(データのこと)の送受信を行うことが出来ます。 これにより、処理の投げっぱなしになりがちな非同期処理と、連携を強化できます。 しかも、ProduceやFlowを使うと記述が簡素になり、プログラミングの容易さと読みやすさが向上して便利です。 今回は、この「メッセージの送受信」について、使い方をまとめました。 ※環境:Android Studio Flamingo | 2022.2.1    :org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.4 ...
スポンサーリンク