Coroutines_Principle

Kotlin协程使用

Dispatchers

协程调度器是用来指定协程体在哪个线程中执行,Kotlin提供了几个调度器:

Default

默认选项,指定协程体在线程池中执行:

1
2
3
4
5
6
7
8
9
10
11
12
GlobalScope.launch(Dispatchers.Default) {
println("1: ${Thread.currentThread().name}")
launch(Dispatchers.Default) {
println("2: ${Thread.currentThread().name}")
}
println("3: ${Thread.currentThread().name}")
}

-->output
1: DefaultDispatcher-worker-1
3: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-2

Main

指定协程体在主线程中执行。

IO

基于 Default 调度器背后的线程池(designed for offloading blocking IO tasks),因此从 Default 切换到 IO 不会触发线程切换:

1
2
3
4
5
6
7
8
9
10
11
12
GlobalScope.launch(Dispatchers.Default) {
println("1: ${Thread.currentThread().name}")
launch(Dispatchers.IO) {
println("2: ${Thread.currentThread().name}")
}
println("3: ${Thread.currentThread().name}")
}

-->output
1: DefaultDispatcher-worker-1
3: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-1

Unconfined

协程体运行在父协程所在的线程:

1
2
3
4
5
6
7
8
9
10
11
12
GlobalScope.launch(Dispatchers.Default) {
println("1: ${Thread.currentThread().name}")
launch(Dispatchers.Unconfined) {
println("2: ${Thread.currentThread().name}")
}
println("3: ${Thread.currentThread().name}")
}

-->output
1: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-1
3: DefaultDispatcher-worker-1

CoroutineScope

1
2
3
4
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
复制代码

GlobeScope

GlobeScope 启动的协程是一个单独的作用域,不会继承上层协程的作用域,其内部的子协程遵守默认的作用域规则。

coroutineScope

coroutineScope 启动的协程 cancel 时会 cancel 所有子协程,也会 cancel 父协程,子协程未捕获的异常也会向上传递给父协程。

supervisorScope

supervisorScope 启动的协程 cancel 和传递异常时,只会由父协程向子协程单向传播。MainScope 是 supervisorScope 作用域。

Android-Kotlin协程使用

MainScope

Android 中一般不建议使用 GlobalScope, 因为它会创建一个顶层协程,需要保持所有对 GlobalScope 启动的协程的引用,然后在 Activity destory 等场景的时候 cancel 掉这些的协程,否则就会造成内存泄露等问题。可以使用 MainScope:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class CoroutineActivity : AppCompatActivity() {
private val mainScope = MainScope()

fun request1() {
mainScope.launch {
// ...
}
}

// request2, 3, ...

override fun onDestroy() {
super.onDestroy()
mainScope.cancel()
}
}

MainScope 的定义:

1
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

Lifecycle协程

关于 Lifecycle 可以参考 Android-Jetpack组件之Lifecycle

添加依赖:

1
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope

val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}

abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
internal abstract val lifecycle: Lifecycle

// 当 activity created 的时候执行协程体
fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenCreated(block)
}

// // 当 activity started 的时候执行协程体
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenStarted(block)
}

// // 当 activity resumed 的时候执行协程体
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenResumed(block)
}
}
复制代码

使用:

1
2
3
4
5
6
7
8
9
10
// AppCompatActivity 实现了 LifecycleOwner 接口
class MainActivity : AppCompatActivity() {

fun test() {
lifecycleScope.launchWhenCreated {
// ...
}
}
}
复制代码

LiveData协程

关于 LiveData 可以参考 Android-Jetpack组件之LiveData-ViewModel

添加依赖:

1
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
fun <T> liveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
@BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

internal class CoroutineLiveData<T>(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
private var emittedSource: EmittedSource? = null

init {
val supervisorJob = SupervisorJob(context[Job])
val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
blockRunner = BlockRunner(
liveData = this,
block = block,
timeoutInMs = timeoutInMs,
scope = scope
) {
blockRunner = null
}
}

internal suspend fun emitSource(source: LiveData<T>): DisposableHandle {
clearSource()
val newSource = addDisposableSource(source)
emittedSource = newSource
return newSource
}

internal suspend fun clearSource() {
emittedSource?.disposeNow()
emittedSource = null
}

// 启动协程
override fun onActive() {
super.onActive()
blockRunner?.maybeRun()
}

// 取消协程
override fun onInactive() {
super.onInactive()
blockRunner?.cancel()
}
}

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// AppCompatActivity 实现了 LifecycleOwner 接口
class MainActivity : AppCompatActivity() {

fun test() {
liveData {
try {
// ...
emit("success")
} catch(e: Exception) {
emit("error")
}
}.observe(this, Observer {
Log.d("LLL", it)
})
}
}

ViewModel协程

关于 ViewModel 可以参考 Android-Jetpack组件之LiveData-ViewModel

添加依赖:

1
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
return setTagIfAbsent(JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate))
}

internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
override val coroutineContext: CoroutineContext = context

override fun close() {
coroutineContext.cancel()
}
}

协程原理

suspend 的原理

