Rxswift (1) Functional responsive programming idea

RxSwift (II) Sequence core logic analysis

Create, subscribe, and destroy an RxSwift Observable

RxSwift (4) higher-order functions

RxSwift (v)

(6) Dispose source code analysis

RxSwift (7) RxSwift vs. SWIFT usage

RxSwift (x) Basic Usage part 1- Sequence, subscribe, destroy

RxSwift Learning 12 (Basics 3- UI Control Extensions) @TOC

Introduction of Rxswift destroyer Dispose

  1. First of all, please use a mind map to get a preliminary understanding of what Dispose owned and did:
  2. This paper mainly focuses on the above picture, focusing on the analysis of how Dispose() destroys the sequence.
  3. From the figure above we can see that dispose and disposeBag are the first root nodes after the destroyer. What are they? The answers are explained below.

Introduction to Rxswift destroyer classes and important functions

1. DisposeBag

1.1 What is a DisposeBag

RxSwift and RxCocoa also have an additional tool to assist with ARC and memory management: DisposeBag. This is a virtual “package” of observers that is discarded when their parent object is released. When an object with the DisposeBag attribute calls deinit(), the virtual package is emptied and each Disposable Observer automatically unsubscribs to what it observes. This allows ARC to reclaim memory as usual. If there is no DisposeBag, there are two outcomes: either the Observer generates a retain cycle that is bound to the observed object indefinitely; Or accidentally released, causing the program to crash. So to be a good ARC person, set up Observables and add them to DisposeBag. In this way, they can be cleaned well.

When an Observable is observed and subscribed, it generates a Disposable instance that can be used to free resources. There are two ways to release resources in RxSwift, namely unbind and free space, respectively explicit and implicit:

  • Explicit release allows us to call the release method directly in our code to release resources as shown in the following example:
let dispose = textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
dispose.dispose()
Copy the code
  • Implicit release implicit release passesDisposeBagThis is similar to the automatic release pool mechanism in Objective-C ARC. When we create an instance, it is added to the automatic release pool of the thread. The automatic release pool releases and rebuilds the pool after a RunLoop cycle. The DisposeBag is like an automatic release pool for RxSwift. We add resources to the DisposeBag and release them along with the DisposeBag.

Examples are as follows:

let disposeBag = DisposeBag(a)func binding(a) {
       textField.rx_text
           .bindTo(label.rx_sayHelloObserver)
           .addDisposableTo(self.disposeBag)
}
Copy the code

In this code, the method addDisposableTo makes a weak reference to DisposeBag, so this DisposeBag will be referenced by the instance, usually as a member variable of the instance, and when the instance is destroyed, the member DisposeBag will follow. This frees resources bound by RxSwift on this instance.

From the above we can see that DisposeBag is like our automatic release pool in OC memory management. It acts as a garbage collection bag, you just add the sequence to the disposeBag, and the disposeBag will help us release resources at the right time, so how does it do that?

1.2 DisposeBag implementation source code analysis

1.2.1. Take a look at the class diagram:

1.2.2. Specifically analyze the source code process

  1. When we invoke the Disposed () method, we call the Insert () method of the Dispose class to add the disposables to the _Disposables array. The specific source code is as follows:
public final class DisposeBag: DisposeBase {
    
    private var _lock = SpinLock(a)// state
    fileprivate var _disposables = [Disposable]()
    fileprivate var _isDisposed = false
    
    /// Constructs new empty dispose bag.
    public override init() {
        super.init()}/// Adds `disposable` to be disposed when dispose bag is being deinited.
    ///
    /// - parameter disposable: Disposable to add.
    public func insert(_ disposable: Disposable) {
        self._insert(disposable)? .dispose() }private func _insert(_ disposable: Disposable) -> Disposable? {
        // In order to prevent multiple threads from preempting resources, need to lock control synchronous access
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isDisposed {// Call _dispose(), please dispose of it and return it directly
            return disposable
        }
        // Save to array
        self._disposables.append(disposable)

        return nil
    }

    /// This is internal on purpose, take a look at `CompositeDisposable` instead.
    private func dispose(a) {
        // 1. Retrieve all saved destroyers
        let oldDisposables = self._dispose()

        // 2. Run through each disposer, dispose() of each disposer to release resources
        for disposable in oldDisposables {
            disposable.dispose()
        }
    }

