What is the Flow

Kotlin’s Coroutines are for handling asynchronous tasks, and flows are for handling asynchronous data flows

Very simple sentence, I always feel almost meaning. Go to the Kotlin website to see the code

fun simple(a): Flow<Int> = flow { // flow builder
    for (i in 1.3.) {
        delay(100) 
        emit(i)
    }
}
fun main(a) = runBlocking<Unit> {
    // There is a code that loops to determine whether the main thread is blocked or not
    simple().collect { value -> println(value) } 
}
Copy the code

Main, runBlocking blocks the code, opens a coroutine, calls simple(), and hits collect.

So what exactly is Flow? First of all, Flow is an interface

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}
Copy the code

Review out. Out represents generic covariant, and only generic types can be returned as output. If the simple() method returns Flow

, then only Int can be returned

Flow is cold

The code does not run until it is collected. Collect… So let’s get rid of that

fun simple(a): Flow<Int> = flow { 
    println("start")
    for (i in 1.3.) {
        delay(100) 
        emit(i)
    }
}
fun main(a) = runBlocking<Unit> {
    simple()
}
Copy the code

Start will not be printed. The code will not run until collect.

Principle of implementation

Enter the flow method

public fun <T> flow(block: suspend FlowCollector<T>. () - >Unit): Flow<T> = SafeFlow(block)

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}
Copy the code

The flow method, passing in a suspended FlowCollector object and returning a Flow interface of type. The instance object is SafeFlow and the block is passed in as a parameter. SafeFlow is a class.

So the simple() method just gets a SafeFlow class and does nothing else.

Take a look at simple().collect

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) - >Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })
Copy the code

Action is a suspend method, println(value)

In lines 2-4, Object overwrites the EMIT method with a new object and then calls the object’s Collect method, also known as the SafeFlow method.

public final override suspend fun collect(collector: FlowCollector<T>) {
    val safeCollector = SafeCollector(collector, coroutineContext)
    try {
        collectSafely(safeCollector)
    } finally {
        safeCollector.releaseIntercepted()
    }
}
Copy the code

The collector in the first line is the new object just object

In line 2 it is wrapped as SafeCollector, which also inherits from the FlowCollector class

The fourth line calls the collectSafely(..) methods

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}
Copy the code

Collector.block () what is this? Look at the input parameter. The input argument is a block(), which is a method that belongs to FlowCollector

.(), so you can call the block directly. You can see this when you write the simple() method

This is the FlowCollector and the instance object is SafeCollector, so the emit is a SafeCollector emit.

 override suspend fun emit(value: T) {
     return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->  / / 2
         try {
             emit(uCont, value)   / / 4
         } catch (e: Throwable) {
             lastEmissionContext = DownstreamExceptionElement(e)
             throw e
         }
     }
 }
 private fun emit(uCont: Continuation<Unit>, value: T): Any? {
     val currentContext = uCont.context
     currentContext.ensureActive()
     val previousContext = lastEmissionContext
     if(previousContext ! == currentContext) { checkContext(currentContext, previousContext, value) } completion = uContreturn emitFun(collector asFlowCollector<Any? >, value,this as Continuation<Unit>)}Copy the code

The second line is thrown into the coroutine because of its name

In line 4, the emit directly calls the private emit method, and the value is passed in. Importantly, in the last line, emitFun, the first argument is passed in by the constructor, which is the simple().collect{} parentheses at the beginning. All into this emitFun method.

private valemitFun = FlowCollector<Any? >::emitasFunction3<FlowCollector<Any? >, Any? , Continuation<Unit>, Any? >Copy the code

The two colons are Kotlin’s reflection, and opening his bytecode reveals the familiar Invoke.

Summary:

Flow is cold because the EMIT method block is executed only when collect is executed

The implementation uses reflection to throw the emit value into the collect method

Flow operator

Transform, you can transform an int to some other type like a String and send it out

(1.4.).asFlow().transform {
    emit("this ${it}")
}.collect {
    println(it)
}
Copy the code

map

You turn one event into another event through a function. Here we turn an int into a String through the flow function

(1.. 5).asFlow().map{
    flow {
        emit("$it: Second")
    }
}.collect {
    it.collect { println(it) }
}
Copy the code

