preface

The last article combs the entire RxJava/RxSwift definition, subscription, event generation -> consumption process and source code analysis, this article is mainly to talk about the RxSwift subscribeOn and observeOn source code implementation through the previous summary of ideas, if you have not read the previous suggestions first move to read. (PS: This article Rx source code based on RxSwift 5.1.1). If you have seen the previous summary of the RxJava version may have the same content.

process

Based on the Observable. Create parsed above, we add subscribeOn and observeOn operators. A flow chart is attached with the code, which is mainly to supplement the code data flow analyzed above and the subscribeOn and observeOn we talk about this time.

Observable<String>.create { (ob) -> Disposable in
            print("Event generation thread:\(Thread.current.description)")
            ob.onNext("I'm ")
            ob.onCompleted()
            return Disposables.create()
        }
        .subscribeOn(ConcurrentDispatchQueueScheduler.init(qos: .userInteractive))
        .observeOn(MainScheduler.instance)
        .subscribe(onNext: { (str) in
            print("Event consuming thread:\(Thread.current.description)")
            print(str)
        }, onError: nil, onCompleted: nil, onDisposed: nil)
Copy the code

As you can see from the previous code parsing, RxSwift is inThe subscription period relies on an implementation of the abstract method SUBSCRIBE. Classes that inherit from Producer typically implement the Run method. Event delivery relies on the implementation of methods such as onNext/onError. Therefore, the following article will also summarize the implementation of subscribeOn and observeOn based on these two aspects.

subscribeOn

The outer layer calls the subscribeOn method and creates a subscribeOn object, which acts as the source of subscribeOn.

// SubscribeOn.swift
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        return SubscribeOn(source: self, scheduler: scheduler)
}
.

final private class SubscribeOn<Ob: ObservableType> :Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob.scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    .
Copy the code

Source:

  • SubscribeOn.swift

run

Since SubscribeOn inherits from Producer class, we can directly look at its run method here.

  • Create the SubscribeOnSink object that decorates the incoming Observer (the upper-level observer observer). The Run method that SubscribeOnSink calls at the same time.
  • In SubscribeOnSink. Run method, the self. The parent. The scheduler. The schedule is the trigger code of thread scheduling. Here, parent is the SubscribeOn object passed in when it is created, and source is the source (upper level) of SubscribeOn.
  • The content of the closure is to execute the code after the thread switch, which in this case calls the subscription logic at the next level. That is, the subscription continues to move up after the thread is switched.
// SubscribeOn.swift
override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
}
Copy the code
// SubscribeOn.swift
final private class SubscribeOnSink<Ob: ObservableType.Observer: ObserverType> :Sink<Observer>, ObserverType where Ob.Element= =Observer.Element {
    typealias Element = Observer.Element 
    typealias Parent = SubscribeOn<Ob>
    .
    
    func run(a) -> Disposable {
        let disposeEverything = SerialDisposable(a)let cancelSchedule = SingleAssignmentDisposable()
        
        disposeEverything.disposable = cancelSchedule
        
        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

        cancelSchedule.setDisposable(disposeSchedule)
    
        return disposeEverything
    }
Copy the code

Source:

  • SubscribeOn.swift

onNext

According to the flow chart at the beginning, the event flow is generated and transmitted down through onNext, which is the onNext method that SubscribeOnSink SubscribeOnSink. As mentioned earlier, most of the func on(_ event: event) methods in RxSwift override the ObserverType protocol.

The on method calls the forwardOn of Sink’s parent class. This calls only the on method of the member variable _observer (the observer passed in to the run method), but not the thread.

// SubscribeOn.swift - class SubscribeOnSink
func on(_ event: Event<Element>) {
        self.forwardOn(event)
        
        if event.isStopEvent {
            self.dispose()
        }
}

// Sink.swift - class Sink
final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
}
Copy the code

Source:

  • SubscribeOn.swift
  • Sink.swift

observeOn

Outer call observeOn method, a new observeOn/ObserveOnSerialDispatchQueue object, with itself as the observeOn/ObserveOnSerialDispatchQueue source. Here will determine the scheduler types, such as MainScheduler is SerialDispatchQueueScheduler class inheritance.

What will see ObserveOn and ObserveOnSerialDispatchQueue respectively, the difference lies mainly in the events such as onNext thread safety problem.

