译-深入Kotlin协程

20210909【译】深入Kotlin协程

原文链接:https://kt.academy/article/cc-under-the-hood

内容来自于Kotlin Coroutines书中的一个章节,可以在LeanPub中找到它

就有那么一类人,他们不会仅仅只是接受汽车是能开动的。他们需要去打开它的引擎盖,尝试理解一下在引擎盖之下它是如何运作的。而我就是这一类人,所以我需要搞清楚协程是如何运作的。如果你也和我一样,那么你会喜欢这一章节的内容,如果你不是的话,那么可以跳过它了。

本章不会想你介绍新的工具,仅仅只是一个解释。将要试图去达到让人满意的程度来解释协程是如何工作的。关键的课程是:

  • 挂起方法就像是状态机,在方法执行伊始以及每个挂起函数之后都带着一个可能的状态数据
  • 用来表征状态的数字和本地数据都会被保持在Continuation所代表的的后续执行过程对象中
  • 代表一个方法的Continuation又被另一个的所修饰起来,结果就是所有的Continuation代表着一个调用栈,会在恢复的时候被用到

如果你对一些内部原理感兴趣(简化过的),随我来。

后续传递风格

Continuation-passing style

挂起函数是有好几种声明的方式的,但是Kotlin团队决定了使用了:后续传递风格。这代表了后续过程会被作为参数在方法之间进行传递

按照惯例,代表后续过程的参数会是方法声明中的最后一个

1
2
3
4
5
6
7
8
9
10
11
suspend fun getUser(): User?
suspend fun setUser(user: User)
suspend fun checkAvailability(flight: Flight): Boolean

// 实际上会是下边的样子:
fun getUser(continuation: Continuation<*>): Any?
fun setUser(user: User, continuation: Continuation<*>): Any
fun checkAvailability(
flight: Flight,
continuation: Continuation<*>
): Any

你应该已经注意到了,实际上的方法的返回类型和原始声明并不一样。所有类型都变成了Any类型,那些可空的变成了Any?。为什么会这样呢?原因是挂起函数是可能被挂起的,所以它们有可能不会返回一个生命类型。在这种情况下,它所返回的实际上是一个特殊的COROUTINE_SUSPENDED标记。稍后在联系中会看到。

现在只需要关注,因为getUser可能会返回User?COROUTINE_SUSPENDEDAny类型),所以他的结果就必须是User?Any的超类型,即Any?

也许有一天,Kotlin会引入一个集合类型,那么这里我们就可以写成User?|COROUTINE_SUSPENDED

简单的方法

挖的更深一点,看看下边这个例子,一个延迟前后有简单打印:

1
2
3
4
5
suspend fun myFunction() {
println("Before")
delay(1000) //挂起中
println("After")
}

你已经可以推断出来,真实的方法签名会是:

1
fun myFunction(continuation: Continuation<*>): Any

接下来此方法需要他自己的“后续”,来记得它的状态数据。让我们将它命名为:MyFunctionContinuation(实际上的“后续”是一个对象展开式,并没有名字,但是通过现在的方式方便后边进行解释)。在此方法的一开始,myFunction会用MyFunctionContinuation来包装参数上的continuation

1
val continuation = MyFunctionContinuation(continuation)

只有在continuation尚未被包装过的时候才可以这么做。一旦包装完毕,它就是恢复进程中的一员了,我们应该保持continuation不变(当前可能听起来很困惑,但是接下来你就会明白为什么了):

1
2
3
val continuation =
if (continuation is MyFunctionContinuation) continuation
else MyFunctionContinuation(continuation)

可以简化为:

1
2
val continuation = continuation as? MyFunctionContinuation
?: MyFunctionContinuation(continuation)

最终,让我们谈谈方法的内容:

1
2
3
4
5
suspend fun myFunction() {
println("Before")
delay(1000) //挂起中
println("After")
}

这个方法可能通过两种方式开始:

  • 它本身是第一个调用
  • 从某讴歌挂起点后恢复

