preface

The previous two articles have introduced the Rx packaging routine and the subscribeOn and observeOn source code involved in thread scheduling process control. That is commonly used in this article will be to Android development IoScheduler and AndroidSchedulers mainThread source parse (). As mentioned above, subscribeOn and observeOn only control thread scheduling in the process based on scheduler set by the developer. It can be understood here that Compared with subscribeOn and observeOn, Scheduler is only a tool for them. So this article will focus on the implementation of scheduler (PS: this article is based on RxJava 2.1.16, RxAndroid 2.0.1).

What does Rx’s Observable.create do?

RxJava subscribeOn and observeOn source code analysis

IoScheduler

Based on the call of subscribeOn during subscribeActual, a flow chart is arranged:The data flow shown above is from when subscribeActual calls the Scheduler to when it is submitted to the thread pool. We will follow this procedure to parse the IoScheduler.

Observable.create<String> {
     Log.d(TAG, ${thread.currentThread ().name})
     it.onNext("I'm ")
     it.onComplete()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
      Log.d(TAG, Event-consuming Thread: ${thread.currentThread ().name})
      Log.d(TAG, it)
}
Copy the code

Scheduler

Scheduler. Java is the parent class of the thread Scheduler Scheduler in RxJava. ObservableSubscribeOn. SubscribeActual method, you will call the scheduler. ScheduleDirect thread switches. The scheduler is IoScheduler and the scheduleDirect method is implemented in its parent class.

  • Call the createWorker method implemented by the subclass.
  • The scheduler.DisposeTask class (also a Runnable) wraps the incoming Runnable object.
  • The schedule method of the Scheduler.Worker object is called for thread scheduling.

Next, take this method as the basis for parsing one by one.

// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
    DisposeTask task = new DisposeTask(decoratedRun, w);
    
    w.schedule(task, delay, unit);
    
    returntask; }...@NonNull
public abstract Worker createWorker(a);
Copy the code

Thread scheduling

In IoScheduler, two logical thread pools are involved: 1. Worker thread pools are used to process externally submitted tasks (such as developer-set subscriptions or observed responses). Reclaim the thread pool, periodically reclaim the thread pool of the expired worker thread pool.

IoScheduler’s inner classes are described here to give you a sense of what they are.

  • IoScheduler.ThreadWorker: The class in IoScheduler that actually manages threads. Member variables have a Java thread pool for thread scheduling.
  • IoScheduler CachedWorkerPool: this is the type of role, a factory is mainly used for creating, cache reuse and destroy IoScheduler ThreadWorker.
  • IoScheduler. EventLoopWorker: IoScheduler. ThreadWorker decorator, is used to control the CachedWorkerPool ThreadWorker held management itself.

Member variables

IoScheduler member variables and their definitions:

  • The XXX_THREAD_NAME_PREFIX and RxThreadFactory objects are the configuration for creating threads for the thread pool. RxThreadFactory is a Java subclass of ThreadFactory. This is mainly used to standardize the name and priority of the thread. Static code blocks used in IoScheduler.
  • KEEP_ALIVE_TIME and KEEP_ALIVE_UNIT: default keepalive time after the call ends. The subsequent start() method will be used.
  • SHUTDOWN_THREAD_WORKER: ioScheduler. ThreadWorker object, used for subsequent processing of the thread after the Rx dipose call.
  • ThreadFactory: This is a customizable threadFactory in the constructor to override the WORKER_THREAD_FACTORY described above.
  • Pool: IoScheduler CachedWorkerPool object, this is a similar factory pattern object, mainly used to create the above IoScheduler. ThreadWorker object, management and reuse of thread.

Ps: Here you can read briefly to have an understanding. Later, we will introduce them in detail and connect them together in the process.

// IoScheduler.java
public final class IoScheduler extends Scheduler {
    private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
    static final RxThreadFactory WORKER_THREAD_FACTORY;

    private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
    static final RxThreadFactory EVICTOR_THREAD_FACTORY;

    private static final long KEEP_ALIVE_TIME = 60;
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

    static final ThreadWorker SHUTDOWN_THREAD_WORKER;
    final ThreadFactory threadFactory;
    final AtomicReference<CachedWorkerPool> pool;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_IO_PRIORITY = "rx2.io-priority";