// ObserveOn.swift
public func observeOn(_ scheduler: ImmediateSchedulerType) -> Observable<Element> {
            if let scheduler = scheduler as? SerialDispatchQueueScheduler {
                return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
            }
            else {
                return ObserveOn(source: self.asObservable(), scheduler: scheduler)
            }
}
Copy the code
// ObserveOn.swift
final private class ObserveOnSerialDispatchQueue<Element> :Producer<Element> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
        self.scheduler = scheduler
        self.source = source
        .
    }
    .
Copy the code
// ObserveOn.swift
final private class ObserveOn<Element> :Producer<Element> {
    let scheduler: ImmediateSchedulerType
    let source: Observable<Element>

    init(source: Observable<Element>, scheduler: ImmediateSchedulerType) {
        self.scheduler = scheduler
        self.source = source
        .
    }
    .
Copy the code

Source:

  • ObserveOn.swift

run

Due to ObserveOn/ObserveOnSerialDispatchQueue inherited from Producer, here we look at its run method can directly.

  • In the observeon. run method, create the wrapper class ObserveOnSink, call source (the source passed when ObserveOn was created) subscribe to the ObserveOnSink object, pass the subscription up.
  • ObserveOnSerialDispatchQueue. Run method, create a wrapper class ObserveOnSerialDispatchQueueSink, Call the source (ObserveOnSerialDispatchQueue created the incoming source) subscribe to ObserveOnSerialDispatchQueueSink object as a parameter, will subscribe to pass up.

Because the subscription part does not involve thread scheduling, the code is similar on both sides.

// ObserveOn.swift
final private class ObserveOn<Element> :Producer<Element> {
    let scheduler: ImmediateSchedulerType
    let source: Observable<Element>
    .
    override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Element {
        let sink = ObserveOnSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
Copy the code
// ObserveOn.swift
final private class ObserveOnSerialDispatchQueue<Element> :Producer<Element> {
    let scheduler: SerialDispatchQueueScheduler
    let source: Observable<Element>
    .
    override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
Copy the code

Source:

  • ObserveOn.swift

onNext

There is also concern func on (_ event: the event) method, because the ObserveOnSink and ObserveOnSerialDispatchQueueSink are inherited from ObserverBase class, the class to rewrite on method:

// ObserverBase.swift
class ObserverBase<Element> : Disposable.ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) = = 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) = = 0 {
                self.onCore(event)
            }
        }
    }
    
    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }
.
Copy the code

Method on the last call onCore method, so that ObserveOnSink with ObserveOnSerialDispatchQueueSink rewrite is onCore method.

ObserveOnSink

1. OnCore method:

  • Is ultimately trigger a thread scheduling logic: the self _scheduler. ScheduleRecursive ((), the action: the self. The run)
  • Before firing, there is a shouldStart judgment to ensure that the logic should only be called once at the same time in concurrent cases to keep the thread safe.
  • Var _queue = Queue

    >(shouldStart = Queue

    >(capacity: 10)); 2. Use the global variable _state to determine whether the _scheduler is already scheduled.

2, run method are used as a closure in the onCore call _scheduler. ScheduleRecursive, can be understood as when a thread switch after the execution of the code is run method:

  • We start with a lock closure call. If the cache queue is not empty, the events cached at onCore are fetched along with the member variable _observer (the observer passed in observeon.run, the next level of observer), otherwise the member variable _state updates the state. Stop also returns nil.
  • If the queue is not empty and the event is not empty, the observer.on method is used to pass the event down. Otherwise, return.
  • The last method, self._shouldContinue_synchronized(), is to check whether the queue is empty. If the queue is not empty, the parameter’s closure recurse(()) is called. In fact, this is a recursive call, which can be interpreted as a loop that runs out of events in the self._queue. Here also involves RecursiveImmediateScheduler class some of the source, because this paper does not involve the scheduler related content, interested to see the source code.
// ObserveOn.swift - class ObserveOnSink
override func onCore(_ event: Event<Element>) {
        let shouldStart = self._lock.calculateLocked { () -> Bool in
            self._queue.enqueue(event)

            switch self._state {
            case .stopped:
                self._state = .running
                return true
            case .running:
                return false}}if shouldStart {
            self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
        }
}

func run(_ state: ()._ recurse: () () - >Void) {
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<Element>? .Observer) in
            if !self._queue.isEmpty {
                return (self._queue.dequeue(), self._observer)
            }
            else {
                self._state = .stopped
                return (nil.self._observer)
            }
        }

        if let nextEvent = nextEvent, !self._cancel.isDisposed {
            observer.on(nextEvent)
            if nextEvent.isStopEvent {
                self.dispose()
            }
        }
        else {
            return
        }

        let shouldContinue = self._shouldContinue_synchronized()

        if shouldContinue {
            recurse(())
        }
}

