Using the operator

Introduction to the

The official description of the using operator:

Create a cleanable resource that has the same lifetime as an Observable. When you create an Observable with the using operator, you create a cleanable resource. Once the Observable terminates, the resource is cleared.

Beeth0ven. Making. IO/RxSwift – Chi…

The using method is static and has two implementations:

  1. Implemented in the ObservableType protocol extension

Observable, Relay, ControlProperty, etc. The method creates a private Using type that is called by common sources.

extension ObservableType {
    /** Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime. - seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html) - parameter resourceFactory: Factory function to obtain a resource object. - parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource. - returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object. */
    public static func using<Resource: Disposable> (_ resourceFactory: @escaping(a)throws -> Resource.observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
        return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
    }
}
Copy the code
  1. Implementation in PrimitiveSequence structure extension

For sequential source calls (Single, Maybe, etc.), the method implementation still calls the implementation in the ObservableType extension

extension PrimitiveSequence {
    public static func using<Resource: Disposable> (_ resourceFactory: @escaping(a)throws -> Resource.primitiveSequenceFactory: @escaping (Resource) throws -> PrimitiveSequence<Trait.Element>)
        -> PrimitiveSequence<Trait.Element> {
            return PrimitiveSequence(raw: Observable.using(resourceFactory, observableFactory: { (resource: Resource) throws -> Observable<Element> in
                return try primitiveSequenceFactory(resource).asObservable()
            }))
    }
}
Copy the code

Methods effect

The method takes two arguments:

  1. ResourceFactory closure

This closure has no entry and returns a Disposable. This is the Disposable resource that can be cleaned up. When the Observable subscription returned by the second parameter closure is released, it calls its Dispose method at the same time.

  1. ObservableFactory closure

The closure’s entry is the Disposable returned by the resourceFactory closure. The return value of the closure is the Observable source, which is also returned by the Using method and is used to subscribe to the caller

Since you create an object that can be disposed in the resourceFactory closure, and this object is passed as an input argument to the observableFactory closure to process and ultimately return a source for the caller to subscribe to, there are many ways to use it

Simple example

We first create the simplest demo, using to create a signal source, the signal source is simply to send a few numbers, the resources that can be cleared are only at the time of dispose, log

  1. Disposable protocol represents a resource that can be released, there is only one Dispose method, there is no default implementation for external use in RxSwift, so we need to define a TestDisposable by ourselves:
class TestDisposable: NSObject.Disposable {
    func dispose(a) {
        // Simply print it
        print(String.init(format: "dp: %p dispose".self))}deinit {
        print("Dp release")}}Copy the code
  1. We then use using to create a source that holds TestDisposable and releases TestDisposable when the subscription is cancelled:
_ = Observable<Int>.using({
    () -> TestDisposable in
    let dp = TestDisposable(a)print(String.init(format: "Create source: %p", dp))
    return dp
}, observableFactory: {
    dp in
    // Do not process the dp, just print the same dp information
    print(String.init(format: "Create factory, dp: %p", dp))
    // Returns a direct, simple output digit source
    return Observable.from([1.2.3.4.5]).debug("factory", trimOutput: false)
}).debug("using", trimOutput: false).subscribe()
Copy the code

In the Using method’s resourceFactory closure, TestDisposable is created and the address is printed, and in the observableFactory closure, the address of TestDisposable passed in is also printed. Dispose sends onComplete after sending 1,2,3,4,5, and dispose of the subscriber. Here I use debug to debug the Observable I created in the observableFactory closure and to debug the signal returned by using, and then run ~

Running results:

As you can see, the source was created first and then the Factory was created. After the factory subscription was cancelled, the Source was disposed and then it was released.

Use the advanced

In the simple use above, we use the resourceFactory to create a very simple unsubscribe object, and we do nothing about it in the observableFactory closure. So if we return a complex unsubscribe object that even carries a signal source, The observableFactory closure can then be used to process the source and return the new source. This enables some more advanced operations. The using example is shown in the RxExample: ActivityIndicator

ActivityIndicator

Introduction to the

