Android小记:Kotlin协程suspend的机制解读

lxf2023-05-20 01:18:59

拜读

# Kotlin协程之再次读懂协程工作原理-作者:苍耳叔叔

示例

协程中suspend的挂机机制使得异步wait\notify变成一如同步的丝滑体验,而对于异步我们通常的做法是监听+回调的CPS方案,但如果嵌套得过深,会造成回调地域和栈帧溢出。那协程的的挂起究竟是如何做的呢

Continuation Passing Style(续体传递风格): 约定一种编程规范,函数不直接返回结果值,而是在函数最后一个参数位置传入一个 callback 函数参数,并在函数执行完成时通过 callback 来处理结果。

fun getId(): Int {
    println("getId")
    return 1
}

suspend fun getName(id: Int) {
    withContext(Dispatchers.Default) { println("getName by id $id") }
}

suspend fun getAge(id: Int) {
    withContext(Dispatchers.Default) { println("getAge by id $id") }
}

fun main() {
    runBlocking {
        val id = getId()
        delay(300L)
        getName(id)
        getAge(id)
        /*suspendCancellableCoroutine<Unit> { continuation ->
            // 这是另一个测验,可先忽略
            println("???")
            continuation.resume(Unit)
        }*/
        println("already stop")
    }
}


/* console输出
    getId
    getName by id 1
    getAge by id 1
    already stop
    */

协程的启动

launch方法开始,我们以默认调用链深入,找到了该方法startCoroutineCancellable,其他withContextasync也都大同小异,可自行发散探索

// 示例
GlobalScope.launch {
}

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        // 简单看一下这个类,: JobSupport(active), Job, Continuation<T>, CoroutineScope
        // 实现了Continuation接口,其主要方法resumeWith
        // 前面我们讲了CPS的定义,这里留个心眼,也就是意味着coroutine可能会作为续体,且resumeWith极有可能是唤起的回调
        StandaloneCoroutine(newContext, active = true)
    // 注意这里,它把自己传进去了
    coroutine.start(start, coroutine, block)
    return coroutine
}

// 这是上面 coroutine.start(start, coroutine, block)的调用
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    // 这里调用了CoroutineStart.invoke,需要注意
    // 然后刚才讲了receiver是它coroutine,这回又传了个this,这里参数类型是Continuation
    start(block, receiver, this)
}

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        // 得,将coroutine看作两种类型,继续递传。但这里是协程体调用的方法
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

现在看到了协程准备启动的地方了,先进行了create创建、intercepted线程调度(不展开)、resumeCancellableWith运行。但切记,此时调用方法的是协程体

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }
    
// 注意看返回值,也就是说又有一个续体出来了
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        // 会走这个,至于为什么会走,等后面剖析解码的时候就知道继承关系了
        // 这里传入了续体(协程),需留心
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

// 最终都会走resumeWith,注意是对续体接口的扩展
public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

从上我们得知协程内部的执行依赖于Continuation续体接口,那这段代码中有个疑问create返回的新的续体在哪呢,接着看下去

解码

以下内容源于GlobalScope.launch解码成java文件后的内容,我们发现了create函数,但其应该是BaseContinuationImpl方法,这里明明实例化的是Function2

BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
   int label;

   @Nullable
   public final Object invokeSuspend(@NotNull Object var1) {
      Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(this.label) {
      case 0:
         ResultKt.throwOnFailure(var1);
         return Unit.INSTANCE;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
   }

   @NotNull
   public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
      Intrinsics.checkNotNullParameter(completion, "completion");
      Function2 var3 = new <anonymous constructor>(completion);
      return var3;
   }

   public final Object invoke(Object var1, Object var2) {
      return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
   }
}), 3, (Object)null);

通过AndroidStudio自带的Kotlin Bytecode查看字节码得知,这个多出来的东西PersonTestKt$main$2继承于SuspendLambda,实现了Function2接口罢了,而追踪继承我们得到:SuspendLambda>>>ContinuationImpl>>>BaseContinuationImpl

