Rxswift (1) Functional responsive programming idea

RxSwift (II) Sequence core logic analysis

Create, subscribe, and destroy an RxSwift Observable

RxSwift (4) higher-order functions

RxSwift (v)

(6) Dispose source code analysis

RxSwift (7) RxSwift vs. SWIFT usage

RxSwift (x) Basic Usage part 1- Sequence, subscribe, destroy

RxSwift Learning 12 (Basics 3- UI Control Extensions) @TOC

RxSwift sequence core logic

The idea of functional responsive programming is simply to analyze the core logic of the sequence. This blog is mainly for the last article to do a more in-depth discussion, if there are those local analysis is wrong, please leave a message: QQ:282889543, let us improve each other, each other achievement.

In general, the core logic of Rxswift analysis follows a trilogy: create sequence, subscribe sequence, destroy sequence. The central idea is that everything is sequential.

1. Sequence creation

Observable Observable Observable sequences

Let’s look at the class inheritance involved in creating an Observable:

Based on the above class diagram, a simple analysis of the relationship and design idea of the following classes is made: First, the hierarchical implementation is very thorough, each layer only solves one thing, and the structure is very clear when stacked layer by layer: AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType

Next, let’s briefly break down what each class does:

  • ObservableConvertibleType: As the name implies, it can be converted toObservableType protocol, only one methodasObservableWhat’s the good of that?
  1. The user does not need to care what type of object it is
  2. Let users pay more attention to its core functions
  • ObservableType: is a agreement, inherited ObservableConvertibleType agreementasObservableSubscribe, which provides the abstract method subscribe, so you can only observe an object if you subscribe to it.
  • ObservableA real class, which can be called a metaclass, has complete Observable functions for users, because it already has all the functions required by users, although some methods are not implemented and are still abstract.Producer: It inheritsObservableSubscribe method and implement subscribe method
  • AnonymousObservable: It inheritsProducer, and added attributeslet _subscribeHandler: SubscribeHandlerIt holds the closure passed in when creating the sequence, and has the ability to call the sequence. It also implements the run method, which is the core method for creating the sequence. In the run() method it creates oneAnonymousObservableSinkSink can be called a tube. It is similar to the role of manager and has sequence, subscription and destruction capabilities. Here are two puzzles:

Question 1. Why is AnonymousObservableSink defined as a final private class that cannot be inherited or accessed externally? Question 2. How does a created Observable relate to a subscription?

We will discuss these two questions later.

Finally, we summarize the design idea:

In fact, all observables that users use are subclasses of Producer and parallel subclasses of AnonymousObservable, but they don’t need to care about their implementation. There is a related class, AnonymousObservableSink, Sink, pipe, and all of this comes together to make it really work. AnonymousObservableSink also has sequence, subscription functions, similar to the manager role we used in our project. The whole design describes its features in a composite protocol way up and hides implementation details in a subclassing way down, similar to the factory pattern. Such classes can also be called class clusters.

The flow of sequence creation

Through the above class inheritance relationship, it is not difficult to understand the sequence creation process, it is indeed only a relatively simple few, a few lines of code to solve the difficulty is the above several questions:

Let’s take a look at the direct flow and relationships of sequence creation, subscription, and destruction through a simple Rxswift example.