简述:suspend其实也是回调,实现suspend的方法会被编译器替换为 多一个Continuation参数 的方法,Continuation实现了一个状态机,其中每个挂起点(suspend方法)都是一个状态,保存了下一步执行的CoroutineContext

具体而言, 比如main中调用了suspend a(), suspend b(),那么实际上编译器会构造这么一个状态机:invokeSuspend() 中根据每个方法的执行结果,如a()如果耗时【即withContext(Dispatchers)/runBlocking之类的】则会return COROUTINE_SUSPENDED (即后续操作先不执行,也就是挂起了),等耗时执行完成后invokeSuspend()再被调用,此时状态改变,break走下一步b();如果a()不耗时则立即break,走到下一步b(); 如此往复,直到所有suspend方法执行完成,后正常执行其他代码

suspend 是回调(Callback)

理解 suspend 其实不需要纠结神奇的「挂起」是什么意思或者拘泥于线程是怎么切换的。实际上 suspend 的背后是大家非常熟悉的回调。

假设 postItem 由三个有依赖关系的异步子任务组成: requestTokencreatePostprocessPost ,这三个函数都是基于回调的 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 三个基于回调的 API
fun requestToken(block: (String) -> Unit)
fun createPost(
token: String,
item: Item,
block: (Post) -> Unit)
)
fun processPost(post: Post)

fun postItem(item: Item) {
requestToken { token ->
createPost(token, item) { post ->
processPost(post)
}
}
}

可以看到基于回调的 API 很容易造成大量缩进。如果代码中再加上一些条件、循环的逻辑,那么代码可读性会大大降低。Promise (Future) 等 API 以及 Android 社区很流行的 RxJava 通过链式调用在一定程度上消除了嵌套的问题。比如上面这个例子用 RxJava 实现的话:

1
2
3
4
5
6
7
fun requestToken(): Observable<String>
fun createPost(token: String, item: Item): Observable<Post>
fun processPost(post: Post)

fun postItem(item: Item) = requestToken()
.flatMap { createPost(it, item) }
.flatMap { processPost(it) }

但是 RxJava 这样的方案需要使用者掌握大量操作符,写复杂逻辑也很麻烦,会有一种被「困在」这个调用链里面的感觉。

kotlin 的 suspend 关键字可以帮助我们消除回调,用同步的写法写异步:

🏹代表挂起点(suspension point)

1
2
3
4
5
6
7
8
9
suspend fun requestToken(): String
suspend fun createPost(token: String, item: Item): Post
suspend fun processPost(post)

suspend fun postItem(item: Item) {
val token = 🏹 requestToken()
val post = 🏹 createPost(token, item)
🏹 processPost(post)
}

由于 createPost 这些方法实际上是耗时的 IO 异步操作,需要等到拿到返回值才能执行后面的逻辑,但我们又不希望阻塞当前线程(通常是主线程),因此最终必须实现某种消息传递的机制,让后台线程做完耗时操作以后把结果传给主线程。

假设我们有了前面提到的三个基于回调的 API,实现 suspend 可以在编译的时候把每个挂起点 🏹 后面的逻辑包在一个 lambda 里面,然后去调用回调 API,最终生成类似嵌套的代码。但这样每一个挂起点在运行时都需要开销一个 lambda 对象。Kotlin 和许多其他语言都采用生成状态机的方式,性能更好。

具体来说,编译器看到 suspend 关键字会去掉 suspend ,给函数添加一个额外的 Continuation 参数。这个 Continuation 就代表了一个回调:

1
2
3
4
5
public interface Continuation<in T> {
public val context: CoroutineContext

// 用来回调的方法
public fun resumeWith(result: Result<T>)}

Kotlin 编译器会给每个 suspend 的块生成一个 Continuation 的实现类,这个实现类是一个状态机,其中的状态对应于每个挂起点,保存了需要下一步继续执行所需要的上下文(即依赖的局部变量),类似下面的伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
suspend fun postItem(item: Item) {
val token = 🏹 requestToken()
val post = 🏹 createPost(token, item)
🏹 processPost(post)
}

// 编译器变换后的伪代码
// 1.脱掉了 suspend 关键字
// 2.增加了一个 Continuation 对象
fun postItem(item: Item, cont: Continuation) {

// 判断传入的是否是 postItem 的 `ContiuationImpl`
// * false: 初始化一个对应本次调用 postItem 的状态机
// * true: 对应 postItem 内其他 suspend 函数回调回来情况
// 其中 ThisSM 指的 object: ContinuationImpl 这个匿名类
val sm = (cont as? ThisSM) ?: object: ContinuationImpl {

// 实际源码中 override 的是
// kotlin.coroutine.jvm.internal.BaseContinuationImpl
// 的 invokeSuspend 方法
override fun resume(..) {
// 通过 ContinuationImpl.resume
// 重新回调回这个方法
postItem(null, this) }
}

switch (sm.label) {
case 0:
// 捕获后续步骤需要的局部变量
sm.item = item
// 设置下一步的 label
sm.label = 1

// 当 requestToken 里的耗时操作完成后会更新状态机
// 并通过 sm.resume 再次调用这个 postItem 函数
// 「我们在前面提供了 sm.resume 的实现,即再次调用 postItem」
requestToken(sm)
case 1:
val item = sm.item
// 前一个异步操作的结果
val token = sm.result as Token
sm.label = 2
createPost(token, item, sm)
case 2:
procesPost(post)
// ...
}
}

