There are four main members in RxSwift:

  • Observable sequence –Observable
  • Observer –Observer
  • The caller –Scheduler
  • Destroyed –Dispose

If you can figure out those four, then you can say that you can figure out RxSwift as a whole. This article specifically analyzes Scheduler – Scheduler

What is a scheduler

Schedulers
Rx
GCD
OperationQueue

Dispatchqueue.global (qos:.userinitiated).async {dispatchqueue.global (qos:.userinitiated).async {let data = try? Data(contentsOf: url)
    DispatchQueue.main.async {
        self.data = data
    }
}
Copy the code

If implemented with RxSwift, it looks something like this:

let rxData: Observable<Data> = ...

rxData
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { [weak self] data inself? .data = data }) .disposed(by: disposeBag)Copy the code

After we get a feel for the Scheduler, let’s see how it works.

An introduction to several schedulers in RxSwift

CurrentThreadScheduler

  • Represents the current thread, which is present by default.
public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

    /// The singleton instance of the current thread scheduler.
    public static let instance = CurrentThreadScheduler()

    private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
        let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
        defer { key.deallocate() }
                                                               
        guard pthread_key_create(key, nil) == 0 else {
            rxFatalError("isScheduleRequired key creation failed")}return key.pointee
    }()

    private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
        return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
    }()

    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// Gets a value that indicates whether the caller must call a `schedule` method.
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            ifpthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) ! = 0 { rxFatalError("pthread_setspecific failed")}}}...... }Copy the code
  • isScheduleRequiredUsed to indicate whether a call must be madescheduleMethod, use rightqueueSet,get method observation, bind our current queue with static string, realize the same thread data sharing.

SerialDispatchQueueScheduler

  • SerialDispatchQueueSchedulerAbstract serialDispatchQueue. Switch to this if you need to perform some serial tasksSchedulerRun.

ConcurrentDispatchQueueScheduler

  • ConcurrentDispatchQueueSchedulerAbstracted parallelismDispatchQueue. Switch to this if you need to perform some concurrent tasksSchedulerRun.

OperationQueueScheduler

  • OperationQueueSchedulerAbstract theNSOperationQueue. It hasNSOperationQueueSome features, for example, you can set bymaxConcurrentOperationCountTo control the maximum number of concurrent tasks that can be executed simultaneously.
public class OperationQueueScheduler: ImmediateSchedulerType {
    public let operationQueue: OperationQueue
    public letqueuePriority: Operation.QueuePriority /// Constructs new instance of `OperationQueueScheduler` that performs work on `operationQueue`.  /// /// - parameter operationQueue: Operation queue targeted to perform work on. /// - parameter queuePriority: Queue prioritywhich will be assigned to new operations.
    public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
        self.operationQueue = operationQueue
        self.queuePriority = queuePriority
    }
    ......
}
Copy the code
  • In the initializationOperationQueueSchedulerObject, you need to pass inOperationQueueAnd priorityqueuePriority, as the initialization parameter.

MainScheduler

  • MainSchedulerRepresents the main thread. Switch to this if you need to perform some UI-related tasksSchedulerRun.
public final class MainScheduler : SerialDispatchQueueScheduler {

    private let _mainQueue: DispatchQueue

    let numberEnqueued = AtomicInt(0)
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
    public static let instance = MainScheduler()
}
Copy the code
  • Through the source code can be seenMainSchedulerinheritedSerialDispatchQueueSchedulerSerial queue, because the main queue is inherently a special serial queue. The queue type is then determined as the primary queue when the object is initializedself._mainQueue = DispatchQueue.main.

use

