Github.com/agelessman/…

What do you mean by sequence operations? A pipline is a pipe in which data flows in order. For example, if you just want to get the last piece of data in the pipe, or the first piece of data, or something in the middle, something like this is called a sequential operation.

Of course, we can collect all the data and then do some processing, but that’s not in line with modern programming ideas. What do I need? Just give me what’s the most logical way to program.

first

One unique aspect of FIRST is that it terminates the pipline when the first data is received. As shown in the figure above, the pipline ends when data 1 is received and there is no need to accept subsequent data.

The pipline ends with Publisher sending a.finished event.

_ = [1.2.3.4]
    .publisher
    .first()
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")})Copy the code

In many cases, first is too inflexient to return anything but the first data it receives. Declarative programming makes it very flexible by passing it a closure for judging conditions.

The title here is fitstWhere, but instead of.firstWHERE, it is used as follows:

.first { value -> Bool in
    value > 2
}
Copy the code

It’s important to understand what fisrtWhere is, and its core idea is to return the first data that meets a condition.

For example, calculate the first student whose total score of math, Chinese and English exceeds 300, and so on. In order to conveniently demonstrate the functions of these operators, we use []. Publisher to send data. In real development, these data may be sent irregularly.

Where closures are used, there must be a try, so the code for tryFirstWhere looks like this:

.tryFirst { value -> Bool in
    if value = = 2 {
        throw MyError.customError
    }
    value > 2
}
Copy the code

last

Last, as opposed to first, returns the last piece of data in the pipline. You need to wait for the Publisher. Finished event before returning data.

Since it is relative to first, there will be no more detailed explanation here. It also has the concepts of lastWhere and tryLastWhere as follows:

.last { value -> Bool in
    value > 2
}
Copy the code
.tryLast { value -> Bool in
    if value = = 2 {
        throw MyError.customError
    }
    value > 2
}
Copy the code

drop

Drop is an interesting Operator that can be used in three ways:

  • dropUntilOutput
  • dropWhile
  • tryDropWhile

The most interesting is dropUntilOutput, which allows us to trigger the current Publisher with another Publisher. What does ** mean? Think of another publisher as a switch in a pipe, and only when the switch is turned on does the water flow back to the next place.

There is a bit of a problem with this metaphor. In reality, if the switch is not turned on, water will always accumulate. However, in a Pipline that uses drop, data will not accumulate, but will discard the previous data, which is exactly what drop means.

Take a look at the following code:

/// Send data
let publisher = PassthroughSubject<String.Never> ()/ / / trigger
let triggerPublisher = PassthroughSubject<Int.Never> ()_ = publisher
    .drop(untilOutputFrom: triggerPublisher)
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })

publisher.send("Hello")
triggerPublisher.send(1)
publisher.send("Zhang")
publisher.send(completion: Subscribers.Completion.finished)
Copy the code

Instead of using [].publisher to send the data, we used PassthroughSubject, which lets us manually control when the data is sent.

We use publihser to send data, the main pipe, and triggerPublisher as a switch. Note that triggerPublisher may send three kinds of data:

  • Int, normal data
  • .finished, complete the event
  • .failureError, the Error

In the above code, we sent a normal data through triggerPublisher.send(1). Look at the print:

SomeValue: Zhang SAN is finishedCopy the code

As you can see, no data is printed until triggerPublisher is triggered. The print results were exactly as expected.

We’re looking at the second case:

/// Send data
let publisher = PassthroughSubject<String.Never> ()/ / / trigger
let triggerPublisher = PassthroughSubject<Int.Never>()

publisher
    .drop(untilOutputFrom: triggerPublisher)
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })
    .store(in: &cancellables)

publisher.send("Hello")
triggerPublisher.send(completion: Subscribers.Completion.finished)
publisher.send("Zhang")
Copy the code

The print result is as follows:

Come to an endCopy the code

