preface

The subscribeOn and observeOn source codes for RxSwift have been collated. This article will focus on summarizing RxSwift Scheduler to use frequency is higher in the development of daily ConcurrentDispatchQueueScheduler, MainScheduler mainly. As mentioned before, subscribeOn and observeOn only control the thread scheduling on the process based on the scheduler set by the developer. It can be understood that the Scheduler is only a tool for subscribeOn and observeOn. (PS: This paper is based on RxSwift 5.1.1)

What does Rx’s Observable. Create do?

RxSwift subscribeOn and observeOn source parsing

ImmediateSchedulerType&SchedulerType

ImmediateSchedulerType and SchedulerType protocols specify rules for the Scheduler class. SchedulerType inherits from ImmediateSchedulerType. In RxSwift, most of the Scheduler classes inherit from the SchedulerType protocol, and some only inherit from ImmediateSchedulerType protocol.

SchedulerType extends two methods: scheduleRelative and schedulePeriodic.

// ImmediateSchedulerType.swift
public protocol ImmediateSchedulerType {
    func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable
}

// SchedulerType.swift
public protocol SchedulerType: ImmediateSchedulerType {
    /// - returns: Current time.
    var now : RxTime {
        get
    }

    func scheduleRelative<StateType> (_ state: StateType.dueTime: RxTimeInterval.action: @escaping (StateType) - >Disposable) -> Disposable

    func schedulePeriodic<StateType> (_ state: StateType.startAfter: RxTimeInterval.period: RxTimeInterval.action: @escaping (StateType) - >StateType) -> Disposable
}
Copy the code

Source:

  • ImmediateSchedulerType.swift
  • SchedulerType.swift

ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueScheduler create can be directly introduced into a qos, in which we are familiar with the GCD DispatchQoS. Will help us to create a source of DispatchQueue, attribute is set to concurrency, namely ConcurrentDispatchQueueScheduler runs a GCD within concurrent queue.

Of course, the constructor here can also pass in a DispatchQueue directly. This means that we can also pass a serial queue into it (although this makes no sense, haha…). .

By constructing method, we can also see that DispatchQueue is entrusted to a DispatchQueueConfiguration finally. DispatchQueueConfiguration DispatchQueue real call for thread scheduling.

// ConcurrentDispatchQueueScheduler.swift
public class SerialDispatchQueueScheduler : SchedulerType

ConcurrentDispatchQueueScheduler.init(qos:.userInteractive)

@available(iOS8.OSX 10.10, *)
public convenience init(qos: DispatchQoS.leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
    self.init(queue: DispatchQueue(
            label: "rxswift.queue.\(qos)",
            qos: qos,
            attributes: [DispatchQueue.Attributes.concurrent],
            target: nil),
            leeway: leeway)
}

public init(queue: DispatchQueue.leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
    self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
}
Copy the code

Source:

  • ConcurrentDispatchQueueScheduler.swift

DispatchQueueConfiguration

Structure DispatchQueueConfiguration attributes as follows, the expansion of the definition of DispatchQueue calls on its code, which we’ll talk about later.

struct DispatchQueueConfiguration {
    let queue: DispatchQueue
    let leeway: DispatchTimeInterval
}
Copy the code

Let’s think for a moment here that when it comes to subscribing, the code ends up going to the subscribeonsink.run () method. This triggers the scheduler’s thread scheduling:

// SubscribeOn.swift - class SubscribeOnSink
func run(a) -> Disposable {
        let disposeEverything = SerialDisposable(a)let cancelSchedule = SingleAssignmentDisposable()
        
        disposeEverything.disposable = cancelSchedule
        
        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

        cancelSchedule.setDisposable(disposeSchedule)
    
        return disposeEverything
}
Copy the code

If we set the scheduler for ConcurrentDispatchQueueScheduler, its method of the schedule for the following code. Internal call is DispatchQueueConfiguration. Schedule method. The incoming closure is eventually thrown to the queue for asynchronous execution.

As you can see, ConcurrentDispatchQueueScheduler multi-threading logic ultimately relies on the GCD.

// ConcurrentDispatchQueueScheduler.swift
public final func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
        return self.configuration.schedule(state, action: action)
}

// DispatchQueueConfiguration.swift
func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable(a)self.queue.async {
            if cancel.isDisposed {
                return
            }

            cancel.setDisposable(action(state))
        }

        return cancel
}
Copy the code

