20210909【译】深入Kotlin协程 原文链接:https://kt.academy/article/cc-under-the-hood
内容来自于Kotlin Coroutines 书中的一个章节,可以在LeanPub 中找到它
就有那么一类人,他们不会仅仅只是接受汽车是能开动的。他们需要去打开它的引擎盖,尝试理解一下在引擎盖之下它是如何运作的。而我就是这一类人,所以我需要搞清楚协程是如何运作的。如果你也和我一样,那么你会喜欢这一章节的内容,如果你不是的话,那么可以跳过它了。
本章不会想你介绍新的工具,仅仅只是一个解释。将要试图去达到让人满意的程度来解释协程是如何工作的。关键的课程是:
挂起方法就像是状态机,在方法执行伊始以及每个挂起函数之后都带着一个可能的状态数据
用来表征状态的数字和本地数据都会被保持在Continuation
所代表的的后续执行过程对象中
代表一个方法的Continuation
又被另一个的所修饰起来,结果就是所有的Continuation
代表着一个调用栈,会在恢复的时候被用到
如果你对一些内部原理感兴趣(简化过的),随我来。
后续传递风格
Continuation-passing style
挂起函数是有好几种声明的方式的,但是Kotlin团队决定了使用了:后续传递风格。这代表了后续过程会被作为参数在方法之间进行传递
按照惯例,代表后续过程的参数会是方法声明中的最后一个
1 2 3 4 5 6 7 8 9 10 11 suspend fun getUser () : User?suspend fun setUser (user: User ) suspend fun checkAvailability (flight: Flight ) : Boolean fun getUser (continuation: Continuation <*>) : Any?fun setUser (user: User , continuation: Continuation <*>) : Anyfun checkAvailability ( flight: Flight , continuation: Continuation <*> ) : Any
你应该已经注意到了,实际上的方法的返回类型和原始声明并不一样。所有类型都变成了Any
类型,那些可空的变成了Any?
。为什么会这样呢?原因是挂起函数是可能被挂起的,所以它们有可能不会返回一个生命类型。在这种情况下,它所返回的实际上是一个特殊的COROUTINE_SUSPENDED
标记。稍后在联系中会看到。
现在只需要关注,因为getUser
可能会返回User?
或COROUTINE_SUSPENDED
(Any
类型),所以他的结果就必须是User?
和Any
的超类型,即Any?
。
也许有一天,Kotlin会引入一个集合类型,那么这里我们就可以写成User?|COROUTINE_SUSPENDED
了
简单的方法 挖的更深一点,看看下边这个例子,一个延迟前后有简单打印:
1 2 3 4 5 suspend fun myFunction () { println("Before" ) delay(1000 ) println("After" ) }
你已经可以推断出来,真实的方法签名会是:
1 fun myFunction (continuation: Continuation <*>) : Any
接下来此方法需要他自己的“后续”,来记得它的状态数据。让我们将它命名为:MyFunctionContinuation
(实际上的“后续”是一个对象展开式,并没有名字,但是通过现在的方式方便后边进行解释)。在此方法的一开始,myFunction
会用MyFunctionContinuation
来包装参数上的continuation
:
1 val continuation = MyFunctionContinuation(continuation)
只有在continuation
尚未被包装过的时候才可以这么做。一旦包装完毕,它就是恢复进程中的一员了,我们应该保持continuation
不变(当前可能听起来很困惑,但是接下来你就会明白为什么了):
1 2 3 val continuation = if (continuation is MyFunctionContinuation) continuation else MyFunctionContinuation(continuation)
可以简化为:
1 2 val continuation = continuation as ? MyFunctionContinuation ?: MyFunctionContinuation(continuation)
最终,让我们谈谈方法的内容:
1 2 3 4 5 suspend fun myFunction () { println("Before" ) delay(1000 ) println("After" ) }
这个方法可能通过两种方式开始:
为了标记当前的状态,我们声明一个变量叫做label
,在开始时它是0,然后方法在一开始就会执行。在每一个挂起点之前,它都会被赋值为下一个状态,所以在恢复后我们就可以刚好在挂起点后启动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 fun myFunction (continuation: Continuation <Unit >) : Any { val continuation = continuation as ? MyFunctionContinuation ?: MyFunctionContinuation(continuation) if (continuation.label == 0 ) { println("Before" ) continuation.label = 1 if (delay(1000 , continuation) == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } if (continuation.label == 1 ) { println("After" ) return Unit } error("Impossible" ) }
最后一个重要的点在上边的代码片段中也有展示,就是当delay
方法被挂起时,此函数直接返回了COROUTINE_SUSPENDED
,同样的事情也会发生在调用此函数的上级函数中,层层向上直到调用栈的顶部。这就是一个挂起节点是如何终结掉所有函数的,然后此线程就可以去做其他事情了。
在我们继续之前,来分析一下上边的代码。如果delay
的调用返回了COROUTINE_SUSPENDED
会发生什么?而如果返回了Unit
呢(虽然不可能,但是假设一下)?
如果delay
返回了Unit
(即什么都没返回),那么会像其他的函数一样正常之后,而后将运转到下一个状态(label
被赋值)
MyFunctionContinuation
的声明实际上是很短暂的,它会继承与ContinuationImpl
:
1 2 3 4 5 6 7 8 9 class MyFunctionContinuation (continuation: Continuation<*>) : ContinuationImpl(continuation) { var result: Any? = null var label = 0 override fun invokeSuspend ($result$: Any ?) : Any? { this .result = $result; return myFunction(this ); } };
你可能已经注意到了,我们的continuation
继承自ContinuationImpl
。这个类以及它的父类们,是负责了整个恢复过程的。它们的全景是相当复杂的,随着时间的推移它会越来越趋于简化,但是现在我们只需要为我们的简单方法做一个最小化的continuation
就可以了(所以后边我们会继承于Continuation
而不是ContinuationImpl
)。在这个简化版中,我们假设:
continuation
仅仅需要一个状态来表示我们在哪里挂起了,通过label
传入resume
的值并不重要(是个Unit
)
当发生异常时continuation
将不会被恢复
下边的代码片段你可以拿来运行和分析,它是可以正常工作的。需要记住下边有非常多的简化操作,真实的continuation
有更多的逻辑需要考量(大多数我们后边会解释到),delay
会用它自己的类行包装continuation
,我们启动的方式类似,都是用协程构造器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 fun myFunction (continuation: Continuation <Unit >) : Any { val continuation = continuation as ? MyFunctionContinuation ?: MyFunctionContinuation(continuation) if (continuation.label == 0 ) { println("Before" ) continuation.label = 1 if (delay(1000 , continuation) == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } if (continuation.label == 1 ) { println("After" ) return Unit } error("Impossible" ) } class MyFunctionContinuation (val completion: Continuation<Unit >) : Continuation<Unit > { override val context: CoroutineContext get () = completion.context var label = 0 override fun resumeWith (result: Result <Unit >) { if (result.isSuccess) { val res = myFunction(this ) completion.resume(res as Unit ) } } }
上述的代码是下边这段代码会发生什么的简单描述版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 suspend fun myFunction () { println("Before" ) delay(1000 ) println("After" ) } fun main () { val EMPTY_CONTINUATION = object : Continuation<Unit > { override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith (result: Result <Unit >) { } } suspend { myFunction() }.startCoroutine(EMPTY_CONTINUATION) Thread.sleep(2000 ) }
将上述代码赋值到IIntellJ IDEA 中,使用 Tools > Kotlin > Show Kotlin bytecode 的Decompile
(反编译)按钮。你会看到它们被反编译为了Java代码(如果用Java写的话看看这代码会差不多长成什么样子)
带有状态的方法 如果一个方法有一些状态(比如本地变量)是需要在挂起之后恢复的,那么这些状态需要在它的continuation
中保持住。比如下边的方法:
1 2 3 4 5 6 7 8 suspend fun myFunction () { println("Before" ) val counter = 0 delay(1000 ) counter++ println("Counter: $counter " ) println("After" ) }
在挂起前,需要将状态存储在continuation
中,挂起后才可以进行恢复。接下来这个方法是怎么做的就像是这样(简化版):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 fun myFunction (continuation: Continuation <Unit >) : Any { val continuation = continuation as ? MyFunctionContinuation ?: MyFunctionContinuation(continuation) var counter = continuation.counter if (continuation.label == 0 ) { println("Before" ) counter = 0 continuation.counter = counter continuation.label = 1 if (delay(1000 , continuation) == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } if (continuation.label == 1 ) { counter = (counter as Int ) + 1 println("Counter: $counter " ) println("After" ) return Unit } error("Impossible" ) } class MyFunctionContinuation ( val completion: Continuation<Unit > ) : Continuation<Unit > { override val context: CoroutineContext get () = completion.context var label = 0 var counter: Int ? = null override fun resumeWith (result: Result <Unit >) { if (result.isSuccess) { val res = myFunction(this ) completion.resume(res as Unit ) } } }
带值恢复的方法 这是个稍稍不同的场景:如果在挂起中我们的确需要一些数据。让我们分析下下边的方法:
1 2 3 4 5 6 7 8 suspend fun printUser (token: String ) { println("Before" ) val userId = getUserId(token) println("Got userId: $userId " ) val userName = getUserName(userId) println(User(userId, userName)) println("After" ) }
上边有两个挂起函数:
我们添加了一个入参,同时我们的挂起函数也返回了一些值。这些值都需要被存储在continuation
中:
userId
,因为它在恢复后被其他状态需要
result
,函数中并没有这个变量,但其代表了挂起中的函数的返回值(初始状态的user id,以及第二状态时的 user name)
token
,是在continuation
中被需要的,因为在printUser
方法被调用时候需要它
下边是它的模样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 fun printUser ( token: String , continuation: Continuation <Nothing > ) : Any { val continuation = continuation as ? MyFunctionContinuation ?: MyFunctionContinuation(continuation as Continuation<Unit >, token) var result: Any? = continuation.result var userId: String? = continuation.userId val userName: String if (continuation.label == 0 ) { println("Before" ) continuation.label = 1 result = getUserId(token, continuation) if (result == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } if (continuation.label == 1 ) { userId = result as String println("Got userId: $userId " ) continuation.label = 2 continuation.userId = userId result = getUserName(userId, continuation) if (result == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } } if (continuation.label == 2 ) { userName = result as String println(User(userId as String, userName)) println("After" ) return Unit } error("Impossible" ) } class MyFunctionContinuation (val completion: Continuation<Unit >, val token: String) : Continuation<String> { override val context: CoroutineContext get () = completion.context var label = 0 var result: Any? = null var userId: String? = null override fun resumeWith (result: Result <String >) { if (result.isSuccess) { this .result = result.getOrNull() val res = printUser(token, this ) completion.resume(res as Unit ) } } }
异常 一个continuation
能被正常的恢复,也能携带异常恢复。第二种情况将会在挂起点中有异常被抛出时发生。为了模拟这个场景,异常将被设置到结果中,然后每个挂起点都会调用result.throwOnFailure()
方法。这个方法的内容就将之前设置的异常抛出(如有),多亏如此,异常才可以被捕获,然后开发者可以看到有意义的堆栈信息。为了让上边说到的这些发生,我们的结果需要同时能保存成功和失败,所以result
的实际类型是个泛型Result<T>
。
在下边的展示中,因为我不能使用来自Kotlin标准库中的Result<T>
,所以我用自己声明的来代替:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 fun printUser (token: String , continuation: Continuation <Nothing >) : Any { val continuation = if (continuation is MyFunctionContinuation) continuation else MyFunctionContinuation(continuation as Continuation<Unit >, token) var result: Result<Any>? = continuation.result var userId: String? = continuation.userId val userName: String if (continuation.label == 0 ) { result?.throwOnFailure() println("Before" ) continuation.label = 1 val res = getUserId(token, continuation) if (res == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } result = Result.success(res) } if (continuation.label == 1 ) { result!!.throwOnFailure() userId = result.getOrNull() as String println("Got userId: $userId " ) continuation.label = 2 continuation.userId = userId val res = getUserName(userId, continuation) if (res == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } result = Result.success(res) } if (continuation.label == 2 ) { result!!.throwOnFailure() userName = result.getOrNull() as String println(User(userId as String, userName)) println("After" ) return Unit } error("Impossible" ) } class MyFunctionContinuation (val completion: Continuation<Unit >, val token: String) : Continuation<String> { override val context: CoroutineContext get () = completion.context var label = 0 var result: Result<Any>? = null var userId: String? = null override fun resumeWith (result: Result <String >) { this .result = result val res = try { val r = printUser(token, this ) if (r == COROUTINE_SUSPENDED) return Result.success(r as Unit ) } catch (e: Throwable) { Result.failure(e) } completion.resumeWith(res) } } fun main () { toStart() }
调用栈 当方法a
调用方法b
,虚拟机需要存储a
的状态以及当b
执行完成后要继续执行的地址是多少。以上信息所存储的格式叫做调用栈。问题是当我们挂起时,我们释放了线程,也就是说我们的调用栈被自己清除了。所以结果就是,当我们恢复时没有用到它。取而代之的是continuation
们可以是作为一个调用栈。每一个continuation
保持着我们在哪里挂起的(用label
)、方法本地的变量以及参数,还有调用我们的那个方法所对应的continuation
的引用。一个continuation
引用着另外一个,另外一个又引用其他的,如此如此。结果就是,我们的continuation
就像是一个巨型洋葱,在调用栈上保存着所有的东西,看起来就像是这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 suspend fun a () { val user = getUser() b() b() b() return user } suspend fun b () { for (i in 1 ..10 ) { c(i) } } suspend fun c (i: Int ) { delay(i * 100 ) println("Tick" ) }
一个简单的continuation
表示就像是下边这样:
1 2 3 4 5 6 7 8 9 10 11 CContinuation { label -> 1 completion -> BContinuation { i -> 4 label -> 1 completion -> AContinuation { label -> 2 user -> User@1234 } } }
当一个continuation
恢复了,每个continuation
首先调用它自己的方法,当调用完成时,它再去恢复调用它的方法对应的continuation
。这样调用方法的过程会重复直到栈顶:
1 2 3 4 5 6 7 8 9 10 11 override fun resumeWith (result: Result <String >) { this .result = result val res = try { val r = requestUser(token, this ) if (r == COROUTINE_SUSPENDED) return Success(r) } catch (e: Throwable) { Failure(e) } completion.resumeWith(res) }
用图表示,整个过程看起来就是下边这样:
像是异常一样,它们被一个个的从方法中被抛出来,除非他们在哪里被捕获了。
译者注:所以这里可以理解为completion
就是调用当前方法的方法对应的continuation
。当当前方法结束时,会带着结果再次调用父方法,父方法进入下一状态(或抛出此时result中已有的异常)
真实的代码 真实的continuation
以及挂起方法会比这复杂得多,因为它包含了一些优化和额外的算法:
构建一个更好的异常堆栈
添加协程挂起拦截器(后边会讨论这个特性)
下边是基于Kotlin 1.5.30版本的一部分BaseContinuationImpl
的实现,其展示了真正的resumeWith
实现(其他方法和注释被跳过了):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 internal abstract class BaseContinuationImpl ( val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { final override fun resumeWith (result: Result <Any ?>) { var current = this var param = result while (true ) { probeCoroutineResumed(current) with(current) { val completion = completion!! val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() if (completion is BaseContinuationImpl) { current = completion param = outcome } else { completion.resumeWith(outcome) return } } } } }
如你所见,其使用了循环来代替递归调用。这个变化会让代码有一定优化,更加简单。
讨论 真实的实现原理其实是更加复杂的,但是我希望通过这篇文章你能对协程的内部有一些概念了,关键点是:
挂起方法有些像是状态机,在方法的一开始以及每个挂起方法调用后都有一个状态
label
所代表的状态以及本地数据都被保存在continuation
对象中
方法所对应的continuation
是被另一个continuation
所修饰的,所有的这些continuation
表示了一个调用栈,这个调用栈会在恢复时起到作用
实际的算法会更加复杂,label
的第一位会额外的产生变化,在另外的地方这一位会进行检查。这个算法用来让挂起方法支持再现。为了追求简单的理解,这一点被跳过了
调用栈是有限空间,如果它都被用完了,那么我们就需要处理StackOverflowError
栈溢出错误了