In Xcode 13.2, Apple made async/await front-deployment work, lowering the minimum system requirements to iOS 13 This move encouraged more and more people to start experimenting with async/await development. When people come into contact with AsyncSequence, they will find that the performance of Combine is somewhat similar to that of Combine. Especially, considering that the Combine framework has hardly changed in recent two years, many people have raised questions: Whether Apple intends to use AsyncSequence and AsyncStream instead of Combine.

It happens that I have come across a usage scenario that may need to Combine Combine Combine and async/await in recent development. This article will discuss the respective advantages of Combine and async/await, whether they can cooperate and how to cooperate.

The original post was posted on my blog wwww.fatbobman.com

Welcome to subscribe my public account: [Elbow’s Swift Notepad]

Problems that need to be solved

In recent development, I came across this requirement:

  • In the life cycle of app, a series of events will occur irregularly, with variable frequency and path
  • Processing each event consumes significant system resources and calls the system-provided async/await version of the API
  • App does not have high requirements on timeliness of event processing results
  • You need to limit the system consumption of event processing to avoid processing multiple events simultaneously
  • GCD or OperationQueue are not considered

A little analysis of the above requirements will quickly establish a direction for solving the problem:

  • Combine is excellent at observing and receiving events and should be perfect to address the first point of requirement
  • The async/await programming pattern is bound to be used in the solution

There are only two problems left to solve:

  • How to serialize event handling (one event must be processed before the next one can be processed)
  • How to Combine Combine Combine with async/await

Comparison between Combine and AsyncSequence

Due to the similarities between Combine and AsyncSequence, many developers believe that AsyncSequence may replace Combine, for example:

  • Both allow future values to be processed asynchronously
  • Both allow developers to manipulate values using functions such as map and flatMap
  • Both end the data flow when an error occurs

But in fact, there are quite a few differences.

Event observation and reception

Combine is a tool born for responsive programming, and as its name suggests, it is very good at morphing and merging disparate event streams to generate new ones. Combine focuses on responding to change. When a property changes, a user clicks a button, or sends a notification through The NotificationCenter, developers can respond with the built-in tools provided by Combine.

Combine provides Subject (PassthroughSubject, CurrentValueSubject), which makes it very easy for developers to inject values into data streams, especially valuable when your code is written in an imperative style.

In async/await, we can observe and receive data from network flows, files, Notification and other aspects through AsyncSequence. However, compared with Combine, data binding and data injection capabilities similar to Subject are still lacking.

Combine has a great advantage in the observation and reception of events.

Ability of data processing and deformation

In terms of the number of methods used for data processing and deformation, AsyncSequence still lags far behind Combine. But AsyncSequence also provides some very useful methods and variables that Combine does not yet provide, such as characters, lines, and so on.

Due to the different emphasis, even if the two add more built-in methods over time, data processing and deformation will not be consistent, and it is more likely to continue to expand in their respective areas of expertise.

Error handling

In Combine, the type of Failure value is clearly specified. In the data processing chain, in addition to the requirement of consistent Output data value types, the types of error values are also required to match each other. To achieve this goal, Combine provides a number of operations for handling error types, such as mapError, setFailureType, Retry, and so on.

Using the above method to handle errors can gain compiler level guarantee advantage, but on the other hand, for a logically complex data processing chain, the above error handling method will lead to a significant decrease in the code readability, and the developer’s grasp of error handling is high.

Async /await uses the throp-catch approach that developers are most familiar with for error handling. It is basically easy to learn, and the code is more in line with most people’s reading habits.

There is no big difference in error handling between the two, mainly reflected in different processing styles.

Life cycle management

In Combine, developers can clearly define the lifecycle of a data link through code from subscription to unsubscription. When using AsyncSequence, the life cycle of an asynchronous sequence is less explicit.

Scheduling and organization

In Combine, not only can developers explicitly organize the behavior and location of asynchronous events by specifying schedulers, but Combine also provides multi-dimensional processing means to control the number of pipes, adjust the frequency of processing, and so on.

AsyncSequence lacks the ability to control the processing location, frequency and concurrent number of data streams.

In the following, we will try to solve the requirements mentioned above, and each solution adopts the combination of Combine + async/await.

Plan a

In Combine, you can limit the concurrent processing capacity of data by setting the maxPublishers of the flatMap and by custom Subscriber. In this scheme, we will use flatMap to serialize the event processing.

The asynchronous API is invoked in the Combine, and currently the official approach is to wrap upstream data as a Future Publisher and switch through a flatMap.

In scenario 1, we create a new Operator to fulfill our requirements by combining flatMap, Deferred (ensuring that the Future is executed only after the subscription), and Future.