Here is the scheduleRelative method, which is a thread schedule that is triggered periodically (there is also a schedulePeriodic method that can execute logic that is triggered repeatedly, which is analogous to this method). From the source of DispatchQueueConfiguration as you can see, here is the GCD DispatchSourceTimer use regularly.

Interestingly, the DispatchSourceTimer does not require a global hold to work properly, and the authors have added a TODO comment. I went out of my way to check out the latest version of RxSwift and didn’t see any code updates. If you know friends can discuss below.

// ConcurrentDispatchQueueScheduler.swift
public final func scheduleRelative<StateType> (_ state: StateType.dueTime: RxTimeInterval.action: @escaping (StateType) - >Disposable) -> Disposable {
        return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}

// DispatchQueueConfiguration.swift
func scheduleRelative<StateType> (_ state: StateType.dueTime: RxTimeInterval.action: @escaping (StateType) - >Disposable) -> Disposable {
        let deadline = DispatchTime.now() + dueTime

        let compositeDisposable = CompositeDisposable(a)let timer = DispatchSource.makeTimerSource(queue: self.queue)
        timer.schedule(deadline: deadline, leeway: self.leeway)

        // TODO:
        // This looks horrible, and yes, it is.
        // It looks like Apple has made a conceputal change here, and I'm unsure why.
        // Need more info on this.
        // It looks like just setting timer to fire and not holding a reference to it
        // until deadline causes timer cancellation.
        var timerReference: DispatchSourceTimer? = timer
        let cancelTimer = Disposables.create {
            timerReference?.cancel()
            timerReference = nil
        }

        timer.setEventHandler(handler: {
            if compositeDisposable.isDisposed {
                return
            }
            _ = compositeDisposable.insert(action(state))
            cancelTimer.dispose()
        })
        timer.resume()

        _ = compositeDisposable.insert(cancelTimer)

        return compositeDisposable
    }
Copy the code

Source:

  • DispatchQueueConfiguration.swift

SerialDispatchQueueScheduler

Before speaking MainScheduler, let’s look at SerialDispatchQueueScheduler, because MainScheduler inherited from SerialDispatchQueueScheduler. And similar ConcurrentDispatchQueueScheduler logic, but it mainly as a serial queue logic encapsulation.

// MainScheduler.swift
public final class MainScheduler : SerialDispatchQueueScheduler
Copy the code

Because are inherited from SchedulerType, so SerialDispatchQueueScheduler with ConcurrentDispatchQueueScheduler logic instead of the same, but here more than here. Also managed to DispatchQueueConfiguration management DispatchQueue queue, logic is the same as the above.

// SerialDispatchQueueScheduler.swift
public class SerialDispatchQueueScheduler : SchedulerType

init(serialQueue: DispatchQueue.leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
}
Copy the code

Difference is bigger SerialDispatchQueueScheduler. The schedule changed to call their own scheduleInternal method. By default, or do we call DispatchQueueConfiguration. Schedule method. As described above, the default is to throw an incoming closure to the queue for asynchronous execution.

MainScheduler overrides scheduleInternal to fit the logic of the main thread.

// SerialDispatchQueueScheduler.swift 
public final func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
        return self.scheduleInternal(state, action: action)
}

func scheduleInternal<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
        return self.configuration.schedule(state, action: action)
}
Copy the code

Source:

  • SerialDispatchQueueScheduler.swift

MainScheduler

The above mentioned, MainScheduler inherited from SerialDispatchQueueScheduler, because for single-threaded, give priority to the thread here can understand MainScheduler SerialDispatchQueueScheduler for a special, Special serial queues.

In the following code, MainScheduler has a member variable _mainQueue, which is dispatchQueue.main. NumberEnqueued is for dealing with concurrent threads, which we’ll talk about later.

// MainScheduler.swift
public final class MainScheduler : SerialDispatchQueueScheduler {

    private let _mainQueue: DispatchQueue

    let numberEnqueued = AtomicInt(0)