L1
    LINENUMBER 33 L1
    GETSTATIC kotlinx/coroutines/GlobalScope.INSTANCE : Lkotlinx/coroutines/GlobalScope;
    CHECKCAST kotlinx/coroutines/CoroutineScope
    ACONST_NULL
    ACONST_NULL
    NEW com/wjf/self_demo/PersonTestKt$main$2
    DUP
    ACONST_NULL
    INVOKESPECIAL com/wjf/self_demo/PersonTestKt$main$2.<init> (Lkotlin/coroutines/Continuation;)V
    CHECKCAST kotlin/jvm/functions/Function2
    ICONST_3
    ACONST_NULL
    INVOKESTATIC kotlinx/coroutines/BuildersKt.launch$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Job;
    POP
L2

final class com/wjf/self_demo/PersonTestKt$main$2 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2 {

原来协程体自身实现了续体接口(且继承于BaseContinuationImpl),而协程本身也是个续体(但继承于AbstractCoroutine是协程,但实现了续体接口),而且协程体在create时,是把外部续体传入了的并持有了的,在类BaseContinuationImpl中即成员变量completion。从解码的内容来看,kotlin对于suspend关键词是进行了字节码增强的,均会被转化为SuspendLambda。那么剩下resume启动了,结合上面解码出来的.java看,先调用invokeSuspend完成其内部逻辑,完成后会调用parent也就是外部续体(协程)的resumeWith进行通知

// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
    // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
    var current = this
    var param = result
    while (true) {
        // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
        // can precisely track what part of suspended callstack was already resumed
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
            releaseIntercepted() // this state machine instance is terminating
            if (completion is BaseContinuationImpl) {
                // 这块需要搞清楚协程体和协程对象的继承关系,仅协程体,也就是Block闭包内的实现是BaseContinuationImpl,而其协程发起本身是个AbstractCoroutine
                // unrolling recursion via loop
                current = completion
                param = outcome
            } else {
                // top-level completion reached -- invoke and return
                completion.resumeWith(outcome)
                return
            }
        }
    }
}

那么到这,其实协程的挂起恢复实现大概有了思路,编译时会进行一定程度的封装,以通过CPS方式进行回调以达到同步效果,那么它是否也会存在CPS的缺陷呢?接着看

状态机

还记得示例代码吗,这边重新贴一下。同样的,还是借助Kotlin Bytecode看看这时都生成了些什么

fun getId(): Int {
    println("getId")
    return 1
}

suspend fun getName(id: Int) {
    withContext(Dispatchers.Default) { println("getName by id $id") }
}

suspend fun getAge(id: Int) {
    withContext(Dispatchers.Default) { println("getAge by id $id") }
}

fun main() {
    runBlocking {
        val id = getId()
        delay(300L)
        getName(id)
        getAge(id)
        /*suspendCancellableCoroutine<Unit> { continuation ->
            // 这是另一个测验,可先忽略
            println("???")
            continuation.resume(Unit)
        }*/
        println("already stop")
    }
}


/* console输出
    getId
    getName by id 1
    getAge by id 1
    already stop
    */

我们看协程内的主体内容,注意看代码注释和label的状态轮转

BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
   int I$0;
   int label;

   @Nullable
   public final Object invokeSuspend(@NotNull Object $result) {
      label26: {
         int id;
         Object var4;
         label25: {
            var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               // 第一个挂起点
               /*
                   val id = getId()
                   delay(300L)
                   */
               id = PersonTestKt.getId();
               this.I$0 = id;
               // label是状态轮转的重点,即下次再进来的话就走下一个分支,会break出去
               this.label = 1;
               // 如果它是个挂起函数的话,那就先return结束,等待其内部结束后再唤醒,这块下面会再贴代码解释
               // 注意,这里将自身协程体作为续体传入了进去,也就是如果调用了this.resumeWith,就会再次调用invokeSuspend,状态就动起来了
               if (DelayKt.delay(300L, this) == var4) {
                  return var4;
               }
               break;
            case 1:
               id = this.I$0;
               ResultKt.throwOnFailure($result);
               break;
            case 2:
               id = this.I$0;
               ResultKt.throwOnFailure($result);
               break label25;
            case 3:
               ResultKt.throwOnFailure($result);
               break label26;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            this.I$0 = id;
            this.label = 2;
            // 这里也是如法炮制,后面一样的就不再解释了
            if (PersonTestKt.getName(id, this) == var4) {
               return var4;
            }
         }

         this.label = 3;
         if (PersonTestKt.getAge(id, this) == var4) {
            return var4;
         }
      }

      String var3 = "already stop";
      System.out.println(var3);
      return Unit.INSTANCE;
   }

   @NotNull
   public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
      Intrinsics.checkNotNullParameter(completion, "completion");
      Function2 var3 = new <anonymous constructor>(completion);
      return var3;
   }
  
   public final Object invoke(Object var1, Object var2) {
      return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
   }
}), 1, (Object)null);

