[TOC]

Mostly code, no analysis, mainly wisps of clues

RxSwift scheduler

Scheduler, as one of the core schedulers in RxSwift, mainly switches the working mechanism of tasks in threads or queues #### Types of schedulers

  • CurrentThreadScheduler: Gets the current scheduling
  • MainScheduler: Main scheduler, which switches to the main thread
  • SerialDispatchQueueScheduler: serial scheduling, using this schedule to run serial tasks, serial queue encapsulation based on GCD
  • ConcurrentDispatchQueueScheduler: Parallel scheduling, using this scheduler to run parallel tasks, gCD-based parallel queue encapsulation
  • OperationQueueScheduler: Operation queue scheduling, based on NSOperationQueue encapsulation

The inheritance structure of the scheduler

The realization of each scheduler

  1. MainShedulerThe scheduler
public final class MainScheduler : SerialDispatchQueueScheduler {

    private let _mainQueue: DispatchQueue

    var numberEnqueued = AtomicInt(0)

    /// Initializes new instance of `MainScheduler`.
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
}
Copy the code

Code analysis MainScheduler inherited from SerialDispatchQueueScheduler serial scheduler, the determination of the home side column in the constructor is based on the self. _mainQueue = DispatchQueue. The main, and call the superclass constructor, Incoming queue type 2. SerialDispatchQueueScheduler scheduler

public class SerialDispatchQueueScheduler : SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date
    
    /// - returns: Current time.
    public var now : Date {
        return Date()}let configuration: DispatchQueueConfiguration
    
    /** Constructs new `SerialDispatchQueueScheduler` that wraps `serialQueue`. - parameter serialQueue: Target dispatch queue. - parameter leeway: The amount of time, in nanoseconds, that the system will defer the timer. */
    init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
    }

    public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) - >Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        let queue = DispatchQueue(label: internalSerialQueueName, attributes: []) serialQueueConfiguration? (queue)self.init(serialQueue: queue, leeway: leeway)
    }
}
Copy the code

The code analysis This code to tell the truth didn’t see anything, just to save incoming queue and leeway, here should be the specific use of specific implementation in the parent class or the corresponding extension 3. ConcurrentDispatchQueueScheduler scheduler

public class ConcurrentDispatchQueueScheduler: SchedulerType {
    public typealias TimeInterval = Foundation.TimeInterval
    public typealias Time = Date
    
    public var now : Date {
        return Date()}let configuration: DispatchQueueConfiguration
     
    public init(queue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.configuration = DispatchQueueConfiguration(queue: queue, leeway: leeway)
    }
    @available(iOS 8.OSX 10.10*),public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
        self.init(queue: DispatchQueue(
            label: "rxswift.queue.\(qos)",
            qos: qos,
            attributes: [DispatchQueue.Attributes.concurrent],
            target: nil),
            leeway: leeway
        )
    }
}
Copy the code

Code analysis set the queue type attributes Settings, concurrent for parallel 4. OperationQueueScheduler scheduler

public class OperationQueueScheduler: ImmediateSchedulerType {
    public let operationQueue: OperationQueue
    public let queuePriority: Operation.QueuePriority

    public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
        self.operationQueue = operationQueue
        self.queuePriority = queuePriority
    }
}
Copy the code

5. CurrentThreadScheduler scheduler

public class CurrentThreadScheduler : ImmediateSchedulerType {
    typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>

    /// The singleton instance of the current thread scheduler.
    public static let instance = CurrentThreadScheduler(a)private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
        let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
        defer{#if swift(>=4.1)
            key.deallocate()
#else
            key.deallocate(capacity: 1)
#endif
        }

    static var queue : ScheduleQueue? {
        get {
            return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
        }
        set {
            Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
        }
    }

    /// Gets a value that indicates whether the caller must call a `schedule` method.
    public static fileprivate(set) var isScheduleRequired: Bool {
        get {
            return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
        }
        set(isScheduleRequired) {
            if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil: scheduleInProgressSentinel) ! =0 {
                rxFatalError("pthread_setspecific failed")}}}/** Schedules an action to be executed as soon as possible on current thread. If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be automatically installed and uninstalled after all work is performed. - parameter state: State passed to the action to be executed. - parameter action: Action to be executed. - returns: The disposable object used to cancel the scheduled action (best effort). */
    public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) - >Disposable {
        if CurrentThreadScheduler.isScheduleRequired {
            CurrentThreadScheduler.isScheduleRequired = false

            let disposable = action(state)

            defer {
                CurrentThreadScheduler.isScheduleRequired = true
                CurrentThreadScheduler.queue = nil
            }

            guard let queue = CurrentThreadScheduler.queue else {
                return disposable
            }

            while let latest = queue.value.dequeue() {
                if latest.isDisposed {
                    continue
                }
                latest.invoke()
            }

            return disposable
        }

        let existingQueue = CurrentThreadScheduler.queue

        let queue: RxMutableBox<Queue<ScheduledItemType>>
        if let existingQueue = existingQueue {
            queue = existingQueue
        }
        else {
            queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
            CurrentThreadScheduler.queue = queue
        }

        let scheduledItem = ScheduledItem(action: action, state: state)
        queue.value.enqueue(scheduledItem)

        return scheduledItem
    }
}
Copy the code

