Make writing a habit together! This is the fourth day of my participation in the “Gold Digging Day New Plan · April More text Challenge”. Click here for more details. Now that we’ve learned the core logic of RxSwift, how do you create, subscribe, and destroy Observable sequences? This article mainly introduces their methods.

1. Observale created

For Observale, we can understand that everything is in sequence, like a tube with glass beads running through it, and these represent every change that happens and we observe the change and make the corresponding change.

So for the creation of Observale we usually create it by creating it

1.1 create create

// MARK: -create create

    

    func create(){

        

        let ob = Observable<String>.create { (observer) -> Disposable in

            

            observer.onNext("hello")

            

            return Disposables.create()

            

        }

        

        ob.subscribe(onNext: {(str) in

            

            print("str:\(str)")

        }).disposed(by: disposeBag) 

    }
Copy the code

1.2 bindTo binding

We can bind our sequence to another observer

// MARK: -bindto binding



    func bind(){

        

        let timer = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)

        
        timer.map{"Current timer count :\($0/3600):\($0/60):\($0%60)"}

          .bind(to: self.label.rx.text)

            .disposed(by: disposeBag)

        
    }
Copy the code

Of course, we can also bind the processing directly and display it on the label as above

let timer = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)
timer.map{"Current timer count :\($0/3600):\($0/60):\($0%60)"}

            .bind{ [weak self](str) inself? .label.text = str }.disposed(by: disposeBag)Copy the code

1.3 AnyObserver create

AnyObserver means any kind of observer, and we can define its closure callback eventHandler

func anyObserver() {

        

        let observer : AnyObserver<String> = AnyObserver { (event) in

            

            switch event{

            case .next(let str):

                print(str)

            case .error(let error):

                print(error)

            case .completed:

                print("Complete")}}let observable = Observable.of("1"."2"."3")

        

        observable.subscribe(observer)

            .disposed(by: disposeBag)

        

        

    }
Copy the code

Our sequence can be subscribed directly.

  • Binder

AnyObserver is a uniform general observer, and Binder has two characteristics:

  • Error events are not handled
  • Make sure the binding is in the specifiedSchedulerExecute, default isMainScheduler

Events in the UI are usually updated in the main thread, so we can bind with a binder type observer

// MARK: - Binder

    

    func binder() {

        

        let observable = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.asyncInstance)

        

        let binder:Binder<String> = Binder(label) {

            (lab,text) in

            lab.text = text

        }

        observable.map{"Item :\($0)"}

            .subscribe(binder)

            .disposed(by: disposeBag)

        

    }
Copy the code

Of course we can also use bindto

observable.map{"Item :\($0)"}

// .subscribe(binder)

            .bind(to: binder)

            .disposed(by: disposeBag)
Copy the code

1.4 Factory Method

In RxSwift, the author provides us with some quick factory methods. Let’s take a look at them below:

1.4.1 the empty

Empty sequence: Used to create an empty sequence that accepts only complete events

func empty() {

        

        let emptyObservable = Observable<Int>.empty()

        emptyObservable.subscribe { (num) in

            print(num)

        } onError: { (error) in

            print("error")

        } onCompleted: {

            print("complete")

        } onDisposed: {

            print("dispose")

        }.disposed(by: disposeBag)
     

    }
Copy the code

Print complete, dispose. It means it was done without sending a signal.

The direct observer.on(.completed) was completed when we subscribe

1.4.2 just

Just: The build has only one default value to initialize, the Observable queue has only one element, and sends complete when the subscription is complete

func just() {

        

        let arr = [1.2.3]

        Observable<[Int]>.just(arr)// Single element

            .subscribe { (event) in

                print(event)

            }.disposed(by: disposeBag)

        

        Observable<[Int]>.just(arr).subscribe { arr in

            print(arr)

        } onError: { error in

            print(error)

        } onCompleted: {

            print("complete")

        } onDisposed: {

            print("dispose")

        }.disposed(by: disposeBag)
        
    }
Copy the code

Send the initialization element and completion event directly when you subscribe.

1.4.3 of

