Kotlin - 协程结构化并发Structured Concurrency

news/2024/12/23 16:19:39 标签: kotlin, CoroutineScope, 协程, 结构化同步

前言

KotlinProject LeadRoman Elizarov一片文章https://elizarov.medium.com/structured-concurrency-722d765aa952介绍了Structured Concurrency发展背景。相对Kotlin1.1时代后来新增Structured Concurrency理念,也就是我们现在所熟悉的协程版本所具备的特性,解决各种复杂业务场景例如协程嵌套、异步等等使用方式所面临生命周期管理问题本文通过梳理源码试图理解Structured Concurrency的具体含义实现原理

概念理解

常见业务场景如下

suspend fun loadAndCombine(name1: String, name2: String): Image { 
    val deferred1 = async { loadImage(name1) }
    val deferred2 = async { loadImage(name2) }
    return combineImages(deferred1.await(), deferred2.await())
}

deferred1deferred2都是异步执行最终需要二者执行结果合并后返回如果此时其中一个loadImage执行异常或者主动取消很难通知另一个LoadImage及时停止执行释放资源

或者如下场景:

val scope = CoroutineScope(Job())
    scope.launch {
        printLog("launch1")
        launch {
            delay(20000)
            printLog("launch1-1")
        }
        printLog("launch1 done")
        cancel()
    }

外层launch执行最后希望cancel内部所有协程没有Structrued Concurrency特性时候要实现这种逻辑需要类似使用线程处理方式Structrued Concurrency特性可以让我们cancel外层协程自动cancel里面所有的子协程

这就是所谓的对协程生命周期管理为了能够将所有协程生命周期完全管理起来Kotlin使用CoroutineScope

Coroutines are always related to some local scope in your application, which is an entity with a limited life-time, like a UI element.

CoroutineScope相当于圈定一个空间所有协程这个空间里面执行这样所有协程声明周期可以通过CoroutineScope进行管理

实现原理

我们知道launch都是一个JobJobCoroutineScope关系如下

再次根据这个例子这种关系如何实现

val scope = CoroutineScope(Job())
    scope.launch {
        printLog("launch1")
        launch {
            delay(20000)
            printLog("launch1-1")
        }
        printLog("launch1 done")
        cancel()
    }

首先新建CoroutineScope(Job())

kotlinx.coroutines-master\kotlinx-coroutines-core\common\src\CoroutineScope.kt

public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    // CoroutineScope is used intentionally for user-friendly representation
    override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}

CoroutineScope本身是一个接口这里CoroutineScope不是构造函数而是一个顶层函数这里两个关注点

context[Job]context + Job()

所有JobCoroutineDispatcher继承于CoroutineContext因此CoroutineScope函数参数我们可以新建一个Job(), 也可以一个CoroutineDispatcherJob()为例看下实现

public interface Job : CoroutineContext.Element {
    /**
     * Key for [Job] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<Job>

Job继承于CoroutineContext.Element

public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }

注意这里get函数返回值取决于keykey哪里赋值

Job也是一个接口Job()也是顶层函数

public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)
internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {

JobImp继承JobSupportJobSupportJob具体实现

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob {
    final override val key: CoroutineContext.Key<*> get() = Job

可以看到key实际值Job

所以如果CoroutineScope(...)的参数传入Job(), context[Job]返回Job

那context + Job()代表什么

CoroutineContext接口声明看到plus操作符重载:

public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

是将两个CoroutineContext合并成了CombinedContextCombinedContext本身一个CoroutineContext

综上 CoroutineScope时候如果传入一个Job使用这个Job如果没有传入Job(可能传入一个CoroutineDispatcher)新建一个Job然后Job赋值ContextScopecoroutineContext成员变量。

如此一来一个新建CoroutineScope关联一个顶层Job

使用launch创建一个协程
kotlinx.coroutines-master\kotlinx-coroutines-core\common\src\Builders.common.kt

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

首先launchCoroutineScope扩展函数也就是只能在CoroutineScope创建协程newCoroutineContext(context)

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = foldCopies(coroutineContext, context, true)
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

这里contextEmptyCoroutineContextcoroutineContext刚才CoroutineScope(Job())传入的顶层Job经过foldCopies返回combined可以看做顶层Job的封装。return语句中可以看到debug(即顶层Job)加上debug + Dispatchers.Default这就是为什么默认运行Dispatchers.Default线程原因

创建newContext,如果start.isLazy构建LazyStandaloneCoroutine否则构建StandaloneCoroutinestart协程执行方式默认立即执行也可以懒加载执行具体kotlinx.coroutines-master\kotlinx-coroutines-core\common\src\CoroutineStart.kt

这里构建的是默认StandaloneCoroutine

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

parentContext参数传入刚才构建newContext也就顶层JobinitParentJob默认值true接着看下他的继承AbstractCoroutine

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    init {
        /*
         * Setup parent-child relationship between the parent in the context and the current coroutine.
         * It may cause this coroutine to become _cancelling_ if the parent is already cancelled.
         * It is dangerous to install parent-child relationship here if the coroutine class
         * operates its state from within onCancelled or onCancelling
         * (with exceptions for rx integrations that can't have any parent)
         */
        if (initParentJob) initParentJob(parentContext[Job])
    }