    static final CachedWorkerPool NONE;
Copy the code

Static code block

Here, we create some of the static variables mentioned above. NONE can be interpreted as the default CachedWorkerPool object for IoScheduler. Update replacement for subsequent start() and shutdown() methods.

KEY_IO_PRIORITY = “rs2.io-priority” specifies the priority of the thread. You can change the priority by setting the key to rs2.io-priority.

static {
    SHUTDOWN_THREAD_WORKER = new ThreadWorker(new RxThreadFactory("RxCachedThreadSchedulerShutdown"));
    SHUTDOWN_THREAD_WORKER.dispose();

    int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));

    WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

    EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

    NONE = new CachedWorkerPool(0.null, WORKER_THREAD_FACTORY);
    NONE.shutdown();
}
Copy the code

Initialize, start, and close

The call to schedulers.io () corresponds to the default global static IoScheduler object, created with the no-argument constructor.

  • The default threadFactory object is the WORKER_THREAD_FACTORY object.
  • The pool is initialized with the default CachedWorkerPool object NONE, and the start() method is called.
  • The start() method actually creates a CachedWorkerPool object and sets the default thread pool lifetime KEEP_ALIVE_TIME (60s). Ps: CAS is used here to ensure thread safety.
  • The subsequent shutdown() method is a reset of start(). Ps: Spin is also used here to ensure thread safety.

In fact, I personally do not understand why the logic of start and shutdown is needed here. If you know, you can comment below.

// IoScheduler.java
public IoScheduler(a) {
    this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    start();
}

@Override
public void start(a) {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

@Override
public void shutdown(a) {
    for (;;) {
       CachedWorkerPool curr = pool.get();
       if (curr == NONE) {
          return;
       }
       if (pool.compareAndSet(curr, NONE)) {
          curr.shutdown();
          return; }}}Copy the code

createWorker

Back to business, the scheduler.scheduleDirect method calls createWorker(). IoScheduler createWorker() creates and returns an EventLoopWorker object whose member variable CachedWorkerPool pool is the IoScheduler’s pool object.

// IoScheduler.java
@NonNull
@Override
public Worker createWorker(a) {
    return new EventLoopWorker(pool.get());
}
Copy the code

EventLoopWorker

The EventLoopWorker consists of a CachedWorkerPool and a ThreadWorker retrieved from IoScheduler.

As mentioned earlier, CachedWorkerPool acts as the administrator for ThreadWorker in EventLoopWorker. ThreadWorker uses an internal thread pool for thread scheduling.

In the EventLoopWorker constructor, threadWorker is retrieved via cachedWorkerPool.get ().

The ps: cachedWorkerPool. get method contains reuse logic for ThreadWorker, which will be covered later.

// IoScheduler.java - class EventLoopWorker
static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get(); }...Copy the code

Scheduler. ScheduleDirect method finally invokes the baron chedule, here actually invokes threadWorker. ScheduleActual.

// IoScheduler.java - class EventLoopWorker
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }

    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
Copy the code

ThreadWorker

ThreadWorker’s scheduleActual inherits its parent, NewThreadWorker:

  • The scheduleActual method actually calls the Java thread pool Executor for thread scheduling.
  • The ScheduledRunnable class is used to decorate the Runnable object passed in again.
  • Take the Observables Subscribeon trigger as an example. The decoration chain for Runnable objects is SubscribeTask -> Scheduler.disposeTask (scheduler.scheduleDirect method) -> ScheduledRunnable (NewThreadWorker scheduleActual).
// IoScheduler.java - class ThreadWorker
static final class ThreadWorker extends NewThreadWorker {
        private longexpirationTime; .// NewThreadWorker.java
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if(parent ! =null) {
        if(! parent.add(sr)) {returnsr; } } Future<? > f;try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if(parent ! =null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}
Copy the code

The default in NewThreadWorker is to create a single thread pool. To understand the concept, IoScheduler does not have a single thread pool for scheduling, but multiple single-threaded thread pools for scheduling. The manager for each thread pool is ioScheduler.threadworker.

// NewThreadWorker.java
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); }...// SchedulerPoolFactory.java
public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    tryPutIntoPool(PURGE_ENABLED, exec);
    return exec;
}    
Copy the code

CachedWorkerPool