Of: Whereas just is a sequence of a single element, of can represent a sequence of multiple elements

func of() {

        

        // Multiple elements

        let observable = Observable<Int>.of(1.2.3.4)

        observable.subscribe { (num) in

            print(num)

        } onError: { error in

            print(error)

        } onCompleted: {

            print("complete")

        } onDisposed: {

            print("dispose")

        }.disposed(by: disposeBag)

        / / a dictionary

        Observable<[String:Any]>.of(["name":"fish"."age":18], ["name":"fishMan"."age":18]).subscribe { (dic) in

            print(dic["name"]!)

        } onError: { error in

            print(error)

        } onCompleted: {

            print("complete")

        } onDisposed: {

            print("dispose")

        }.disposed(by: disposeBag)

        

        / / array

        

        Observable<[Int]>.of([1.2.3]).subscribe { (arr) in

            print(arr[0])

        } onError: { error in

            print(error)

        } onCompleted: {

            print("complete")

        } onDisposed: {

            print("dispose")

        }.disposed(by: disposeBag)

    }
Copy the code

Element can be an array or a dictionary as an element of a sequence

The subscription process also calls run by sink and sends signals through iterator traversal.

1.4.4 the from

Form: gets sequence from collection: Array, collection,set gets sequence

  • There are options for processing
  • A more secure
func from() {

        

        Observable<[String:Any]>.from(optional: ["name":"fish"."age":18])

            .subscribe { (dic) in

                print(dic["age"] as Any)

            }  onError: { error in

                print(error)

            } onCompleted: {

                print("complete")

            } onDisposed: {

                print("dispose")

            }.disposed(by: disposeBag)

    }

Copy the code

To determine if the element exists, send observer.on(.next(element)) or else automatically observer.on(.completed) completes the sequence

1.4.5 deferred

Deferred: Delays the initialization of the Observable sequence, returning an Observable sequence that calls the specified factory function when a new observer subscribes.

func deferred() {

    

        var isEven = true

        

        Observable<Int>.deferred { () -> Observable<Int> inisEven = ! isEvenif isEven {

        

                return Observable<Int>.of(2.4.6.8.10)}else{

                

                return Observable<Int>.of(1.3.5.7.9)

            }

        }.subscribe(onNext: {print($0)})

            .disposed(by: disposeBag)
        

    }
Copy the code

We can control the initialization we want. For example, our login and registration pages are similar, but the requested interfaces are different. We can make a relative judgment outside and select the corresponding registration interface or login interface sequence to initialize the subscription, thus reducing our logical judgment.

The sequence is created using the observableFactory method when subscribing, and an error event is sent if none is present.

1.4.6 reaching

Range: Generates a sequence of observable integers in a specified range

_ =  Observable.range(start: 3.count: 6)

            .subscribe(onNext: {print($0)})

            .disposed(by: disposeBag)

Copy the code

Definition:

When we run, we continue to send incrementing Int values that are less than count.

1.4.7 generate

This method creates an Observable that gives an action only if all of the provided criteria are true. Given an initial value and then condition 1 and then condition 2 will keep recursing until condition 1 or condition 2 doesn’t satisfy an array-like loop

Observable.generate(initialState: 0.condition: {$0<10},

                            iterate: {$0+3})

        .subscribe { (num) in

            print(num)

        } onError: { error in

            print(error)

        } onCompleted: {

            print("complete")

        } onDisposed: {

            print("dispose")

        }.disposed(by: disposeBag)
Copy the code

The results

An array of operating

Realize the principle of

When condition is satisfied, it sends a sequence signal, iterate through the iterator iterate, and check if condition is satisfied next time. If condition is not satisfied, it sends a complete signal.

1.4.8 timer

For timers we can return a specified scheduling thread