AbstractCoroutine继承了JobSupportJob也就是StandaloneCoroutine实际上构造一个Job看下这里initParentJob(parentContext[Job])parentContext进来顶层Job封装newContext这里取出Job传进initParentJob

protected fun initParentJob(parent: Job?) {
        assert { parentHandle == null }
        if (parent == null) {
            parentHandle = NonDisposableHandle
            return
        }
        parent.start() // make sure the parent is started
        val handle = parent.attachChild(this)
        parentHandle = handle
        // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
        if (isCompleted) {
            handle.dispose()
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
    }

这里执行parent.attachChild(this)字面理解launch创建出来Job作为Child加入顶层的Job

关联父子Job

看下具体实现

kotlinx.coroutines-master\kotlinx-coroutines-core\common\src\JobSupport.kt

public final override fun attachChild(child: ChildJob): ChildHandle {
        val node = ChildHandleNode(child).also { it.job = this }
        val added = tryPutNodeIntoList(node) { _, list ->
            // First, try to add a child along the cancellation handlers
            val addedBeforeCancellation = list.addLast(
                node,
                LIST_ON_COMPLETION_PERMISSION or LIST_CHILD_PERMISSION or LIST_CANCELLATION_PERMISSION
            )
            ...
                node.invoke(rootCause)
                if (addedBeforeCompletion) {
                    /** The root cause can't be null: since the earlier addition to the list failed, this means that
                     * the job was already cancelled or completed. */
                    assert { rootCause != null }
                    true
                } else {
                    /** No sense in retrying: we know it won't succeed, and we already invoked the handler. */
                    return NonDisposableHandle
                }
            }
        }
        if (added) return node
        /** We can only end up here if [tryPutNodeIntoList] detected a final state. */
        node.invoke((state as? CompletedExceptionally)?.cause)
        return NonDisposableHandle
    }

首先构造一个ChildHandleNode

private class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobNode(), ChildHandle {
    override val parent: Job get() = job
    override val onCancelling: Boolean get() = true
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

这里parent传入顶层JobchildJob是launch新建Job

tryPutNodeIntoList

private inline fun tryPutNodeIntoList(
        node: JobNode,
        tryAdd: (Incomplete, NodeList) -> Boolean
    ): Boolean {
        loopOnState { state ->
            when (state) {
                is Empty -> { // EMPTY_X state -- no completion handlers
                    if (state.isActive) {
                        // try to move to the SINGLE state
                        if (_state.compareAndSet(state, node)) return true
                    } else
                        promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
                }
                is Incomplete -> when (val list = state.list) {
                    null -> promoteSingleToNodeList(state as JobNode)
                    else -> if (tryAdd(state, list)) return true
                }
                else -> return false
            }
        }
    }
private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)

private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
        while (true) {
            block(state)
        }
    }

state是什么

kotlinx.coroutines-master\kotlinx-coroutines-core\common\src\JobSupport.kt

private val EMPTY_NEW = Empty(false)
private val EMPTY_ACTIVE = Empty(true)

JobSupport维护一个状态机管理Job不同状态阶段这里EMPTY_NEW和 EMPTY_ACTIVE具体状态

private class Empty(override val isActive: Boolean) : Incomplete {
    override val list: NodeList? get() = null
    override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
}

其内维护一个list

简言之就是tryAdd(state, list)自己的state内的list传递调用tryPutNodeIntoList回头tryPutNodeIntoList

val addedBeforeCompletion = list.addLast(
                    node,
                    LIST_CHILD_PERMISSION or LIST_ON_COMPLETION_PERMISSION
                )

Job加到list

由此一来CoroutineScope构建Job树。

Job的执行

回到CoroutineScope.launch

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