FlowOn, flow is executed in the I/O thread

(1.4.).asFlow()
    .flowOn(Dispatchers.IO)
    .collect {
        println("take ${it}")}Copy the code

Combine combines the two emit, the latest emit

Zip compression, will be two flow content, one – to – one emission

Cache catch, catch error, fire again

OnStart started…

OnCompletion end…

There are a lot of operators out there. No more examples.

SharedFlow Shared flow

It’s an interface in itself. Flow is an interface. Inheritance relationship StateFlow: SharedFlow: Flow

The code is collected before collect. That is, before collect, the emit sent will be collected according to the configuration and policy.

val mSharedFlow = MutableSharedFlow<String>(
    0.// The amount that needs to be passed back to the new collect when the new collect is collected
    0.// How many more data buffers can be cached by the Flow with replay
    onBufferOverflow  = BufferOverflow.SUSPEND      // Overflow policy
)
Copy the code

The first parameter: indicates the number of subscriptions repassed to the new observer when a new subscription is available

Second parameter: indicates how many buffers the Flow can cache if the first parameter is added

Third parameter: indicates the overflow policy. BufferOverflow has three constants: SUSPEND, DROP_OLDEST dismisses old data, and DROP_LATEST dismisses new data

How to use

fun main(a):Unit = runBlocking{
    // Define a SharedFlow object
    val mSharedFlow = MutableSharedFlow<String>(
        4.// repass the number
        10.// Buffer capacity
        onBufferOverflow  = BufferOverflow.SUSPEND      // Overflow policy
    )
    launch {
        mSharedFlow.collect {
            println(it)
        }
    }
    launch {
        repeat(10){
            mSharedFlow.emit("abc:${it}")}}}Copy the code

Objects are defined and two launches are used to start two coroutines, one for collect and the other for repeated launch.

You’ll find that the program keeps running and never ends.

Principle of implementation

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
     // Check parameters
    val bufferCapacity0 = replay + extraBufferCapacity
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
Copy the code

BufferCapacity0 = Number of retransmissions + buffer capacity

