RxSwift introduction and ridicule

Functional Reactive Programming(FRP) RxSwift is the Swift version of the ReactiveX family. If you have used ReactiveCocoa(RAC) before, you will be familiar with the concept of Functional Reactive Programming. Yes, RxSwift is also an FRP frame. It’s worth noting that RAC’s README has these lines:

ReactiveCocoa was originally inspired, and therefore heavily influenced, by Microsoft’s Reactive Extensions (Rx) library.

Although ReactiveCocoa was started as an Objective-C framework, as of version 3.0, all major feature development is concentrated on the Swift API.

RAC is inspired by Microsoft’s Reactive Extensions, so it is also heavily influenced by them. (Of course, there are some differences between RAC and Rx, and amway iOS developers use RAC.) RAC started out as an OC framework, but since version 3.0, all major feature development has been centered on the Swift API. In other words, rain or no rain, RAC will match Swift better.

If you’re an iOS developer using Swift and tired of the ubiquitous OOP and want to open the door to a new world, both frameworks are available. However, since I am interested in the concrete implementation of the framework, AND I prefer Swfit to OC, I choose RxSwift, which is a pure Swift implementation.

An API for asynchronous programming

with observable streams

Swift’s support for functional programming is very good, so I thought that RxSwift’s internal implementation would use delayed tables as signal streams to represent the time history of an object’s sequential state. This way everything is a function, no state changes, and no synchronization mechanism is needed to keep the thread safe. It turns out I’m still pattern! RxSwift still has all kinds of classes and all kinds of inheritance, and of course there are all kinds of synchronization mechanisms: spin locks, recursive locks, atomic operations… What about Functional? Is functional the only API exposed to the user? This was a bit of a disappointment at first, but it turns out that the framework uses a lot of functional features, just not as pure as I expected (a framework in pure functional might not really catch on…). .

Ok, after all, let’s look at the introduction of the official website:

ReactiveX is a combination of the best ideas from

the Observer pattern, the Iterator pattern, and functional programming

This means that Rx is a combination of the best of the observer pattern, the iterator pattern, and functional programming. It’s not implemented in purely functional code as I would like, but the idea of “flow” is true. So far, I’ve only seen a little bit of the code and have a rough idea of what the observer mode part of the implementation looks like, so I’ll share it with you.

Observables and Observer

RxSwift project has rX. playground, which has this quote on the introduction page:

The key to understanding RxSwift is in understanding the notion of Observables. Creating them, manipulating them, and subscribing to them in order to react to changes.

The key to understanding RxSwfit is to understand the concept of being observed, creating them, manipulating them, and then subscribing to them in response to change. Observable is so important. Let’s look at an example using an Observable:


empty creates an empty sequence. The only message it sends is the .Completed message.

The empty function creates an empty sequence that sends only the message.completed. This sequence is shown as follows:

 let emptySequence: Observable = empty()
 let subscription = emptySequence
     .subscribe { event in
     }Copy the code

This code gets an Observable from empty. Now look at Empty:

public func empty() -> Observable {
    return Empty()
}Copy the code

EmptySequence = empty () allows you to get an Observable without emptySequence = empty ().