This is a signal indicator, the signal value is Bool, you can bind a Token to each signal source that needs to be monitored. When the signal source subscription is completed, the Token will be unsubscribed. ActivityIndicator holds a relay to mark the number of tokens held. When the number of tokens is 0, ActivityIndicator emits false to indicate that no tokens are being sent, and true to indicate that at least one Token is being sent, which can be used as a download indicator

use

Create a delayed Single source to simulate a network request, and then simulate sending four requests of varying lengths. Use DEBUG to print the completion time of these requests, and use DEBUG to print the status of the ActivityIndicator

// The closure used to create the mock request takes the number of mock request seconds
let testRequestBlock: (Int) - >Single<String> = {
    seconds in
    return .create(subscribe: {
        singler in
        print("The simulation begins to request\(seconds)s")
        // Delay sending mock request completion
        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(seconds), execute: {
            singler(.success("Simulation\(seconds)Request completed"))})return Disposables.create()
    })
}

// ActivityIndicator, used to indicate whether all requests have been completed
let indicator = ActivityIndicator(a)// Simulate four requests
testRequestBlock(1).trackActivity(indicator).debug("req1", trimOutput: false).subscribe()
testRequestBlock(7).trackActivity(indicator).debug("req7", trimOutput: false).subscribe()
testRequestBlock(3).trackActivity(indicator).debug("req3", trimOutput: false).subscribe()
testRequestBlock(5).trackActivity(indicator).debug("req5", trimOutput: false).subscribe()

// Subscribe to ActivityIndicator to print status
indicator.asObservable().subscribe(onNext: {
    isRequesting in
    print(isRequesting ? "Requesting" : "Request terminated")})Copy the code

Execution Result:

As you can see, the ActivityIndicator signals true when the request is started and false when all four requests are complete

Used with signal flow

Under normal circumstances, the business logic will be: click the button, N multi-signal transformation processing, send the request. When flatMap or concatMap is used to change an old source to a new one, trackActivity() calls the method carefully. If it is used incorrectly, it will detect an error signal and cause an abnormal state.

Simple point signal flow: Click the button and the flatMap transforms into a network request and listens for the status of the request.

let req1 = testRequestBlock(1).debug("req1", trimOutput: false)

// Write it correctly
btn1.rx.tap.flatMap({
    _ -> Observable<String> in
    print("Button click")
    return req1.trackActivity(indicator)
}).debug("btn1", trimOutput: false).subscribe()

// Error
btn1.rx.tap.flatMap({
    _ -> Observable<String> in
    print("Button click")
    return req1.asObservable()
})
.trackActivity(indicator)
.debug("btn1", trimOutput: false).subscribe()
Copy the code

Correct operation result:

The subscribed button event is ControlEvent. The subscribed button event is ControlEvent. The flatMap function is to replace the semaphore with a new one for the subscriber to process after the button click event is generated, so the trackActivity() method needs to be written after REQ1.

Error run result:

By comparing the correct result above, it can be found that before clicking the button, the indicator status has already changed to request. This is because the error is written, the indicator monitors the status of the button time signal, so after the button SUBSCRIBE, it becomes true, because the signal source subscription of the button will never be released. So you don’t print the end of the request

PS: Since drivers, ControlEvent and Relay do not send complete and error events, their subscribers will never release subscriptions automatically. Dispose must be triggered by external conditions, otherwise it will cause memory leakage. So RxSwift has a DisposeBag object that holds the Disposable returned by the subscription, and then the bag is held by some object (usually VC, or VM) when the bag is released, Dispose of all disposables is done once to avoid memory leak. Another way to avoid memory leak is to use take(count) or take(until) explicitly. For this signal source, I only take a few signals I want, and the subscription will be automatically released when the target is reached. However, this approach is not safe. It is recommended to use bags or own Disposable objects to manage subscription releases.

The principle of

The principle of ActivityIndicator is to use the using method to create an ActivityToken for the signal source that needs to monitor the status. When creating token, count +1, token. Dispose, count -1, send false when count equals 0. Emit true if greater than 0, using distinctUntilChanged to filter out duplicate signals.

Dispose method is called when the source unsubscribes. Dispose method is called when the source unsubscribes
private struct ActivityToken<E> : ObservableConvertibleType.Disposable {
    private let _source: Observable<E>
    private let _dispose: Cancelable