编译器将 suspend 编译成带有 continuation 参数的方法叫做 CPS (Continuation-Passing-Style) 变换。

使用 suspend 函数无须关心线程切换

suspend 提供了这样一个**约定(Convention)**:调用这个函数不会阻塞当前调用的线程。

但前提是这个 suspend 函数实现正确,真正做到了不阻塞当前线程。单纯地给函数加上 suspend 关键字并不会神奇地让函数变成非阻塞的

这对 UI 编程是非常有用的,因为 UI 的主线程需要不断相应各种图形绘制、用户操作的请求,如果主线程上有耗时操作会让其他请求无法及时响应,造成 UI 卡顿。

Android 社区流行的网络请求库 Retrofit、官方出品的数据库 ORM Room 都已经通过提供 suspend API 的形式支持了协程。Android 官方也利用 Kotlin 扩展属性的方式给 Activity 等具有生命周期的组件提供了开启协程所需的 CoroutineScope ,其中的 context 指定了使用 Dispatchers.Main ,即通过 lifecycleScope 开启的协程都会被调度到主线程执行。因此我们可以在调用 suspend 函数,拿到结果后直接更新 UI,无须做任何线程切换的动作。这样的 suspend 函数叫作「main 安全」的。

1
2
3
4
lifecycleScope.launch {
val posts = 🏹 retrofit.get<PostService>().fetchPosts();
// 由于在主线程,可以拿着 posts 更新 UI
}

这相比 callback 和 RxJava 的 API 是要好很多的。这些异步的 API 最终都得依靠回调,但回调回来在哪个线程需要调用方自己搞清楚,得看这些函数里面是怎么实现的。而有了 suspend 不阻塞当前线程的约定,调用方其实无须关心这个函数内部是在哪个线程执行的。

1
2
lifecycleScope.launch(Dispatchers.Main) {
🏹 foo()}

比如上面这个代码块,我们指定这个协程块调度到主线程执行,里面调用了一个不知道哪里来的 suspend foo 方法。这个方法内部可能是耗时的 CPU 计算,可能是耗时的 IO 请求,但是我在写这个协程块的时候,其实并不需要关心这里面到底是怎么回事,运行在哪个线程。类似地,在阅读这段协程块的时候,我们可以清楚地知道眼前的这段代码会在主线程执行,suspend foo 里面的代码是一个潜在的耗时操作,具体在哪个线程执行是这个函数的实现细节,对于当前代码的逻辑是「透明」的。

但前提是这个 suspend 函数实现正确,真正做到了不阻塞当前线程。单纯地给函数加上 suspend 关键字并不会神奇地让函数变成非阻塞的,比如假设 suspend foo 里面的实现是这样的:

1
2
// 😖
suspend fun foo() = BigInteger.probablePrime(4096, Random())

这里这个 suspend 函数的内部实现是一段耗时的 CPU 操作,类似地也可以想象成是一段时间复杂度特别高的代码。我们如果在主线程调用这个函数还是会阻塞 UI。问题出在这个 foo 函数的实现没有遵守 suspend 的语义,是错误的。正确的做法应该修改这个 foo 函数:

1
2
3
suspend fun findBigPrime(): BigInteger =
withContext(Dispatchers.Default) { BigInteger.probablePrime(4096, Random())
}

借助 withContext 我们把耗时操作从当前主线程挪到了一个默认的后台线程池。因此有人说,即使是用了协程,最终还是会「阻塞」某个线程,「所有的代码本质上都是阻塞式的」。这种理解可以帮助我们认识到 Android / JVM 上最终需要线程作为执行协程的载体,但忽略了阻塞和非阻塞 IO 之分。CPU 执行线程,而上面 BigInteger.probablePrime 是一个耗时的 CPU 计算,只能等待 CPU 把结果算出来,但 IO 造成的等待并不一定要阻塞 CPU。

链接

抽丝剥茧kotlin-协程

Kotlin笔记之协程工作原理

理解 Kotlin 的 suspend 函数

https://blog.yujinyan.me/posts/understanding-kotlin-suspend-functions/

Other

以GlobalScope.launch{ … } 为例,编译期通过编译时插入代码,将”…”协程体包裹封装成一个继承与SuspendLambda,而SuspendLambda是一个抽象类,继承于ContinuationImpl。

ContinuationImpl就是核心协程类Continuation接口的实现(一个Continuation就代表了一个协程)\

协程的启动是通过 BaseContinuationImpl.resumeWith 方法调用到了子类 SuspendLambda.invokeSuspend 方法,然后通过状态机来控制顺序运行。

Kotlin 编译器会为 协程体 生成继承自 SuspendLambda 的子类,协程的真正运算逻辑都在其 invokeSuspend 方法中。

Author

white crow

Posted on

2021-11-02

Updated on

2024-03-25

Licensed under