    private func _dispose(a)- > [Disposable] {
        self._lock.lock(); defer { self._lock.unlock() }

        // Retrieve all saved destroyers
        let disposables = self._disposables
        
        self._disposables.removeAll(keepingCapacity: false)
        self._isDisposed = true // This variable is used to record whether the garbage bag array has been emptied
        
        return disposables
    }
    
    deinit {
        Dispose () when DisposeBag's own object is disposed, dispose() is called to iterate over all the saved destroyers in the dispose array.
        self.dispose()
    }
}
Copy the code
  1. The source flow above is identified by a diagram
  2. To summarize the DisposeBag process above:
  • When we call the sequence ofdispose()Method is,DisposeBagcallinsert()Method saves the sequence we need to destroy for storage_disposablesIn the array.
  • When our DisposeBag is destroyed, such as the local variable defined out of scope, it will be destroyed. Our Deinit () method is called as shown in figure 4, and its dispose() method is executed in deinit(), and then all the previously saved variables that need to be released_disposablesArray, in turn calling their own Dispose () method.

2. FetchOr () function

  1. FetchOr (); fetchOr (); fetchOr ();
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value |= mask
    this.unlock()
    return oldValue
}
Copy the code

The source code is very simple, but the effect is not small. In the code this is the AtomicInt value passed in, with only one value inside. FetchOr returns a copy of this.value as the result. And enclosing the value and the mask do or (|) operation. And assigns the result of the or operation to this.value.

  1. To understand the result of this function, we use a table:
this.value mask oldValue Or this.value after the operation The return value
0 1 0 1 0
1 1 1 1 1
0 2 0 2 0
1 2 1 3 1

Is made a or operations, the actual decimal result remains the same, just change the inside of the binary, can be used to make marks, just inside the C language often use method, namely the value of an Int type processing itself can use, can also through the bitwise and, or, to change its logo, achieve the purpose of passing values, This allows each bit to replace a bool, often used as an enumeration.

The operator binary The decimal system instructions
0000, 0001, 1
0000, 0010, 2
Or operation 0000, 0011, 3
  1. Through the analysis of the above, I learned that fetchOr () function in the role of, can make sure that every piece of code is executed only once, is equivalent to a flag, if the initial value of 0, 1 if the incoming parameters, assuming that this code is executed repeatedly five times, only the first will be from 0 to 1, the back four times call for 1, don’t send changes.

Dispose core logic

Dispose instance code analysis

  • Children who have learned Rxswift know that dispose() will release our resources just like the reference counter in our OC. We can also listen for destroyed event callbacks at release time. Have you ever thought about how Dispose did it?

To know the answer, we can only analyze the source code step by step:

  • First, let’s look at an example code:

Example 1:

func limitObservable(a){
        // Create sequence
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            return Disposables.create { print("Destroyed and released.")} // dispose.dispose()
        }
        // Sequence subscription
        let dispose = ob.subscribe(onNext: { (anything) in
            print("Subscribed to:\(anything)")
        }, onError: { (error) in
            print("Subscribed to:\(error)")
        }, onCompleted: {
            print("Done") {})print("Destroy callback")}print("Executed")
        //dispose.dispose()
    }
Copy the code
  1. The above code executes as follows:
  2. From the above results, we know that the sequence created is not destroyed, that is, “destroyed free” is not printed, nor “Destroy callback” is printed. Why is that? This problem we later through the analysis of the source code Rx source will know.
  3. Now let’s uncomment that line of code abovedispose.dispose()This line of code, uncommented and rerun, produces the following output:
  4. As we can see from the above code, the sequence created is destroyed and the destruction callback is executed. Then why did you add itdispose.dispose()Is that enough?
  5. In addition, let’s modify our code again:

Example 2:

func limitObservable(a){
        // Create sequence
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            observer.onCompleted()
            return Disposables.create { print("Destroyed and released.")} // dispose.dispose()
        }
        // Sequence subscription
        let dispose = ob.subscribe(onNext: { (anything) in
            print("Subscribed to:\(anything)")
        }, onError: { (error) in
            print("Subscribed to:\(error)")
        }, onCompleted: {
            print("Done") {})print("Destroy callback")}print("Executed")
        //dispose.dispose()
    }
Copy the code

Example 2: Observer.oncompleted () : Observer.oncompleted ()

observer.onCompleted()

  • Let’s explore the underlying implementation of Rxswift with three questions

Dispose process source code parsing