构建coroutine执行coroutine.start

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }
public enum class CoroutineStart {
...
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

在这里开始执行协程

Structured Concurrency的典型作用:协程的cancel

当执行scopecancel

public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

是通过coroutineContext[Job]获取顶层Job然后执行cancel

kotlinx.coroutines-master\kotlinx-coroutines-core\common\src\JobSupport.kt
public override fun cancel(cause: CancellationException?) {
        cancelInternal(cause ?: defaultCancellationException())
    }

 public open fun cancelInternal(cause: Throwable) {
        cancelImpl(cause)
    }
    
internal fun cancelImpl(cause: Any?): Boolean {
        var finalState: Any? = COMPLETING_ALREADY
        if (onCancelComplete) {
            // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
            // completing and had recorded exception
            finalState = cancelMakeCompleting(cause)
            if (finalState === COMPLETING_WAITING_CHILDREN) return true
        }
        if (finalState === COMPLETING_ALREADY) {
            finalState = makeCancelling(cause)
        }
        return when {
            finalState === COMPLETING_ALREADY -> true
            finalState === COMPLETING_WAITING_CHILDREN -> true
            finalState === TOO_LATE_TO_CANCEL -> false
            else -> {
                afterCompletion(finalState)
                true
            }
        }
    }

makeCancelling为例

private fun makeCancelling(cause: Any?): Any? {
        var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
        loopOnState { state ->
            when (state) {
                is Finishing -> { // already finishing -- collect exceptions
                    val notifyRootCause = synchronized(state) {
                        if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
                        // add exception, do nothing is parent is cancelling child that is already being cancelled
                        val wasCancelling = state.isCancelling // will notify if was not cancelling
                        // Materialize missing exception if it is the first exception (otherwise -- don't)
                        if (cause != null || !wasCancelling) {
                            val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
                            state.addExceptionLocked(causeException)
                        }
                        // take cause for notification if was not in cancelling state before
                        state.rootCause.takeIf { !wasCancelling }
                    }
                    notifyRootCause?.let { notifyCancelling(state.list, it) }
                    return COMPLETING_ALREADY
                }
                is Incomplete -> {
                    // Not yet finishing -- try to make it cancelling
                    val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
                    if (state.isActive) {
                        // active state becomes cancelling
                        if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
                    } else {
                        // non active state starts completing
                        val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
                        when {
                            finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
                            finalState === COMPLETING_RETRY -> return@loopOnState
                            else -> return finalState
                        }
                    }
                }
                else -> return TOO_LATE_TO_CANCEL // already complete
            }
        }
    }

假如协程运行执行tryMakeCancelling

private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
        assert { state !is Finishing } // only for non-finishing states
        assert { state.isActive } // only for active states
        // get state's list or else promote to list to correctly operate on child lists
        val list = getOrPromoteCancellingList(state) ?: return false
        // Create cancelling state (with rootCause!)
        val cancelling = Finishing(list, false, rootCause)
        if (!_state.compareAndSet(state, cancelling)) return false
        // Notify listeners
        notifyCancelling(list, rootCause)
        return true
    }

state.compareAndSet进行状态机切换随后执行notifyCancelling

private fun notifyCancelling(list: NodeList, cause: Throwable) {
        // first cancel our own children
        onCancelling(cause)
        list.close(LIST_CANCELLATION_PERMISSION)
        notifyHandlers(list, cause) { it.onCancelling }
        // then cancel parent
        cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
    }
