Schedulers are the core module of Rx to implement multithreading. They are mainly used to control which thread or queue tasks run on.

  • MainScheduler: represents the main thread.
  • SerialDispatchQueueScheduler:Abstract theserialDispatchQueueThis scheduler is used to perform serial tasks.
  • ConcurrentDispatchQueueScheduler:Abstract theparallelDispatchQueueThis scheduler can be used to perform parallel tasks.
  • OperationQueueScheduler:Abstract theNSOperationQueue.

Common use

Observable.of(1.2.3.4)
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observe(on: MainScheduler.instance)
    .subscribe(onNext: { data in
        print(data)
    })
    .disposed(by: disposeBag)
Copy the code
  1. subscribe(on:): Determines where the constructor of the data sequence is locatedSchedulerTo run on.
  2. observe(on:): indicates whichSchedulerListen on and perform response processing.

Subscribe (on:) process analysis

How subscribe(on:) causes the sequence builder function to run on the specified Scheduler? Let’s start with the method source code

public func subscribe(on scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    SubscribeOn(source: self, scheduler: scheduler)
}
Copy the code

SubscribeOn is a subclass of Producer. It can be found that the processing of sequence inherits Producer, carries out corresponding logical processing, and then creates corresponding Sink, so as to achieve flexible expansion.

final private class SubscribeOn<Ob: ObservableType> :Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob.scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType> (_ observer: Observer.cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element = = Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
Copy the code

Source stores the source sequence, and Scheduler stores the queue where tasks need to be executed. The final logic is to look at Sink’s run method.

// SubscribeOnSink
func run(a) -> Disposable {
    let disposeEverything = SerialDisposable(a)let cancelSchedule = SingleAssignmentDisposable()
    
    disposeEverything.disposable = cancelSchedule
    
    let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
        let subscription = self.parent.source.subscribe(self)
        disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
        return Disposables.create()
    }

    cancelSchedule.setDisposable(disposeSchedule)
    
    return disposeEverything
}
Copy the code
  • Call theSubscribeOnThe savedschedulerthescheduleMethod, passing in a closure callback.
  • scheduleMethod isImmediateSchedulerTypeThe protocol method that we created isConcurrentDispatchQueueScheduler, it followsImmediateSchedulerTypeProtocol, corresponding toscheduleThe method is implemented as follows.
/ / ConcurrentDispatchQueueScheduler schedule
public final func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
    / / call the DispatchQueueConfiguration schedule
    self.configuration.schedule(state, action: action)
}

/ / DispatchQueueConfiguration schedule
func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
    let cancel = SingleAssignmentDisposable(a)// self.queue is the queue to which we pass values, executed asynchronously
    self.queue.async {
        if cancel.isDisposed {
            return
        }
        // Execute the action closure
        cancel.setDisposable(action(state))
    }

    return cancel
}
Copy the code
  • The final call is going to beDispatchQueueConfiguration the schedule of implementation.
  • Execute asynchronously, in an asynchronous execution function, according to the queue that the value comes inscheduleIs the closure ofSubscribeOnSink.run()The red box part of the method.

  • Closure that executes the source sequencesubscribeMethod, and that’s itsubscribeIn what we specifyscheduleIn the execution.

Observe (on:) process analysis

Observe (on:) is similar to subscribe(on:) except subscribe(on:) is executed in schedule and observe(on:) is executed in schedule.

// ObserveOnSink
override func onCore(_ event: Event<Element>) {
    let shouldStart = self._lock.calculateLocked { () -> Bool in
        self._queue.enqueue(event)

        switch self._state {
        case .stopped:
            self._state = .running
            return true
        case .running:
            return false}}if shouldStart {
        self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
    }
}
Copy the code

ObserveOnSink called the scheduleRecursive(_: Action 🙂 method of the ImmediateSchedulerType protocol in the method by rewriting the onCore(_:) method

// ImmediateSchedulerType
public func scheduleRecursive<State> (_ state: State.action: @escaping (_ state: State._ recurse: (State) - >Void) - >Void) -> Disposable {
    let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
    
    recursiveScheduler.schedule(state)
    
    return Disposables.create(with: recursiveScheduler.dispose)
}

// RecursiveImmediateScheduler
func schedule(_ state: State) {
    var scheduleState: ScheduleState = .initial
    / / self. _scheduler preservation is ConcurrentDispatchQueueScheduler
    let d = self._scheduler.schedule(state) { state -> Disposable in
        // best effort
        if self._group.isDisposed {
            return Disposables.create()
        }
        
        let action = self._lock.calculateLocked { () -> Action? in
            switch scheduleState {
            case let .added(removeKey):
                self._group.remove(for: removeKey)
            case .initial:
                break
            case .done:
                break
            }

            scheduleState = .done

            return self._action
        }
        // The action callback is executed in the specified _scheduler
        if let action = action {
            action(state, self.schedule)
        }
        
        return Disposables.create()
    }
    .
}

// ConcurrentDispatchQueueScheduler
public final func schedule<StateType> (_ state: StateType.action: @escaping (StateType) - >Disposable) -> Disposable {
    return self.configuration.schedule(state, action: action)
}
Copy the code
  • scheduleRecursive(_:action:)->RecursiveImmediateScheduler.schedule(_:)->ConcurrentDispatchQueueScheduler.schedule(_:action:)->configuration.schedule, through this call chain, you can see andsubscribe(on:)Same thing, it’s always called at the endconfiguration.schedule.
  • The action callback is inscheduleClosure, implemented in the specifiedscheduleListen in.