Many people who have used RxSwift are curious about the dark magic in this framework. This time we will try to demystify RxSwift and implement its core functionality with a small amount of code:

  • Observable – Listens on a sequence
  • Observer
  • Disposable – Scavengable resource
  • Filter operator – Filtering
  • Map operator – conversion
  • Scan operator – Scan

The following code can run on top of the blank Swift Playground without relying on any framework.

Let’s get down to business:

Observable & Observer & Disposable

enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
    
typealias Observer<Element> = (Event<Element- > >)Void
typealias Disposable= () - >Void
typealias Observable<Element> = (@escaping Observer<Element- > >)Disposable
Copy the code

Full end 🎉🎉🎉!

It’s not that simple. There are some details to work out.

Observable, Observer, and Disposable are all different types of functions.

  • Event<Element>– The one we knowEvent<Element>
  • typealias Observer<Element> = (Event<Element>) -> Void– Watcher is one for consumptionEventThe function of
  • typealias Disposable = () -> Void– Cleanable resource is a function with no input parameters and no return value. It is used to clear resources
  • typealias Observable<Element> = (@escaping Observer<Element>) -> Disposable– The listening sequence is a toObserverReturns for the input parameterDisposableA function of phi, which is sometimes called phisubscribefunction

Let’s create an Observable:

let observable: Observable<Int> = { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose")}}Copy the code

This is similar to how we normally use observable. create.

Then subscribe to it:

let observer: Observer<Int> = { event in
    switch event {
    case .next(let element):
        print("next: \(element)")
    case .error(let error):
        print("error: \(error)")
    case .completed:
        print("completed")}}let disposable = observable(observer) // subscribe
Copy the code

Results:

next: 0
next: 1
next: 2
next: 3
next: 4
completed
next: 5
Copy the code

Wow, it worked.

Wait, there is something wrong with printing next: 5 after the completed event. The sequence should end after a termination event such as COMPLETED is generated. I’m not going to create any new elements.

Yes, let’s fix this:

func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> {
    
    return { observer in
        // states {
        var isDisposed = false
        var disposable: Disposable?
        // }
        disposable = subscribe { event in  // subscribe
            if isDisposed { return }
            switch event {
            case .next:
                observer(event)
            case .error, .completed:
                isDisposed = trueobserver(event) disposable? ()}}if isDisposed {
            disposable?()
        }
        return {
            if isDisposed { return }
            isDisposed = truedisposable? ()}}}Copy the code

We introduce a global function called createObservable, which is implemented to some extent to restore Observable.create.

This function takes an Observable and returns an Observable that is based on the source Observable and adds some logic, such as:

  • With a stateisDisposedThe source sequence knows whether it has terminated. If it has terminated, no new events will be pushed to the observer
  • Perform cleanup logic if necessary:disposable? (a)

Try creating an Observable with a new method:

let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose")}}let observer1: Observer<Int> = { event in
    switch event {
    case .next(let element):
        print("origin next: \(element)")
    case .error(let error):
        print("origin error: \(error)")
    case .completed:
        print("origin completed")}}let disposable1 = observable1(observer1) // subscribe
Copy the code

Results:

origin next: 0
origin next: 1
origin next: 2
origin next: 3
origin next: 4
origin completed
dispose
Copy the code

Well, this time it turned out very well!

Let’s try adding some operators.

The filter operatorFilter –

Let’s start with a common filter operator that filters elements by criteria:

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element- > >)Observable<Element> {... }Copy the code

It is also a global function:

  • The entry parameter is the decision condition –predicate: @escaping (Element) -> Bool
  • The return value is a ** “weird thing” ** –(@escaping Observable<Element>) -> Observable<Element>

Oh, my God. What is this? To explain this “weird thing”, let’s go back to the createObservable function.

func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> {... }Copy the code

If we remove the function name and parameter name of createObservable, we find that it is (@escaping Observable

) -> Observable

.

CreateObservable generates a new Observable based on the source Observable and adds some custom logic.

The filter operator generates a new Observable and adds filtering logic to the source Observable. So it is also (@escaping Observable

) -> Observable

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element- > >)Observable<Element> {
    
    return { source -> Observable<Element> in. }}Copy the code

