RxJava constitute

The paper

RxJava = Observable + Operator + Scheduler + Observer?

Observable
    .fromArray(1.2.3.4)
    .map { it * 5 }
    .filter { it -> it > 10 }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { println(it) }
Copy the code


Observable*

What are the characteristics of the observable data sources,

  • Usually do work when you start or stop listening
  • Synchronous or asynchronous
  • Single item,many items,or empty
  • Terminates with an error or succeeds to completion
  • May never terminate!
  • Just an implemention of the Observer pattern

GOTO 2016 • Exploring RxJava 2 for Android • Jake Wharton

What can be called observable data sources

  • View event (touch, click..)
  • Data returned by a network request
  • Query the data returned by the database
  • Mobile phone sensor notification data (GPS, power)
  • Alarm timer notification…

How to create an Observable

  • Observable.just(“Hello”)
  • Observable.fromArray(“Hello”,”Hello2″)
  • Observable.fromCallable{“Hello”}
  • Observable.create{ it.onNext(“Hello”)}
  • Observable.Interval(200)
Observable.create(new ObservableOnSubscribe<String>() {
	@Override
	public void subscribe(ObservableEmitter<String> emitter)  {
		emitter.onNext("Hello"); emitter.onComplete(); }})Copy the code
Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return "Hello"; }});Copy the code

Meanwhile, in the Android usage scenario:

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1)}//.subscribeOn(Schedulers.newThread())
    .subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next  :Thread[main,5,main] 5
Copy the code

After subscribeOn is added, it is found that both the upstream and downstream running threads of subscribeOn are changed.

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1) }
	.subscribeOn(Schedulers.newThread())
    .subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1.5,main]
in next  :Thread[RxNewThreadScheduler-1.5,main] 5
Copy the code

observeOn

Specifies the thread of the observer. Let’s take a look at a code run.

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1)}//.observeOn(Schedulers.newThread())
    .subscribe { println("int next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next  :Thread[main,5,main] 5
Copy the code

When observeOn is added, it is found that the threads running downstream of observeOn have changed.

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1) }
	.observeOn(Schedulers.newThread())
    .subscribe { println("in next :${Thread.currentThread()} $it")}// Run the result
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next  :Thread[RxNewThreadScheduler-1.5,main] 5
Copy the code




In particular:

The Observer observers

slightly

Application scenarios in APP

List the current scenarios where RxJava is used in our App.

Network request :Retrofit+RxJava

@POST("/content/user/info")
Observable<BaseResponse<UserInfo>> userInfo_Ob(@Body UserInfoReq req);
Copy the code

Here’s a piece of code where we use the operator to synchronize server data when our app initializes

// Synchronize server data during app initialization.
fun checkRequiredObservable(a): Observable<PersonRequiredInfo> {
    	// Personal information required later
        var personRequiredInfo = PersonRequiredInfo()
        val allObservables = arrayListOf(
                ...
            	// a series of separate requests
                appTabSetting(),// Bottom TAB configuration
            	userInfoObservable(),// User information
                queryBindConfig(),// The user binds the account information
                syncBabyInfoObservable(userId) // Synchronize baby information...).var zipObservable =  Observable.zip(allObservables) { 
             return@zip personRequiredInfo 
         }
        return apolloAppConfig()// Get Apollo configuration information and save the configuration
                .flatMap {zipObservable } // parallel the above series of requests
                .onErrorReturn { personRequiredInfo }
                .doOnNext {registerUserSuperProperties()}
                .compose(RxHelper.io2MainThread())
}
private fun syncBabyInfoObservable(a): Observable<Any> {
        return getBabyInfoObservable() // Get the server baby information first
            .flatMap { updateBabyInfoObservable() } // Upload if required
            .flatMap { getBabyInfoObservable().map { Any() } }// Get the synchronized baby information again
            .onErrorReturn { Any()}
}
Copy the code

Given the current state of the interface, it’s hard to imagine how I would have combined these requests without RxJava.





Debounce and Throttle

debounce

Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.

throttleFirst

Returns an Observable that emits only the first item emitted by the source ObservableSource during sequential time windows of a specified duration.

View repeat click

RxView.clicks(container)
    .throttleFirst(800,TimeUnit.MILLISECONDS)
    .subscribe { onclick() }
Copy the code

Search image stabilization

RxTextView.textChanges(etSearch)
    .debounce(searchDebounceTime, TimeUnit.MILLISECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { goSearch(chars) }
Copy the code

Page refresh frequency control

class RefreshDebounce {
    private var rxEmitter: ObservableEmitter<Long>? = null
    private var observable = Observable
        .create<Long> { rxEmitter = it }
        .debounce(1000L, TimeUnit.MILLISECONDS)

    constructor(consumer: (Long) - >Unit?). { observable.subscribe { consumer.invoke(it) } }fun callRefresh(a)= rxEmitter? .onNext(System.currentTimeMillis()) }/ / use
var refreshDebounce = RefreshDebounce { println("refresh:$it") }
refreshDebounce.callRefresh()
Copy the code

Timing/delay execution

private void startCountDown15Min() {
    countDown15MinDisposable = Observable.timer(900, TimeUnit.SECONDS)
           subscribe(aLong -> Log.i(TAG, "The countdown is up."));
}

private void cancelCountDown15Min() {
    if(countDown15MinDisposable ! =null&&! countDown15MinDisposable.isDisposed()) { countDown15MinDisposable.dispose(); }}Copy the code

System Permission Application

new RxPermissions(activity)
	.request(Manifest.permission.READ_PHONE_STATE)
    .subscribe(aBoolean -> Log.d(TAG,"result:"+aBoolean)
Copy the code