为了标记当前的状态,我们声明一个变量叫做label,在开始时它是0,然后方法在一开始就会执行。在每一个挂起点之前,它都会被赋值为下一个状态,所以在恢复后我们就可以刚好在挂起点后启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 用来描述myFunction背后是如何运行的简单的演示
fun myFunction(continuation: Continuation<Unit>): Any {
val continuation = continuation as? MyFunctionContinuation
?: MyFunctionContinuation(continuation)

if (continuation.label == 0) {
println("Before")
continuation.label = 1
if (delay(1000, continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
println("After")
return Unit
}
error("Impossible")
}

最后一个重要的点在上边的代码片段中也有展示,就是当delay方法被挂起时,此函数直接返回了COROUTINE_SUSPENDED,同样的事情也会发生在调用此函数的上级函数中,层层向上直到调用栈的顶部。这就是一个挂起节点是如何终结掉所有函数的,然后此线程就可以去做其他事情了。

在我们继续之前,来分析一下上边的代码。如果delay的调用返回了COROUTINE_SUSPENDED会发生什么?而如果返回了Unit呢(虽然不可能,但是假设一下)?

如果delay返回了Unit(即什么都没返回),那么会像其他的函数一样正常之后,而后将运转到下一个状态(label被赋值)

MyFunctionContinuation的声明实际上是很短暂的,它会继承与ContinuationImpl

1
2
3
4
5
6
7
8
9
class MyFunctionContinuation(continuation: Continuation<*>) : ContinuationImpl(continuation) {
var result: Any? = null // 在此场景中这个返回值是无用的,就是个Unit,后边会阐述有什么用处
var label = 0

override fun invokeSuspend($result$: Any?): Any? {
this.result = $result;
return myFunction(this);
}
};

你可能已经注意到了,我们的continuation继承自ContinuationImpl。这个类以及它的父类们,是负责了整个恢复过程的。它们的全景是相当复杂的,随着时间的推移它会越来越趋于简化,但是现在我们只需要为我们的简单方法做一个最小化的continuation就可以了(所以后边我们会继承于Continuation而不是ContinuationImpl)。在这个简化版中,我们假设:

  • continuation仅仅需要一个状态来表示我们在哪里挂起了,通过label
  • 传入resume的值并不重要(是个Unit
  • 当发生异常时continuation将不会被恢复

下边的代码片段你可以拿来运行和分析,它是可以正常工作的。需要记住下边有非常多的简化操作,真实的continuation有更多的逻辑需要考量(大多数我们后边会解释到),delay会用它自己的类行包装continuation,我们启动的方式类似,都是用协程构造器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
fun myFunction(continuation: Continuation<Unit>): Any {
val continuation = continuation as? MyFunctionContinuation
?: MyFunctionContinuation(continuation)

if (continuation.label == 0) {
println("Before")
continuation.label = 1
if (delay(1000, continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
println("After")
return Unit
}
error("Impossible")
}

class MyFunctionContinuation(val completion: Continuation<Unit>) : Continuation<Unit> {
override val context: CoroutineContext
get() = completion.context

var label = 0

override fun resumeWith(result: Result<Unit>) {
if (result.isSuccess) {
val res = myFunction(this)
completion.resume(res as Unit)
}
// ... 余下的部分后边再说
}
}

上述的代码是下边这段代码会发生什么的简单描述版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//sampleStart
suspend fun myFunction() {
println("Before")
delay(1000)
println("After")
}

fun main() {
val EMPTY_CONTINUATION = object : Continuation<Unit> {
override val context: CoroutineContext = EmptyCoroutineContext

override fun resumeWith(result: Result<Unit>) {
// no-op
}
}
suspend { myFunction() }.startCoroutine(EMPTY_CONTINUATION)
Thread.sleep(2000)
}

将上述代码赋值到IIntellJ IDEA中,使用 Tools > Kotlin > Show Kotlin bytecode 的Decompile(反编译)按钮。你会看到它们被反编译为了Java代码(如果用Java写的话看看这代码会差不多长成什么样子)

带有状态的方法

如果一个方法有一些状态(比如本地变量)是需要在挂起之后恢复的,那么这些状态需要在它的continuation中保持住。比如下边的方法:

1
2
3
4
5
6
7
8
suspend fun myFunction() {
println("Before")
val counter = 0
delay(1000) //挂起中
counter++
println("Counter: $counter")
println("After")
}

在挂起前,需要将状态存储在continuation中,挂起后才可以进行恢复。接下来这个方法是怎么做的就像是这样(简化版):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
fun myFunction(continuation: Continuation<Unit>): Any {
val continuation = continuation as? MyFunctionContinuation
?: MyFunctionContinuation(continuation)

var counter = continuation.counter //初始化

if (continuation.label == 0) {
println("Before")
counter = 0
continuation.counter = counter //存储
continuation.label = 1
if (delay(1000, continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
counter = (counter as Int) + 1 //消费
println("Counter: $counter")
println("After")
return Unit
}
error("Impossible")
}

class MyFunctionContinuation(
val completion: Continuation<Unit>
) : Continuation<Unit> {
override val context: CoroutineContext // 现在先别管这个变量
get() = completion.context

var label = 0
var counter: Int? = null //用来存储原有方法的状态数据counter

override fun resumeWith(result: Result<Unit>) {
if (result.isSuccess) {
val res = myFunction(this)
completion.resume(res as Unit)
}
// ... 稍后再讨论这部分
}
}

带值恢复的方法

这是个稍稍不同的场景:如果在挂起中我们的确需要一些数据。让我们分析下下边的方法:

1
2
3
4
5
6
7
8
suspend fun printUser(token: String) {
println("Before")
val userId = getUserId(token) // suspending
println("Got userId: $userId")
val userName = getUserName(userId) // suspending
println(User(userId, userName))
println("After")
}

上边有两个挂起函数:

  • getUserId
  • getUserName

我们添加了一个入参,同时我们的挂起函数也返回了一些值。这些值都需要被存储在continuation中:

  • userId,因为它在恢复后被其他状态需要
  • result,函数中并没有这个变量,但其代表了挂起中的函数的返回值(初始状态的user id,以及第二状态时的 user name)
  • token,是在continuation中被需要的,因为在printUser方法被调用时候需要它

下边是它的模样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
fun printUser(
token: String,
continuation: Continuation<Nothing>
): Any {
val continuation = continuation as? MyFunctionContinuation
?: MyFunctionContinuation(continuation as Continuation<Unit>, token)

var result: Any? = continuation.result
var userId: String? = continuation.userId
val userName: String

if (continuation.label == 0) {
println("Before")
continuation.label = 1
result = getUserId(token, continuation)
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
userId = result as String
println("Got userId: $userId")
continuation.label = 2
continuation.userId = userId
result = getUserName(userId, continuation)
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 2) {
userName = result as String
println(User(userId as String, userName))
println("After")
return Unit
}
error("Impossible")
}

class MyFunctionContinuation(val completion: Continuation<Unit>, val token: String) : Continuation<String> {
override val context: CoroutineContext // 先别管这个变量
get() = completion.context

var label = 0
var result: Any? = null
var userId: String? = null

override fun resumeWith(result: Result<String>) {
if (result.isSuccess) {
this.result = result.getOrNull()
val res = printUser(token, this)
completion.resume(res as Unit)
}
// ... 后边将要讨论
}
}

异常

一个continuation能被正常的恢复,也能携带异常恢复。第二种情况将会在挂起点中有异常被抛出时发生。为了模拟这个场景,异常将被设置到结果中,然后每个挂起点都会调用result.throwOnFailure()方法。这个方法的内容就将之前设置的异常抛出(如有),多亏如此,异常才可以被捕获,然后开发者可以看到有意义的堆栈信息。为了让上边说到的这些发生,我们的结果需要同时能保存成功和失败,所以result的实际类型是个泛型Result<T>

在下边的展示中,因为我不能使用来自Kotlin标准库中的Result<T>,所以我用自己声明的来代替:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
fun printUser(token: String, continuation: Continuation<Nothing>): Any {
val continuation =
if (continuation is MyFunctionContinuation) continuation
else MyFunctionContinuation(continuation as Continuation<Unit>, token)

var result: Result<Any>? = continuation.result
var userId: String? = continuation.userId
val userName: String

if (continuation.label == 0) {
result?.throwOnFailure()
println("Before")
continuation.label = 1
val res = getUserId(token, continuation)
if (res == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
result = Result.success(res)
}
if (continuation.label == 1) {
result!!.throwOnFailure()
userId = result.getOrNull() as String
println("Got userId: $userId")
continuation.label = 2
continuation.userId = userId
val res = getUserName(userId, continuation)
if (res == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
result = Result.success(res)
}
if (continuation.label == 2) {
result!!.throwOnFailure()
userName = result.getOrNull() as String
println(User(userId as String, userName))
println("After")
return Unit
}
error("Impossible")
}

class MyFunctionContinuation(val completion: Continuation<Unit>, val token: String) : Continuation<String> {
override val context: CoroutineContext // Don't think about it now
get() = completion.context

var label = 0
var result: Result<Any>? = null
var userId: String? = null

override fun resumeWith(result: Result<String>) {
this.result = result
val res = try {
val r = printUser(token, this)
if (r == COROUTINE_SUSPENDED) return
Result.success(r as Unit)
} catch (e: Throwable) {
Result.failure(e)
}
completion.resumeWith(res)
}
}

fun main() {
toStart()
}

调用栈

当方法a调用方法b,虚拟机需要存储a的状态以及当b执行完成后要继续执行的地址是多少。以上信息所存储的格式叫做调用栈。问题是当我们挂起时,我们释放了线程,也就是说我们的调用栈被自己清除了。所以结果就是,当我们恢复时没有用到它。取而代之的是continuation们可以是作为一个调用栈。每一个continuation保持着我们在哪里挂起的(用label)、方法本地的变量以及参数,还有调用我们的那个方法所对应的continuation的引用。一个continuation引用着另外一个,另外一个又引用其他的,如此如此。结果就是,我们的continuation就像是一个巨型洋葱,在调用栈上保存着所有的东西,看起来就像是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
suspend fun a() {
val user = getUser()
b()
b()
b()
return user
}

suspend fun b() {
for (i in 1..10) {
c(i)
}
}

suspend fun c(i: Int) {
delay(i * 100)
println("Tick")
}

一个简单的continuation表示就像是下边这样:

1
2
3
4
5
6
7
8
9
10
11
CContinuation {
label -> 1
completion -> BContinuation {
i -> 4
label -> 1
completion -> AContinuation {
label -> 2
user -> User@1234
}
}
}

当一个continuation恢复了,每个continuation首先调用它自己的方法,当调用完成时,它再去恢复调用它的方法对应的continuation。这样调用方法的过程会重复直到栈顶:

1
2
3
4
5
6
7
8
9
10
11
override fun resumeWith(result: Result<String>) {
this.result = result
val res = try {
val r = requestUser(token, this)
if (r == COROUTINE_SUSPENDED) return
Success(r)
} catch (e: Throwable) {
Failure(e)
}
completion.resumeWith(res)
}

用图表示,整个过程看起来就是下边这样:

像是异常一样,它们被一个个的从方法中被抛出来,除非他们在哪里被捕获了。

译者注:所以这里可以理解为completion就是调用当前方法的方法对应的continuation。当当前方法结束时,会带着结果再次调用父方法,父方法进入下一状态(或抛出此时result中已有的异常)

真实的代码

真实的continuation以及挂起方法会比这复杂得多,因为它包含了一些优化和额外的算法:

  • 构建一个更好的异常堆栈
  • 添加协程挂起拦截器(后边会讨论这个特性)

下边是基于Kotlin 1.5.30版本的一部分BaseContinuationImpl的实现,其展示了真正的resumeWith实现(其他方法和注释被跳过了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
internal abstract class BaseContinuationImpl(
val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// 使用循环代替递归是内部决定好的,不能让外部有机会复写,所以final
final override fun resumeWith(result: Result<Any?>) {
// 使用循环代替递归让恢复时堆栈能更加短
var current = this
var param = result
while (true) {
// 用于填充debug信息
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast
// 执行自己的内容,产生结果
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
// 如果需要挂起,那么直接返回,不要担心,下游方法有当前continuation的引用(completion),会在完成时自己找回来的
if (outcome === COROUTINE_SUSPENDED)
return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
// 状态机实例(意义上)要终结了……
if (completion is BaseContinuationImpl) {
// 使用循环而非递归进行调用链回溯,completion代表的是调用自己的方法
current = completion
param = outcome
} else {
// 顶层调用将不是BaseContinuationImpl类型
// 会带着最终的结果退出
completion.resumeWith(outcome)
return
}
}
}
}

// ...
}

如你所见,其使用了循环来代替递归调用。这个变化会让代码有一定优化,更加简单。

讨论

真实的实现原理其实是更加复杂的,但是我希望通过这篇文章你能对协程的内部有一些概念了,关键点是:

  • 挂起方法有些像是状态机,在方法的一开始以及每个挂起方法调用后都有一个状态
  • label所代表的状态以及本地数据都被保存在continuation对象中
  • 方法所对应的continuation是被另一个continuation所修饰的,所有的这些continuation表示了一个调用栈,这个调用栈会在恢复时起到作用
  1. 实际的算法会更加复杂,label的第一位会额外的产生变化,在另外的地方这一位会进行检查。这个算法用来让挂起方法支持再现。为了追求简单的理解,这一点被跳过了
  2. 调用栈是有限空间,如果它都被用完了,那么我们就需要处理StackOverflowError栈溢出错误了