Github.com/agelessman/…

The custom Operator is the most difficult part of the Combine tutorial because it connects Publisher and Subscriber and acts as a bridge between them.

So what’s the difficulty? I hope readers will read this article carefully with the following three questions:

  • How do I receive data from upstream publishers?
  • The downstream may be Operator or Subscriber. How to handle this situation?
  • If the downstream is Subscriber, how to receive its request and transmit it to the upstream?

The above three questions are the core of this article. The following code is from CombineExt

For the full Combine tutorial, visit: FuckingSwiftUI

The simplest custom Operator

A combination is a combination of an existing Publisher and Operator into an Operator with a new function, for example:

public extension Publisher where Output: Collection {

    func mapMany<Result> (_ transform: @escaping (Output.Element) - >Result) -> Publishers.Map<Self[Result]> {
        map { $0.map(transform) }
    }
}
Copy the code

The.mapmany () in the code above is a new Operator generated by combination, which can be used as follows:

let intArrayPublisher = PassthroughSubject"[Int].Never>()

intArrayPublisher
   .mapMany(String.init)
  .sink(receiveValue: { print($0) })

intArrayPublisher.send([10.2.2.4.3.8])

// Output: ["10", "2", "2", "4", "3", "8"]   
Copy the code

As you can see, the function of.mapmany () is to map all the elements in the Collection according to the given rules. The code above is very simple, and we can imitate this pattern to generate any other Operator.

An interesting point is that the.mapmany () Output type is a Collection constrained by the code Public Extension Publisher where Output: Collection. That is, the input data for this Operator must be Collection.

Of course, in most cases it is not necessary to write code like this. This is a matter of personal preference. The code above is equivalent to the code below:

let intArrayPublisher = PassthroughSubject"[Int].Never>()

cancellable = intArrayPublisher
    .map {
        $0.map { String($0) }
    }
  .sink(receiveValue: { print($0) })

intArrayPublisher.send([10.2.2.4.3.8])
Copy the code

Fully custom Operator

We will use the AMB in CombineExt to demonstrate how to customize Operator. To understand the content of this article, the prerequisite is to have a certain understanding of Combine, have some research on CombineExt, and eager to know how to customize Operator. Returning to amb, which is an interesting Operator, let’s see how it is used:

let subject1 = PassthroughSubject<Int.Never> ()let subject2 = PassthroughSubject<Int.Never>()

subject1
  .amb(subject2)
  .sink(receiveCompletion: { print("amb: completed with \ [$0)") },
        receiveValue: { print("amb: \ [$0)") })

subject2.send(3) // Since this subject emit first, it becomes the active publisher
subject1.send(1)
subject2.send(6)
subject1.send(8)
subject1.send(7)

subject1.send(completion: .finished)
// Only when subject2 finishes, amb itself finishes as well, since it's the active publisher
subject2.send(completion: .finished)
Copy the code

Print result:

amb: 3
amb: 6
amb: completed with .finished
Copy the code

As you can see from the code above, whoever sends the data first, Subject1 or Subject2, will be activated and the other will be ignored, much like a knockout, where only the first place will be retained.

This Operator is especially good for explaining how to customize an Operator because it’s not too complicated to use, so we’ll get to the point.

To illustrate the amB authoring process, we need to work backwards. Let’s first look at what happens when we call the following code:

subject1
  .amb(subject2)
Copy the code
public extension Publisher {
    func amb<Other: Publisher> (_ other: Other)
        -> Publishers.Amb<Self.Other> where Other.Output = = Output.Other.Failure = = Failure {
        Publishers.Amb(first: self, second: other)
    }
}
Copy the code

From the above code, we can analyze the following information:

  • amb()A constraint is that the input to the function must be a Publisher
  • amb()The return value of the function isPublishers.Amb, is also a Publisher. The following constraints constrain the input and output types of these two publishers to be the same

As you can see from the code above, the Operator is an extension of the Publisher protocol, so we can get the current Publisher, and then we need to return a Publisher in this function to implement the chain call.

So now the question points toPublishers.AmbThe question we need to solve is: how to deal with the elimination logic mentioned above? How to respond to Subscriber subscription and request?

Let’s look at the code for Publishers.Amb:

public extension Publishers {
    struct Amb<First: Publisher.Second: Publisher> :Publisher where First.Output= =Second.Output.First.Failure= =Second.Failure {
        public func receive<S: Subscriber> (subscriber: S) where Failure = = S.Failure.Output = = S.Input {
            subscriber.receive(subscription: Subscription(first: first,
                                                          second: second,
                                                          downstream: subscriber))
        }

        public typealias Output = First.Output
        public typealias Failure = First.Failure

        private let first: First
        private let second: Second

        public init(first: First.second: Second) {
            self.first = first
            self.second = second
        }
    }
}
Copy the code

The code looks pretty simple, just holding the two Publishers, and since Amb implements the Publisher protocol, it’s all about handling the subscription logic:

Subscription(first: first,
            second: second,
            downstream: subscriber)
Copy the code

