RxJava basics

RxJava create operator

Note: Dependencies need to be added before using Rxjava

dependencies {
    implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.0.2'
    implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.0'
    implementation 'the IO. Reactivex. Rxjava2: rxkotlin: 2.3.0'// Note: RxJava2 and RxJava1 cannot coexist, i.e. dependencies cannot exist at the same time}Copy the code
The operator role
create() Create an Observable from scratch using a function
just() Converts one or more objects into an Observable that emits this or these objects
from() Converts an Iterable, Future, or array into an Observable
defer() An Observable is created only when a subscriber subscribes, and a new Observable is created for each subscription
range() Creates an Observable that emits a sequence of integers in the specified range
interval() Creates an Observable that emits integer sequences at the given interval
timer() Create an Observable that emits single data after a given delay
empty() Create an Observable that does nothing and does nothing directly
error() Create an Observable that does nothing and notifies errors directly
never() Create an Observable that emits no data

Create one.

Create an Observable from scratch using a function

Official document: reactivex. IO /documentati…

RxJava recommends that when passing the create method function we first check the isDisposed state of the observer to stop our Observable from emitting data and running expensive operations when no observer is available

  • The practical application
fun testCreate() {observable. create<Int> {// Determine the isDisposed state of an Observableif(! it.isDisposed) { it.onNext(1) it.onNext(2) it.onNext(3) it.onComplete() } }.subscribeBy( onNext = { Log.e("TAG"."onNext: ${it.toString()}") },
                    onComplete = { Log.e("TAG"."onComplete")})}Copy the code

The execution result

    onNext: 1
    onNext: 2
    onNext: 3
    onComplete
Copy the code

2. Just

Creates an Observable that emits the specified value

Official document: reactivex. IO /documentati…

Just takes one to ten arguments and returns an Observable that emits the data in list order

  • The practical application
fun testJust(){Observable. Just (1,2,3,4,5,6,7,8,9,10)."TAG"."onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG"."onComplete")})}Copy the code

The execution result

    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onNext: 5
    onNext: 6
    onNext: 7
    onNext: 8
    onNext: 9
    onNext: 10
    onComplete
Copy the code

If null is passed in just(), a null-pointer exception is thrown

Three.

Transform other kinds of objects and data types into Observables

  • The practical application
fun testFrom(){val list = arrayListOf<Int>(1,2,3,4,5) list.toobservable ()."TAG"."onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG"."onComplete")})}Copy the code

The execution result

    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onNext: 5
    onComplete
Copy the code

For the Future, it emits the single data returned by the future.get () method

class MyCallable : Callable<String>{
        override fun call(): String {
            Log.e("TAG"."Simulate some time-consuming operations...")
            Thread.sleep(5000)
            return "OK"
        }
    }
    
fun testFromFuture(){
            val executorService = Executors.newSingleThreadExecutor();
            val future = executorService.submit(MyCallable())
            Observable.fromFuture(future)
                    .subscribeBy(
                            onNext = {Log.e("TAG"."onNext: $it")})}Copy the code

The execution result

Simulate some time-consuming operations... onNext: OKCopy the code

The FROM method has a version that accepts two optional parameters, specifying the timeout length and the unit of time. If the Future doesn’t return after the specified time, the Observable emits an error notification and terminates

fun testFromFuture(){
            val executorService = Executors.newSingleThreadExecutor();
            val future = executorService.submit(MyCallable())
            Observable.fromFuture(future,3,TimeUnit.SECONDS)
                    .subscribeBy(
                            onNext = {Log.e("TAG"."onNext: $it")})}Copy the code

The execution result

Simulate some time-consuming operations... AndroidRuntime: FATAL EXCEPTION: main io.reactivex.exceptions.OnErrorNotImplementedExceptionCopy the code

4. Repeat

Create an Observable that emits a particular data multiple times

Instead of creating an Observable, repeat repeats the data sequence of the original Observable, which can be infinite or can be specified by repeat(n)

fun testRepeat(){
            Observable.just("hello"//.repeat() // infinite.repeat(3) // loop 3 times.subscribeby (onNext = {log.e ("TAG"."onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG"."onComplete") },
                            onError = { it.printStackTrace() }
                    )
        }
Copy the code

The execution result

 onNext: hello
 hello
 onComplete
Copy the code

There are also two repeat related operators in rxJavA2.x: repeatWhen and repeatUntil

1. repeatWhen

RepeatWhen does not cache and repeat the data sequence of the original Observable, but re-subscribes and publishes the original Observable according to the specified conditions