Before analyzing Dispose source code, we must first deeply understand the creation of sequence, subscription process is the basis, only by understanding this, can we truly understand the principle of Dispose. This has actually been analyzed in a previous blog, see my previous blog: Sequence Core Logic for more details

In order to better understand, I will clarify the specific process again:

1. Sequence creation and subscription process

  • (1) When we execute the codeLet ob = Observable<Any>. Create {(observer) -> Disposable in this is A closure we call closure A}“, you actually come to line 20 of the create. swift file:
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) - >Observable<Element> {
        return AnonymousObservable(subscribe)
    }
Copy the code
  • (2) The create() function returns oneAnonymousObservable(subscribe)Object and pass our closure A into the constructor of AnonymousObservable, which saves the closure Alet _subscribeHandler: SubscribeHandlerIt’s stored in this variable._subscribeHandlerThis variable holds the closure A passed in when the sequence OB was created (where closure A is required to be passed in)AnyObserverType as argument)
final private class AnonymousObservable<Element> :Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element- > >)Disposable
    // This variable holds the closure A passed in when the sequence ob was created (where closure A requires the type AnyObserver to be passed in as an argument)
    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler // This variable holds the closure A passed in when the sequence ob was created}... The following code will not be omitted}Copy the code
  • (3) We callDispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)"}Line 39 of ObservableType+Extensions. Swift (ObservableType+Extensions. Swift)
 public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) - >Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) - >Disposable {
            let disposable: Disposable. The code will not analyze ellipsis herelet observer = AnonymousObserver<Element> < span style = "box-sizing: border-box; color: RGB (74, 74, 74);return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
Copy the code
  • (4) As you can see from the above source code, we create an AnonymousObserver in the function, and then directly return Disposables. Create ().

  • Subscribe (observer) : self.asObservable().subscribe(observer) : self.asObservable().subscribe(observer) : self.asObservable().subscribe(observer)

  • (6) To understand the line in (5), we need to understand the class integration relationship: AnonymousObservable – > Producer – > observables – > ObservableType – > ObservableConvertibleType details below:

  • (7) Through the inheritance relationship, we can follow the inheritance chain to find the parent class. We can find that the asObservable () method is defined in the Observable class:

public class Observable<Element> : ObservableType {... Omitted code that is not of interestpublic func asObservable(a) -> Observable<Element> {
        return self}... Omitted code that is not concerned}Copy the code
  • (8) Through source code analysis, I know that asObservable() returns self, and (3) calls yesself.asObservable().subscribe(observer)Self in this line of code is the sequence ob that we created, soself.asObservable()What is returned is the ob observable sequence that we originally created.self.asObservable().subscribe(observer)The observer in is where we arepublic func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)Local variables created in method implementation:Let observer = AnonymousObserver<Element>We pass this local variable to the ObservableThe subscribe ()Methods.
  • (9) Next we share Observable’sThe subscribe ()Method does something.
  • (10) When we call this line in Example 2:Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}When the actual call is madeObservableTypeThe subscribe() method of the protocol, in which we create oneAnonymousObserverObject and passself.asObservable().subscribe(observer)Ob. Susbscribe (Observer) (Pay attention toOb is the AnonymousObservable object created by create(), and observer is the temporary local AnonymousObserver created by subscribe.
  • (11) However, we can see from the above class diagram that there is no subscribe() method in ob (AnonymousObservable), so we can only look for its parent Producer first.
  • (12) According to the above class diagram analysis, we can see that Producer inherits the Observerable observable sequence and follows the ObservableType protocol (which defines a SUBSCRIBE () interface), so we must implement this interface in Producer. Let me look at the source code:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element= =Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer(a)// Call the run() method with two arguments:
            // Parameter 1: observer: the AnonymousObserver object passed in by 'self.asObservable().subscribe(observer)'
            // Parameter 2: Disposer: SinkDisposer() object will be destroyed and analyzed again.
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer(a)let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
Copy the code
  • (13) Through the above source code analysis, we know thatProducerImplementation of theThe subscribe ()Interface, called its ownrun()Method, and inrun()The observer is passed to the method: that’s usself.asObservable().subscribe(observer)The incomingAnonymousObserverObject. So let’s see what run does:
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element= =Element {
        rxAbstractMethod()
    }
Copy the code
  • (14) From the run() method in Producer above, we can see that the method is not doing anything, just one linerxAbstractMethod()RxAbstractMethod () is just an abstract method. Our subclass AnonymousObservable must have overridden the run() method. So let’s seeAnonymousObservabletherun()Source:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element= =Element {
        // Create a tube AnonymousObservableSink and pass the tube two parameters:
         // Parameter 1: observer: the AnonymousObserver object passed in by 'self.asObservable().subscribe(observer)'
            // Parameter 2: Disposer: SinkDisposer() object will be destroyed and analyzed again.
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
Copy the code
  • (15) on the toprun()Source code we can see: inAnonymousObservabletherun()Methods. First, you create oneAnonymousObservableSinkobjectsinkAnd willobserver(That is, usself.asObservable().subscribe(observer)The incomingAnonymousObserverObject) passed in; Next, callsink.run(self)The subscription method then returns a tuple directly, that is, the run() method returns a tuple:(sink: sink, subscription: subscription). But our focus is on the sink tube.AnonymousObservableSinkIs a role similar to manager, which holds sequence, subscriber, destroyer information, and scheduling capabilities. It is through this tube that our series and subscribers communicate.
  • (16) Next, let’s analyzeAnonymousObservableSinkWhat does the tube do? Let’s seeAnonymousObservableSinkSource:
final private class AnonymousObservableSink<Observer: ObserverType> :Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    Parent is the AnonymousObservable sequence
    typealias Parent = AnonymousObservable<Element>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: Observer, cancel: Cancelable) {
    // Pass observer: the AnonymousObserver object passed in by 'self.asObservable().subscribe(observer)'
        super.init(observer: observer, cancel: cancel)
    }

func on(_ event: Event<Element>){#if DEBUG
        self._synchronizationTracker.register(synchronizationErrorMessage: .default)
        defer { self._synchronizationTracker.unregister() }
    #endif
    switch event {
    case .next:
        if load(self._isStopped) == 1 {If the.error,.completed is completed, the self.forwardon (event) code is not executed, which means the.complete,.error event is executed during the object's lifetime. Unless reactivation changes the condition value.
            return
        }
        self.forwardOn(event)
    case .error, .completed:
    FetchOr () ¶ fetchOr() ¶ fetchOr() ¶
        if fetchOr(self._isStopped, 1) = =0 {// Execute once if never executed, otherwise not. To ensure that the following code executes only once in the object's life cycle, no matter how many times on() is called.
            self.forwardOn(event)
            self.dispose()
        }
    }
}

// This is an important method,
    func run(_ parent: Parent) -> Disposable {
    // We call parent the AnonymousObservable sequence that we call create() ob,_subscribeHandler which is the closure A that we pass in when we create the sequence. This argument is AnyObserver(self).
        return parent._subscribeHandler(AnyObserver(self))}}Copy the code
  • (17) Through the source code of AnonymousObservableSink above, we know the following conclusions:
    • AnonymousObservableSink. Init initialization time introduced to the observer: is usself.asObservable().subscribe(observer)The incomingAnonymousObserverObject.
    • AnonymousObservableSink has an on() method that does different things with the event parameter passed in, but is called at least onceself.forwardOn(event)Methods. Each time if onNext event is called onceforwardOn(). However, the. Error,. Completed event is called at most onceforwardOn().
    • AnonymousObservableSink’s run() method is the core method that calls back to the closure A we passed when we first created create() and creates ob.subscribe() internallyAnonymousObserverObjects sink through our AnonymousObservableSink objects, that isAnyObserver(self)In theselfOnce wrapped as an AnyObserver structure, we pass closure A as A parameter, thus linking our sequence to the subscribers.
    • ** Special note: ** Many people think that passing our closure A isAnonymousObserverIn fact, it is not; closure A is passed as an AnyObserver structure
    • With the run() method of AnonymousObservableSink we successfully passed the closure we created with our original ob.subscibe() subscriptionAnyObserver(self)When we call this line of code inside closure A:observer.onNext("kongyulu")After ob.subscribe(),AnyObserver(self)This is our observer, and the observer is a structure that owns our tubeAnonymousObservableSinkObject’s on() method.
    • In example 1: when we sendobserver.onNext("kongyulu")When you sequence messages, they actually pass through our pipeAnonymousObservableSink.on()To schedule, and finally schedule the closure we subscribe to: onNext()Closure B:Dispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}.
    • So now the biggest question is ** :AnonymousObservableSink.on()How to fromobserver.onNext("kongyulu")Dispatch to our closure B? 六四屠杀
  • To analyze the above problem, we need to analyze the structure firstAnyObserver(self)What you did: First look at AnyObsevrer’s source code