As mentioned in previous articles, Subscription is a bridge between Publisher and Subscriber, so the logic in it is very important.

Let’s look at its code:

private extension Publishers.Amb {
    class Subscription<Downstream: Subscriber> :Combine.Subscription where Output= =Downstream.Input.Failure= =Downstream.Failure {
        private var firstSink: Sink<First.Downstream>?
        private var secondSink: Sink<Second.Downstream>?
        private var preDecisionDemand = Subscribers.Demand.none
        private var decision: Decision? {
            didSet {
                guard let decision = decision else { return }
                switch decision {
                case .first:
                    secondSink = nil
                case .second:
                    firstSink = nil
                }

                request(preDecisionDemand)
                preDecisionDemand = .none
            }
        }

        init(first: First.second: Second.downstream: Downstream) {
            self.firstSink = Sink(upstream: first,
                                  downstream: downstream) { [weak self] in
                                guard let self = self.self.decision = = nil else { return }

                                self.decision = .first
                             }

            self.secondSink = Sink(upstream: second,
                                   downstream: downstream) { [weak self] in
                                guard let self = self.self.decision = = nil else { return }

                                self.decision = .second
                              }
        }

        func request(_ demand: Subscribers.Demand) {
            guard decision ! = nil else {
                preDecisionDemand + = demand
                return
            }

            firstSink?.demand(demand)
            secondSink?.demand(demand)
        }

        func cancel(a) {
            firstSink = nil
            secondSink = nil}}}Copy the code

The code above is quite long, so let’s break it up. Let’s look at the initialization method first:

init(first: First.second: Second.downstream: Downstream) {
    self.firstSink = Sink(upstream: first,
                          downstream: downstream) { [weak self] in
                        guard let self = self.self.decision = = nil else { return }

                        self.decision = .first
                     }

    self.secondSink = Sink(upstream: second,
                           downstream: downstream) { [weak self] in
                        guard let self = self.self.decision = = nil else { return }

                        self.decision = .second
                      }
}
Copy the code

Downstream here is Subscriber, we don’t need to worry about Sink, it will be explained below. Now we only need to take it as a new bridge, which can connect Publisher and Subscriber.

The closure in firstSink’s Sink initialization function above is called when the event of first Publisher is received, whether it is data or completion, which we will explain in Sink later.

SecondSink is the same as firstSink. In the initialization function above, we find the answer to the first question. When first or second is received for the first time, we assign decision, which is an enum. So he can tell the difference between first and second.

private enum Decision {
    case first
    case second
}
Copy the code

So far, you should still be confused, because you don’t know Sink very well, we must explain this Sink before we can continue:

class Sink<Upstream: Publisher.Downstream: Subscriber> :Subscriber {
    typealias TransformFailure = (Upstream.Failure) - >Downstream.Failure?
    typealias TransformOutput = (Upstream.Output) - >Downstream.Input?

    private(set) var buffer: DemandBuffer<Downstream>
    private var upstreamSubscription: Subscription?
    private let transformOutput: TransformOutput?
    private let transformFailure: TransformFailure?

    init(upstream: Upstream.downstream: Downstream.transformOutput: TransformOutput? = nil.transformFailure: TransformFailure? = nil) {
        self.buffer = DemandBuffer(subscriber: downstream)
        self.transformOutput = transformOutput
        self.transformFailure = transformFailure
        upstream.subscribe(self)}func demand(_ demand: Subscribers.Demand) {
        let newDemand = buffer.demand(demand)
        upstreamSubscription?.requestIfNeeded(newDemand)
    }

    func receive(subscription: Subscription) {
        upstreamSubscription = subscription
    }

    func receive(_ input: Upstream.Output) -> Subscribers.Demand {
        .
    }

    func receive(completion: Subscribers.Completion<Upstream.Failure>) {
        .
    }

    func cancelUpstream(a) {
        upstreamSubscription.kill()
    }

    deinit { cancelUpstream() }
}
Copy the code

I’ve omitted some unimportant code. Let’s examine the above code in detail:

  • SinkTo achieve theSubscriberAgreement,This indicates that it is a subscriber in its own right and is usually used to subscribe to upstream for easy access to data and request output from upstream.
  • DemandBufferAs we’ve talked about in previous articles, it does data management,Only complex data is sent to downstream
  • transformOutputandtransformFailureData conversion functions, we won’t talk about them here

Sink’s core idea is to personally subscribe to upstream Publisher to receive data and events, manage these data and events through DemandBuffer, and send them to downstream subscribers when needed.

The design of Sink above is very important. It is an intermediate process, essentially because it is a Subscriber Subscriber itself, so it can not only obtain the data of upstream, but also control the sending of RQuest by itself.

Process on

Let’s repeat the process again, starting with the following image:

What happens when the following code is executed?

subject1
  .amb(subject2)
  .sink(receiveCompletion: { print("amb: completed with \ [$0)") },
        receiveValue: { print("amb: \ [$0)")})Copy the code

Subject1 will be Publisher,.amb() will return the amB, and when.sink() is called, the amB will receive the subscription and invoke the following code:

public extension Publishers {
    struct Amb<First: Publisher.Second: Publisher> :Publisher where First.Output= =Second.Output.First.Failure= =Second.Failure {
        public func receive<S: Subscriber> (subscriber: S) where Failure = = S.Failure.Output = = S.Input {
            subscriber.receive(subscription: Subscription(first: first,
                                                          second: second,
                                                          downstream: subscriber))
        }
    }
}
Copy the code

When a subscription is received, it needs to return a subscription, which is a subscription credential, because subsequent Subscriber needs this credential to send a request or cancel a pipline.

Since the green.sink() is the system method in the figure above, we can’t see the implementation, but we do know that.sink() sends request when it receives the subscription credentials, which is the purple dotted line in the figure above.

Note that the contents of the Amb are completely custom, so we have full control, and the Subscription function is called when a.sink() request is received:

private extension Publishers.Amb {
    class Subscription<Downstream: Subscriber> :Combine.Subscription where Output= =Downstream.Input.Failure= =Downstream.Failure {
        .

        func request(_ demand: Subscribers.Demand) {
            guard decision ! = nil else {
                preDecisionDemand + = demand
                return
            }

            firstSink?.demand(demand)
            secondSink?.demand(demand)
        }
				.}}Copy the code

.sink() demand =.unlimited decision specifies whether the Publisher is in use and whether subject1 or Subject2 will send the data first.

Since the request was sent immediately after receiving the subscription certificate, the decision is nil, and neither Subject1 nor Subject2 has sent data, the code stores demand passed by.sink() in the preDecisionDemand property. This demand is then passed transparently to the winning Publisher(Subject1 or Subject2).

So the point is, where is the code for subject1, Subject2? The Subscription initialization method is used above:

private extension Publishers.Amb {
    class Subscription<Downstream: Subscriber> :Combine.Subscription where Output= =Downstream.Input.Failure= =Downstream.Failure {
        private var firstSink: Sink<First.Downstream>?
        private var secondSink: Sink<Second.Downstream>?
        private var preDecisionDemand = Subscribers.Demand.none
        private var decision: Decision? {
            didSet {
                guard let decision = decision else { return }
                switch decision {
                case .first:
                    secondSink = nil
                case .second:
                    firstSink = nil
                }

                request(preDecisionDemand)
                preDecisionDemand = .none
            }
        }

        init(first: First.second: Second.downstream: Downstream) {
            self.firstSink = Sink(upstream: first,
                                  downstream: downstream) { [weak self] in
                                guard let self = self.self.decision = = nil else { return }

                                self.decision = .first
                             }

            self.secondSink = Sink(upstream: second,
                                   downstream: downstream) { [weak self] in
                                guard let self = self.self.decision = = nil else { return }

                                self.decision = .second
                              }
        }

        .}}Copy the code

Remember when Subscription was initialized? This is created when you receive a subscription to.sink(). Init () creates two sinks, firstSink for Subject1 and secondSink for Subject2.

In the above section, we have known that the call timing of the closure parameter of Sink is when the first parameter is received. Combined with the above code, we can see that the value of decision is determined when either firstSink or secondSink receives the data for the first time. And in the didSet of the decision, which Publisher is selected as the Publisher that sent the data, and the other Publisher is assigned nil, We then re-called Request (preDecisionDemand) and passed the previously saved demand transparently to the winning Publisher.

At this point, there should be two questions in our minds:

  1. If firstSink wins, then callfirstSink? .demand(demand)How is demand passed through Subject1 implemented?
  2. How does Sink receive subject1 or Subject2?

The core of both of these problems points to Sink, and notice that this Sink is very interesting, and it’s mentioned at the top of this article, and it implements itSubscriberProtocol, this is important, what’s going on in its initialization method?

init(upstream: Upstream.downstream: Downstream.transformOutput: TransformOutput? = nil.transformFailure: TransformFailure? = nil) {
    self.buffer = DemandBuffer(subscriber: downstream)
    self.transformOutput = transformOutput
    self.transformFailure = transformFailure
    upstream.subscribe(self)}Copy the code

Does that make sense? Since Sink is itself a Subscriber, it subscribs to the upstream Publisher that is passed in.

func receive(subscription: Subscription) {
    upstreamSubscription = subscription
}
Copy the code

And you can get the subscription from the upstream Publisher, so you can use it to send requests.

So far, the answers to the above two questions are in sight.

So to sum up,AmbInSubscriptionAs downstream of communication.sink()The bridge receives the request,SubscriptionHolding theSinkSubscribe to upstream Publisher, which serves as Publisher and.sink()The intermediate bridge between passthrough demand and data.

So back to the first three questions, do you have the answers?

  • How do I receive data from upstream publishers?
  • The downstream may be Publisher or other Operator, or Subscriber. How to deal with this situation?
  • If the downstream is Subscriber, how to receive its request and transmit it to the upstream?

conclusion

I was particularly curious about how operators in Combine are implemented. Because it is really special, its upstream is Publisher or Operator and its downstream is Operator or Subscriber. This article can be used as a routine to learn if you need to customize an Operator.