According to the previous example to analyze the specific implementation of subscribeOn and observeOn using subscribeOn

  • We usesubscribeOnTo determine where the constructor of the data sequence is locatedSchedulerTo run on. In the above example, because it takes a long time to get the Data, usesubscribeOnGo to backgroundSchedulerTo get Data. This prevents the main thread from being blocked.
  • By default,ObservableCreating, applying operators, and sending notifications are all thereSubscribemethod-calledSchedulerThe execution.subscribeOnThe operator changes this behavior by specifying a differentSchedulerTo make theObservableThe execution.
  • Source code analysis
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    return SubscribeOn(source: self, scheduler: scheduler)
}
Copy the code
  • See that the return value type isObservableWe know thatsubscribeOnThe source sequence is encapsulated as a mid-tier sequenceSubscribeOn.
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
Copy the code
  • The source sequence and scheduler are saved at initialization.
  • Before passing yesRxSwiftWhen a sequence is subscribed, the code must be executedrunMethods. (Check out my previous article on the core logic of RxSwift if you are not familiar with it.SubscribeOnSink.runmethods
func run() -> Disposable {
    let disposeEverything = SerialDisposable()
    let cancelSchedule = SingleAssignmentDisposable()
    
    disposeEverything.disposable = cancelSchedule
    
    let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
        let subscription = self.parent.source.subscribe(self)
        disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
        return Disposables.create()
    }

    cancelSchedule.setDisposable(disposeSchedule)

    return disposeEverything
}
Copy the code
  • You see a line of code about schedulingself.parent.scheduler.schedule().self.parent.schedulerIt is calledSubscribeOnMethod is passed as an argument to the queue and then executedscheduleMethods.
  • callself.scheduleInternal(state, action: action)
  • And then execute toself.configuration.schedule(state, action: action)
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable()

    self.queue.async {
        if cancel.isDisposed {
            return
        }
        cancel.setDisposable(action(state))
    }

    return cancel
}
Copy the code
  • See here finally understand, the original is to put the task in the set queue in the asynchronous execution
  • thisaction(state)It’s a trailing closure that comes in from the outside, so the code will start executing the closure, it will executelet subscription = self.parent.source.subscribe(self), subscribes to the source sequence, so must comeProducerthesubscribeMethods.
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if! CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed.let disposer = SinkDisposer()
        let sinkAndSubscription = self.run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

        return disposer
    }
    else {
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
    }
}
Copy the code
  • So it’s going to decide which code to execute based on the current scheduling environment, so it’s going to go else first. Came toscheduleMethods the inside
  • deferIs a deferred call, guaranteed inreturnBefore the call
  • It’s going to executeaction(state), will be executed inside the closureObservableSequenceSink.runMethod, and finally the code comes back to thisscheduleMethod, because the last time I came inisScheduleRequiredSet to false, so the code executes block 3(shown)
  • Block 3 basically encapsulates the task into oneScheduledItemObject and join the queue
  • It then executes code block 2, which pulls out the previously queued tasks and executes them sequentially, starting the closure below
func schedule(_ state: State) {
    var scheduleState: ScheduleState = .initial

    let d = self._scheduler.schedule(state) { state -> Disposable in
        // best effort
        if self._group.isDisposed {
            returnDisposables. Create ()} // Here, because of the recursive environment, we have added a lock recursive lock to ensure safetylet action = self._lock.calculateLocked { () -> Action? in
            switch scheduleState {
            case let .added(removeKey):
                self._group.remove(for: removeKey)
            case .initial:
                break
            case .done:
                break
            }

            scheduleState = .done

            return self._action
        }
        
        if let action = action {
            action(state, self.schedule)
        }
        
        return Disposables.create()
    }
    
    ......
}
Copy the code
  • First of all, to ensure thread safety, to ensure FIFO, we added a lock. That’s why RxSwift’s signal execution is sequential, right
  • Then performaction, which is the closure that the outside world passes to the recursive scheduler, followed by the normal flow of sending signalsself.forwardOn(.next(next)).
  • ps:RxSwiftThe code calls are cumbersome, deeply nested, and closed, so you need to slowly and repeatedly break points to carefully consider

Using observeOn

  • We useobserveOnTo decide which oneSchedulerListen for this data sequence. In the above example, by usingobserveOnMethod switches to the main thread to listen and process the results.
  • observeOnThe operator will specify a differentSchedulerTo make theObservableNotify the observer.
  • observeOnGeneral process and ideassubscribeOnAlmost. So I’m not going to break it down here.

conclusion

  • The schedulerSchedulerInheritance diagram of
  • SchedulersRxThe core module of multithreading, which is mainly used to control which thread or queue the task runs in
  • subscribeOnTo determine where the constructor of the data sequence is locatedSchedulerRunning on the
  • observeOnTo decide which oneSchedulerListen for this data sequence
  • subscribeOnandobserveOnCreates a mid-tier sequence, so there’s also an internal process of subscribing to the response sequence, mid-tiersinkIt’s the observer of the source sequence

If you have any questions or suggestions, you are welcome to comment or write to us. Like friends can click on the following and like, the follow-up will continue to update the article.