本文将从以下3个方面介绍 Kotlin Coroutines

  • 概念
  • 使用方法
  • 内部实现原理

本文的实例代码基于Kotlin 1.3.0 和 kotlinx.coroutines 1.0.0。

众所周知,在高负荷下,阻塞和轮询是不好的。而且这个世界正在变得越来越依赖于push和异步。许多语言已经(starting with C# in 2012) 通过 async/await 关键字支持异步。 而在Kotlin, 我们抽象了这个概念,从而 一些库就可以实现各自的异步支持,所以 async 在kotlin中不再是关键字,而是一个function。

这个设计是为了集成各种异步API: futures/promises, callback-passing, 等等。这个设计足够抽象,甚至可以表达 lazy generators (yield) 和其他用例。

Kotlin 团队推出 coroutines 来提供一种简单的方式来编写 concurrent 编程。 也许我们中的大多数已经在用各种基于线程的并发工具,比如 Java’s concurrency API,这已经是一套很成熟的API了。

Java Concurrency vs. Kotlin Coroutines

如果你仍然对Java中的 threading 和 concurrency 感到困惑,我建议你阅读这本书《Java Concurrency in Practice》

尽管 Java’s 并发工具已经工程化的很好了, 但还是很难用。另一个问题是Java 并不鼓励非阻塞编程,所以你总是发现自己在start一个线程后才发现引入了过度开销和阻塞计算 (due to locks, sleeps, waits, etc.)。 应用 non-blocking patterns 很难而且极易犯错。

Kotlin 协程则相反, 背后帮程序员处理好了一堆复杂的事情,调用则显得简单,看起来像命令式编程,他们提供了不用阻塞线程就可以执行异步代码的一种方式, 对应用来说这又提供了一种新的可能。不是阻塞线程,而是suspended计算。

许多别的文章将coroutines视为“light-weight threads”; 但是coroutines不是我们Java传统意义上的线程。与线程相比, 新建一个coroutines 的代价非常廉价。一个原因是coroutines没有被映射到native的线程。正如我们将会看到的, coroutines 实际上是是被库管理着在一个线程池里面执行的。
另外一个很重要的不同点在于“limitation”: 线程受限于可用的native threads, coroutines 则几乎没有限制, 即使是上千个协程也可以一下子开启。

Concurrent Programming Style

在不同的语言中asynchronous/concurrent的实现风格往往不同:

  • Callback-based (JavaScript)
  • Future/Promise-based (Java, JavaScript)
  • Async/Await-based (C#) and more

所有的这些概念都可以用coroutines实现,kotlin没有直接依赖任何实现风格,而且,额外的一个好处是,coroutines可以像命令式编程一样序列化编写代码,尽管运行时是并发的。

The Concept of Kotlin Coroutines

“Coroutine” 的概念并不是新鲜事物。根据wiki,早在1958就有人提出这个概念. 许多现代编程语言也提供了native支持: C#, Go, Python, Ruby, etc. 包括Kotlin coroutines的实现在内, 都是基于“Continuations”, Continuations 是计算机程序的状态控制的一层抽象。

Getting Started with Coroutines

reference
GitHub Kotlin_Examples

Kotlin Coroutines Ingredients

正如我们提到的,kotlin提供了一套易于理解的高阶api,首先我们了解下一个新修饰符suspend,suspend表示一个方法是“suspending”的。

Suspending Functions

在Coroutines中我们说一个方法是“suspending”的是指这个方法可能会在任何一行命令中挂起,被suspend修饰的方法只能在 coroutines 或者其他suspending functions中调用。

suspend fun myMethod(p: String): Boolean {
//...
}

我们可以把coroutine看做一个序列的常规方法调用,只不过这个序列执行完成后会额外提供一个运行结果。

Hands-On

现在来看一个实际的例子

fun main(args: Array<String>) = runBlocking { //(1)
val job = GlobalScope.launch { //(2)
val result = suspendingFunction() //(3)
print("$result")
}
print("The result: ")
job.join() //(4)
}
// prints "The result: 5"

在这个例子中,有两个新方法**(1)** runBlocking 、 (2) launch,它们都是 coroutine builders。我们可以利用各种各样的builder来实现不同的业务需求:

  • launch (fire and forget, can also be canceled)
  • async (返回 promise)
  • runBlocking (阻塞线程)
  • etc.

我们可以在各种scopes中开启一个coroutines,在这个例子中,GlobalScope 被用来 launch一个和App拥有同样生命周期的 coroutine。这种方式仅仅用在文中的例子里,在实际应用中还是要看具体业务场合。 按照 “structured concurrency”的概念, 我们需要将coroutines限制在不同的scopes下面,从而更好的维护和管理.  CoroutineScope.

接下来看这个代码做了什么:
由**(2)** launch开启的内部的coroutine是实际上干活的,我们调用**(3)** suspending function,然后coroutine打印出结果。在启动coroutine 后,主线程在coroutine结束前打印 The result:。 (2) launch返回的是一个Job,通过它可以取消或者**(4)** join()等待 coroutines 完成。由于 join() 可能会suspend, 所以我们需要将它包在另外一个coroutine——runBlocking中。
runBlocking coroutine builder 被设计用来将常规的阻塞代码桥接到suspending function。这个功能常常被用在main function和 test中。API。如果不调用join(),那么这个程序会在coroutine打印出结果前结束掉。

runBlocking coroutine scope外 launch一个coroutine也是可以的。我们只需将GlobalScope.launch 改为 launch。同时,我们也可以去掉join因为runBlocking在所有的child coroutines完成前不会结束。这个例子也是 structured concurrency的一个例子, 接下来会详细介绍这个概念。

Structured Concurrency

正如之前提到的,我们可以按照某种层次结构组织管理coroutines。假设在UI界面的某一个特定事件上我们需要中断。如果我们在这个UI上开启一个 coroutines来处理某个任务, 而且当主界面中断时,这个任务也应该被中断。值得注意的是,每个coroutine都可以运行在不同的scope内。 我们可以将多个coroutines在同一个scope内管理,这样就可以同时取消。下面的例子复用了上一节的例子,只不过 launch coroutines 在 runBlocking 的scope 下。

fun main(args: Array<String>) {
runBlocking {
launch {
delay(500)
println("Hello from launch")
}
println("Hello from runBlocking after launch")
}
println("finished runBlocking")
}

输出如下

Hello from runBlocking after launch
Hello from launch
finished runBlocking

从上面的输出可以看出runBlocking 会等待子coroutinelaunch完成。因此,我们可以利用这个结构来将coroutine的取消操作代理给子coroutines:

fun main(args: Array<String>) {
runBlocking {
val outerLaunch = launch {
launch {
while (true) {
delay(300)
println("Hello from first inner launch")
}
}
launch {
while (true) {
delay(300)
println("Hello from second inner launch")
}
}
}

println("Hello from runBlocking after outer launch")
delay(800)
outerLaunch.cancel()
}
println("finished runBlocking")
}

输出如下

Hello from runBlocking after outer launch
Hello from first inner launch
Hello from second inner launch
Hello from first inner launch
Hello from second inner launch
finished runBlocking

在这个例子中,可以看到在runBlocking这个scope下,launch先创建了一个outer coroutine,接着又创建了两个内部coroutines,当我们取消 outer coroutine时,会同时委托到inner coroutines进行cancel操作。这个机制同样适用于错误控制,如果有异常从inner coroutines内部抛出,那么同一个scope内的所有coroutines都会停止。

Custom Scope

这一节,我们将创建自己的CoroutineScope。在上个例子中我们为了简单起见,使用runBlockingscope,而在实际应用中创建自己管理的scope是有必要的。创建也很简单,使用 coroutineScope builder。文档如下:

Creates new [CoroutineScope] and calls the specified suspend block with this scope. The provided scope inherits its [coroutineContext][CoroutineScope.coroutineContext] from the outer scope, but overrides context’s [Job]. This function is designed for a parallel decomposition of work. When any child coroutine in this scope fails, this scope fails, and all the rest of the children are canceled (for a different behavior see [supervisorScope]).

创建一个新的[CoroutineScope]并且执行这个scope内指定的suspend block。 新scope会继承外部scope的[CoroutineScope.coroutineContext],但是会覆盖context的[Job]对象。这个方法可以用来parallel decomposition业务。任何child coroutines 失败,整个scope失败,并且其他的子coroutines都会被取消(如果要自定义这个行为可以使用[supervisorScope])。

fun main(args: Array<String>) = runBlocking {
coroutineScope {
val outerLaunch = launch {
launch {
while (true) {
delay(300)
println("Hello from first inner launch")
}
}
launch {
while (true) {
delay(300)
println("Hello from second inner launch")
}
}
}

println("Hello from runBlocking after outer launch")
delay(800)
outerLaunch.cancel()
}
println("finished coroutineScope")
}

这个例子和我们之前看到的非常相似。只不过最外层的scope变成了我们自定义的scope。了解更多可以参考this post on structured concurrency with coroutines.

Going deeper

下面举一个更具体的例子,比如在应用中发送一封邮件。
请求接收者的地址和渲染消息内容是两个耗时的任务,而且互相独立。使用kotlin coroutines你可以同时执行这两个task:

suspend fun sendEmail(r: String, msg: String): Boolean { //(6)
delay(2000)
println("Sent '$msg' to $r")
return true
}

suspend fun getReceiverAddressFromDatabase(): String { //(4)
delay(1000)
return "coroutine@kotlin.org"
}

suspend fun sendEmailSuspending(): Boolean {
val msg = GlobalScope.async { //(3)
delay(500)
"The message content"
}
val recipient = GlobalScope.async {
getReceiverAddressFromDatabase() //(5)
}
println("Waiting for email data")
val sendStatus = GlobalScope.async {
sendEmail(recipient.await(), msg.await()) //(7)
}
return sendStatus.await() //(8)
}

fun main(args: Array<String>) = runBlocking { //(1)
val job = GlobalScope.launch {
sendEmailSuspending() //(2)
println("Email sent successfully.")
}
job.join() //(9)
println("Finished")
}

首先,和之前一样,我们在runBlocking builder 里面起了一个**(1)** launch builder,这样在**(9)** 处就可以等待coroutines结束了。和这个结构一样,(2)sendEmailSuspending suspending function也不是什么新的语法。这个方法内调用了一个**(3)** inner coroutine来获取message,同时调用另一个suspend方法getReceiverAddressFromDatabase获取发送地址。我们在**(5)** async built的两个单独的coroutines里同时执行这两个task。
注意,这里的delay表示的coroutines中的一种非阻塞挂起, 和Thread.sleep类似,这里用来模拟耗时操作。

The async Coroutine Builder

async builder 在概念上很容易理解。在其他语言里async会返回一个promise, 而在kotlin,则会返回Deferred。顺便说一下,这里的 promise, future, deferred 或者 delay 通常描述的都是同一个概念: 异步方法 promises 许诺会返回一个值我们可以wait或者在之后再去获取。

在**(7)sendEmail(recipient.await(), msg.await())处返回了sendStatus这个Deferred对象,(6)** sendEmail调用时传入的则是之前的Deferred对象。调用Deferred.await()会挂起当前函数,直到返回结果可用为止。最终我们在 (8) sendStatus.await()处返回发送结果。

Shared Mutable State

虽然前面没有提及,但是读者可能也想到了coroutines之间同步的问题。并发的coroutines之间会共享状态,显然和其他编程语言一样,比如Java,意识到这一点很重要。 我们可以用一些常规的策略来解决同步问题,比如 thread-safe data structuresconfining execution to a single thread 或者 使用 locks.
除了这些常规模式, Kotlin coroutines 鼓励使用 “share by communication” (see QA).

实际上,我们可以使用 “actor” 来表示被多个coroutines共享的状态。Coroutines 之间可以利用actors 来发送和接收消息。

Actors

sealed class CounterMsg {
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: SendChannel<Int>) : CounterMsg() // a request with channel for reply.
}

fun counterActor() = GlobalScope.actor<CounterMsg> { //(1)
var counter = 0 //(9) </b>actor state, not shared
for (msg in channel) { // handle incoming messages
when (msg) {
is CounterMsg.IncCounter -> counter++ //(4)
is CounterMsg.GetCounter -> msg.response.send(counter) //(3)
}
}
}

suspend fun getCurrentCount(counter: SendChannel<CounterMsg>): Int { //(8)
val response = Channel<Int>() //(2)
counter.send(CounterMsg.GetCounter(response))
val receive = response.receive()
println("Counter = $receive")
return receive
}

fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor()

GlobalScope.launch { //(5)
while(getCurrentCount(counter) < 100){
delay(100)
println("sending IncCounter message")
counter.send(CounterMsg.IncCounter) //(7)
}
}

GlobalScope.launch { //(6)
while ( getCurrentCount(counter) < 100) {
delay(200)
}
}.join()
counter.close() // shutdown the actor
}

