What is a Rxjava

RxJava - A reactive extension of the JVM - a library for composing asynchronous and event-based programs using the Java VM's observable sequence. (Simply put: it's a library for asynchronous operations.)Copy the code

role

Used to implement asynchronous operations, similar to the Android AsyncTask, Handler function

The advantages of RxJava

RxJava provides an ASYNCHRONOUS programming API that is observer based and chain-called, so the logic of code written in RxJava is very concise.Copy the code

Usage scenarios

Database reading and writing, loading large images, file compression/decompression and other time-consuming operations that need to be put in the background can be implemented with RxJavaCopy the code

Two common ways

ObservableOnSubscribe<Integer> {ObservableOnSubscribe<Integer> {ObservableOnSubscribe<Integer> {ObservableOnSubscribe<Integer> {ObservableOnSubscribe<Integer>} object:Observer<Integer> { override fun onSubscribe(d: Disposable) { } override fun onNext(t: Integer) { } override fun onError(e: Override fun onComplete() {}} // Subscribe to mobServable.subscribe (mObserver)Copy the code

The observer model does not support back pressure: when the observer quickly send a large amount of data, the downstream won’t do other processing, even if a large number of data accumulation, the chain will not quote MissingBackpressureException, consumes too much memory will only OOM. Therefore, when we use Observable/Observer, we need to consider whether the data volume is very large (the official line is 1000 events as a reference).

Flowable.range(0,100). Subscribe(object: Subscriber<Integer> {override fun onSubscribe(s: Subscription?) { TODO("Not yet implemented") } override fun onNext(t: Integer?) { TODO("Not yet implemented") } override fun onError(t: Throwable?) { TODO("Not yet implemented") } override fun onComplete() { TODO("Not yet implemented") } }) }Copy the code

Flowable supports back pressure, that is, in general, the upstream observed responds to the downstream observer’s data request, and the downstream calls Request (n) to tell the upstream how many data to send. This prevents large amounts of data from piling up in the call chain, keeping memory low.

Flowable can also be created with create, but a backpressure policy must be specified (a backpressure policy will be covered in a later blog).

Constituent part

  1. The observed
    • Observable ObservableThe observed decides when the event is triggered and what kind of event is triggered, right
    • Flowable FlowableYou can view it asObservableOnly it supports back pressure
    • SingleonlyonSuccessandonErrorEvent, onlyonSuccessSend a data or an error notification, and then send the data again without any processing, directly ignored
    • CompletableonlyonCompleteandonErrorEvent, do not transmit data, noneMap, flatMapOperators. Often in combination withandThenOperator usage
    • MaybeThere is noonNextMethod, also neededonSuccessLaunch data, and can only launch 0 or 1 data, multiple is no longer processed
  2. Observer (observer)
    • onSubscribe Called when subscribing to an observer
    • onNextWhen the event is sent, the observer calls back to the onNext() method
    • onErrorWhen this event is sent, the observer calls back to the onError() method, and after this event is sent, no other events will be sent
    • onCompleteWhen the event is sent, the observer calls back to the onComplete() method, and after the event is sent, no other events will be sent
  3. Subscribe (subscribe)

The five can be observed through toObservable toFlowable, toSingle, toCompletable, toMaybe mutual conversion

Single, Completable, Maybe — simplified versions of Observables

The operator

  1. Create operator

    The method name function
    create() Create an observed
    just() Create an observed and send no more than 10 events
    fromArray() This method is similar to just(), except fromArray can pass in more than 10 variables, and it can pass in an array
    fromCallable() The Callable used here is the same as the Runnable used in java.util.concurrent, except that it returns a result value that is intended for the observer
    fromFuture() The Future in this argument is the Future in java.util.concurrent, which adds methods such as cancel() to the Callable and retrieves the value returned by the Callable via the get() method
    fromIterable() Send a List collection of data directly to the observer
    defer() The effect of this method is that the observed is not created until the observed is subscribed
    timer() A value of 0L is sent to the observer after the specified time
    interval() Every once in a while an event is sent that starts at 0 and increments the number of 1
    intervalRange() You can specify the start value and number of events to send, as interval() does
    range() Send a range of events simultaneously
    rangeLong() It acts the same as range(), but is of type Long
    empty() Send the onComplete() event directly
    never() No events are sent
    error() Send the onError() event
  2. Conversion operator

    The method name function
    map() The map can convert the data type sent by the observer to another type
    flatMap() This method can integrate the elements of the event sequence and return a new observed.
    concatMap() ConcatMap () and flatMap() are basically the same, except that concatMap() forwards events in order, while flatMap() is unordered
    buffer() Get a number of events from the list of events that need to be sent and send them in a buffer
    groupBy() The sent data is grouped, and each group returns one observed
    scan() To aggregate data in a logical way
    window() When a specified number of events are sent, they are grouped. The count argument in the window represents the specified amount. For example, if count is set to 2, each two pieces of data will be grouped into a group.
  3. Combinatorial operator

    The method name function
    concat() You can group multiple observers together and then send events in the order they were sent before. Note that concat() can only send a maximum of four events
    concatArray() Same as concat(), but concatArray() can send more than four observed objects
    merge() This method works basically the same as concat(), which sends events sequentially, while merge() sends events in parallel
    zip() Multiple observables are merged and combined one by one according to the order of events sent by each Observable. The number of events sent is the same as the minimum number of events in the source Observable
    reduce() The function of the scan() operator and scan() operator is also to aggregate the sent data in a certain logic. The difference between these two operators is that scan() will send the event to the observer every time the data is processed, while reduce() will aggregate all the data together before sending the event to the observer
    collect() Collect data into a data structure
    concatArrayDelayError() & mergeArrayDelayError() In both concatArray() and mergeArray() methods, if one of the observers sends an Error event, it will stop sending the event. If you want to delay the onError() event until all the observers have sent the event, You can use concatArrayDelayError() and mergeArrayDelayError()
    combineLatest() & combineLatestDelayError() The function of combineLatest() is similar to that of ZIP (), but the sequence of events combineLatest() sends depends on the timeline. When all observables in combineLatest() send events, Whenever one of the Observables sends an event, the event is combined with the most recent event sent by the other Observables
    count() Returns the number of events sent by the observer
    startWith() & startWithArray() Append events before sending them, startWith() appends one event, and startWithArray() can append multiple events. Additional events are emitted first.
  4. Functional operator

    The method name function
    delay() Delay sending events for a period of time
    doOnEach() Observable calls back to this method before sending an event
    doOnNext() Observable calls back to this method every time it sends onNext()
    doAfterNext() Observable calls back to this method every time it sends onNext()
    doOnComplete() Observable calls back to this method every time it sends onComplete()
    doOnError() Observable calls back to this method before sending onError()
    doOnSubscribe() Observable calls back to this method before sending onSubscribe()
    doOnDispose() This method is called after Disposable dispose() is called
    doOnLifecycle() A callback method that calls back the first argument to the method before calling onSubscribe, which can be used to decide whether to unsubscribe
    doOnTerminate() & doAfterTerminate() DoOnTerminate is a callback before onError or onComplete is sent, and doAfterTerminate is a callback after onError or onComplete is sent
    doFinally() This method is called back after all events have been sent
    onErrorReturn() When called back after receiving an onError() event, the returned value calls back to the onNext() method and normally terminates the sequence of events
    onErrorResumeNext() When onError() is received, a new Observable is returned and the sequence of events is terminated normally
    onExceptionResumeNext() Similar to onErrorResumeNext(), but this method can only catch exceptions
    retry() If an error event occurs, the sequence of all events is resend. Times is the number of retransmissions
    retryUntil() After an error event occurs, you can use this method to determine whether to continue sending the event
    retryWhen() This method is called back when the observed receives an exception or error event and returns a new observed. If the returned observed sends an Error event, the previous observed will not continue to send the event. If a normal event is sent, the previous observed will continue to retry sending the event
    repeat() The observed event is repeatedly sent, and times is the number of times
    repeatWhen() This method may return a new observed setting that determines whether to send the event again
    subscribeOn() Specifies the thread being observed. Note that if this method is called multiple times, it will only work the first time
    observeOn() The thread that specifies the observer takes effect each time it is specified
  5. Filter operator

    The method name function
    filter() Events sent by the observer are filtered by logic. If true is returned, the event is sent, otherwise it is not
    ofType() You can filter events that do not match this type
    skip() Certain events are skipped. Count indicates the number of skipped events
    distinct() Filter repeated events in the event sequence
    distinctUntilChanged() Filter out consecutive and repeated events
    take() Controls the number of events an observer receives
    debounce() If the interval between two events is less than the set interval, the first event will not be sent to the observer
    firstElement() && lastElement() FirstElement () takes the firstElement of the event sequence, and lastElement() takes the lastElement of the event sequence
    elementAt() & elementAtOrError() ElementAt () can specify events that are fetched from the sequence of events, but nothing happens if the input index exceeds the total number of events in the sequence. So in this case, if you want to send an exception message elementAtOrError()
  6. Conditional operator

    The method name function
    all() Checks whether the sequence of events all satisfies an event, and returns true if so, and false if not
    takeWhile() Conditions can be set so that data is sent when it meets the conditions and not otherwise
    skipWhile() Conditions can be set so that data is not sent when it meets the conditions, and otherwise
    takeUntil() Conditions can be set so that when an event meets this condition, the next event will not be sent
    skipUntil() When the Observable in skipUntil() sends an event, the original Observable sends an event to the observer
    sequenceEqual() Checks whether two Observables send the same event
    contains() Determines whether the sequence of events contains an element, returning true if it does, and false if it does not
    isEmpty() Determines whether the sequence of events is empty
    amb() Amb () passes in a collection of Observables, but sends only the events in the first Observable that sent the event; the rest are discarded
    defaultIfEmpty() This method can be used to send a value if the observer sends only an onComplete() event