Example 1:

    //1. Create sequence
   let ob = Observable<Any>.create { (obserber) -> Disposable in
            // 3: sends signals
            obserber.onNext("Kyl sent a signal.")
            obserber.onCompleted()
            return Disposables.create()
        }
    
        // 2: subscribe signal
        let _ = ob.subscribe(onNext: { (text) in
            print("Subscribe to:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("Complete") {})print("Destroyed")}Copy the code

The code in example 1 above can be clearly expressed with a diagram from cool TEACHER C:

From the code and diagram above, we might be wondering:

Question 3: Ob.subscribe (). Why did we call obserber.onNext(“kyl sends A signal “) in the obserber.onNext(” kyL sends A signal “) in the following closure at ob creation? Let _ = ob.subscribe(onNext: {(text) in print(” subscribe to :\(text)”)} let _ = ob.subscribe(onNext: {(text) in print(” subscribe to :\(text)”)} If ob sends a message, the onNext closure of subcribe() will trigger it.

The ob.subscribe() method must do something to call closure A somewhere in order to do this. How does it work? Below we will analyze the source code to answer this question.

Let ob = Observable

. Create {(obserber) -> this line does a lot of things.

Let’s start with a flow chart to get a glimpse of the sequence creation process:

Return AnonymousObservable(SUBSCRIBE). We didn’t find the answer we were looking for, and we got a little dizzy.

  • AnonymousObservable class source
final private class AnonymousObservable<Element> :Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element- > >)Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E= =Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
Copy the code

Let’s just take a deep breath, relax, and let’s try to figure out the other direction, not just one way. Let’s take a look at the subscription process.

2. To subscribe to

Review the subscription code in example 1 above: let _ = ob.subscribe(onNext: {(text) in what does this line do? Here we go through the source code to in-depth analysis:

  • Rxswift subscriptionsubscribe()The source code is as follows:
  public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) - >Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) - >Disposable{... The above code is not the focus of our analysis... Indicates that a section of the source code has been ignored/* The AnonymousObserver () constructor is passed a trailing closure, eventHandler, which is triggered when a different event is received. We 'let _ = ob.subscribe(onNext: {(text) in' this method is passed to the closure */
            let observer = AnonymousObserver<E> { event in.switch event {
                case .next(letvalue): onNext? (value)Ob.subscribe (onNext: closure) passed in when calling the subscription
                case .error(let error):
                    if let onError = onError {
                        onError(error)Ob.subscribe (onError: closure) is passed in when a subscription is called
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()Ob.subscribe (onCompleted: closure) passed in when the subscription is called
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )/* This returns to the Disposables object, to release the Disposables, and in its constructor calls self.asObservable().subscribe(observer), The asObservable() is the sequence ob that we created, ob.subscribe(), and passes in the local variable let Observer = AnonymousObserver
      
       , */
      
    }
Copy the code

Through the above source code we can know: Subscribe (), which is passed as arguments to the onNext() closure, onError() closure, onComplete() closure, creates an AnonymousObserver object observer, which is created with a closure, The three closures onNext, onError, and onComplete that we subscribe() pass in are called when we receive different events. The most important one is return Disposables. Create (self.asObservable().subscribe(observer), disposable) calls our real subscribe() function, The AnonymousObserver object is passed in as a parameter. Self.asobservable () is the ob sequence created by our create() function, and here we can see clearly that we pass in the parameter closure when we subscribe to the ob chain.

Why is self.asObservable() the ob returned by our create() function?

To answer this question, I need to review the Observable inheritance analyzed above: Observables – > ObservableType – > ObservableConvertibleType observables inherit ObservableType agreement, ObservableType inherit ObservableConvertibleType agreement again, and our ObservableConvertibleType asObservable provides an abstract method (), We implement the asObservable() method in our Observable class, which returns self directly.

The following is verified by the source code:

///
/// It represents a push style sequence.
public class Observable<Element> : ObservableType {...public func asObservable(a) -> Observable<E> {
        return self}... }Copy the code

After analyzing the source code of Rxswift subscribe(), we find that there is a chain relationship between the closure passed in when we create OB and the closure we subscribe to, that is, as long as OB sends a message, our subscribers must be able to receive the message according to this chain. But we still don’t know how it’s called, how it’s triggered.

And we notice that self.asObservable().subscribe(observer) that AnonymousObservable calls subscribe(), We don’t find a definition of SUBSCRIBE () in the AnonymousObservable class, so we need to look at its parent Producer

  • The source code for Producer is as follows:
class Producer<Element> : Observable<Element> {
    override init() {
        super.init()}override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E= =Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer(a)/* The run() call is passed to an observer, and the sink tube is created. The sink tube has the sequence function to call the on() method. * /
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer(a)let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E= =Element {
        rxAbstractMethod()
    }
}

Copy the code

As expected, we find the definition of subscribe () in Producer. From there, we can summarize several clear clues:

  • (1) It can be known from the previous class inheritance relationship that isProducerTo achieve theObservableTypeThe subscribe () method of the protocol. It’s called in this methodself.run(observer, cancel: disposer)
  • (2) Self.run () is actually anonymousobservable.run (), which does three things:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E= =Element {
//1. Create a sink pipe object and create the observer as create()
// The closure passed into the sequence is passed to sink
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        Sink calls its own run () method and passes AnonymousObservable as an argument.
        let subscription = sink.run(self)
        // Return a tuple containing sink channel information.
        return (sink: sink, subscription: subscription)
    }
Copy the code
  • (3) The run() method of AnonymousObservableSink class calls parent-_subscribeHandler (AnyObserver(self)), where parent is the self passed by sink. Run (self) in (2). The AnonymousObservable object; And we already know that _subscribeHandler is the closure passed in as the create() function argument that is saved when the sequence is created: Let ob = Observable

    . Create {(obserber) -> Disposable in // 3: obserber.onnext (“kyl “) Obserber.oncompleted () return Disposables.create()} It’s now clear that parent._SubscribeHandler (AnyObserver(self)) executes the closure and this line calls obserber.onNext(” KYL sent the signal “).

  • Now we can summarize the flow of our code through a flow chart:

We understand the logic from the subscription sequence to the call to the parameter closure passed in when we call create(), but it is not clear how the closure sends onNext() to the onNext() closure of the subscription message. So we need to analyze the AnonymousObserver

Let’s start with the AnonymousObserver class

  • AnonymousObserverThe source code definition is as follows:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    
    typealias EventHandler = (Event<Element- > >)Void
    
    private let _eventHandler : EventHandler
    
    /* Constructor that holds the EventHandler trailing closure */
    init(_ eventHandler: @escaping EventHandler) {#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._eventHandler = eventHandler
    }

    // Override the onCore method and call the EventHandler closure
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}

Copy the code

The onNext() method is not found in AnonymousObserver source code, so we can only look up its inheritance chain. Here we need to know the class inheritance relationship:

  • AnonymousObserver:

By analyzing the inheritance relationship of the class, we know that:

AnonymousObserver object on () method will call onCore () method, ObserverType onNext, onError, onComplete method. But how and when is on() called?

To solve this problem, we need to go back to the code where we created the sequence:

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) - >Observable<E> {
        return AnonymousObservable(subscribe)
    }
Copy the code

The create() method that creates the sequence passes a SUBSCRIBE closure and returns an AnonymousObservable object. The SUBSCRIBE closure is the closure we pass in as a parameter when we create our sequence. And the closure is saved when AnonymousObservable is initialized self._subscribeHandler = subscribeHandler AnonymousObservable has a run() method, Run method create AnonymousObservableSink object sink.

final private class AnonymousObservable<Element> :Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element- > >)Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E= =Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
Copy the code

After all this analysis and going around the circle, the key is found in the AnonymousObservableSink tube object. Sink this is a magic tube. So it stores the sequence, it stores the subscription, it stores the disposition so you have create sequence, subscribe sequence, destroy sequence at the same time.

AnonymousObservableSink AnonymousObservableSink

final private class AnonymousObservableSink<O: ObserverType> :Sink<O>, ObserverType {
    typealias E = O.E
    // Parent is AnonymousObservable
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

// The constructor passes in an observer sequence and Cancelable
    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

// This implements the on() method of the ObserverType protocol
    func on(_ event: Event<E>){#if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            Calling the parent's publication, self.forwardon () calls its own on() method
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) = =0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
    /* Calls the _subscribeHandler closure that we passed in when we created the sequence. Parent is the sequence that's passed in, where self is passed in the closure of the sequence and is forced to be AnyObserver and self is passed to the closure _subscribeHandler so that _subscribeHandler has the ability to subcribe. * /
        return parent._subscribeHandler(AnyObserver(self))}}Copy the code

The source code of the Sink class is as follows:

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: O, cancel: Cancelable) {#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>){#if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        // Call the passed observer.on() method,
        self._observer.on(event)
    }

    final func forwarder(a) -> SinkForward<O> {
        return SinkForward(forward: self)}final var disposed: Bool {
        return isFlagSet(self._disposed, 1)}func dispose(a) {
        fetchOr(self._disposed, 1)
        self._cancel.dispose()
    }

    deinit{#if TRACE_RESOURCES
       _ =  Resources.decrementTotal()
#endif
    }
}
Copy the code

From source code analysis we know:

  • Our sink saves our sequence. When we call ob.onNext() to send a signal, because our sink already has ob, it will call on() and call self.forwardon (event) in on(). Instead, inside fowardOn() calls self._observer.on(event). Sink calls the on() method.

  • Here we summarize the general process again:

  1. When you create a sequencecreate()Returns aobOb is a sequence that is created with a closure passed inA. Called in closure Aob.onNext()A signal was sent.
  2. Called when subscribing to a sequenceob.subscribe()Method, which creates oneAnonymousObserverObject and calledself.asObservable().subscribe(observer).
  3. self.asObservable()It’s actually oursobOb calls SUBSCRIBE (). AnonymousObserver does not find subscribe().
  4. We are inAnonymousObserverSubscribe () is found in the parent class of subscribe()AnonymousObserver run ()Methods.
  5. In the run() method of AnonymousObserver, a tube sink is created and calledsink.run(self)Sink is the object of AnonymousObservableSink, while in sink’s run() methodparent._subscribeHandler(AnyObserver(self))The closure A (parent is AnonymousObserver) that was saved when the sequence was created was called, which explains why the closure A was called back when the subscription was made.
  6. As for how to call onNext() method, it is also implemented by sink.
  7. Sink already holds OB. When we call OB.onNext () in the A closure to send A signal, it will actually be called through sink.on(). First sink.on() calls the forwardOn().
  8. In the forwardOn() call self._observer.on(event).
  9. _observer.on () will call _observer.oncore ()
  10. _observer.oncore (event) will call onNext(),onError(), and onComplete() based on the type of event. This _observer.onnext () calls the closure subscribe(onNext:) that we pass in when we subscribe.
  11. The reasons for the pullback are:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) - >Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) - >Disposable{... The above code is not the focus of our analysis... Indicates that a section of the source code has been ignored/* The AnonymousObserver () constructor is passed a trailing closure, eventHandler, which is triggered when a different event is received. We 'let _ = ob.subscribe(onNext: {(text) in' this method is passed to the closure */
            let observer = AnonymousObserver<E> { event in.switch event {
                case .next(letvalue): onNext? (value)// passed in when the subscription is called
Copy the code

When we call ob.subscribe() here, we create AnonymousObserver and bind it to our SUBSCRIBE () onNext() closure, Anonymousobserver.onnext () must call back the onNext() closure that subscribe() passed in. Let Observer = AnonymousObserver

  • Again, the simplest way to explain it is with this picture:

3. The destruction of

RxSwift shows us the design thinking

Common iOS design patterns