这个例子展示了Actor的使用, 实际上Actor本身就是一个coroutine。本例中的 actor 持有了 (9) 一个状态量 counter。接下来要介绍下 (2) Channel的概念。

Channels

Channels 提供了一种传递数据流的方法。和Java中的BlockingQueue类似 (消费者生产者pattern) ,不过Channels没有任何阻塞方法,而是提供了 send 和 receive 这两个 suspending functions 来按照FIFO的策略来生产和消费。

总是会有一个默认的actor和channels连接,通过这个actor可以与别的coroutines **(7)**交互。这上面的例子中,actor遍历了channel中的message,根据message的类型来决定是增加counter还是通过 GetCounter‘s SendChannel发送counter message。

main中的第一个 coroutine 启动了一个一直向actor发送**(7)** IncCounter messages 的任务,直到counter大于100。第二个 (6) coroutine则挂起等待,直到counter到达100。每个coroutines都调用了suspending function (8)getCurrentCounter, 其内部向actor发送了GetCounter message 并且挂起等待receive返回。

正如我们看到的,可变状态被限制在一个特定的actor coroutine内,并遵循了 share by communication 的原则,解决了共享可变状态的问题。

More Features and Examples

更多文档和例子these.

How it works – Implementation of Kotlin Coroutines

Coroutines的内部实现并不依赖于操作系统或者Java虚拟机。 与之相反,coroutines的实现机制在于编译器。编译器会将coroutines和suspend function转化为一个内部状态机,这个虚拟机可以维护和切换挂起的coroutines,并负责保持coroutines的内部状态。其原理来自于Continuations这个概念。 Continuations 会被编译器作为额外的一个参数传递给每个suspending function。这种技术实现也被称为“Continuation-passing style”

