拜读
# Kotlin协程之再次读懂协程工作原理-作者:苍耳叔叔
示例
协程中suspend
的挂机机制使得异步wait\notify变成一如同步的丝滑体验,而对于异步我们通常的做法是监听+回调
的CPS方案,但如果嵌套得过深,会造成回调地域和栈帧溢出。那协程的的挂起究竟是如何做的呢
Continuation Passing Style(续体传递风格): 约定一种编程规范,函数不直接返回结果值,而是在函数最后一个参数位置传入一个 callback 函数参数,并在函数执行完成时通过 callback 来处理结果。
fun getId(): Int {
println("getId")
return 1
}
suspend fun getName(id: Int) {
withContext(Dispatchers.Default) { println("getName by id $id") }
}
suspend fun getAge(id: Int) {
withContext(Dispatchers.Default) { println("getAge by id $id") }
}
fun main() {
runBlocking {
val id = getId()
delay(300L)
getName(id)
getAge(id)
/*suspendCancellableCoroutine<Unit> { continuation ->
// 这是另一个测验,可先忽略
println("???")
continuation.resume(Unit)
}*/
println("already stop")
}
}
/* console输出
getId
getName by id 1
getAge by id 1
already stop
*/
协程的启动
从launch
方法开始,我们以默认调用链深入,找到了该方法startCoroutineCancellable
,其他withContext
、async
也都大同小异,可自行发散探索
// 示例
GlobalScope.launch {
}
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
// 简单看一下这个类,: JobSupport(active), Job, Continuation<T>, CoroutineScope
// 实现了Continuation接口,其主要方法resumeWith
// 前面我们讲了CPS的定义,这里留个心眼,也就是意味着coroutine可能会作为续体,且resumeWith极有可能是唤起的回调
StandaloneCoroutine(newContext, active = true)
// 注意这里,它把自己传进去了
coroutine.start(start, coroutine, block)
return coroutine
}
// 这是上面 coroutine.start(start, coroutine, block)的调用
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
// 这里调用了CoroutineStart.invoke,需要注意
// 然后刚才讲了receiver是它coroutine,这回又传了个this,这里参数类型是Continuation
start(block, receiver, this)
}
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
// 得,将coroutine看作两种类型,继续递传。但这里是协程体调用的方法
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
现在看到了协程准备启动的地方了,先进行了create
创建、intercepted
线程调度(不展开)、resumeCancellableWith
运行。但切记,此时调用方法的是协程体
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
// 注意看返回值,也就是说又有一个续体出来了
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
// 会走这个,至于为什么会走,等后面剖析解码的时候就知道继承关系了
// 这里传入了续体(协程),需留心
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
// 最终都会走resumeWith,注意是对续体接口的扩展
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
从上我们得知协程内部的执行依赖于Continuation续体接口,那这段代码中有个疑问create
返回的新的续体在哪呢,接着看下去
解码
以下内容源于GlobalScope.launch
解码成java文件后的内容,我们发现了create
函数,但其应该是BaseContinuationImpl
方法,这里明明实例化的是Function2
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
通过AndroidStudio自带的Kotlin Bytecode
查看字节码得知,这个多出来的东西PersonTestKt$main$2
继承于SuspendLambda
,实现了Function2
接口罢了,而追踪继承我们得到:SuspendLambda>>>ContinuationImpl>>>BaseContinuationImpl
L1
LINENUMBER 33 L1
GETSTATIC kotlinx/coroutines/GlobalScope.INSTANCE : Lkotlinx/coroutines/GlobalScope;
CHECKCAST kotlinx/coroutines/CoroutineScope
ACONST_NULL
ACONST_NULL
NEW com/wjf/self_demo/PersonTestKt$main$2
DUP
ACONST_NULL
INVOKESPECIAL com/wjf/self_demo/PersonTestKt$main$2.<init> (Lkotlin/coroutines/Continuation;)V
CHECKCAST kotlin/jvm/functions/Function2
ICONST_3
ACONST_NULL
INVOKESTATIC kotlinx/coroutines/BuildersKt.launch$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Job;
POP
L2
final class com/wjf/self_demo/PersonTestKt$main$2 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2 {
原来协程体自身实现了续体接口(且继承于BaseContinuationImpl
),而协程本身也是个续体(但继承于AbstractCoroutine
是协程,但实现了续体接口),而且协程体在create
时,是把外部续体传入了的并持有了的,在类BaseContinuationImpl
中即成员变量completion
。从解码的内容来看,kotlin对于suspend关键词是进行了字节码增强的,均会被转化为SuspendLambda
。那么剩下resume
启动了,结合上面解码出来的.java
看,先调用invokeSuspend
完成其内部逻辑,完成后会调用parent也就是外部续体(协程)的resumeWith
进行通知
// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 这块需要搞清楚协程体和协程对象的继承关系,仅协程体,也就是Block闭包内的实现是BaseContinuationImpl,而其协程发起本身是个AbstractCoroutine
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
那么到这,其实协程的挂起恢复实现大概有了思路,编译时会进行一定程度的封装,以通过CPS方式进行回调
以达到同步效果,那么它是否也会存在CPS的缺陷呢?接着看
状态机
还记得示例代码吗,这边重新贴一下。同样的,还是借助Kotlin Bytecode
看看这时都生成了些什么
fun getId(): Int {
println("getId")
return 1
}
suspend fun getName(id: Int) {
withContext(Dispatchers.Default) { println("getName by id $id") }
}
suspend fun getAge(id: Int) {
withContext(Dispatchers.Default) { println("getAge by id $id") }
}
fun main() {
runBlocking {
val id = getId()
delay(300L)
getName(id)
getAge(id)
/*suspendCancellableCoroutine<Unit> { continuation ->
// 这是另一个测验,可先忽略
println("???")
continuation.resume(Unit)
}*/
println("already stop")
}
}
/* console输出
getId
getName by id 1
getAge by id 1
already stop
*/
我们看协程内的主体内容,注意看代码注释和label
的状态轮转
BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
int I$0;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
label26: {
int id;
Object var4;
label25: {
var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
// 第一个挂起点
/*
val id = getId()
delay(300L)
*/
id = PersonTestKt.getId();
this.I$0 = id;
// label是状态轮转的重点,即下次再进来的话就走下一个分支,会break出去
this.label = 1;
// 如果它是个挂起函数的话,那就先return结束,等待其内部结束后再唤醒,这块下面会再贴代码解释
// 注意,这里将自身协程体作为续体传入了进去,也就是如果调用了this.resumeWith,就会再次调用invokeSuspend,状态就动起来了
if (DelayKt.delay(300L, this) == var4) {
return var4;
}
break;
case 1:
id = this.I$0;
ResultKt.throwOnFailure($result);
break;
case 2:
id = this.I$0;
ResultKt.throwOnFailure($result);
break label25;
case 3:
ResultKt.throwOnFailure($result);
break label26;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
this.I$0 = id;
this.label = 2;
// 这里也是如法炮制,后面一样的就不再解释了
if (PersonTestKt.getName(id, this) == var4) {
return var4;
}
}
this.label = 3;
if (PersonTestKt.getAge(id, this) == var4) {
return var4;
}
}
String var3 = "already stop";
System.out.println(var3);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 1, (Object)null);
这个状态嵌套轮转来代替回调递归,确实是个好办法。先看它如何区分是否需要挂起并进行return
退出
public static final Object getName(final int id, @NotNull Continuation $completion) {
// 这个$completion是外部的协程体,传给了withContext,但其构造中并没有看到第三个参数
Object var10000 = BuildersKt.withContext((CoroutineContext)Dispatchers.getDefault(), (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
String var2 = "getName by id " + id;
System.out.println(var2);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), $completion);
// 这个解释一下,认为自身需要挂起,那就会返回自身,不然外部就继续执行,代表不需要挂起
// 好像这么解释有点抽象,大概知道啥意思就行了
return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}
最后我们进行验证。首先,withContext
会invoke
方法,suspendCoroutineUninterceptedOrReturn
内的闭包,创建一个DispatchedCoroutine
协程,闭包block
创建自身续体,执行create>>>resume>>>invokeSuspend>>>complete.resume
完成后通知协程DispatchedCoroutine
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
}
}
// Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension.
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}
而DispatchedCoroutine的父类是ScopeCoroutine,当resumeWith响应后会调用到uCont.resumeWith,那这个uCont
到底是什么,上面可能看的有点迷糊,咋啥都没干就抛异常了?但想一件事!withContext
解码的入参里,是有外层协程体传入的(而且是插桩加进去的额外入参),不可能不用啊,如果这个就是它的话,一切就都变得合情合理了。结合源码中的英文注释,似乎我们的猜测没有问题,那一切就此结束
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
final override fun getStackTraceElement(): StackTraceElement? = null
final override val isScopedCoroutine: Boolean get() = true
internal val parent: Job? get() = parentContext[Job]
override fun afterCompletion(state: Any?) {
// Resume in a cancellable way by default when resuming from another context
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
override fun afterResume(state: Any?) {
// Resume direct because scope is already in the correct context
uCont.resumeWith(recoverResult(state, uCont))
}
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
结束!撒花
这块内容非常绕,容易把自己绕进去。如果看完了还没有完全看懂建议再看一遍,这一遍牢记一个概念:协程
是续体,协程体
也是续体,分清协程
和协程体
,协程
包裹协程体
。如果看懂了,期待点赞+收藏+关注三连
本网站是一个以CSS、JavaScript、Vue、HTML为核心的前端开发技术网站。我们致力于为广大前端开发者提供专业、全面、实用的前端开发知识和技术支持。
在本网站中,您可以学习到最新的前端开发技术,了解前端开发的最新趋势和最佳实践。我们提供丰富的教程和案例,让您可以快速掌握前端开发的核心技术和流程。
本网站还提供一系列实用的工具和插件,帮助您更加高效地进行前端开发工作。我们提供的工具和插件都经过精心设计和优化,可以帮助您节省时间和精力,提升开发效率。
除此之外,本网站还拥有一个活跃的社区,您可以在社区中与其他前端开发者交流技术、分享经验、解决问题。我们相信,社区的力量可以帮助您更好地成长和进步。
在本网站中,您可以找到您需要的一切前端开发资源,让您成为一名更加优秀的前端开发者。欢迎您加入我们的大家庭,一起探索前端开发的无限可能!
这块内容非常绕,容易把自己绕进去。如果看完了还没有完全看懂建议再看一遍,这一遍牢记一个概念:协程
是续体,协程体
也是续体,分清协程
和协程体
,协程
包裹协程体
。如果看懂了,期待点赞+收藏+关注三连