fun testRepeatWhen(){observable.range (0,5).repeatwhen {observable.timer (10, timeunit.seconds)}."TAG"."onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG"."onComplete") },
                            onError = { it.printStackTrace() }
                    )
        }
Copy the code

The execution result

09-06 05:55:14. 901, 22472-22472 / com mufeng. Rxjavademo E/TAG: onNext: 0 onNext: 1 onNext: 2 onNext: 3 onNext: 4 09-06 05:55:24. 903, 22472-22505 / com mufeng. Rxjavademo E/TAG: onNext: 0 09-06 05:55:24. 904, 22472-22505 / com mufeng. Rxjavademo E/TAG: onNext: 1 onNext: 2 onNext: 3 onNext: 4 09-06 05:55:24. 911, 22472-22505 / com mufeng. Rxjavademo E/TAG: the onCompleteCopy the code
2. repeatUntil

RepeatUntil is a new operator in rxJavA2.x, indicating that data is not repeated until a certain condition

fun testRepeatUntil(){
            val time = System.currentTimeMillis();
            Observable.just("hello")
                    .repeatUntil {
                        System.currentTimeMillis() - time > 5000
                    }
                    .subscribeBy(
                            onNext = { Log.e("TAG"."onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG"."onComplete") },
                            onError = { it.printStackTrace() }
                    )
        }
Copy the code

The execution result

09-06 06:02:15. 220, 22728-22728 / com mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 552, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 552, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 568, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 569, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 578, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 579, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 579, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 579, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 580, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 580, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 581, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 581, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 583, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 584, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 609, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:16. 609, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:18. 384, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:18. 384, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:20. 177, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: onNext: Hello 09-06 06:02:20. 178, 22728-22728 / com. Mufeng. Rxjavademo E/TAG: the onCompleteCopy the code

5. The Empty/Never/Error

1. empty()

Create an observer object and send only the onComplete event

Observable.empty<Int>()
        .subscribeBy(
                onNext = { Log.e("TAG"."Accept the onNext event ==$it")},
                onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}")},
                onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code

2. error()

The observed object created by this method has the characteristics of sending events: only sending Error events, directly notifying exceptions can be customized exceptions

Observable.error<Int>(Throwable("Unknown exception"))
        .subscribeBy(onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}")})
Copy the code

3. never()

No events are sent

Observable.never<Int>()
        .subscribeBy(
                onNext = { Log.e("TAG"."Accept the onNext event ==$it")},
                onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}")},
                onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code

Six Defer.

Until an observer subscribes, an Observable is dynamically created and a new Observable is created for each observer

Every time you subscribe, you get a newly created Observable, which ensures that the data in the Observable is up to date

  • Common operators
  1. ․ RxJava 2 x: defer
  2. RxKotlin: defer
Var I = 100 val observable = observable. defer {observable. just(I)} Then defer() is used to create the Observable object Observable. SubscribeBy (onNext = {log.e ()"TAG"."Accept the onNext event ==$it") },
        onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
        onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code

7. The Timer

Create an Observable that emits a special value after a given delay

Observable.timer(2, TimeUnit.SECONDS)
        .subscribeBy(
                onNext = { Log.e("TAG"."Accept the onNext event ==$it") },
                onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
                onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code

Eight Interval.

An Observable that emits integer sequences at a fixed time interval

  • Common operators
  1. RxJava 2.X: interval
  2. RxKotlin: interval
/** * First argument: the first delay time * second argument: the interval between sending events * third argument: TimeUnit */ observable. interval(2,1, timeunit.seconds). SubscribeBy (onNext = {log.e ("TAG"."Accept the onNext event ==$it") },
                onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
                onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code

Nine Range.

Sends a sequence of events in succession, ranges can be specified

/** * range sends a sequence of integers without delay * first argument: the start of the sequence of events * second argument: / Observable. Range (3,10). SubscribeBy (onNext = {log.e)"TAG"."Accept the onNext event ==$it") },
                onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
                onComplete = { Log.e("TAG"."Response to Complete Event"}) /** * intervalRange: send sequence of integers, delay can be set * first argument: start of sequence of events * second argument: number of events * third argument: event interval * fourth argument: Incident unit * / observables. IntervalRange (3,10,2,1, TimeUnit. SECONDS). SubscribeBy (onNext = {Log. E ("TAG"."Accept the onNext event ==$it") },
                onError = { Log.e("TAG"."In response to an Error event:${it.localizedMessage}") },
                onComplete = { Log.e("TAG"."Response to Complete Event")})Copy the code

The above is the use of Observable creation operators. These operators can be used not only in Observables, but also in Flowable

The Demo address:
RxJavaDemo