public extension Publisher {
    func task<T> (maxPublishers: Subscribers.Demand = .unlimited,
                     _ transform: @escaping (Output) async -> T) -> Publishers.FlatMap<Deferred<Future<T.Never> >,Self> {
        flatMap(maxPublishers: maxPublishers) { value in
            Deferred {
                Future { promise in
                    Task {
                        let output = await transform(value)
                        promise(.success(output))
                    }
                }
            }
        }
    }
}

public extension Publisher where Self.Failure= =Never {
    func emptySink(a) -> AnyCancellable {
        sink(receiveValue: { _ in}}})Copy the code

For a full version of the code (Error, SetFailureType support), visit Gist, which references the Sundell article.

The usage method is as follows:

var cancellables = Set<AnyCancellable> ()func asyncPrint(value: String) async {
    print("hello \(value)")
    try? await Task.sleep(nanoseconds: 1000000000)} ["abc"."sdg"."353"].publisher
    .task(maxPublishers:.max(1)){ value in
        await asyncPrint(value:value)
    }
    .emptySink()
    .store(in: &cancellables)
// Output
// hello abc
// Wait 1 second
// hello sdg
// Wait 1 second
// hello 353
Copy the code

If you replace [” ABC “,” SDG “,”353”].publisher with PassthoughSubject or Notification, you will miss data. This is because we limit the amount of parallel processing of the data, which results in the consumption of data exceeding the generation time of the data. You need to add buffers to the end of Publisher to buffer the data.

let publisher = PassthroughSubject<String.Never>()
publisher
    .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest) // The number of caches and the cache policy are determined according to the actual situation of the business
    .task(maxPublishers: .max(1)) { value in
        await asyncPrint(value:value)
    }
    .emptySink()
    .store(in: &cancellables)

publisher.send("fat")
publisher.send("bob")
publisher.send("man")
Copy the code

Scheme 2

In scheme 2, we will adopt the way of user-defined Subscriber to limit the amount of parallel processing, and try to call async/await method in Subscriber.

Creating a customized Subscriber:

extension Subscribers {
    public class OneByOneSink<Input.Failure: Error> :Subscriber.Cancellable {
        let receiveValue: (Input) - >Void
        let receiveCompletion: (Subscribers.Completion<Failure- > >)Void

        var subscription: Subscription?

        public init(receiveCompletion: @escaping (Subscribers.Completion<Failure- > >)Void.receiveValue: @escaping (Input) - >Void) {
            self.receiveCompletion = receiveCompletion
            self.receiveValue = receiveValue
        }

        public func receive(subscription: Subscription) {
            self.subscription = subscription
            subscription.request(.max(1)) // Apply data amount when subscribing
        }

        public func receive(_ input: Input) -> Subscribers.Demand {
            receiveValue(input)
            return .max(1) // The amount of data to be applied after the data processing is completed
        }

        public func receive(completion: Subscribers.Completion<Failure>) {
            receiveCompletion(completion)
        }

        public func cancel(a) {
            subscription?.cancel()
            subscription = nil}}}Copy the code

In receive(_ input: subscription), subscription is used to set the amount of data requested by the subscriber when subscribing. In receive(_ input: Input), using rev.max (1) to set the amount of data requested after each execution of the receiveValue method. In this way, we create a subscriber that takes one value at a time and processes it one at a time.

However, when we call async/await code using Task from receiveValue method, we find that since no callback mechanism is provided, the subscriber will directly apply for the next value regardless of whether the asynchronous code has completed or not, which is not in line with our requirements.

The callback mechanism can be implemented in Subscriber in a variety of ways, such as callback method, Notification, @published, etc. In the following code, we use Notification for callback notifications.

public extension Subscribers {
    class OneByOneSink<Input.Failure: Error> :Subscriber.Cancellable {
        let receiveValue: (Input) - >Void
        let receiveCompletion: (Subscribers.Completion<Failure- > >)Void

        var subscription: Subscription?
        var cancellable: AnyCancellable?

        public init(notificationName: Notification.Name.receiveCompletion: @escaping (Subscribers.Completion<Failure- > >)Void.receiveValue: @escaping (Input) - >Void) {
            self.receiveCompletion = receiveCompletion
            self.receiveValue = receiveValue
            cancellable = NotificationCenter.default.publisher(for: notificationName, object: nil)
                .sink(receiveValue: { [weak self] _ in self?.resume() })
                // After receiving the callback notification, continue applying to Publisher for the new value
        }

        public func receive(subscription: Subscription) {
            self.subscription = subscription
            subscription.request(.max(1))}public func receive(_ input: Input) -> Subscribers.Demand {
            receiveValue(input)
            return .none // The new value is not applied after calling the function
        }

        public func receive(completion: Subscribers.Completion<Failure>) {
            receiveCompletion(completion)
        }

        public func cancel(a) {
            subscription?.cancel()
            subscription = nil
        }

