经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 移动开发 » Kotlin » 查看文章
Kotlin图文并茂讲解续体与续体拦截器和调度器
来源:jb51  时间:2022/8/1 13:08:44  对本文有异议

一.Continuation

Continuation接口是协程中最核心的接口,代表着挂起点之后的续体,代码如下:

  1. public interface Continuation<in T> {
  2. // 续体的上下文
  3. public val context: CoroutineContext
  4. // 该方法用于恢复续体的执行
  5. // result为挂起点执行完成的返回值,T为返回值的类型
  6. public fun resumeWith(result: Result<T>)
  7. }

Continuation图解

二.ContinuationInterceptor

ContinuationInterceptor接口继承自Element接口,是协程中的续体拦截器,代码如下:

  1. public interface ContinuationInterceptor : CoroutineContext.Element {
  2. // 拦截器的Key
  3. companion object Key : CoroutineContext.Key<ContinuationInterceptor>
  4. // 拦截器对续体进行拦截时会调用该方法,并对continuation进行缓存
  5. // 拦截判断:根据传入的continuation对象与返回的continuation对象是否相同
  6. public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
  7. // 当interceptContinuation方法拦截的协程执行完毕后,会调用该方法
  8. public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
  9. /* do nothing by default */
  10. }
  11. // get方法多态实现
  12. public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
  13. @OptIn(ExperimentalStdlibApi::class)
  14. if (key is AbstractCoroutineContextKey<*, *>) {
  15. @Suppress("UNCHECKED_CAST")
  16. return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
  17. }
  18. @Suppress("UNCHECKED_CAST")
  19. return if (ContinuationInterceptor === key) this as E else null
  20. }
  21. // minusKey方法多态实现
  22. public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
  23. @OptIn(ExperimentalStdlibApi::class)
  24. if (key is AbstractCoroutineContextKey<*, *>) {
  25. return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
  26. }
  27. return if (ContinuationInterceptor === key) EmptyCoroutineContext else this
  28. }
  29. }

三.CoroutineDispatcher

CoroutineDispatcher类继承自AbstractCoroutineContextElement类,实现了ContinuationInterceptor接口,是协程调度器的基类,代码如下:

  1. public abstract class CoroutineDispatcher :
  2. AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
  3. // ContinuationInterceptor的多态实现,调度器本质上就是拦截器
  4. @ExperimentalStdlibApi
  5. public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
  6. ContinuationInterceptor,
  7. { it as? CoroutineDispatcher })
  8. // 用于判断调度器是否要调用dispatch方法进行调度,默认为true
  9. public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
  10. // 调度的核心方法,在这里进行调度,执行block
  11. public abstract fun dispatch(context: CoroutineContext, block: Runnable)
  12. // 如果调度是由Yield方法触发的,默认通过dispatch方法实现
  13. @InternalCoroutinesApi
  14. public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
  15. // ContinuationInterceptor接口的方法,将续体包裹成DispatchedContinuation,并传入当前调度器
  16. public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
  17. DispatchedContinuation(this, continuation)
  18. // 释放父协程与子协程的关联。
  19. @InternalCoroutinesApi
  20. public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
  21. (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
  22. }
  23. // 重载了"+"操作,直接返回others
  24. // 因为两个调度器相加没有意义,同一个上下文中只能有一个调度器
  25. // 如果需要加的是调度器对象,则直接替换成最新的,因此直接返回
  26. public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
  27. override fun toString(): String = "$classSimpleName@$hexAddress"
  28. }

四.EventLoop

EventLoop类继承自CoroutineDispatcher类,用于协程中任务的分发执行,只在runBlocking方法中和Dispatchers.Unconfined调度器中使用。与Handler中的Looper类似,在创建后会存储在当前线程的ThreadLocal中。EventLoop本身不支持延时执行任务,如果需要可以自行继承EventLoop并实现Delay接口,EventLoop中预留了一部分变量和方法用于延时需求的扩展。

为什么协程需要EventLoop呢?协程的本质是续体传递,而续体传递的本质是回调,假设在Dispatchers.Unconfined调度下,要连续执行多个suspend方法,就会有多个续体传递,假设suspend方法达到一定数量后,就会造成StackOverflow,进而引起崩溃。同样的,我们知道调用runBlocking会阻塞当前线程,而runBlocking阻塞的原理就是执行“死循环”,因此需要在循环中做任务的分发,去执行内部协程在Dispatchers.Unconfined调度器下加入的任务。

