In the last article, we understand the core concepts and core logic of RxSwift, interested readers can understand: novice read understand RxSwift source code analysis (zero) – core concepts and core logic this article, THE author will work with you to learn the next topic: RxSwift event generation and transmission. We illustrate this by using one of the most commonly used operators: create.

Observable<Int>. Create {observer in // subscribe closure observer.onnext (1) observer.onnext (2) observer.onnext (3) Observable<Int>. OnCompleted () return Disposables. Create ()}. Subscribe {value in // Next:\(value))} onError: {error in // Listener closure 2 print(" error :\(error)")} onCompleted: {// Listener closure 3 print("Cpmpleted")} onDisposed: {// Listen closure print("Disposed")}. Disposed (by: disposeBag)Copy the code

For ease of understanding, we call the first closure a subscription closure and the following closures a listener closure.

First, the occurrence of events

In the code above, we call create, passing in a subscription closure. The closure takes an AnyObserver parameter observer, which is a struct that follows the ObserverType protocol. In the subscription closure closure, we call the Observer’s onNext and onCompleted methods, resulting in a sequence of three next events and a completion event. So when is this closure called?

Let’s take a look at the create implementation:

extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        AnonymousObservable(subscribe)
    }
}
Copy the code

Create takes a subscription closure and returns an AnonymousObservable. We still don’t see when the subscription closure was called, so it’s not clear when the sequence of events was generated. Then we’ll just have to move on and watch the event unfold.

Two, the monitoring of events

As we saw in the last article, we can listen for events by calling the Subscribe method of ObservableType. Now let’s see what this subscribe method does.

From the above analysis, we know that we created an AnonymousObservable with create, which is exactly an ObservableType. We then call its subscribe method, which is not defined by the protocol:

func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
Copy the code

It is a method provided in the protocol extension:

    public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
Copy the code

This method basically creates an AnonymousObserver and subscribes to the AnonymousObserver via self.asObservable().subscribe(observer). Since self is an AnonymousObservable, and it inherits from Observable, the asObservable() actually returns self.

So we have an idea of what the second step of the call chain does: it creates an AnonymousObserver and passes it to the Subscribe method of the AnonymousObservable created in the first step, via the incoming subscription closure. So we’re going to look at what this Subscrib method does.

To find out, we need to access the source code for the AnonymousObservable. It inherits from the Producer class, which in turn inherits from Observable:

AnonymousObservable => Producer => Observable

In order to understand what AnonymousObservable is, we need to briefly introduce its parent Producer.

Producer

Producer is a subclass of Observabled, which is an abstract class. From its name, we can probably guess that it is the class responsible for producing the sequence of events.

class Producer<Element>: Observable<Element> { override init() { super.init() } override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { if ! CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed. let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } else { return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } } } func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { rxAbstractMethod() } }Copy the code

Producer inherits Observable and rewrites the Subscribe method. We’ll focus on these lines:

    let disposer = SinkDisposer()
    let sinkAndSubscription = self.run(observer, cancel: disposer)
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
Copy the code

You first create a SinkDisposer object, call your own run method, pass the observer parameters of the SinkDisposer and subscribe method to the Run method, and the return values of this SinkDisposer and Run method are mainly for resource release purposes, It is not the focus of this article. By calling subscribe, we call the run method of AnonymousObservable. The run method is the starting point of the core logic. So let’s dive into the source code of AnonymousObservable.

AnonymousObservable

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self.subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
Copy the code

First we see that AnonymousObservable holds the subscription closure created in step 1 create.

Second, AnonymousObservable, as a subclass of Producer, overrides the run method. In the run method, an AnonymousObservableSink is created and the Run method of AnonymousObservableSink is called.

Here is another concept, called Sink. In fact, this Sink is also an important concept in RxSwift. It mainly serves as a bridge connecting Observable and Observer. Each Observable type connects to its Observer through a Sink and transmits events.

AnonymousObservableSink

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
  
    ......

    func run(_ parent: Parent) -> Disposable {
        parent.subscribeHandler(AnyObserver(self))
    }
}
Copy the code

We see that the run method does two main things. 1. Create an AnyObserver using self as the argument. 2. The Parent’s AnonymousObservable subscribeHandler is called. Now we know when the subscription closure created in step 1 is called:

In AnonymousObservable. The subscribe method by invoking the self. The run, the final call AnonymousObservableSink. The run method, in the run method calls the subscription closure, and in this subscription closure, we produced a series of events.

In fact, most other Observables in RxSwift follow similar processes, which will be analyzed in the following articles.

The next question is, the event is generated and is being listened on, so how is the event passed to the listener closure we provided in step 2? Observables and observers are connected by sinks, meaning events are transmitted by sinks, so let’s look at this Sink.

Sink

