引言
本文分析示例代码如下:
launch(Dispatchers.Main) { flow { emit(1) emit(2) }.collect { delay(1000) withContext(Dispatchers.IO) { Log.d("liduo", "$it") } Log.d("liduo", "$it") } }
一.Flow的创建
在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码入下:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:
public interface FlowCollector<in T> { // 可挂起,非线程安全 public suspend fun emit(value: T) }
调用flow方法,会返回一个Flow接口指向的对象,代码如下:
public interface Flow<out T> { @InternalCoroutinesApi public suspend fun collect(collector: FlowCollector<T>) }
这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。
二.Flow的消费
在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。
1.SafeFlow类
根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。
2.AbstractFlow类
AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:
@FlowPreview public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> { // 核心方法 @InternalCoroutinesApi public final override suspend fun collect(collector: FlowCollector<T>) { // 创建SafeCollector对象,对collector进行包裹 val safeCollector = SafeCollector(collector, coroutineContext) try { // 调用collectSafely方法 collectSafely(safeCollector) } finally { // 释放拦截的续体 safeCollector.releaseIntercepted() } } public abstract suspend fun collectSafely(collector: FlowCollector<T>) }
collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。
3. SafeCollector类
当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。
SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:
internal actual class SafeCollector<T> actual constructor( @JvmField internal actual val collector: FlowCollector<T>, @JvmField internal actual val collectContext: CoroutineContext ) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame { ... // 保存上下文中元素数量,用于检查上下文是否变化 @JvmField internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count 1 } // 保存上一次的上下文 private var lastEmissionContext: CoroutineContext? = null // 执行结束后的续体 private var completion: Continuation<Unit>? = null // 协程上下文 override val context: CoroutineContext get() = completion?.context ?: EmptyCoroutineContext // 挂起的核心方法 override fun invokeSuspend(result: Result<Any?>): Any? { result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } completion?.resumeWith(result as Result<Unit>) return COROUTINE_SUSPENDED } // 释放拦截的续体 public actual override fun releaseIntercepted() { super.releaseIntercepted() } // 发射数据 override suspend fun emit(value: T) { // 获取当前suspend方法续体 return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> try { // 调用重载的方法 emit(uCont, value) } catch (e: Throwable) { // 出现异常时,将异常封装成上下文,保存到lastEmissionContext lastEmissionContext = DownstreamExceptionElement(e) // 抛出异常 throw e } } } // 重载的emit方法 private fun emit(uCont: Continuation<Unit>, value: T): Any? { // 从续体中获取上下文 val currentContext = uCont.context // 保证当前协程的Job是active的 currentContext.ensureActive() // 获取上次的上下文 val previousContext = lastEmissionContext // 如果前后上下文发生变化 if (previousContext !== currentContext) { // 检查上下文是否发生异常 checkContext(currentContext, previousContext, value) } // 保存续体 completion = uCont // 调用emitFun方法,传入collector,value,continuation return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>) } // 检查上下文变化,防止并发 private fun checkContext( currentContext: CoroutineContext, previousContext: CoroutineContext?, value: T ) { // 如果上次执行过程中发生了异常 if (previousContext is DownstreamExceptionElement) { // 抛出异常 exceptionTransparencyViolated(previousContext, value) } // 检查上下文是否发生变化,如果变化,则抛出异常 checkContext(currentContext) lastEmissionContext = currentContext } // 用于抛出异常 private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) { error(""" Flow exception transparency is violated: Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected. Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead. For a more detailed explanation, please refer to Flow documentation. """.trimIndent()) } }
emit方法最终会调用emitFun方法方法,代码如下:
private val emitFun = FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:
@Nullable public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) { InlineMarker.mark(0); // 核心执行 Object var10000 = p1.emit(p2, continuation); InlineMarker.mark(2); InlineMarker.mark(1); return var10000; }
可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。
而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) })
调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。
通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。
4.消费过程中的挂起
如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:
override suspend fun emit(value: T) { // 获取当前suspend方法续体 return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> try { // 调用重载的方法 emit(uCont, value) } catch (e: Throwable) { // 出现异常时,将异常封装成上下文,保存到lastEmissionContext lastEmissionContext = DownstreamExceptionElement(e) // 抛出异常 throw e } } }
当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:
emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:
在Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:
override fun invokeSuspend(result: Result<Any?>): Any? { // 尝试获取异常 result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } // 如果没有异常,则恢复flow方法续体的执行 completion?.resumeWith(result as Result<Unit>) // 返回挂起标识,这里挂起的是消费过程 return COROUTINE_SUSPENDED }
在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:
以上就是Kotlin协程之Flow基础原理示例解析的详细内容,更多关于Kotlin协程Flow原理的资料请关注Devmax其它相关文章!