EventLoop代码如下:

  1. internal abstract class EventLoop : CoroutineDispatcher() {
  2. // 用于记录使用当前EventLoop的runBlocking方法和Dispatchers.Unconfined调度器的数量
  3. private var useCount = 0L
  4. // 表示当前的EventLoop是否被暴露给其他的线程
  5. // runBlocking会将EventLoop暴露给其他线程
  6. // 因此,当runBlocking使用时,shared必须为true
  7. private var shared = false
  8. // Dispatchers.Unconfined调度器的任务执行队列
  9. private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
  10. // 处理任务队列的下一个任务,该方法只能在EventLoop所在的线程调用
  11. // 返回值<=0,说明立刻执行下一个任务
  12. // 返回值>0,说明等待这段时间后,执行下一个任务
  13. // 返回值为Long.MAX_VALUE,说明队列里没有任务了
  14. public open fun processNextEvent(): Long {
  15. if (!processUnconfinedEvent()) return Long.MAX_VALUE
  16. return 0
  17. }
  18. // 队列是否为空
  19. protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
  20. // 下一个任务多长时间后执行
  21. protected open val nextTime: Long
  22. get() {
  23. val queue = unconfinedQueue ?: return Long.MAX_VALUE
  24. return if (queue.isEmpty) Long.MAX_VALUE else 0L
  25. }
  26. // 任务的核心处理方法
  27. public fun processUnconfinedEvent(): Boolean {
  28. // 若队列为空,则返回
  29. val queue = unconfinedQueue ?: return false
  30. // 从队首取出一个任务,如果为空,则返回
  31. val task = queue.removeFirstOrNull() ?: return false
  32. // 执行
  33. task.run()
  34. return true
  35. }
  36. // 表示当前EventLoop是否可以在协程上下文中被调用
  37. // EventLoop本质上也是协程上下文
  38. // 如果EventLoop在runBlocking方法中使用,必须返回true
  39. public open fun shouldBeProcessedFromContext(): Boolean = false
  40. // 向队列中添加一个任务
  41. public fun dispatchUnconfined(task: DispatchedTask<*>) {
  42. // 若队列为空,则创建一个新的队列
  43. val queue = unconfinedQueue ?:
  44. ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
  45. queue.addLast(task)
  46. }
  47. // EventLoop当前是否还在被使用
  48. public val isActive: Boolean
  49. get() = useCount > 0
  50. // EventLoop当前是否还在被Unconfined调度器使用
  51. public val isUnconfinedLoopActive: Boolean
  52. get() = useCount >= delta(unconfined = true)
  53. // 判断队列是否为空
  54. public val isUnconfinedQueueEmpty: Boolean
  55. get() = unconfinedQueue?.isEmpty ?: true
  56. // 下面三个方法用于计算使用当前的EventLoop的runBlocking方法和Unconfined调度器的数量
  57. // useCount是一个64位的数,
  58. // 它的高32位用于记录Unconfined调度器的数量,低32位用于记录runBlocking方法的数量
  59. private fun delta(unconfined: Boolean) =
  60. if (unconfined) (1L shl 32) else 1L
  61. fun incrementUseCount(unconfined: Boolean = false) {
  62. useCount += delta(unconfined)
  63. // runBlocking中使用,shared为true
  64. if (!unconfined) shared = true
  65. }
  66. fun decrementUseCount(unconfined: Boolean = false) {
  67. useCount -= delta(unconfined)
  68. // 如果EventLoop还在被使用
  69. if (useCount > 0) return
  70. assert { useCount == 0L }
  71. // 如果EventLoop不被使用了,并且在EventLoop中使用过
  72. if (shared) {
  73. // 关闭相关资源,并在ThreadLocal中移除
  74. shutdown()
  75. }
  76. }
  77. protected open fun shutdown() {}
  78. }

协程中提供了EventLoopImplBase类,间接继承自EventLoop,实现了Delay接口,用来延时执行任务。同时,协程中还提供单例对象ThreadLocalEventLoop用于EventLoop在ThreadLocal中的存储。

到此这篇关于Kotlin图文并茂讲解续体与续体拦截器和调度器的文章就介绍到这了,更多相关Kotlin续体内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号