        private func resume(a) {
            subscription?.request(.max(1))}}}public extension Publisher {
    func oneByOneSink(
        _ notificationName: Notification.Name.receiveCompletion: @escaping (Subscribers.Completion<Failure- > >)Void.receiveValue: @escaping (Output) - >Void
    ) -> Cancellable {
        let sink = Subscribers.OneByOneSink<Output.Failure>(
            notificationName: notificationName,
            receiveCompletion: receiveCompletion,
            receiveValue: receiveValue
        )
        self.subscribe(sink)
        return sink
    }
}

public extension Publisher where Failure= =Never {
    func oneByOneSink(
        _ notificationName: Notification.Name.receiveValue: @escaping (Output) - >Void
    ) -> Cancellable where Failure = = Never {
        let sink = Subscribers.OneByOneSink<Output.Failure>(
            notificationName: notificationName,
            receiveCompletion: { _ in },
            receiveValue: receiveValue
        )
        self.subscribe(sink)
        return sink
    }
}
Copy the code

Call:

let resumeNotification = Notification.Name("resume")

publisher
    .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
    .oneByOneSink(
        resumeNotification,
        receiveValue: { value in
            Task {
                await asyncPrint(value: value)
                NotificationCenter.default.post(name: resumeNotification, object: nil)
            }
        }
    )
    .store(in: &cancellables)
Copy the code

Since a callback is required to complete the entire processing logic, scheme 1 is significantly more elegant than Scheme 2 for the purposes of this article.

In scheme 2, the data processing chain can be paused, which is suitable for scenarios where certain conditions need to be triggered to continue execution.

Plan 3

As mentioned earlier, Apple has provided AsyncSequence support for Notification. If we only send events through the NotificationCenter, the following code directly satisfies our needs:

let n = Notification.Name("event")
Task {
    for await value in NotificationCenter.default.notifications(named: n, object: nil) {
        if let str = value.object as? String {
            await asyncPrint(value: str)
        }
    }
}

NotificationCenter.default.post(name: n, object: "event1")
NotificationCenter.default.post(name: n, object: "event2")
NotificationCenter.default.post(name: n, object: "event3")
Copy the code

Unimaginably simple, isn’t it?

Unfortunately, Combine’s Subject and other Publishe do not directly follow the AsyncSequence protocol.

But this year’s Combine adds a very small but very important feature to Publisher — Values.

Values are of type AsyncPublisher, which complies with the AsyncSequence protocol. The purpose of the design is to convert Publisher to AsyncSequence. You can use the following code to meet the needs of each Publisher type:

let publisher = PassthroughSubject<String.Never> ()let p = publisher
        .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
Task {
    for await value in p.values {
        await asyncPrint(value: value)
    }
}
Copy the code

Since AsyncSequence can only process data one by one, we do not need to consider the serial problem of data.

The principle of converting Publisher to AsyncSequence is not complicated. Create a structure that conforms to AsyncSequence and forward the data obtained from Publihser through AsyncStream. And point the iterator to the AsyncStream iterator.

We can implement the values function above ourselves in code. Below we create a sequence that behaves like values.

public struct CombineAsyncPublsiher<P> :AsyncSequence.AsyncIteratorProtocol where P: Publisher.P.Failure= =Never {
    public typealias Element = P.Output
    public typealias AsyncIterator = CombineAsyncPublsiher<P>

    public func makeAsyncIterator(a) -> Self {
        return self
    }

    private let stream: AsyncStream<P.Output>
    private var iterator: AsyncStream<P.Output>.Iterator
    private var cancellable: AnyCancellable?

    public init(_ upstream: P.bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) {
        var subscription: AnyCancellable?
        stream = AsyncStream<P.Output> (P.Output.self, bufferingPolicy: limit) { continuation in
            subscription = upstream
                .sink(receiveValue: { value in
                    continuation.yield(value)
                })
        }
        cancellable = subscription
        iterator = stream.makeAsyncIterator()
    }

    public mutating func next(a) async -> P.Output? {
        await iterator.next()
    }
}

public extension Publisher where Self.Failure= =Never {
    var sequence: CombineAsyncPublsiher<Self> {
        CombineAsyncPublsiher(self)}}Copy the code

See Gist for the complete code, which references Marin Todorov’s article for this example

Sequence and Values are slightly different in implementation, so if you are interested, you can use the following code to analyze the differences.

let p = publisher
    .print()  // Observe the subscriber requests. The implementation of values is the same as option two.
    // Sequence uses AsyncStream's buffer, so there is no need to set the buffer

for await value in p.sequence {
    await asyncPrint(value: value)
}
Copy the code

conclusion

For the foreseeable future, Apple will definitely provide more pre-built fusion means for Combine and Async /await. Perhaps in the next year or so, the first two will be able to use official apis directly.

I hope this article has been helpful to you.

The original post was posted on my blog wwww.fatbobman.com

Welcome to subscribe my public account: [Elbow’s Swift Notepad]