As you can see, the switch triggerPublisher can be very useful, not only as a data switch, but also as an end to the main pipeline. The publisher ends when the triggerPublisher sends a.finished event, because in the above code we did not call the following code to technology the main pipe:

publisher.send(completion: Subscribers.Completion.finished)
Copy the code

Let’s look at the last case:

enum MyCustomError: Error {
    case customError
}

/// Send data
let publisher = PassthroughSubject<String.Error> ()/ / / trigger
let triggerPublisher = PassthroughSubject<Int.Error>()

publisher
    .drop(untilOutputFrom: triggerPublisher)
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })
    .store(in: &cancellables)

publisher.send("Hello")
triggerPublisher.send(completion: Subscribers.Completion.failure(MyCustomError.customError))
publisher.send("Zhang")
Copy the code

Print result:

End error:TheOperation couldn't be completed. (MCMarbleDiagramSwiftUITests.MCMarbleDiagramSwiftUITests.(unknown context at The $1068f51d0).(unknown context at The $1068f5224).MyCustomError error 0.).Copy the code

Basically the same thing as.finished, pipline

To summarize, start Publisher when triggerPublisher sends normal data, and close Publisher when triggerPublisher sends.finished or.failure.

The second use of drop is dropWhile. The core idea of dropWhile is how to declare drop, and based on that, we are free to define conditions.

When the closure returns false for the first time, it does not continue to filter the data. Regardless of whether the data meets the closure’s criteria, it goes to the next phase.

To summarize, dropWhile is more like a trigger switch, and when it returns false for the first time, it’s time to turn the switch on.

_ = [-40.-10.0.10.0.2.-30]
    .publisher
    .drop { $0 < = 0 }
    .sink { print($0)}Copy the code

DropWhile uses closures. There is also a tryDropWhile that allows exceptions to be thrown from the closure, which I won’t explain here.

prepend

Prepend (publisher) prepend (publisher) Prepend (Publisher) Prepend (Publisher) prepend (Publisher) In code, it should look like this:

Publishers.Concatenate(prefix: firstPublisher, suffix: secondPublisher)
Copy the code

Literally, firstPublisher should be followed by secondPublisher, and the data flows through each publisher in turn. But that’s not the case. It’s more like this picture:

As you can see, when Sink receives data from firstPublisher, the switch on secondPublisher is turned off, and only when firstPublisher sends the. Finished event, the switch on secondPublisher is turned on. Data flows from secondPublisher to Sink.

Any time any Publisher encounters an Error, it ends the Pipline.

let firstPublisher = PassthroughSubject<String.Never> ()let secondPublisher = PassthroughSubject<String.Never> ()_ = secondPublisher
    .prepend(firstPublisher)
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })

firstPublisher.send("Hello")
firstPublisher.send(completion: Subscribers.Completion.finished)
secondPublisher.send("Zhang")
secondPublisher.send(completion: Subscribers.Completion.finished)
Copy the code

Prepend also has two convenience constructors that can easily output a sequence of data, such as firstPublisher. If it were a sequence, the code would look something like this:

let firstPublisher = ["1"."2"."3"].publisher
let secondPublisher = PassthroughSubject<String.Never>()

secondPublisher
    .prepend(firstPublisher)
    .sink(receiveCompletion: { completion in
        .
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })
    .store(in: &cancellables)

secondPublisher.send("4")
Copy the code

The print result is as follows:

someValue: 1
someValue: 2
someValue: 3
someValue: 4
Copy the code

We can simplify the above code to:

let secondPublisher = PassthroughSubject<String.Never>()

secondPublisher
    .prepend(["1"."2"."3"])
    .sink(receiveCompletion: { completion in
        .
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })
    .store(in: &cancellables)

secondPublisher.send("4")
Copy the code

Of course, you can pass only one value:

secondPublisher
    .prepend("1")
Copy the code

dropFirst

As you can see above, the main purpose of.DropFirst is to filter out the first data in the pipline. This is also the default. There is an extension where we can specify the number of data to filter.