private class SharedFlowImpl<T>(
    private val replay: Int.private val bufferCapacity: Int.private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    /* Logical structure of the buffer buffered values /-----------------------\ replayCache queued emitters / -- -- -- -- -- -- -- -- -- -- \ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- \ + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + | | 1 | 2 | 3 | 4  | 5 | 6 | E | E | E | E | E | E | | | | +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ head == minOf(minCollectorIndex, replayIndex) // by definition totalSize == bufferSize + queueSize // by definition //... * /
         private varbuffer: Array<Any? >? =null
}
Copy the code

Big Kotlin has already drawn the data structure, with two definitions, the header always == the smallest index or the smallest retransmitted index. Total length == Buffer capacity + queue capacity. There is also a buffer object, and Array is an Array, specifically an Array of Java objects.

emit

override suspend fun emit(value: T) {
    if (tryEmit(value)) return
    emitSuspend(value)
}

override fun tryEmit(value: T): Boolean {
    var resumes: Array<Continuation<Unit>? > = EMPTY_RESUMESval emitted = synchronized(this) {     / / 8
        if (tryEmitLocked(value)) {
            resumes = findSlotsToResumeLocked(resumes)
            true
        } else {
            false}}for (cont inresumes) cont? .resume(Unit)
    return emitted
}

private fun tryEmitLocked(value: T): Boolean {
    if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)    / / 21
    if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
        when (onBufferOverflow) {
            BufferOverflow.SUSPEND -> return false  
            BufferOverflow.DROP_LATEST -> return true  
            BufferOverflow.DROP_OLDEST -> {}  
        }
    }
    enqueueLocked(value)
    bufferSize++ 
    if (bufferSize > bufferCapacity) dropOldestLocked()
    if (replaySize > replay) { 
        updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
    }
    return true
}
Copy the code

Emit looks at key points along the way

Line 8 synchronized

Line 21, if nCollectors = = 0, tryEmitNoCollectorsLocked attempts to launch without locking the collector, if only one collector, can try to launch directly, if more than one collector, cache, buffer overrun strategy judgment

According to the first code, only one collector, perform tryEmitNoCollectorsLocked method, if multiple cache policy execution

private fun tryEmitNoCollectorsLocked(value: T): Boolean {
    if (replay == 0) return true
    enqueueLocked(value) 
    bufferSize++ 
    if (bufferSize > replay) dropOldestLocked()
    minCollectorIndex = head + bufferSize 
    return true
}

private fun enqueueLocked(item: Any?). {
    val curSize = totalSize
    val buffer = when (val curBuffer = buffer) {
        null -> growBuffer(null.0.2)
        else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
    }
    buffer.setBufferAt(head + curSize, item)
}
private fun growBuffer(curBuffer: Array<Any? >? , curSize:Int, newSize: Int): Array<Any? > {valnewBuffer = arrayOfNulls<Any? >(newSize).also { buffer = it }if (curBuffer == null) return newBuffer
    val head = head
    for (i in 0 until curSize) {
        newBuffer.setBufferAt(head + i, curBuffer.getBufferAt(head + i))
    }
    return newBuffer
}
Copy the code

Return if replay == 0

Otherwise go to enqueueLocked, the key growBuffer method, the buffer growth method, which basically changes the array length and moves the data into a new array. The size assigned is a power of two.

I’ve basically gone through one case of insertion up here.

Back to tryEmitLocked, if nCollectors is not 0, a caching policy is executed. Bufferoverflow.suspend returns false. Back to tryEmit (..) methods

override fun tryEmit(value: T): Boolean {
    var resumes: Array<Continuation<Unit>? > = EMPTY_RESUMESval emitted = synchronized(this) {
        if (tryEmitLocked(value)) {
            resumes = findSlotsToResumeLocked(resumes)
            true
        } else {
            false}}for (cont inresumes) cont? .resume(Unit)
    return emitted
}
override suspend fun emit(value: T) {
    if (tryEmit(value)) return // fast-path
    emitSuspend(value)
}
Copy the code

All the way false returns to emitSuspend(value), at which point synchronized has been unlocked.

    private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
        var resumes: Array<Continuation<Unit>? > = EMPTY_RESUMESval emitter = synchronized(this) lock@{
            if (tryEmitLocked(value)) {
                cont.resume(Unit)
                resumes = findSlotsToResumeLocked(resumes)
                return@lock null
            }
            // add suspended emitter to the buffer
            Emitter(this, head + totalSize, value, cont).also {
                enqueueLocked(it)
                queueSize++ 
                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes) } } emitter? .let { cont.disposeOnCancellation(it) }for (r inresumes) r? .resume(Unit)}Copy the code

We’re entering a coroutine. Another lock, another tryEmitLocked attempt. Success returns directly. If false still fails, an Emitter object is initialized, and the coroutine executes, bringing emmiter inside. Look inside again.

internal open class CancellableContinuationImpl<in T>(a)... {private val_state = atomic<Any? >(Active) }Copy the code

Seeing atomic, it must be a CAS operation. That is, tryEmitLocked attempts the CAS operation, suspends it, and then executes.

At this point we’re back to tryEmitLocked, and we’re basically done with the firing process.

Collect

mSharedFlow.collect {    / / 1
    println(it)
}

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) - >Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

override suspend fun collect(collector: FlowCollector<T>) {
    val slot = allocateSlot()          / / 11
    try {
        if (collector is SubscribedFlowCollector) collector.onSubscription()
        val collectorJob = currentCoroutineContext()[Job]
        while (true) {         / / 15
            var newValue: Any?
            while (true) {     / / 17
                newValue = tryTakeValue(slot) 
                if(newValue ! == NO_VALUE)breakawaitValue(slot) } collectorJob? .ensureActive() collector.emit(newValueas T)
        }
    } finally {
        freeSlot(slot)
    }
}

private fun tryTakeValue(slot: SharedFlowSlot): Any? {
    var resumes: Array<Continuation<Unit>? > = EMPTY_RESUMESval value = synchronized(this) {
        val index = tryPeekLocked(slot)
        if (index < 0) {
            NO_VALUE
        } else {
            val oldIndex = slot.index
            val newValue = getPeekedValueLockedAt(index)
            slot.index = index + 1 
            resumes = updateCollectorIndexLocked(oldIndex)
            newValue
        }
    }
    for (resume inresumes) resume? .resume(Unit)
    return value
}
Copy the code

Collect in line 1 is still an extension method and will be wrapped by FlowCollector and thrown into the concrete collect implementation in line 10.

Line 11… It won’t stick… Msharedflow. collect{} msharedflow. collect{} msharedflow. collect{}

“While (true)”, “collect”, “collect”, “collect”, “collect”, “collect”, “collect”, “collect”, “collect”, “collect”

TryTakeValue tries to pick up a value, and another synchronized. TryPeekLocked () gets an index, and then getPeekedValueLockedAt gets the value based on that index. After fetching the value, point to the next index.

UpdateCollectorIndexLocked this method is more complex, not continue to see, see how the name position, cache update the index. Deleting the first value in an array, for example, requires a lot of tag bits.

So let’s go back to line 17 while true, break if the new value is not NO_VALUE, otherwise awaitValue, so it’s going to look at the index, and it’s going to evaluate by the index, and it’s going to loop over the value.

The loop outside line 15 is the loop that emits the emit value. Collect FlowCollector

collect FlowCollector

collect FlowCollector

}.


A quick summary:

1. SharedFlow collect will never be completed

Emit and Collect both have synchronized, producer-consumer mode

3. If oldValue == newValue is not shaken, it will still be launched

4. You can configure replay sticky events. If the value is 0, there are no sticky events

5. Essentially an array structure

StateFlow state flow

How to use

fun main(a){
    runBlocking {
        val stateFlow = MutableStateFlow("aaa")
        launch {
            stateFlow.collect {
                println("launch1:${it}")
            }
        }
        launch {
            stateFlow.emit("bbb")}}}Copy the code

To define a MutableStateFlow, we must initialize it with an initial value and always have a state. A collect, an emit

It doesn’t look too different from SharedFlow, except StateFlow can only have one data.

Principle of implementation

The StateFlow structure is not very complicated because of its inheritance, its own capabilities, and other limitations.

private class StateFlowImpl<T>(
    initialState: Any  
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialState)  

    public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) { updateState(null, value ? : NULL) }override suspend fun emit(value: T) {    / / 10
        this.value = value
    }
    
    private fun updateState(expectedState: Any? , newState:Any: Boolean {
        / /..
        synchronized(this) {         / / 16
             / /..
            if (oldState= =newState) return true 
            _state.value = newState
            / /..
        }
        / /..
    }

    override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        try {
            / /..
            while (true) {
                / /..
                val newState = _state.value
                / /..collectorJob? .ensureActive()/ / 25
                // Conflate value emissions using equality
                if (oldState == null|| oldState ! = newState) { collector.emit(NULL.unbox(newState)) oldState = newState }/ /..}}finally {  / /..}}}Copy the code

Line 10 emits, which then fires the updateState method

Line 16 is synchronized again, if oldState == newState the old value equals the new value.

Line 25 is again while true to make sure the coroutine is still active and fires the new value

A quick summary:

StateFlow Collect will never be completed

2, save is synchronized, a write, can have multiple read

If oldValue == newValue, the state will not be changed

4. Default sticky event, when a new collect, will get the latest value

It is essentially an atomic generic and must have an initial value

Other methods

SharedFlow and StateFlow inherit Flow, and you can use most of the Flow extension methods, such as Map, onStart, and onCompletion.

When flowOn() is used, you will find that only Flow is used. As a guess, Flow is actually executed by reflection, while the other two are put into coroutines. If anyone knows, ask for advice.

If you want to switch threads, you can configure Dispatchers for coroutines directly

MutableSharedFlow and MutableStateFlow can be read-only objects such as.assharedFlow () and.asStateflow () to prevent external use.

conclusion

SharedFlow and StateFlow collect will never be completed

SharedFlow is not shaken, but StateFlow is

Synchronized exists in both SharedFlow read and write, and synchronized exists in StateFlow write

SharedFlow can be configured to be sticky, while StateFlow is sticky

SharedFlow does not allow emit NULL, StateFlow does allow NULL.

Flow is not aware of the life cycle and can be solved through coroutines.

Reference documentation

Kotlin website

Do not follow suit party, LiveData, StateFlow, SharedFlow usage scenario comparison

about

If you find this article helpful, please give it a thumbs up.

If there is a mistake in the content, I will change it in time.