func _shouldContinue_synchronized(a) -> Bool {
        self._lock.lock(); defer { self._lock.unlock() } / / {
            if !self._queue.isEmpty {
                return true
            }
            else {
                self._state = .stopped
                return false
            }
        // }
}
Copy the code

ObserveOnSerialDispatchQueueSink

1, onCore method:

  • It’s just the logic that triggers thread scheduling. The closure cachedScheduleLambda is passed in and executed after a thread switch. An instance of cachedScheduleLambda is in the constructor.

CachedScheduleLambda closure

  • Parameter is a cachedScheduleLambda closure (sink: ObserveOnSerialDispatchQueueSink < Observer >, the event: Event Element < >) tuple, sink namely ObserveOnSerialDispatchQueueSink object, Event or events that need to be passed.
  • The pair argument to the closure (the tuple type mentioned above) is actually passed in at onCore. (self. The scheduler. The schedule ((self, event), the action: self cachedScheduleLambda!) )
  • Pair. Sink. The observer on (pair. The event) is called ObserveOnSerialDispatchQueueSink member variables of the observer (incoming ObserveOnSerialDispatchQueue. Run method Observers, the level below them, pass events down.

Ps: as the ObserveOnSerialDispatchQueue receives the scheduler is a serial queue inside, so there will not be a multi-threaded concurrency issues, do not need to be locked.

// ObserveOn.swift - class ObserveOnSerialDispatchQueueSink
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType> :ObserverBase<Observer.Element> {
    .
    var cachedScheduleLambda: (((sink: ObserveOnSerialDispatchQueueSink<Observer>, event: Event<Element) - > >)Disposable)!

    init(scheduler: SerialDispatchQueueScheduler.observer: Observer.cancel: Cancelable) {
        .
        self.cachedScheduleLambda = { pair in
            guard !cancel.isDisposed else { return Disposables.create() }

            pair.sink.observer.on(pair.event)

            if pair.event.isStopEvent {
                pair.sink.dispose()
            }

            return Disposables.create()
        }
    }

    override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)}Copy the code

Source:

  • ObserveOn.swift
  • ObserverBase.swift
  • RecursiveScheduler.swift

conclusion

At this point, the subscribeOn and observeOn source code is parsed. The points to understand are:

  • The thread scheduling for subscribeOn occurs in the subscribe/ RUN method, that is, the subscription phase.
  • ObserveOn thread scheduling occurs during event processing/consumption phases such as onNext/onError (on methods).

Next, let’s use the aforementioned source code for scientific analysis to solve threading problems that are often encountered in Rx.

Q&A

Sometimes I see some tutorials that mention “the thread generated by the event depends on subscribeOn” and “the thread processed by the event is the same as the thread consumed by the event by default, and is affected by observeOn”. This common problem can be resolved by combining the source code.

Which thread does Rx run on by default?

As you can see in the following code, the code is called on the main thread and the final output is run on the main thread. As you can see in the following code, the code is called on the main thread and the final output is run on the main thread.

It is important to understand that Rx does not default to the main thread, but to the current thread. Because only the subscribeOn and observeOn operators have the ability of thread scheduling, to be precise, the scheduler in subscribeOn and observeOn has the ability of thread scheduling, and the two operators only control the process.

So, Rx code runs on the current thread without a scheduler to switch threads.

Observable<String>.create { (ob) -> Disposable in
            print("Event generation thread:\(Thread.current.description)")
            ob.onNext("I'm ")
            ob.onCompleted()
            return Disposables.create()
        }
        .map({ (str) -> String in
            print("Event processing thread:\(Thread.current.description)")
            return str + "rx"
        })
        .subscribe(onNext: { (str) in
            print("Event consuming thread:\(Thread.current.description)")
            print(str)
        }, onError: nil, onCompleted: nil, onDisposed: nilEvent generation thread: main Event processing thread: main Event consumption thread: mainCopy the code

Determine which thread executes the problem

Now that you understand that thread switching only occurs when subscribeOn or observeOn has a thread switching operation, let’s look at how to determine what thread each code block executes on. 1. The thread scheduling of subscribeOn occurs in subscribe/ RUN, that is, in the subscription stage. ObserveOn thread scheduling occurs during event processing/consumption phase such as onNext/onError (on method).

Here’s a slightly more complex example where we can try to determine the thread status directly from this code:

Observable<String>. Create The thread of execution generated by the incoming event

In the example of Observable.create, general events are generated after subscribing, and will undergo thread switching after subscribeOn’s Run method. Therefore the code block calls in ConcurrentDispatchQueueScheduler created thread. (ps: In most cases, event generation and subscription are not continuous, so event generation does not necessarily occur in the subscribed thread. This can be a confusing point.)

The first map operator executes the thread

Trigger the map logic operator will after onNext calls, and at the next higher level onNext after observeOn operator, will experience a thread here, that is to say the code block calls will be created in SerialDispatchQueueScheduler threads.

The second map operator executes the thread

The code block is executed on the MainScheduler (main) thread as parsed in 2.

The final event consumes the thread of execution

Event consumption also takes place in the onNext event stream, where you can look for the nearest observeOn definition above it, since subsequent onNext does not pass through the observeOn operator again, indicating that the thread has not switched again. The final event consumption is also done on the MainScheduler (main) thread.

Observable<String>.create { (ob) -> Disposable in
            print("Event generation thread:\(Thread.current.description)")
            ob.onNext("I'm ")
            ob.onCompleted()
            return Disposables.create()
        }
        .subscribeOn(ConcurrentDispatchQueueScheduler.init(qos: .userInteractive))
        .observeOn(SerialDispatchQueueScheduler.init(qos: .userInteractive))
        .map({ (str) -> String in
            print("Event processing thread:\(Thread.current.description)")
            return str + "rx"
        })
        .observeOn(MainScheduler.instance)
        .map({ (str) -> String in
            print("Event processing thread 2:\(Thread.current.description)")
            return str + "!"
        })
        .subscribe(onNext: { (str) in
            print("Event consuming thread:\(Thread.current.description)")
            print(str)
        }, onError: nil, onCompleted: nil, onDisposed: nilEvent generation thread: number= 6 name =(NULL) Event processing thread: number= 3 name =(NULL) Event processing thread2: main Event consuming thread: mainCopy the code

Conclusion:

Through the above analysis, we can summarize:

  • The subscription phase is a bottom-up process and the observation phase (event processing/event consumption) is a top-down process.
  • Judgment of subscribing threads: Since subscribing is bottom-up and the time when threads switch is also in subscribe/ RUN method, we can find the first subscribeOn operator from top to bottom to judge threads.
  • Observe thread judgment: top-down process, can find the code block above the nearest observeOn. This is why many tutorials talk about the effects of operators such as map and the event consuming thread observeOn.

One-time validity issue of subscribeOn

You’ll also often see a lot of people saying that subscribeOn is only valid when you set it the first time. Does that mean that the thread will only be switched once? In fact, the above introduction has been mentioned, let’s use a graphic illustration to illustrate this problem. (The event is processed and ignored for parsing purposes.)

Observable<String>.create { (ob) -> Disposable in
      print("Event generation thread:\(Thread.current.description)")
      ob.onNext("I'm ")
      ob.onCompleted()
      return Disposables.create()
    }
    .subscribeOn(ConcurrentDispatchQueueScheduler.init(qos: .userInteractive))
    .subscribeOn(SerialDispatchQueueScheduler.init(qos: .userInteractive))
    .subscribeOn(MainScheduler.instance)
    .subscribe(onNext: { (str) in
        print("Event consuming thread:\(Thread.current.description)")
        print(str)
    }, onError: nil, onCompleted: nil, onDisposed: nil)
Copy the code

The process diagrams the data flow during the subscription process, and you can see that each call to the run method undergoes a thread switch. That is, “the first set is valid” does not mean that there is only one thread switch, but that it will eventually switch to the first set thread.

The last

This paper analyzes the source code of subscribeOn and observableOn operators, as well as the analysis of some common problems. You can see that the subscribeOn and observableOn operators are related to thread scheduling, but their role is more of process control over Rx. So we can completely separate these two operators from the thread scheduling scheduler. I will continue to summarize the scheduler source code analysis of RxSwift.

Series of articles:

  • What does Rx’s Observable.create do?
  • RxJava subscribeOn and observeOn source code analysis
  • Take a look at the Scheduler of RxJava.
  • How is RxSwift Scheduler implemented