Cachedworkerpool.get ()

  • The get() method eventually returns a ThreadWorker object, which can be understood as a factory role.
  • Context Context expiringWorkerQueue is a cackqueue of type ConcurrentLinkedQueue for CachedWorkerPool.
  • If the expiringWorkerQueue is not empty then you would like to retrieve a ThreadWorker from it, otherwise you would like to create a new ThreadWorker.

ThreadWorker would like to join a expiringWorkerQueue when Runnable is finished. We’ll talk about that later.

// IoScheduler.java - class CachedWorkerPool
ThreadWorker get(a) {
    if (allWorkers.isDisposed()) {
        return SHUTDOWN_THREAD_WORKER;
    }
    while(! expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll();if(threadWorker ! =null) {
            returnthreadWorker; }}// No cached worker found, so create a new one.
    ThreadWorker w = new ThreadWorker(threadFactory);
    allWorkers.add(w);
    return w;
}
Copy the code

ThreadWorker management and destruction

There is a problem with recycling resources after executing Runnable passed by Rx. Let’s talk about ThreadWorker management strategies.

Scheduler.disposetask (inherited from Runnable) was also missing from the previous scheduler.scheduleDirect method. Here, the decoratedRun passed in during initialization is the SubscribeTask object passed in the previous step, and W is the EventLoopWorker mentioned above.

// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = newDisposeTask(decoratedRun, w); .Copy the code

When the run method is called,

  • The first call is to the Run method of the member variable Runnable object (ps: here is SubscribeTask, which performs the next subscription logic).
  • Finally, its dispose() method is called. Since W is EventLoopWorker, the EventLoopWorker.Dispose () method is executed.
// Scheduler.java - class DisposeTask
@Override
public void run(a) {
    runner = Thread.currentThread();
    try {
        decoratedRun.run();
    } finally {
        dispose();
        runner = null; }}@Override
public void dispose(a) {
    if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
        ((NewThreadWorker)w).shutdown();
    } else{ w.dispose(); }}Copy the code

The eventLoopWorker. dispose method calls pool.release, which is the CachedWorkerPool object, It can also be seen that EventLoopWorker controls CachedWorkerPool for ThreadWorker management.