public struct AnyObserver<Element> : ObserverType {
    /// Anonymous event handler type.
    
    public typealias EventHandler = (Event<Element- > >)Void
    // The alias EventHandler is defined as a closure for an incoming event
    private let observer: EventHandler

    /// Construct an instance whose `on(event)` calls `eventHandler(event)`
    ///
    /// - parameter eventHandler: Event handler that observes sequences events.
    
    public init(eventHandler: @escaping EventHandler) {
    Self. Observer saves the AnonymousObservableSink object
        self.observer = eventHandler
    }
    
    /// Construct an instance whose `on(event)` calls `observer.on(event)`
    ///
    /// - parameter observer: Observer that receives sequence events.
    // This is self in AnyObserver(self) code, which is the AnonymousObservableSink object,
    public init<Observer: ObserverType> (_ observer: Observer) where Observer.Element= =Element {
    / / this code directly save the AnonymousObservableSink. On () method
    //self.observer is actually an on() method
        self.observer = observer.on
    }
    
    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
    / / here call on practical approach is to call AnonymousObservableSink. On (event) method
        return self.observer(event)
    }

    /// Erases type of observer and returns canonical observer.
    ///
    /// - returns: type erased observer.
    public func asObserver(a) -> AnyObserver<Element> {
        return self}}Copy the code
  • (19) Through the above AnyObserver source code analysis, we know that AnyObserver initialization saves our pipe AnonymousObservableSink on() method, and has its own on method, In his own on methods to call AnonymousObservableSink. On () method. This is just a wrapper around our AnonymousObservableSink class. Why is that? This design has several advantages:

    • It’s completely encapsulated, so the outside world doesn’t need to know about our tubesAnonymousObservableSinkClass, they don’t care about usAnonymousObservableSinkThe user only needs to use the interface on(). It doesn’t matter how on() is implemented or by whom.
    • Acts as a decoupling effect,AnyObserverDoesn’t own usAnonymousObservableSinkObject, it just owns itAnonymousObservableSinkThe on() interface of theAnonymousObservableSinkThat’s all you need to do to implement the on() interface. As for theAnonymousObservableSinkInternal changes (as long as the ON () interface is not changed) will not affectAnyObserver
  • (20) Now we focus on the on() method:

    • When weExample 1To perform:observer.onNext("kongyulu")This line of code actually calls:AnyObserver.onNext()Methods. Since we AnyObserver inherit the ObserverType protocol, we have itObserverTypetheOnNext ()Method, you can go back to class inheritance if you don’t know.
  • Anyobserver.onnext () calls its own on() method: anyobserver.onnext ()