class Empty : Producer {
    override init() {


    override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
        return NopDisposable.instance
}Copy the code

At first glance, this class is a bit confusing. What does this empty constructor mean? Well, probably to avoid calling the superclass constructor during initialization, just to make sure nothing is done. And then this run function is kind of confusing, this whole set of parameters, and what is this Disposable? It is a protocol that is similar to the IDisposable interface used in C# to explicitly release resources:

/** similar to the IDisposable interface in C#, used to release resources. Since Swift uses ARC, the Dispose method mostly just unreferences a resource, Dispose Dispose () {/** Dispose Dispose ()}Copy the code

Since this article focuses on the observer mode, I would like to put aside the things related to Disposable for the moment, because there are some thread-related operations related to saving and releasing resources, which are quite troublesome, but they have nothing to do with the observer mode. Based on this, I slightly simplified some classes related to “empty” and “just” in RxSwfit, deleted some contents related to “Disposable”, and then added some comments. After being put together, “Empty” and “Just” can still run normally.

Ok, so the simplified Empty class looks like this:

Class Empty: Producer {override func run(observer: O) {// The observer subscribed to an observer.on(.completed)}}Copy the code

Ok, now that we have an instance of Empty, we call its SUBSCRIBE method, which takes (Event) -> Void, which is a closure type. We found a qualified subscribe method in the extension to the ObservableType protocol:

extension ObservableType { func subscribe(on: (event: Void) {// Create an anonymous observer and assign on to the observer's eventHandler, Let observer = AnonymousObserver(on) {e in on(event: e) } self.subscribeSafe(observer) }Copy the code

The SUBSCRIBE method first constructs an anonymous observer after accepting the closure. The Event closure is passed to the observer as a constructor argument. Take a look at AnonymousObserver:

class AnonymousObserver : ObserverBase { typealias Element = ElementType typealias EventHandler = Event -> Void private let eventHandler : EventHandler init(_ eventHandler: EventHandler) {// Trace resources (for development purposes to solve memory leaks) #if TRACE_RESOURCES // atomic operations: OSAtomicIncrement32(&resourceCount) #endif self.eventHandler = eventHandler} // onCore will be called on (on) Override func onCore(event: Event) {return self.eventhandler (Event)} #if TRACE_RESOURCES deinit { OSAtomicDecrement32(&resourceCount)} #endif}Copy the code

Ignoring the memory-tracking code, this class basically takes a closure in the constructor, assigns it to the private eventHandler property, and then in the onCore method, eventHandler is called. The observer’s on method is called in run, not onCore, but ObserverBase.

class ObserverBase: ObserverType { typealias E = ElementType var isStopped: Int32 = 0 init() { } func on(event: Event) { switch event { case .Next: If isStopped == 0 {onCore(event)} // Once an Error or Completed event occurs, onCore will not be executed after that.Error,.completed: / / OSAtomicCompareAndSwap32: comparison and exchange of atomic operations, if isStopped = = 0, then isStoppend = 1, return true, otherwise it returns false if! OSAtomicCompareAndSwap32(0, 1, &isstopped) {return} onCore(event)}} // The func onCore(event: Event) { abstractMethod() } }Copy the code

Okay, so that’s clear. OnCore is going to be called by on. Go back to subscribe and continue down, get the observer instance, and it will be passed as an argument along the way. You call self.subscribesafe (Observer), which is passed to the subscribeSafe method, which is also in the ObserverType extension:

SubscribeSafe (observer: O) {subscribe self.subscribe(observer)}Copy the code

In subscribeSafe, the subscribe method is called at the end, but the argument to subscribe is an implementation class of ObserverType, not a closure, so this is an overloaded method. It is declared in the protocol ObservableType:

protocol ObservableType { /** hack: Since there is no generic protocol in Swift, you can only declare an alias in the protocol, then declare the implementation class as a generic class, and name the generic name passed as E (e.g. Typealias E = Element) where the generic arguments are accepted as follows: func demo(ob: */ TypeAlias E subscribe func (Observer: O)}Copy the code

We find that this method does not appear in Empty, so we can only look up the inheritance tree of Empty. Its implementation can be found in the parent class Producer of Empty:

Override func subscribe(observer) {// Override func subscribe(observer) O) {// There will be some resource release and thread-related operations //... run(observer) } func run(observer: O) { abstractMethod() } }Copy the code

Subscribe calls run, but abstractMethod is called inside this run method. Let’s see what it looks like:

@noreturn func abstractMethod() -> Void {
    fatalError("Abstract method")
}Copy the code

Once called, fatalError is raised, so run must be overridden by subclasses or the program terminates. I guess because Swift does not have the concept of abstract class and abstract method, it cannot force the subclass to rewrite the method by adding abstract before the function, and can only simulate the abstract method in this rewrite or terminate way. In that case, let’s look at the run method in our subclasses:

Override func run(observer: O) {// The observer subscribed to an observer.on(.completed)}} class Just: Producer {let element: element init(element: Element) { self.element = element } override func run(observer: O) { observer.on(.Next(element)) observer.on(.Completed) } }Copy the code

In Empty, the on method of the passed observer is called once, and the run implementation of Empty and Just is called. Completed is taken as the parameter. We know that the on method is actually the same closure that we passed in when we called the subscribe method in the first place, {event in print(event)}, so it will print out. Completed. As for the.completed, which is obviously an enumeration, it is of type Event:

enum Event {
    case Next(Element)
    case Error(ErrorType)
    case Completed
}Copy the code

The initializer of Just takes a value and assigns it to the instance element property. Then, when the run method is called, the on method of the passed observer is called twice, once with a value. Next(Element) as an argument, once with. The parameter Completed indicates the end. Something like this:

// MARK: - Call print("just Observable demo:") let justObservable = just(1) justObservable. Subscribe {event in print(event)} print("----") print("empty observable demo:") let emptyObservable: Observables = empty () emptyObservable. Subscribe {event in print (event)} output: just observables demo: Next(1) Completed ---- empty observable demo: CompletedCopy the code

It’s a little convoluted, right, mainly because there’s so much inheritance, so many methods have to be found in the parent class. My simplified version is here, perhaps I say so much is not as clear as you clone down to take a look.


Because I’ve only seen the beginning of the code, I don’t yet understand the need for so many inheritance hierarchies in RxSwift. It was a heavy frame after all, and I had to read a little bit, or ELSE I would forget the front. If there are any achievements so far, they are as follows:

  • Swift implementation of the observer pattern.
  • Simulate the specifics of a generic protocol with typeAlias.
  • Simulate the concrete practice of an abstract method with fatalError.
  • Use of spin locks, recursive locks, and two atomic operations.