DropFirst (2) will filter out the first value of the pipline when it is specified.

So what does that do? Because the data in Pipline is so uncertain that we don’t know when the data will flow out, it’s easy to start filtering using dropFirst, but you need to develop your own scenarios.

_ = [1.2.3.4]
    .publisher
    .dropFirst(2)
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")})Copy the code

prefix

Prefix is a prefix, and when something becomes a prefix for another thing, it usually has a limiting effect.

So the essence of a prefix is to add constraints to a publisher.

It has the following four uses:

  • prefix(untilOutputFrom:)
  • prefix(_ maxLength:)
  • prefix(while:)
  • tryPrefix(while:)

Prefix (untilOutputFrom:) is very interesting, It’s a bit similar to the front to deliver the prepend, firstPublisher. The prepend (secondPublisher) when the first publisher sent. After finished, the second publisher began to send data, And firstPublisher. Prefix (untilOutputFrom: secondPublisher) indicates when the second publisher sent after the first data, frist publisher will immediately terminate the pipline.

This illustrates theprefix(untilOutputFrom:)It is possible to switch the entire Pipline on and off through Second Publisher, which can be useful in some development scenarios.

let firstPublisher = PassthroughSubject<Int.Never> ()let secondPublisher = PassthroughSubject<Int.Never>()

firstPublisher
    .prefix(untilOutputFrom: secondPublisher)
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })
    .store(in: &cancellables)

firstPublisher.send(1)
firstPublisher.send(2)

secondPublisher.send(3)
firstPublisher.send(4)
Copy the code

Print result:

someValue: 1
someValue: 2Come to an endCopy the code

Now that you know the nature of prefix is to add constraints for publisher, prefix(_ maxLength:) is to limit the maximum number of output data for publisher. That is, it controls how much data the Publisher can output.

Let’s verify this with the code:

[1.2.3.4]
    .publisher
    .prefix(3)
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })
    .store(in: &cancellables)
Copy the code

The print result is as follows:

someValue: 1
someValue: 2
someValue: 3Come to an endCopy the code

From the code above, we can see that,.prefix(3)The pipline is terminated immediately after receiving three data points, rather than filtering the rest of the data, so it can be called a Publisher constraint.

As shown above, prefix(while:) is also a publisher constraint. While is a closure parameter that receives upstream output and returns a Bool, and the pipline ends immediately when the closure returns false for the first time.

[1.2.3.4]
    .publisher
    .prefix { value -> Bool in
        value < = 2
    }
    .sink(receiveCompletion: { completion in
        print("It's over.")
        switch completion {
        case .finished:
            print("Complete")
        case .failure(let error):
            print("Error:\(error.localizedDescription)")
        }
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")
    })
    .store(in: &cancellables)
Copy the code

Print result:

someValue: 1
someValue: 2Come to an endCopy the code

TryPrefix (while:) is an extension of prefix(while:) that allows throwing exceptions in closures, which we won’t cover in detail.

To summarize, consider using prefix when we want to use a condition to control whether to close the pipeline. This condition can be a Publisher, a closure, or a upper limit.

output

Output allows us to control exactly where the data can be output. It is also an operation on a sequence of data. As you can see from the figure above, 4 is actually an index, just like an index in an array, starting at 0 and actually fetching the fifth value in the sequence.

_ = [1.2.3.4.5.6.7]
    .publisher
    .output(at: 4)
    .sink(receiveCompletion: { completion in
       .
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")})Copy the code

Output (at index: Int) can output only one data. Output (in range: RangeExpression) can output a range of data. As shown below:

_ = [1.2.3.4.5.6.7]
    .publisher
    .output(in: (2.4))
    .sink(receiveCompletion: { completion in
        .
    }, receiveValue: { someValue in
        print("someValue: \(someValue)")})Copy the code

Output is used to fetch data in the middle of a data stream.