这个状态嵌套轮转来代替回调递归,确实是个好办法。先看它如何区分是否需要挂起并进行return退出

public static final Object getName(final int id, @NotNull Continuation $completion) {
    // 这个$completion是外部的协程体,传给了withContext,但其构造中并没有看到第三个参数
   Object var10000 = BuildersKt.withContext((CoroutineContext)Dispatchers.getDefault(), (Function2)(new Function2((Continuation)null) {
      int label;

      @Nullable
      public final Object invokeSuspend(@NotNull Object var1) {
         Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         switch(this.label) {
         case 0:
            ResultKt.throwOnFailure(var1);
            String var2 = "getName by id " + id;
            System.out.println(var2);
            return Unit.INSTANCE;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }
      }

      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkNotNullParameter(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         return var3;
      }

      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), $completion);
   // 这个解释一下,认为自身需要挂起,那就会返回自身,不然外部就继续执行,代表不需要挂起
   // 好像这么解释有点抽象,大概知道啥意思就行了
   return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}

最后我们进行验证。首先,withContextinvoke方法,suspendCoroutineUninterceptedOrReturn内的闭包,创建一个DispatchedCoroutine协程,闭包block创建自身续体,执行create>>>resume>>>invokeSuspend>>>complete.resume完成后通知协程DispatchedCoroutine

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
    }
}

// Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension.
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}

DispatchedCoroutine的父类是ScopeCoroutine,当resumeWith响应后会调用到uCont.resumeWith,那这个uCont到底是什么,上面可能看的有点迷糊,咋啥都没干就抛异常了?但想一件事!withContext解码的入参里,是有外层协程体传入的(而且是插桩加进去的额外入参),不可能不用啊,如果这个就是它的话,一切就都变得合情合理了。结合源码中的英文注释,似乎我们的猜测没有问题,那一切就此结束

internal open class ScopeCoroutine<in T>(
    context: CoroutineContext,
    @JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
    final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
    final override fun getStackTraceElement(): StackTraceElement? = null
    final override val isScopedCoroutine: Boolean get() = true

    internal val parent: Job? get() = parentContext[Job]

    override fun afterCompletion(state: Any?) {
        // Resume in a cancellable way by default when resuming from another context
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }

    override fun afterResume(state: Any?) {
        // Resume direct because scope is already in the correct context
        uCont.resumeWith(recoverResult(state, uCont))
    }
    
    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
}

结束!撒花

这块内容非常绕,容易把自己绕进去。如果看完了还没有完全看懂建议再看一遍,这一遍牢记一个概念:协程是续体,协程体也是续体,分清协程协程体协程包裹协程体。如果看懂了,期待点赞+收藏+关注三连

本网站是一个以CSS、JavaScript、Vue、HTML为核心的前端开发技术网站。我们致力于为广大前端开发者提供专业、全面、实用的前端开发知识和技术支持。 在本网站中,您可以学习到最新的前端开发技术,了解前端开发的最新趋势和最佳实践。我们提供丰富的教程和案例,让您可以快速掌握前端开发的核心技术和流程。 本网站还提供一系列实用的工具和插件,帮助您更加高效地进行前端开发工作。我们提供的工具和插件都经过精心设计和优化,可以帮助您节省时间和精力,提升开发效率。 除此之外,本网站还拥有一个活跃的社区,您可以在社区中与其他前端开发者交流技术、分享经验、解决问题。我们相信,社区的力量可以帮助您更好地成长和进步。 在本网站中,您可以找到您需要的一切前端开发资源,让您成为一名更加优秀的前端开发者。欢迎您加入我们的大家庭,一起探索前端开发的无限可能!