    /// Initializes new instance of `MainScheduler`.
    public init(a) {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
    .
Copy the code

It is worth mentioning that the MainScheduler is a singleton and provides two entities for us to use.

  • Instance uses the MainScheduler itself.
  • AsyncInstance it is usingDispatchQueue.mainTo create aSerialDispatchQueueSchedulerObject. The logic followsThe same SerialDispatchQueueScheduler introduction.
// MainScheduler.swift
public static let instance = MainScheduler(a)public static let asyncInstance = SerialDispatchQueueScheduler(serialQueue: DispatchQueue.main)
Copy the code

MainScheduler overrides the scheduleInternal method. The end result is that the passed closure is thrown to the main thread for asynchronous execution. Of course, if the current thread is the master and no logic is executing (previousNumberEnqueued == 0), synchronous execution will be used.

There’s a numberEnqueued property here +1, -1 logic. NumberEnqueued is an AtomicInt. NumberEnqueued is an AtomicInt. +1 before closure, -1 after closure. My personal understanding of this operation is in the case of concurrent invocation of multiple threads, to ensure that the sequence of events is not modified, and can be executed synchronously in the case of the main thread idle. The +1 and -1 here are also a little bit like recursive locks.

// MainScheduler.swift
override func scheduleInternal<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
        let previousNumberEnqueued = increment(self.numberEnqueued)

        if DispatchQueue.isMain && previousNumberEnqueued = = 0 {
            let disposable = action(state)
            decrement(self.numberEnqueued)
            return disposable
        }

        let cancel = SingleAssignmentDisposable(a)self._mainQueue.async {
            if !cancel.isDisposed {
                _ = action(state)
            }

            decrement(self.numberEnqueued)
        }

        return cancel
}
Copy the code

Source:

  • MainScheduler.swift

ConcurrentMainScheduler

There is a comment at the beginning of the mainScheduler. swift file:

This scheduler is optimized for observeOn operator. To ensure observable sequence is subscribed on main thread using subscribeOn operator please use ConcurrentMainScheduler because it is more optimized for that purpose.

MainScheduler is optimized for the observeOn operator, and ConcurrentMainScheduler is optimized for the subscribeOn operator. So next, let’s take a look at what ConcurrentMainScheduler is.

ConcurrentMainScheduler can be understood as a decorator to the MainScheduler, because the logic calls the corresponding methods of the MainScheduler, except overriding the SchedulerType.schedule() method.

// ConcurrentMainScheduler.swift
public final class ConcurrentMainScheduler : SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date

    private let _mainScheduler: MainScheduler
    private let _mainQueue: DispatchQueue

    /// - returns: Current time.
    public var now: Date {
        return self._mainScheduler.now as Date
    }

    private init(mainScheduler: MainScheduler) {
        self._mainQueue = DispatchQueue.main
        self._mainScheduler = mainScheduler
    }

    /// Singleton instance of `ConcurrentMainScheduler`
    public static let instance = ConcurrentMainScheduler(mainScheduler: MainScheduler.instance)
    .
Copy the code

In the schedule method with MainScheduler. ScheduleInternal difference is that fewer atoms attribute as a thread safe handling. In combination with the subscribeOn operator recommended by the author, the author personally understands that there is no concurrent subscription process, so the thread-safety problem can be ignored.

// ConcurrentMainScheduler.swift
public func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
        if DispatchQueue.isMain {
            return action(state)
        }

        let cancel = SingleAssignmentDisposable(a)self._mainQueue.async {
            if cancel.isDisposed {
                return
            }

            cancel.setDisposable(action(state))
        }

        return cancel
}
Copy the code

Source:

  • ConcurrentMainScheduler.swift

RecursiveImmediateScheduler

RecursiveImmediateScheduler not a Scheduler, here we first review simple observeOn.

, when creating observeOn code according to the incoming scheduler is to create ObserveOnSerialDispatchQueue SerialDispatchQueueScheduler object or observeOn object.

// ObserveOn.swift - extension ObservableType
public func observeOn(_ scheduler: ImmediateSchedulerType) -> Observable<Element> {
     if let scheduler = scheduler as? SerialDispatchQueueScheduler {
         return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
     }
     else {
         return ObserveOn(source: self.asObservable(), scheduler: scheduler)
     }
}
Copy the code
  • If ObserveOnSerialDispatchQueue, will eventually pass the event triggered when ObserveOnSerialDispatchQueueSink. OnCore method, the corresponding is that we told by the method of the schedule.