The code allows the current scheduling, which is executed by the current thread by default. There is an internal mechanism to bind queue, store and determine whether to call schedule 6.SchedulerType protocol and extended implementation

public protocol SchedulerType: ImmediateSchedulerType {

    /// - returns: Current time.
    var now : RxTime {
        get
    }
    /** Schedules an action to be executed. - parameter state: State passed to the action to be executed. - parameter dueTime: Relative time after which to execute the action. - parameter action: Action to be executed. - returns: The disposable object used to cancel the scheduled action (best effort). */
    func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) - >Disposable
 
    /** Schedules a periodic piece of work. - parameter state: State passed to the action to be executed. - parameter startAfter: Period after which initial work should be run. - parameter period: Period for running the work periodically. - parameter action: Action to be executed. - returns: The disposable object used to cancel the scheduled action (best effort). */
    func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) - >Disposable
}
extension SchedulerType {

    /** Periodic task will be emulated using recursive scheduling. - parameter state: Initial state passed to the action upon the first iteration. - parameter startAfter: Period after which initial work should be run. - parameter period: Period for running the work periodically. - returns: The disposable object used to cancel the scheduled recurring action (best effort). */
    public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) - >Disposable {
        let schedule = SchedulePeriodicRecursive(scheduler: self, startAfter: startAfter, period: period, action: action, state: state)
            
        return schedule.start()
    }

    func scheduleRecursive<State>(_ state: State, dueTime: RxTimeInterval, action: @escaping (State, AnyRecursiveScheduler<State>) -> Void) - >Disposable {
        let scheduler = AnyRecursiveScheduler(scheduler: self, action: action)
         
        scheduler.schedule(state, dueTime: dueTime)
            
        return Disposables.create(with: scheduler.dispose)
    }
}
Copy the code

ImmediateSchedulerType Dynamic scheduling protocol implementation

 extension ImmediateSchedulerType {
    /** Schedules an action to be executed recursively. - parameter state: State passed to the action to be executed. - parameter action: Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in recursive invocation state. - returns: The disposable object used to cancel the scheduled action (best effort). */
    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) - >Void) - >Disposable {
        let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
        recursiveScheduler.schedule(state)
        
        return Disposables.create(with: recursiveScheduler.dispose)
    }
}
Copy the code

Code analysis in 6 is by constructing a SchedulePeriodicRecursive or AnyRecursiveScheduler; In 7 RecursiveImmediateScheduler SchedulePeriodicRecursive and RecursiveImmediateScheduler is in a swift file, Internal main implementation used to implement a schedule method, specific here will not repeat, also did not particularly carefully look at AnyRecursiveScheduler is similar

GCD implements an asynchronous processing and returns to the main thread

DispatchQueue.init(label: "com.baozi.testGCD",qos: .default,attributes:.concurrent).async {
    var num = 0
    for i in 0.250{
        num += I
    }
    DispatchQueue.main.sync {
        print("total Baozi count :\(num)  \(Thread.current)")}}Copy the code

RxSwiftImplement asynchronous return to main thread 🌰

Observable<Any>.create { (observer) -> Disposable in
        var num = 0
        for i in 0.250{
            num += I
        }
        observer.onNext(num)
        return Disposables.create()
    }
    .subscribeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "com.baozi.rxswift"))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { (val) in
        print(val)
    }).disposed(by: disposeBag)

Copy the code

The logic of the scheduler

Start by writing a little 🌰

let _ = Observable.of(1.2.3.4.5).subscribeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "com.baozi.rxswift"))
    .subscribe(onNext: { (val) in
        print(val)
    }).disposed(by: disposeBag)
    
    let _ = Observable.of(1.2.3.4.5)
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { (val) in
        print(val)
    }).disposed(by: disposeBag)
Copy the code

How to implement multi-threaded scheduling, you can clearly see that it is mainly based on subscribeOn and observeOn to specify the related scheduling, in fact, is to develop the related threads

Now, they’re pretty much the same thing, so look at subscribeOn

public func subscribeOn(_ scheduler: ImmediateSchedulerType)
    -> Observable<Element> {
    return SubscribeOn(source: self, scheduler: scheduler)
}
Copy the code

The SubscribeOn implementation is as follows

final private class SubscribeOn<Ob: ObservableType> :Producer<Ob.Element> {
    let source: Ob
    let scheduler: ImmediateSchedulerType
    
    init(source: Ob, scheduler: ImmediateSchedulerType) {
        self.source = source
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element= =Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
Copy the code

It looks very similar to the core logic, as well as the corresponding Sink. There is indeed a corresponding Sink implementation in the source code, which should be the same logic. This piece should be the basis of the core logic, or to lay a solid foundation