RxSwiftElderly drivers in China

Flow using observable sequences

  • ObservableGenerate an observable sequence
  • If you need to transform the observable sequence into a new observable sequence by higher-order functions
  • subscribeTo subscribe to
  • onNext.onError.onCompleteTo deal with

But problems can arise when dealing with multithreading and UI processing of observable sequences, such as 🌰

  • The mock request interface queries data
func dealwithData(inputText:String)-> Observable<Any> {print("Requested the network\(Thread.current)") // data
        return Observable<Any>.create({ (ob) -> Disposable in
            if inputText == "1234" {
                ob.onError(NSError.init(domain: "com.lgcooci.cn", code: 10086, userInfo: nil))}DispatchQueue.global().async {
                print("Look before you send:\(Thread.current)")
                ob.onNext("Already entered:\(inputText)")
                ob.onCompleted()
            }
            return Disposables.create()
        })
    }

Copy the code
  • Listen through observable sequences
let result = inputTF.rx.text.skip(1).flatMap { [weak self](input) -> Observable<Any> in
     return (self? .dealwithData(inputText: input ??""))!
}
Copy the code
  • Subscribe to the observable sequence
// First subscription
result.subscribe { (even) in
    print(even)
    print(Thread.current)
}.disposed(by: disposeBag)

// Second subscription
result.subscribe { (even) in
    print(even)
    print(Thread.current)
}.disposed(by: disposeBag)

Copy the code
  • After running, enter a 1 as follows:

Since we subscribed twice, our simulated request operation ran twice, and the thread receiving the signal in the child thread we want to get the result that the request operation runs only once. RxSwift provides a mechanism to share(Replay:), which returns a new sequence of events. It listens for events in the underlying sequence and notifies its subscribers. Fixed a problem where the map would be executed multiple times with multiple subscribers


let result = inputTF.rx.text.skip(1).flatMap { [weak self](input) -> Observable<Any> in
    return (self? .dealwithData(inputText: input ??""))!
}.share(replay: 1, scope: .whileConnected)


Copy the code

Run again, enter 1, and the result is as follows

However, the event being listened to is still in the child thread, and the response asynchrony is specified by the thread of execution to listen on

 let result = inputTF.rx.text.skip(1).flatMap { [weak self](input) -> Observable<Any> in
     return (self? .dealwithData(inputText: input ??""))!
            .observeOn(MainScheduler.instance)
}.share(replay: 1, scope: .whileConnected)
Copy the code

Run it again and print the following

In this way, we perfectly deal with the problem of multiple processing and asynchronous response, but it does increase a lot of work, but it should also be dealt with, so is there a simpler and faster way of development and use, then the old driver appeared

DriverIntroduction and principle of

introduce
  • DriverProvides a simple way to program responsiveness at the UI level
  • Use it to satisfy the condition
    • No error events are generated
    • Listen for events on the main thread
    • Shared state changes
Advantages of older drivers
  1. Data-driven UI
  2. Stop responding to error events
  3. Be sure to handle logic such as UI in the main thread
  4. Shared state changes
The principle of
extension ControlProperty {
    public func asDriver(a) -> Driver<Element> {
        return self.asDriver { _ -> Driver<Element> in
            #if DEBUG
                rxFatalError("Somehow driver received error from a source that shouldn't fail.")
            #else
                return Driver.empty()
            #endif
        }
    }
}
Copy the code
public typealias Driver<Element> = SharedSequence<DriverSharingStrategy.Element>
Copy the code

AsDriver () is an extension of the ControlProperty structure. Its return value is the closure of self.asDriver, which is an alias for ShareSequence, called a shared sequence

Implementation of shared sequences


public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}
public enum SharingScheduler {
    /// Default scheduler used in SharedSequence based traits.
    public private(set) static var make: () -> SchedulerType = { MainScheduler()}}public final class MainScheduler : SerialDispatchQueueScheduler {

    private let _mainQueue: DispatchQueue
    /// Initializes new instance of `MainScheduler`.
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
}
Copy the code

Note that the Driver constructor above specifies scheduling on the main thread