private fun notifyCancelling(list: NodeList, cause: Throwable) {
        // first cancel our own children
        onCancelling(cause)
        list.close(LIST_CANCELLATION_PERMISSION)
        notifyHandlers(list, cause) { it.onCancelling }
        // then cancel parent
        cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
    }
    
 private inline fun notifyHandlers(list: NodeList, cause: Throwable?, predicate: (JobNode) -> Boolean) {
        var exception: Throwable? = null
        list.forEach { node ->
            if (node is JobNode && predicate(node)) {
                try {
                    node.invoke(cause)
                } catch (ex: Throwable) {
                    exception?.apply { addSuppressed(ex) } ?: run {
                        exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
                    }
                }
            }
        }
        exception?.let { handleOnCompletionException(it) }
    

node.invoke(cause)实现

private class InvokeOnCancelling(
    private val handler: CompletionHandler
) : JobNode()  {
    // delegate handler shall be invoked at most once, so here is an additional flag
    private val _invoked = atomic(false)
    override val onCancelling get() = true
    override fun invoke(cause: Throwable?) {
        if (_invoked.compareAndSet(expect = false, update = true)) handler.invoke(cause)
    }
}
private fun cancelParent(cause: Throwable): Boolean {
        // Is scoped coroutine -- don't propagate, will be rethrown
        if (isScopedCoroutine) return true

        /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
         * This allow parent to cancel its children (normally) without being cancelled itself, unless
         * child crashes and produce some other exception during its completion.
         */
        val isCancellation = cause is CancellationException
        val parent = parentHandle
        // No parent -- ignore CE, report other exceptions.
        if (parent === null || parent === NonDisposableHandle) {
            return isCancellation
        }

        // Notify parent but don't forget to check cancellation
        return parent.childCancelled(cause) || isCancellation
    }

将自己状态切换取消随后notifyHandlers通过遍历list通知自己children执行cancel最后通过cancelParent告知Job自己的分支cancel完毕

总结

  1. 所有协程运行CoroutineScope这种限定通过launchasyncrunBlock构建协程函数都是作为CoroutineScope扩展函数实现
  2. CoroutineScope创建过程中必定构建一个顶层Job(后者外部传入),通过coroutineContext与其关联
  3. 每个launch响应构建一个JobJob加入到Joblist由此维护了Job
  4. Structure Concurrency 具体实现 通过 维护 Job 的生命周期 完成


http://www.niftyadmin.cn/n/5796740.html

相关文章

【Linux系统编程】:信号(2)——信号的产生

1.前言 我们会讲解五种信号产生的方式: 通过终端按键产生信号&#xff0c;比如键盘上的CtrlC。kill命令。本质上是调用kill()调用函数接口产生信号硬件异常产生信号软件条件产生信号 前两种在前一篇文章中做了介绍&#xff0c;本文介绍下面三种. 2. 调用函数产生信号 2.1 k…

HarmonyOS NEXT 技术实践-基于意图框架服务实现智能分发

在智能设备的交互中&#xff0c;如何准确理解并及时响应用户需求&#xff0c;成为提升用户体验的关键。HarmonyOS Next 的意图框架服务&#xff08;Intents Kit&#xff09;为这一目标提供了强大的技术支持。本文将通过一个项目实现的示例&#xff0c;展示如何使用意图框架服务…

《开启微服务之旅:Spring Boot 从入门到实践》(三)

自动配置原理 配置文件到底能写什么&#xff1f;怎么写&#xff1f;自动配置原理&#xff1b; https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/reference/htmlsingle/#common-application-properties 自动配置原理 SpringBoot启动的时候加载主配置类&#xff0c;开启…

【Matlab】绘制混淆矩阵示意图+colormap调整方法

主代码 %https://blog.csdn.net/weixin_42943114/article/details/81811556 %https://blog.csdn.net/Mark711/article/details/141144280 clc clear close all warning off %% 原始数据 % 假设groundTruth和predictions是已经定义好的向量 TrueLabels [1 2 1 3 2 3 1 3 2 1 4…

【落羽的落羽 C语言篇】数据存储简介

文章目录 一、整型提升1. 概念2. 规则 二、大小端字节序1. 概念2. 练习练习1练习2 三、浮点数在内存中的存储1. 规则2. 练习 一、整型提升 1. 概念 C语言中&#xff0c;整型算术运算至少是以“缺省整型类型”&#xff08;int&#xff09;的精度来进行的。为了达到这个精度&am…

Android笔试面试题AI答之Android基础(3)

文章目录 1.谈一谈 Android 的安全机制一、系统架构层面的安全设计二、核心安全机制三、其他安全机制与措施 2.Android 的四大组件是哪四大&#xff1f;3.Android 的四大组件都需要在清单文件中注册吗&#xff1f;4.介绍几个常用的Linux命令一、文件和目录管理二、用户和权限管…

GESP202309 二级【小杨的 X 字矩阵】题解(AC)

》》》点我查看「视频」详解》》》 [GESP202309 二级] 小杨的 X 字矩阵 题目描述 小杨想要构造一个 的 X 字矩阵&#xff08; 为奇数&#xff09;&#xff0c;这个矩阵的两条对角线都是半角加号 &#xff0c;其余都是半角减号 - 。例如&#xff0c;一个 5 5 5 \times 5 5…

C++的封装(十四):《设计模式》这本书

很多C学习者学到对C语言有一定自信后&#xff0c;会去读一下《设计模式》这本书。希望能够提升自己的设计水平。 据我所知&#xff0c;围绕C语言出了很多书。因为正好赶上泡沫经济时代。大家一拥而上&#xff0c;自己半懂不懂就出书&#xff0c;抢着出书收割读者&#xff0c;出…