下面我们来看下添加了continuation之后的function长什么样子。

suspend fun sampleSuspendFun(x: Int): Int {
delay(2000)
return x * x
}

经过编译器转化后新方法:

public static final Object sampleSuspendFun(int x, @NotNull Continuation var1)

注意到上面的函数多了一个新的参数Continuation。当我们在coroutines里面调用这个方法时,编译器会将sampleSuspendFun之后的代码作为continuation参数传递。当sampleSuspendFun完成后,continuation 会被回调。这就是我们平时熟知的 callback-based 编程模式,只不过被编译器隐藏了实现细节。当然,这只是简化的一种描述,更多细节可以参考implementation-details.

结论

相比Java,Kotlin鼓励另一种完全不同的——非阻塞式、并且不会绑定到native thread上的并发编程模式。

编写 Java 并发编程常常伴随着过多的线程,或者忘记了线程池管理,这些小粗心往往导致代码执行效率问题。Coroutines, 却相反, 正如 “light-weight threads” 这个别称所表明的一样, 它不会映射到物理线程上,而且正因如此,协程也不会遇到并发编程中常常遇到的deadlocks、starvation问。正如我们上面看到的,在协程中,通常不用担心线程阻塞。而且,同步也更加简单直接,甚至在遵循**“share by communication”** 原则后,同步操作都是不必要的,

协程可以和其他并发编程方式一起工作。其中,许多适配工作已经可以使用,而且其他的并发编程也可以轻松适配。
对于Java开发者来说,async/await 这种范式是最容易接受的,因为很容易和 future 联系起来。但是,async/await并不是简单的代替future而是功能上的升级。

在Java中写并发代码总是会写一堆模板代码来处理 checked exceptionsdefensive locking 等一堆问题。得益于序列式编程、可管理性和可读性,这一切在coroutines中都得到了改善。

Perspective

Kotlin 1.3版本中,Coroutines终于移除了experimental包名,API也变得稳定。