Interface definition for ObserverType

extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))AnyObserver inherits ObserverType and overrides the ON () interface
    }
    public func onCompleted(a) {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}
Copy the code
  • (22)AnyObserver.on()Method will callAnonymousObservableSink.on()Methods.
  • (23)AnonymousObservableSink.on(event)Will be calledAnonymousObservableSink.forwardOn(event)
  • (24) It is not defined in AnonymousObservableSinkForwardOn ()Method, which we find implemented in its parent class SinkForwardOn ()The source code is as follows:
class Sink<Observer: ObserverType> : Disposable {
    fileprivate let _observer: Observer
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: Observer, cancel: Cancelable) {#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
// initializable saves self._observer as an 'AnonymousObserver' object passed to us by 'self.asObservable().subscribe(observer)'
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<Observer.Element>){#if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        // The 'anonymousobServer.on ()' method is actually called here.
        self._observer.on(event) } ... This time the code omitted, do not need to worry about}Copy the code
  • (25) From the above source we can see:Sink. ForwardOn ()Actually calledAnonymousObserver.on()To put it bluntly: we started with instance 1observer.onNext("kongyulu")Ob.onnext () is called first when this line of code executesAnyObserver.on().AnyObserver.on()Will call againAnonymousObservableSink.on().AnonymousObservableSink.on()Will call againAnonymousObservableSink.forwardOn()And thenAnonymousObservableSink.forwardOn()The AnonymousObservableSink parent will be called againSink. ForwardOn ()And finally theSink. ForwardOn ()Call theAnonymousObserver.on().
  • (26) Now that we’re pretty clear, let’s go back toAnonymousObserver.on()Method definition:
  1. First we looked at the class definition as follows, and did not find it by the on() method:
final class AnonymousObserver<Element> :ObserverBase<Element> {
// We gave the trailing closure an alias
    typealias EventHandler = (Event<Element- > >)Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
// There is an incoming trailing closure: Subscribe () let Observer = AnonymousObserver
      
        {event in here is a tag closure B} we pass _eventHandler to save tag closure B
      
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
    // Here we call back our trailing closure B
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}
Copy the code
  1. So let’s look for its parent class ObserverBase:
class ObserverBase<Element> : Disposable.ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)// Call the subclass onCore()
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) = =0 {
                self.onCore(event)
            }
        }
    }

    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }

    func dispose(a) {
        fetchOr(self._isStopped, 1)}}Copy the code
  1. We know by analyzing the parent source codeObserverBase.on()And finally calledAnonymousObserver.onCore(), and inAnonymousObserver.onCore()The _eventHandler(event) closure B is the trailing closure that created AnonymousObserver when we subscribed to the ob.subscribe() sequence in the first place, so that the trailing closure ends up calling the onNext() method we subscribed to. In example 1, executeobserver.onNext("kongyulu")This line of code is going to call backDispose = ob.subscribe(onNext: {(anything) in print(" subscribe to :\(anything)")}So it prints“Subscribe to: Kongyulu”

The code for the trailing closure B of AnonymousObserver {B} is as follows:

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) - >Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) - >Disposable{... This time the irrelevant code is omitted// Special note: AnonymousObserver
      
        {B} 
       
         {B} 
         
         
           {B} 
           
           
             {B} 
             
             
               {B} 
               
               
                 {B} 
                 
                 
                   {B} 
                   
                   
                     {B
                   
                  
                 
                
               
              
             
            
           
          
         
        
       
      
            let observer = AnonymousObserver<Element> { event in. This time the irrelevant code is omittedswitch event {
                case .next(letvalue): onNext? (value)// call onNext(value)
                    
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
Copy the code
  • (27) Through the analysis of point (26), we should understand the whole subscription process, which can be summarized as follows:
  1. Our ob. Create (The closure of ASave closure A inAnonymousObservableIn the variable_subscribeHandler.
  2. When we call ob.subscribe(Closure BWhen subscribing to a sequence, one is created firstAnonymousObserverObject, and will take a trailingClosure C. Then throughself.asObservable().subscribe(AnonymousObserver)Through a series of transformationsAnyObserverPassed to theThe closure of A.
  3. In 2 of them, a series of transformations can be simply explained as:
  • Self.asobservable ().subscribe(AnonymousObserver) is ob.subscribe(AnonymousObserver)

  • Ob. Subscribe (AnonymousObserver) is actually Producer. Subscribe (AnonymousObserver)

  • Subscribe (AnonymousObserver) calls self.run(AnonymousObserver)

  • Self. run(AnonymousObserver) creates an AnonymousObservableSink tube object sink, and then calls sink.run(AnonymousObservable) which calls the tube’s run() method, Ob was transmitted to the pipe sink.

  • In the sink. Run (AnonymousObservable) method of our pipe, parent-_subscribeHandler (AnyObserver(self)) is actually ob._subscribeHandler(AnyObserve) R (AnonymousObservableSink) calls closure A

  • And our closure is A need to pass in A parameter AnyObserver (AnonymousObservableSink), AnyObserver is actually A structure, it preserved the AnonymousObservableSink. On () method.

  • OnNext (” kongYulu “) is actually anyObserver.onNext (” KongYulu “) when we call observer.onNext(“kongyulu”) in closure A, Anyobserver.onnext (“kongyulu”) will call anyObserver.on ()

  • AnyObserver. On () then call AnonymousObservableSink. On the event (event) here

  • AnonymousObservableSink class AnonymousObservableSink. On (event) Then go to call its own forwardOn (event) is AnonymousObservableSink. ForwardOn (event)

  • AnonymousObservableSink. ForwardOn (event) is, in fact, call its superclass Sink. ForwardOn (event) in the Sink when the parent class initialization has preserved the _observer AnonymousObserver object.

  • Sink.forwardOn(event) calls anonymousobServer. on(event)

  • Anonymousobserver. on(event) actually calls its parent’s ObserverBase. On (event)

  • Observerbase.on (event) actually calls the subclass anonyMousobServer.oncore (event)

  • Anonymousobserver.oncore (Event) calls self._eventhandler (event), where _eventHandler saves the following closure C passed in when AnonymousObserver was created and calls back to closure C

  • Closure C calls back closure B according to the different events of the event. For example, event=. OnNext will call back closure B onNext{}, that is, let dispose = ob.subscribe(onNext: {(anything) in print(” subscribed :\(anything)”)}, onError: {(error) in print(” subscribed :\(anything)”)}, onCompleted: {print(” done “)}) {print(” destroy callback “)} : onNext: {(anything) in print(” subscribe :\(anything)”)

  • (28)

  • (29)

Finally, there is a flow chart to illustrate the whole creation and subscription process

2. Sequence creation, subscription diagram

3. Sequential subscription process

3.1 Sequence destruction mode

Above explained the sequence creation, subscription process, in the analysis of the sequence creation, subscription sequence source, we have vaguely seen our initial analysis of Dispose (), it seems that there are code everywhere in the entire source code, so how is the sequence destroyed?

To solve this problem, we will explore the sequence destruction process by analyzing the source code.

Here is a sequence life cycle sequence diagram:

  • Method 1: Release sequence resources by sending events that automatically end the sequence life cycle. Once an error or completed event is emitted, the life cycle of a sequence ends and all internal resources are released without manual release. (This conclusion was verified in this blog when we discussed instances 1 and 2. OnComplete is called and a “destroyed” message is printed whenever completed and error events are sent.)

  • Method 2: Dispose () is called to release. For example, if you need to pre-dispose a sequence resource or unsubscribe, you can call dispose on the returned Disposable.

  • Method 3: Recycle resources through garbage bag DisposeBag to achieve automatic release, which is officially recommended. The official recommendation for managing the life cycle of a subscription is to add resources to a global DisposeBag that follows the life cycle of the page, and when the page is destroyed, the DisposeBag is also destroyed and the resources in the DisposeBag are released. (This conclusion is also confirmed in the DisposeBag analysis above)

3.2 Sequence destruction instance analysis

Let’s start by reviewing example 1, the code for example 1 that this blog began analyzing:

func limitObservable(a){
        // Create sequence
        let ob = Observable<Any>.create { (observer) -> Disposable in
            observer.onNext("kongyulu")
            return Disposables.create { print("Destroyed and released.")} // dispose.dispose()
        }
        // Sequence subscription
        let dispose = ob.subscribe(onNext: { (anything) in
            print("Subscribed to:\(anything)")
        }, onError: { (error) in
            print("Subscribed to:\(error)")
        }, onCompleted: {
            print("Done") {})print("Destroy callback")}print("Executed")
        //dispose.dispose()
    }
Copy the code
  1. The above code executes as follows:
  2. From the above results, we know that the sequence created is not destroyed, that is, “destroyed free” is not printed, nor “Destroy callback” is printed. Why is that? This problem we later through the analysis of the source code Rx source will know.
  3. Now let’s uncomment that line of code abovedispose.dispose()This line of code, uncommented and rerun, produces the following output:
3.3 Sequence destruction source code analysis
  1. From the code in example 1 above, you can first see that the sequence is being createdObservable<Any>.create()Method has a trailing closure and needs to return an implementationDisposableAn instance of a protocol. And that is throughReturn Disposables. Create {print(" Disposables ")}This line of code returns. From this we confirmDisposables. Create {print(" Destroy release ")}It’s very important. Let’s analyze it firstDisposables.createThe source code.
  2. Enter Disposables. Create () source: We want to just click On It and find Disposables is an empty structure
public struct Disposables {
    private init() {}}Copy the code

Since this structure is private even the initialization method cannot be inherited, we infer that Disposables. Create () must be implemented by extension. So we’ll search for extension Disposables in the project, and you’ll find the following:

extension Disposables {

    /// Constructs a new disposable with the given action used for disposal.
    ///
    /// - parameter dispose: Disposal action which will be run upon calling `dispose`.
    public static func create(with dispose: @escaping (a) -> Void) - >Cancelable {
        return AnonymousDisposable(disposeAction: dispose)Dispose is the tag closure we passed in}}Copy the code

Return AnonymousDisposable(disposeAction: Dispose dispose() closes, which is Disposables in example 1. Create {print(” dispose released “)} // dispose. Dispose ()} {print(” destroy free “)} here we give it an alias: closure D

  1. Don’t think about it. We’re definitely going inAnonymousDisposableClass implementation to explore:
fileprivate final class AnonymousDisposable : DisposeBase.Cancelable {
    public typealias DisposeAction= () - >Void

    private let _isDisposed = AtomicInt(0)
    private var _disposeAction: DisposeAction?

    /// - returns: Was resource disposed.
    public var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }

    fileprivate init(_ disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()}// Non-deprecated version of the constructor, used by `Disposables.create(with:)`
    fileprivate init(disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()}/// Calls the disposal action if and only if the current instance hasn't been disposed yet.
    ///
    /// After invoking disposal action, disposal action will be dereferenced.
    fileprivate func dispose(a) {
        if fetchOr(self._isDisposed, 1) = =0 {
            if let action = self._disposeAction {
                self._disposeAction = nil
                action()
            }
        }
    }
}
Copy the code
  1. Analyzing the above AnonymousDisposable class definition source code, we can draw the following conclusions:
  • Initialization saves the closure passed in from the outside world, which is what we analyzed in point 2Closure D:{print(" destroy free ")}
  • There is adispose()By means offetchOr(self._isDisposed, 1) == 0This line of code controlsdispose()The contents are executed only once. (no matterdispose()How many times the method is executed,if let action = self._disposeAction { self._disposeAction = nil action() }This code will be executed at most once.
  • dispose()I’m going to do it firstself._disposeActionAssign a value to a temporary variableaction, and then emptyself._disposeActionAnd then to performaction(). The reason for doing this is if_disposeActionClosures are a time-consuming operation and can be guaranteed_disposeActionCapable of immediate release.
  1. AnonymousDisposable we only saw some routine save operations, combined with our experience in the creation process of analyzing sequences at the beginning (AnonymousDisposable is similar to AnonymousObservable), We can infer that the core code implementation must be in the subscription area.

  2. Next, we’ll dive into the observable.subscribe() method to explore some of the subscribe() source code implementation.

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) - >Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) - >Disposable {
    //1. Define the disposable local variable here
    let disposable: Disposable
    //2. Create a Disposables object
    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    }
    else {
        disposable = Disposables.create()
    }
    //3. Create an AnonymousObserver object with an important trailing closure
    let observer = AnonymousObserver<Element> { event in
        switch event {
        case .next(letvalue): onNext? (value)case .error(let error):
            if let onError = onError {
                onError(error)
            }
            else {
                Hooks.defaultErrorHandler(callStack, error)
            }
            disposable.dispose() // If an error event is received, the resource will be reclaimed
        case .completed:
            onCompleted?()
            disposable.dispose() // When a COMPLETED event is received, the resources are reclaimed}}return Disposables.create(
        self.asObservable().subscribe(observer),
        disposable// We pass the local variable we created to self.asObservable().subscribe, which is our producer.subscribe)}Copy the code

Analysis of the above subscribe () source, combined with the beginning of the analysis, we can draw the following conclusions:

  • The subscribe ()Created aDisposableObject and holds the destruction callback closure, which calls the message back out when the destruction is performed.
  • Executed when an error or completion event is receiveddisposable.dispose()Release resources.
  • return Disposables.create( self.asObservable().subscribe(observer), disposable )That’s returned hereDisposableObject is what we call outside manuallydispose.dispose()methodsdisposeObject, or added to the globalDisposeBagThe destroyers of.
  1. From the analysis of 6, we clearly know the last line of codereturn Disposables.create( self.asObservable().subscribe(observer), disposable )Key points, let’s enter:Disposables. The create ()Source:
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
    return BinaryDisposable(disposable1, disposable2)// Returns a binary destroyer object.
}
Copy the code

So in the code above we see that Create () directly returns a Binary disposableDisposable1, Disposable2 to BinaryDisposable.

  • Here,disposable1isself.asObservable().subscribe(observer)That isProducer.. subscribe(observer)Returns the disposer
  • disposable2We subscribe() to create a local variablelet disposable: Disposable
  1. And then let’s analyze itBinaryDisposableWhat a class really is:
private final class BinaryDisposable : DisposeBase.Cancelable {

    private let _isDisposed = AtomicInt(0)

    // state
    private var _disposable1: Disposable?
    private var _disposable2: Disposable?

    /// - returns: Was resource disposed.
    var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)}init(_ disposable1: Disposable._ disposable2: Disposable) {
        self._disposable1 = disposable1
        self._disposable2 = disposable2
        super.init()}func dispose(a) {
        if fetchOr(self._isDisposed, 1) = =0 {
            self._disposable1? .dispose()self._disposable2? .dispose()self._disposable1 = nil
            self._disposable2 = nil}}}Copy the code