print("start")

        /* The first argument dueTime indicates how long it will take to start the timer. The second argument period indicates how long the timer interval is. The third argument indicates the thread on which the timer sequence is located

        Observable<Int>.timer( DispatchTimeInterval.seconds(2), period: DispatchTimeInterval.seconds(1), scheduler: MainScheduler.asyncInstance)

            .subscribe { (time) in

                print(time)

            } onError: { (error) in

                print(error)

            } onCompleted: {

                print("completed")

            } onDisposed: {

                print("disposed")

            }.disposed(by: disposeBag)
Copy the code

If no period interval is specified, it is sent once by default

Similar to the factory method, it will determine whether there is a time interval, if not, it is a one-time

For one-time direct send the default 0 digit end sequence

For time interval, the defined interval is continuously accumulated to send the signal

1.4.9 interval

An interval, like a timer, returns an observable sequence that generates a value after each cycle, runs the timer and sends observer messages using the specified scheduler.

Observable<Int>.interval( DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)

            .subscribe { (time) in

                print(time)

            } onError: { (error) in

                print(error)

            } onCompleted: {

                print("completed")

            } onDisposed: {

                print("disposed")

            }.disposed(by: disposeBag)

Copy the code

Interval uses the timer method, but the element of the sequence must be of type Integer.

1.4.10 repeatElement

The sequence generated by repeatElement is sent to the observer indefinitely

Observable<Int>.repeatElement(2)

            .subscribe { (time) in

                print(time)

            } onError: { (error) in

                print(error)

            } onCompleted: {

                print("completed")

            } onDisposed: {

                print("disposed")

            }.disposed(by: disposeBag)
Copy the code

You can see that we keep recursively sending element elements that we define

1.4.11 error

Generates an observable sequence that sends only error events

Observable<Any>.error(NSError.init(domain: "networkError".code: 400.userInfo: ["message":"Network error"]))

            .subscribe { (element) in

            print(element)

        } onError: { (error) in

            print(error)

        }  onCompleted: {

            print("completed")

        } onDisposed: {

            print("disposed")

        }.disposed(by: disposeBag)

Copy the code

Only error events are sent

1.4.11 never

This method creates an Observable sequence that never emits an Event (and never terminates).

Observable<Int>.never()

            .subscribe { (num) in

                print(num)

            } onError: { (error) in

                print(error)

            } onCompleted: {

                print("completed")

            } onDisposed: {

                print("disposed")

            }.disposed(by: disposeBag)


Copy the code

Nothing will happen

2. To subscribe to

We usually subscribe in two ways

  • Subscribe to set events
let observable = Observable<Int>.of(1.2.3.4)

        observable.subscribe { (num) in

            print(num)

        } onError: { error in

            print(error)

        } onCompleted: {

            print("complete")
` `
        } onDisposed: {

            print("dispose")

        }.disposed(by: disposeBag)
Copy the code

Of course we can also specify the event callbacks that we care about

// Subscribe to the specified event

        observable.subscribe(onNext:{(num) in

            print(num)

        }).disposed(by: disposeBag)
Copy the code

Or subscribe to all events

// Subscribe to all events

        observable.subscribe { (event) in

            print(event)

        }.disposed(by: disposeBag
Copy the code

3. The destruction of

Usually we use garbage bag DisposeBag() to manage sequences like our timer, redefining dispaosBag

Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)

            .subscribe { [weak self] (num) in

                

                print(num)

                if (num == 4){ self! .disposeBag = DisposeBag()// Dump the previous garbage bag

                }

            } onError: { (error) in

                print(error)

            } onCompleted: {

                print("completed")

            } onDisposed: {

                print("disposed")

            }.disposed(by: disposeBag)

Copy the code

Execution Result:

Our sequence is destroyed with the disposebag we defined earlier.

  • Active send destruction
let ob =  Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)

    

        let subscirber = ob.subscribe {  (num) in

            

            print(num)

            if (num == 4){

                

            }

        } onError: { (error) in

            print(error)

        } onCompleted: {

            print("completed")

        } onDisposed: {

            print("disposed")

        }

        

        subscirber.dispose()
Copy the code

4. To summarize

For sequence creation and subscription, destroy. We pay more attention to the way of sequence creation, through the factory method can quickly and easily create our appropriate sequence, and there is usually no special requirement for subscription, according to their own choice whether to subscribe to the complete event. We can do this by actively invoking or replacing garbage bags.