    init(source: Observable<E>, disposeAction: @escaping() - >Void) {
        _source = source
        _dispose = Disposables.create(with: disposeAction)
    }

    func dispose(a) {
        _dispose.dispose()
    }

    func asObservable(a) -> Observable<E> {
        return _source
    }
}

// The type is a shared sequence source
public class ActivityIndicator : SharedSequenceConvertibleType {
    // Signal element type is Bool
    public typealias Element = Bool
    // Signal sequence policy is Driver(never fail, always subscribe on the main thread, each new subscription, will send the last signal)
    public typealias SharingStrategy = DriverSharingStrategy

    / / by recursion
    private let _lock = NSRecursiveLock(a)// Record the number of uncompleted signal sources monitored
    private let _relay = BehaviorRelay(value: 0)
    / / used to implement SharedSequenceConvertibleType use
    private let _loading: SharedSequence<SharingStrategy.Bool>

    public init(a) {
        // create _loading:relay becomes Driver, semaphore becomes Bool, and duplicate values are filtered
        _loading = _relay.asDriver()
            .map { $0 > 0 }
            .distinctUntilChanged()
    }

    // Monitor signal source
    fileprivate func trackActivityOfObservable<Source: ObservableConvertibleType> (_ source: Source) -> Observable<Source.Element> {
        // Create a new source with using
        return Observable.using({ () -> ActivityToken<Source.Element> in
            // Start with +1
            self.increment()
            // Create the token and return the token to the observableFactory
            return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
        }) { t in
            // Return source from token
            return t.asObservable()
        }
    }

    // Count +1 and let relay send signal
    private func increment(a) {
        _lock.lock()
        _relay.accept(_relay.value + 1)
        _lock.unlock()
    }

    // Count -1 and let relay send signal
    private func decrement(a) {
        _lock.lock()
        _relay.accept(_relay.value - 1)
        _lock.unlock()
    }

    / / implementation SharedSequenceConvertibleType agreement
    public func asSharedSequence(a) -> SharedSequence<SharingStrategy.Element> {
        return _loading
    }
}

extension ObservableConvertibleType {
    // Add a monitoring method to the source type extension
    public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<Element> {
        // Use indicator to monitor self,
        return activityIndicator.trackActivityOfObservable(self)}}Copy the code

The life cycle

  1. ActivityIndicator initializes. _relay is 0 and status is false
  2. Some source A calls the trackActivity method, ready to monitor
  3. ActivityIndicator call trackActivityOfObservable method, using the method, to _relay + 1 first, and then create ActivityToken, ActivityToken created, hold A, Create the Disposables object using the ActivityIndicator -1 method as the closure parameter.
  4. After the ActivityToken is created, the observableFactory method is called using, and the resource held by the token (source A) is returned as the result. Therefore, after the trackActivity method is called, the semaphore of the source returned by source A is the same as that of source A. It’s just layers of encapsulation
  5. The number of _relay of ActivityIndicator is 1 and the status is true
  6. A Done, subscription release
  7. ActivityToken triggers dispose method, calls ActivityIndicator -1 method, and ActivityToken is released
  8. The number of _relay of ActivityIndicator is 0 and the status is false

conclusion

The core of the using method is to create an object that can be disposed, bind it to the source, and dispose it together when the source is subscribed.

ActivityIndicator cleverly uses the signal source A that needs to be detected to create token, encapsulates A with token, and then extracts A from token as the return of using method. The internal semaphore of the signal source does not change before and after invoking method, but encapsulates the whole signal. However, the whole framework of RxSwift uses block to encapsulate the signal source, so each call operator (filter, map, etc.) will encapsulate the current signal source and return a new signal source object. The whole object is new, but the internal semaphore uses block to filter processing.

It’s like a water pipe: Each operator has a cameo appearance with the faucet, some of which have a filter, some have color changes, and some even drink water and release a new cameo. (ㅂ ‘Jun’ has a cameo appearance.) And after all these transformations, we finally get what we want.

The eventual subscriber doesn’t care what type of signal the original source is or how the transition is made, just that he or she receives the right type of data. When adjusting the intermediate logic, you can easily change the logic as long as you remove the existing pipe and replace it with a new one. That’s the beauty of functional chain programming.