// ObserveOn.swift - class ObserveOnSerialDispatchQueueSink
override func onCore(_ event: Event<Element>) {
     _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)}Copy the code
  • If it is ObserveOn, the observeonsinks. onCore method will eventually be triggered when the event is passed, which calls an unseen method scheduleRecursive recursive thread scheduling.
// ObserveOn.swift - class ObserveOnSink
override func onCore(_ event: Event<Element>) {
     let shouldStart = self._lock.calculateLocked { () -> Bool in
         self._queue.enqueue(event)

         switch self._state {
         case .stopped:
             self._state = .running
             return true
         case .running:
             return false}}if shouldStart {
         self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
     }
}
Copy the code

ScheduleRecursive is an extension from ImmediateSchedulerType. Here are executed according to the need of closures and scheduler object to create a RecursiveImmediateScheduler object. Here is also understandable RecursiveImmediateScheduler as we set the Scheduler object decorators.

// ImmediateSchedulerType.swift - extension ImmediateSchedulerType
public func scheduleRecursive<State> (_ state: State.action: @escaping (_ state: State._ recurse: (State) - >Void) - >Void) -> Disposable {
    let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
    recursiveScheduler.schedule(state)
        
    return Disposables.create(with: recursiveScheduler.dispose)
}
Copy the code

RecursiveImmediateScheduler. We set the schedule method is simply call the Scheduler. The schedule method to switch from the thread, it is important to note that Perform the closure will RecursiveImmediateScheduler. The schedule as closure parameters.

// RecursiveScheduler.swift - class RecursiveImmediateScheduler
func schedule(_ state: State) {
        var scheduleState: ScheduleState = .initial

        let d = self._scheduler.schedule(state) { state -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
            
            let action = self._lock.calculateLocked { () -> Action? in
                .

                return self._action
            }
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
        
        .
}
Copy the code

This can be combined with the run method passed in the observeonsink.oncore method when scheduleRecursive is called. Method is finally will recurse call closure namely again for the above mentioned RecursiveImmediateScheduler. The method of the schedule. In order to achieve the effect of recursive invocation, the thread corresponds to the corresponding thread of the scheduler. The exit of the recursion is the following Boolean shouldContinue.

// ObserveOn.swift - class ObserveOnSink
func run(_ state: ()._ recurse: () () - >Void) {
        let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<Element>? .Observer) in
            if !self._queue.isEmpty {
                return (self._queue.dequeue(), self._observer)
            }
            else {
                self._state = .stopped
                return (nil.self._observer)
            }
        }

        if let nextEvent = nextEvent, !self._cancel.isDisposed {
            observer.on(nextEvent)
            if nextEvent.isStopEvent {
                self.dispose()
            }
        }
        else {
            return
        }

        let shouldContinue = self._shouldContinue_synchronized()

        if shouldContinue {
            recurse(())
        }
    }
Copy the code

Lead a question here: why ObserveOnSerialDispatchQueue finally can directly call the scheduler. The schedule method, and need ObserveOn may event cache first, then the recursive call?

  • Recall observeOn method need to determine whether the scheduler for SerialDispatchQueueScheduler or its subclasses. This means that if the DispatchQueue is a serial queue, there is no problem with thread concurrency. So the ObserveOnSerialDispatchQueue don’t need this cache operation processing.

  • And if create ObserveOn means set the scheduler threads concurrent ability (such as ConcurrentDispatchQueueScheduler). Using queue sets to first cache events and then pass them downstream recursively is equivalent to a concurrent data confluence effect, so that the data can be guaranteed to pass down in their relative order.

// ObserveOn.swift - extension ObservableType
public func observeOn(_ scheduler: ImmediateSchedulerType) -> Observable<Element> {
     if let scheduler = scheduler as? SerialDispatchQueueScheduler {
         return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
     }
     else {
         return ObserveOn(source: self.asObservable(), scheduler: scheduler)
     }
}
Copy the code

Source:

  • RecursiveScheduler.swift
  • ObserveOn.swift

The last

This paper analyzes the commonly used ConcurrentDispatchQueueScheduler, MainScheduler and their related classes of the source code. Because it uses gCD-related logic, the logic of the code is relatively easy to understand.

Series of articles:

  • What does Rx’s Observable. Create do?
  • RxJava subscribeOn and observeOn source parsing
  • RxSwift subscribeOn and observeOn source parsing
  • Let’s take a look at RxJava Scheduler!