// IoScheduler.java - class EventLoopWorker
@Override
public void dispose(a) {
    if (once.compareAndSet(false.true)) {
        tasks.dispose();

        // releasing the pool should be the last actionpool.release(threadWorker); }}Copy the code

The expirationTime property of ThreadWorker is set to the current time +keepAliveTime (default 60s). Then add the threadWorker to the expiringWorkerQueue queue.

// IoScheduler.java - class CachedWorkerPool
void release(ThreadWorker threadWorker) {
    // Refresh expire time before putting worker back in pool
    threadWorker.setExpirationTime(now() + keepAliveTime);

    expiringWorkerQueue.offer(threadWorker);
}
Copy the code

CachedWorkerPool is also a Runnable object. Another function of CachedWorkerPool is to periodically destroy ioScheduler.threadworker objects that have expired. Evictor is the thread pool to perform the destruction, with CachedWorkerPool as the Runnable object. (PS: it is set to trigger once every keepAliveTime, that is, it is triggered once in 60s by default).

// IoScheduler.java - class CachedWorkerPool
static final class CachedWorkerPool implements Runnable.CachedWorkerPool(long keepAliveTime.TimeUnit unit.ThreadFactory threadFactory) {... ScheduledExecutorService evictor =null; Future<? > task =null;
    if(unit ! =null) {
        evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
        task = evictor.scheduleWithFixedDelay(this.this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
    }
    evictorService = evictor;
    evictorTask = task;
}
Copy the code

The evictExpiredWorkers() method is called every time a thread call is triggered. Remove threadworkers whose expirations have expired by going through the expiringWorkerQueue. Release useless ThreadWorker.

// IoScheduler.java - class CachedWorkerPool
@Override
public void run(a) { evictExpiredWorkers(); }...void evictExpiredWorkers(a) {
    if(! expiringWorkerQueue.isEmpty()) {long currentTimestamp = now();

        for (ThreadWorker threadWorker : expiringWorkerQueue) {
            if (threadWorker.getExpirationTime() <= currentTimestamp) {
                if(expiringWorkerQueue.remove(threadWorker)) { allWorkers.remove(threadWorker); }}else {
                // Queue is ordered with the worker that will expire first in the beginning, so when we
                // find a non-expired worker we can stop evicting.
                break; }}}}Copy the code

summary

In summary:

  • ThreadWorker holds Java thread pools internally.
  • EventLoopWorker triggers CachedWorkerPool to create, reuse, and destroy threadWorkers.
  • CachedWorkerPool internally holds the ThreadWorker cache queue, which indirectly manages the creation, reuse, and destruction of Java thread pools.
  • When the thread task is completed, the decorator DisposeTask will add the current ThreadWorker to the CachedWorkerPool cache queue by triggering the EventLoopWorker.dispose method and set an expiration time.
  • CachedWorkerPool removes expired threadworkers via a thread trigger mechanism.
  • If ThreadWorker reaches its expiration date, it is reused in the cachedWorkerPool.get () method.

Ps: According to the above code analysis, IoScheduler creates a new thread pool for Rx thread scheduling when thread resources cannot be reused. Scheduler.io() may need to be used sparingly if there is too much time-consuming logic.

Source:

  • ObservableSubscribeOn.java
  • Scheduler.java
  • IoScheduler.java

AndroidSchedulers.mainThread()

AndroidSchedulers, as the name suggests, is a class that encapsulates schedulers for Android. Android thread scheduling relies on handler-Looper-message, a messaging mechanism that will not be covered here.

. The following code can see AndroidSchedulers mainThread () returns the Scheduler is a main thread which HandlerScheduler object. You can assume that subsequent thread switching is done through Handler messaging. Ps: Note that AndroidSchedulers is not a Scheduler type, its role is similar to schedulers.io () to create and provide Schedulers.

// AndroidSchedulers.java
public final class AndroidSchedulers {
    private static final class MainHolder {
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call(a) throws Exception {
                    returnMainHolder.DEFAULT; }});public static Scheduler mainThread(a) {
        returnRxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); }...// HandlerScheduler.java
final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }
Copy the code

Let’s look at the thread scheduling implementation of the HandlerScheduler using observeOn as an example.

Observable.create<String> {
     Log.d(TAG, "Event generation thread:${Thread.currentThread().name}")
     it.onNext("I'm ")
     it.onComplete()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
      Log.d(TAG, "Event consuming thread:${Thread.currentThread().name}")
      Log.d(TAG, it)
}
Copy the code

During the period of subscription ObservableObserveOn. SubscribeActual will call the scheduler. CreateWorker (), the similar to the above IoScheduler. (the scheduler according to the sample code above can be understood as AndroidSchedulers. MainThread HandlerScheduler () returns). The Scheduler.Worker object is created as an ObserveOnObserver for subsequent thread scheduling to observe event updates.

HandlerScheduler. CreateWorker () will create a HandlerWorker object, the handler to the main thread which the handler (handler (stars) getMainLooper ())).

// ObservableObserveOn.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}// HandlerScheduler.java
@Override
public Worker createWorker(a) {
    return new HandlerWorker(handler);
}
Copy the code

Is mentioned in the previous paper, the Rx events such as the issuance of the pass observeOn operator onNext method, invoked ObserveOnObserver. The schedule (). worker.schedule(this); And for calling handlerworker.schedule.

// ObservableObserveOn.java - class ObserveOnObserver
void schedule(a) {
    if (getAndIncrement() == 0) {
        worker.schedule(this); }}Copy the code

In the handlerworker.schedule method, the Runnable object that needs to be executed is decorated with the ScheduledRunnable class, and then switched to its thread (main thread) by using Handler in the form of Message.

private static final class HandlerWorker extends Worker {
    private finalHandler handler; .@Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        returnscheduled; }...Copy the code

Source:

  • ObservableObserveOn.java
  • AndroidSchedulers.java
  • HandlerScheduler.java

conclusion

Finally, to summarize the general logic of Scheduler:

  • Scheduler.Worker’s role is thread management, where RxJava really encapsulates thread scheduling.
  • The role of the Scheduler is Scheduler. Manage Scheduler.Worker, including its creation, reuse and destruction.

The last

This paper parsed Schedulers. IO () (IoScheduler) and AndroidSchedulers. MainThread () of the source code. RxJava Schedulers source code introduction

Series of articles:

  • What does Rx’s Observable.create do?
  • RxJava subscribeOn and observeOn source code analysis
  • SubscribeOn of RxSwift and source code analysis of observeOn
  • How is RxSwift Scheduler implemented