We slowly unfold the code, as it takes a process to digest:……

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element- > >)Observable<Element> {

    return { source -> Observable<Element> in

        return createObservable { observer in. }}}Copy the code

.

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element- > >)Observable<Element> {

    return { source -> Observable<Element> in

        return createObservable { observer in

            return source { event in. }}}}Copy the code

.

func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element- > >)Observable<Element> {

    return { source -> Observable<Element> in

        return createObservable { observer in

            return source { event in
                switch event {
                case .next(let element):
                    let shouldEmitElement = predicate(element)
                    if shouldEmitElement {
                        observer(.next(element))
                    }
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}
Copy the code

This is the filter operator.

Run and see:

let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose")}}...let filteredObservable1 = filter({$0 > 1 })(observable1)

let filteredDisposable1 = filteredObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("filter next: \(element)")
    case .error(let error):
        print("filter error: \(error)")
    case .completed:
        print("filter completed")}})Copy the code

Results:

filter next: 2
filter next: 3
filter next: 4
filter completed
dispose
Copy the code

Well, the result is nice, but filter({$0 > 1})(observable1) is awkward to write. Here, filter({$0 > 1}) returns (@escaping Observable

) -> Observable

, which adds filtering logic to the incoming Observable1.

And we wanted it to look like this:

// The following code is just an assumption
 observable1
  .filter({$0 > 1 })
Copy the code

To fix this, we can introduce the Swift custom operator (SWIFT loves you!). :

infix operator| > :AdditionPrecedence

func |><A, B>(
    left: A,
    right: (A) -> B) - >B {
    return right(left)}Copy the code

In this way:

let filteredObservable1 = filter({$0 > 1 })(observable1)
Copy the code

Can be rewritten as:

let filteredObservable2 = observable1
    |> filter({$0 > 1 })
Copy the code

There you go. I finally got it right.

Next, let’s introduce another commonly used operator, map.

The map operators- conversion

Map transforms the elements once:

func map<Element, Result>(
    _ transform: @escaping (Element) -> Result
    ) -> (@escaping Observable<Element- > >)Observable<Result> {... }Copy the code

Is still a global function:

  • The input parameter is the conversion function –transform: @escaping (Element) -> Result
  • The return value is oneFamiliar functions(@escaping Observable<Element>) -> Observable<Result>

The input parameter is a function, and the return value is a function. Oh, my God! Somebody give me a hand!

func map<Element, Result>(
    _ transform: @escaping (Element) -> Result
    ) -> (@escaping Observable<Element- > >)Observable<Result> {
    
    return { source -> Observable<Result> in

        return createObservable { observer in

            return source { event in
                switch event {
                case .next(let element):
                    let result = transform(element)
                    observer(.next(result))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}
Copy the code

It creates a new Observable and adds transformation logic.

Run a run:

let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose")}}...let mappedObservable1 = observable1
    |> filter{$0 > 1} | >map { "\ [$0) mapped" }

let mappedDisposable1 = mappedObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("map next: \(element)")
    case .error(let error):
        print("map error: \(error)")
    case .completed:
        print("map completed")}})Copy the code

Results:

map next: 2 mapped
map next: 3 mapped
map next: 4 mapped
map completed
dispose
Copy the code

That’s great! That’s what we thought.

Finally, we implement the scan operator:

Scan operator- scan

Scan “adds up” elements and sends them out:

func scan<Element, Result>(
    _ seed: Result,
    accumulator: @escaping (Result, Element) -> Result
) -> (@escaping Observable<Element- > >)Observable<Result> {
    return { source -> Observable<Result> in
        return createObservable { observer in
            // states {
            var total = seed
            // }
            return source { event in
                switch event {
                case .next(let element):
                    total = accumulator(total, element)
                    observer(.next(total))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}
Copy the code

.

let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose")}}...let scanedObservable1 = observable1
    |> filter{$0 > 1 }
    |> scan(0) {$0 + $1 }

let scannedDisposable2 = scanedObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("scan next: \(element)")
    case .error(let error):
        print("scan error: \(error)")
    case .completed:
        print("scan completed")}})Copy the code

Results:

scan next: 2
scan next: 5
scan next: 9
scan completed
dispose
Copy the code

Each of these elements is: 2 = 0 + 2, 5 = 0 + 2 + 3, 9 = 0 + 2 + 3 + 4

And you’re done!

If you think the article is good, please like it and share. Or leave a comment below.

Source:

Also, if you’re interested in the code, you can manually type it into the Playground. You can also try adding some other operators.

enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
    
typealias Observer<Element> = (Event<Element- > >)Void
typealias Disposable= () - >Void
typealias Observable<Element> = (@escaping Observer<Element- > >)Disposable


let observable: Observable<Int> = { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose")}}let observer: Observer<Int> = { event in
    switch event {
    case .next(let element):
        print("next: \(element)")
    case .error(let error):
        print("error: \(error)")
    case .completed:
        print("completed")}}let disposable = observable(observer) // subscribe


func createObservable<Element>(_ subscribe: @escaping Observable<Element>) -> Observable<Element> {
    
    return { observer in
        // states {
        var isDisposed = false
        var disposable: Disposable?
        // }
        disposable = subscribe { event in  // subscribe
            if isDisposed { return }
            switch event {
            case .next:
                observer(event)
            case .error, .completed:
                isDisposed = trueobserver(event) disposable? ()}}if isDisposed {
            disposable?()
        }
        return {
            if isDisposed { return }
            isDisposed = truedisposable? ()}}}let observable1: Observable<Int> = createObservable { observer in
    observer(Event.next(0))
    observer(Event.next(1))
    observer(Event.next(2))
    observer(Event.next(3))
    observer(Event.next(4))
    observer(Event.completed)
    observer(Event.next(5))
    return { print("dispose")}}let observer1: Observer<Int> = { event in
    switch event {
    case .next(let element):
        print("origin next: \(element)")
    case .error(let error):
        print("origin error: \(error)")
    case .completed:
        print("origin completed")}}let disposable1 = observable1(observer1) // subscribe


func filter<Element>(
    _ predicate: @escaping (Element) -> Bool
) -> (@escaping Observable<Element- > >)Observable<Element> {

    return { source -> Observable<Element> in

        return createObservable { observer in

            return source { event in
                switch event {
                case .next(let element):
                    let shouldEmitElement = predicate(element)
                    if shouldEmitElement {
                        observer(.next(element))
                    }
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}

let filteredObservable1 = filter({$0 > 1 })(observable1)

let filteredDisposable1 = filteredObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("filter next: \(element)")
    case .error(let error):
        print("filter error: \(error)")
    case .completed:
        print("filter completed")}})infix operator| > :AdditionPrecedence

func |><A, B>(
    left: A,
    right: (A) -> B) - >B {
    return right(left)}let filteredObservable2 = observable1
    |> filter({$0 > 1 })


func map<Element, Result>(
    _ transform: @escaping (Element) -> Result
    ) -> (@escaping Observable<Element- > >)Observable<Result> {
    
    return { source -> Observable<Result> in

        return createObservable { observer in

            return source { event in
                switch event {
                case .next(let element):
                    let result = transform(element)
                    observer(.next(result))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}


let mappedObservable1 = observable1
    |> filter{$0 > 1} | >map { "\ [$0) mapped" }

let mappedDisposable1 = mappedObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("map next: \(element)")
    case .error(let error):
        print("map error: \(error)")
    case .completed:
        print("map completed")}})func scan<Element, Result>(
    _ seed: Result,
    accumulator: @escaping (Result, Element) -> Result
) -> (@escaping Observable<Element- > >)Observable<Result> {
    return { source -> Observable<Result> in
        return createObservable { observer in
            // states {
            var total = seed
            // }
            return source { event in
                switch event {
                case .next(let element):
                    total = accumulator(total, element)
                    observer(.next(total))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    observer(.completed)
                }
            }
        }
    }
}

let scanedObservable1 = observable1
    |> filter{$0 > 1 }
    |> scan(0) {$0 + $1 }

let scannedDisposable2 = scanedObservable1({ event in  // subscribe
    switch event {
    case .next(let element):
        print("scan next: \(element)")
    case .error(let error):
        print("scan error: \(error)")
    case .completed:
        print("scan completed")}})Copy the code