In RxJava, in addition to creating, filtering, transformation, there is a very common feature, is the combination of multiple data sources, the combination of operations, the corresponding results, and then related processing.

CombineLatest operator.

When either Observables emits data, a function is used to combine the nearest data item emitted by each Observable and emit data based on the result of the function. In fact, if any one of multiple data sources transmits data, it will combine the data recently transmitted by other data sources for processing and comparison.

  • In Android, we often do some form verification, such as login, to monitor the input changes of the account input box and password input box, in the callback to determine whether the text in the two input boxes is not empty, whether it is legal, etc., to light up the login button or submit button. In RxJava, if both input fields are treated as data sources and each input change is a send event, we can use the CombineLatest operator for form validation
Observable.combineLatest(rxTextView. textChanges(vTopicTitleEdit), RxTextView.textChanges(vTopicDescriptionEdit), new BiFunction<CharSequence, CharSequence, Boolean>() { @Override public Boolean apply(CharSequence titleInputText, CharSequence descriptionInputText) throws Exception {return InputVerifyUtil.verifyTitle(titleInputText.toString().trim())
                                && InputVerifyUtil.verifyDesc(descriptionInputText.toString().trim());
                    }
                })
                .as(RxLifecycleUtil.bindLifecycle(this))
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean isReady) throws Exception {
                        if (isReady) {
                            commitBtn.setEnabled(true);
                        } else {
                            commitBtn.setEnabled(false); }}});Copy the code
  • RxBinding2 is used to listen for input changes in two input boxes and convert them to event issues. In combination with the combineLatest operator, each change is validated and compared once. Return true, set the button to available, return false, set it to unavailable.

The Merge operator.

To combine the emission of multiple Observables, the data is interleaved, and the final output is interleaved data.

  • For example, there are two groups of data now, and the product requires the staggered combination of two groups of data items to be displayed in RecyclerView. If RxJava is not used, a for loop is needed, and the data in two sets is taken out with corner index, and then added to another output set, and the problem of corner index crossing is also considered. If you use RxJava, you can easily implement this using the Merge operator.
Items itemsOne = new Items(); / /... Items itemsTwo = new Items(); / /... Add the data operation Observable. Merge (Observable. FromIterable (itemsOne), Observable.fromIterable(itemsTwo)) .toList() .toObservable() .as(RxLifecycleUtil.bindLifecycle(this)) .subscribe(new Consumer<List<Object>>() {@override public void accept(List<Object> items) throws Exception {// interleaving, integrated items}});Copy the code

StartWith operator.

Inserts a specified item at the beginning of a data sequence. Similar to the add(index, object) set List, insert elements at the top, in the StartWith RxJava, insert an event at the top.

  • For example, in a comment list, the first Item is a Tag entry for multiple comment tags, followed by a list of comments for that Tag. Requests are made together, so the Tag data is requested first, the first Tag data is extracted as the parameter to pull the comment list, and the Tags data is inserted to the front.
Observable<HttpModel<HttpListModel<TeacherEvaluateTagModel>>> tagsObservable = getTeacherEvaluateTags(context, tag, teacherUid); // Request list must first Tag interface before comment listreturntagsObservable.flatMap(new Function<HttpModel<HttpListModel<TeacherEvaluateTagModel>>, ObservableSource<HttpModel<? >>>() { @Override public ObservableSource<HttpModel<? >> apply(HttpModel<HttpListModel<TeacherEvaluateTagModel>> model) throws Exception {returnObservable.create(new ObservableOnSubscribe<HttpModel<HttpListModel<TeacherEvaluateTagModel>>>() { @Override public void  subscribe(ObservableEmitter<HttpModel<HttpListModel<TeacherEvaluateTagModel>>> emitter) throws Exception {if (model == null || model.getData() == null || model.getData().getList() == null) {
                            emitter.onError(new NullPointerException("TeacherEvaluateTags return data is null"));
                        } else{ emitter.onNext(model); emitter.onComplete(); } } }) .flatMap(new Function<HttpModel<HttpListModel<TeacherEvaluateTagModel>>, ObservableSource<HttpModel<? >>>() { @Override public ObservableSource<HttpModel<? >> apply(HttpModel<HttpListModel<TeacherEvaluateTagModel>> model) throws Exception {returngetTeacherEvaluateList(context, tag, teacherUid, model.getData().getList().get(0).getId(), page) .map(new Function<HttpModel<? >, HttpModel<? >>() { @Override public HttpModel<? > apply(HttpModel<? > model) throws Exception {returnmodel; }}); }}); }}).startWith(tagsObservable).tolist ().toobServable ();Copy the code

Zip operator.

The emitters of multiple Observables are joined together by a function, based on which the result emits a single data item for each combination. Note that if the number of two datasets is inconsistent, the least number of datasets will be matched, which means that the datasets with more data that cannot find a match will be discarded.

  • As in the previous example, iterating through the item data, combining the corresponding Index corner markers and item data into a DataIndexModel data set. It can then be used for sorting operations and so on. Or use index to find the data set.
Public class DataIndexModel<T> {private T data; public class DataIndexModel<T> {private T data; private int index; public DataIndexModel(T data, int index) { this.data = data; this.index = index; } public TgetData() {
        return data;
    }

    public int getIndex() {
        returnindex; }}Copy the code
Observable.zip(Observable.range(0, mItems.size() - 1), Observable.fromIterable(mItems),
                new BiFunction<Integer, Object, DataIndexModel<Object>>() {
            @Override
            public DataIndexModel<Object> apply(Integer index, Object data) throws Exception {
                return new DataIndexModel<>(data, index);
            }
        })
                .filter(new Predicate<DataIndexModel<Object>>() {
                    @Override
                    public boolean test(DataIndexModel<Object> model) throws Exception {
                        returnmodel.getIndex() ! = 15; } }) .map(new Function<DataIndexModel<Object>, Object>() { @Override public Object apply(DataIndexModel<Object> model) throws Exception {return model.getData();
                    }
                })
                .subscribe();
Copy the code