In project development often perform multiple tasks in a page, multi-thread asynchronous execution task which task to end the results of these are not good control, such as requests for several concurrent network to need to deal with the data after got the result, hope is users perceive only a data load, if can’t do, Feedback to the user interface experience is not good. RxJava’s merge operator makes it easy to handle these situations.

Several commonly used merge operators

Let’s create three observables with different data types as sample data

	Start by creating three sample Observables of different types
    private val observable = Observable.fromArray(1.2.3)
            .concatMap(object : Function<Int, ObservableSource<Int>> {
                override fun apply(t: Int): ObservableSource<Int> {
                    return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS)
                }
            })

    private val observable1 = Observable.just("a"."b"."c")
            .concatMap(object : Function<String, ObservableSource<String>> {
                override fun apply(t: String): ObservableSource<String> {
                    return Observable.just(t).delay(400, TimeUnit.MILLISECONDS)
                }
            })
			
	private val observable2 = Observable.fromArray(1.0 f.2.0 f.3.0 f)
            .concatMap(object : Function<Float, ObservableSource<Float>> {
                override fun apply(t: Float): ObservableSource<Float> {
                    return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS)
                }
            })
Copy the code

concat()

The concat() operator merges multiple Observables in sequence. When the merged Observable generic types are inconsistent, only Object(Java) and Any(Kotlin) can be used in the event flow. The sequence of events sent after concat can be seen in the following example code:

        Observable.concat(observable, observable1)
                .subscribe(object : AppCallBack<Any>() {
                    override fun success(data: Any?) {
                        println("contact----->$data")}override fun fail(code: Long? , msg: String?) {}override fun finish(success: Boolean) {
                        merge()
                    }

                })
Copy the code

The resulting output is:

Merges of more than four Observables: two ways

  • concat()Pass in aIterable<? extends ObservableSource<? extends T>object
  • concatArray()Pass in an Observable array.

merge()

The merge() operator merges multiple Observables in an unordered manner. When the merged Observable generic types are inconsistent, only Object(Java) or Any(Kotlin) can be used in the event flow.

        Observable.merge(observable, observable1)
                .subscribe(object : AppCallBack<Any>() {
                    override fun success(data: Any?) {
                        println("merge----->$data")}override fun fail(code: Long? , msg: String?) {}override fun finish(success: Boolean) {
                        zip()
                    }

                })
Copy the code

The resulting output is:

merge()

Merges of more than four Observables: two ways

  • merge()Pass in aIterable<? extends ObservableSource<? extends T>object
  • mergeArray()Pass in an Observable array.

zip()

Both of the above merges are single event item subscription listeners, and if you want to process both event items when they receive data, you need to use the ZIP () operator.

        Observable.zip(observable, observable1, object : BiFunction<Int, String, String> {
            override fun apply(t1: Int, t2: String): String {
                return "t1=$t1 t2=$t2"
            }
        }).subscribe(object : AppCallBack<String>() {
            override fun success(data: String?) {
                println("zip----->$data")}override fun fail(code: Long? , msg: String?) {}override fun finish(success: Boolean) {}})Copy the code

The resulting output is:

Merge of multiple Observables: the multi-event merge of zip() is a bit more powerful. It supports merge of nine Observables by data type. Except for the merge of two observer objects, the zipper is BiFunction and the other is FunctionX, where X is the number of merges. If more than nine observer objects are merged, you can use zipArray() to do the same with the above two merges, but the observation result is an Object array Object, and you need to determine the data type

conclusion

In addition to the above Observable level merge operators, there are other operators in the event stream, such as concatMap in the first code, startWith() in front of the event stream for other events, and so on. It’s quite perfect