class Sink<Observer: ObserverType>: Disposable {
    fileprivate let observer: Observer
    fileprivate let cancel: Cancelable
    private let disposed = AtomicInt(0)

    #if DEBUG
        private let synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: Observer, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self.observer = observer
        self.cancel = cancel
    }

    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self.disposed, 1) {
            return
        }
        self.observer.on(event)
    }

    final func forwarder() -> SinkForward<Observer> {
        SinkForward(forward: self)
    }

    final var isDisposed: Bool {
        isFlagSet(self.disposed, 1)
    }

    func dispose() {
        fetchOr(self.disposed, 1)
        self.cancel.dispose()
    }

    deinit {
#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}

Copy the code

Sink holds an Observer and the forwardOn is a final method that can be called ina different subclass. Let’s look again at the AnonymousObservableSink generated in the preceding code

AnonymousObservableSink

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType { typealias Element = Observer.Element typealias Parent = AnonymousObservable<Element> // state private let  isStopped = AtomicInt(0) #if DEBUG private let synchronizationTracker = SynchronizationTracker() #endif override init(observer: Observer, cancel: Cancelable) { super.init(observer: observer, cancel: cancel) } func on(_ event: Event<Element>) { #if DEBUG self.synchronizationTracker.register(synchronizationErrorMessage: .default) defer { self.synchronizationTracker.unregister() } #endif switch event { case .next: if load(self.isStopped) == 1 { return } self.forwardOn(event) case .error, .completed: if fetchOr(self.isStopped, 1) == 0 { self.forwardOn(event) self.dispose() } } } func run(_ parent: Parent) -> Disposable { parent.subscribeHandler(AnyObserver(self)) } }Copy the code

As we have seen before, the Run method of AnonymousObservableSink uses self as an argument to create an AnyObserver and calls the subscription closure, passing the AnyObserver to the subscription closure. So when observer.onNext and observer.onComplete are called in a subscription closure, the AnyObserver on method is actually called. How is this on method passed to the listener closure?

AnyObserver source code for this:

public struct AnyObserver<Element> : ObserverType { /// Anonymous event handler type. public typealias EventHandler = (Event<Element>) -> Void private let observer: EventHandler /// Construct an instance whose `on(event)` calls `eventHandler(event)` /// /// - parameter eventHandler: Event handler that observes sequences events. public init(eventHandler: @escaping EventHandler) { self.observer = eventHandler } /// Construct an instance whose `on(event)` calls `observer.on(event)` /// /// - parameter observer: Observer that receives sequence events. public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element { self.observer = observer.on } /// Send `event` to this observer. /// /// -  parameter event: Event instance. public func on(_ event: Event<Element>) { self.observer(event) } /// Erases type of observer and returns canonical observer. /// /// - returns: type erased observer. public func asObserver() -> AnyObserver<Element> { self } }Copy the code

We see that AnyObserver(self) (note: Self (AnonymousObservableSink) actually passes the argument self.on to the Observer closure of AnyObserver, which is called directly in the on method of AnyObserver. The event is then passed to the on method of AnonymousObservableSink. The on method of AnonymousObservableSink calls the forwardOn method, which in turn calls the on method of the observer of AnonymousObservableSink:

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    ...
    func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self.isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self.isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    ...
}

class Sink<Observer: ObserverType>: Disposable {
    ...
    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self.disposed, 1) {
            return
        }
        self.observer.on(event)
    }
    ...
}

Copy the code

This self. Observer is the AnonymousObserver generated in the SUBSCRIBE method, whose on method passes events to the listener closure.

Third, summary

At this point, we have a thorough understanding of how events are generated, transmitted, and listened to. 1. Observables. Create: create AnonymousObservable 2. Observables. Subscribe (onNext: onError: onCompleted: onDisposed:) : Create AnonymousObserver and call AnonymousObservable. Subscribe (_ the observer:) 3. AnonymousObservable. Subscribe (_ the observer:) : Call the Producer. The subscribe (_ the observer:) 4. Producer. The subscribe (_ the observer:) : AnonymousObservable. Run 5. AnonymousObservable. Run: Create AnonymousObservableSink AnonymousObservableSink. The observer = AnonymousObserver, And call the AnonymousObservableSink. Run 6. AnonymousObservableSink. Run: Create AnyObserver, call AnonymousObserver subscribeHandler, incoming AnyObserver 7. AnonymousObserver. SubscribeHandler: Call AnyObserver. On 8. AnyObserver. On: call self. The observer, namely AnonymousObservableSink. On 9. AnonymousObservableSink. On: Call Sink. ForwardOn 10. Sink. ForwardOn: Call AnonymousObservableSink. The observer. On that AnonymousObserver. On 11. AnonymousObserver. On: call to monitor the closure

Code word is not easy, if there are mistakes, welcome to correct. If it helps, please give it a thumbs up. ^ _ ^