preface

In our previous article APT Tools for Android Annotations (iii), we introduced APT technology and how it is used, and also mentioned that some well-known open source frameworks such as Dagger2, ButterKnife, and EventBus use it. In order to give you a better understanding of the use of APT technology, I will focus on the use of APT technology in EventBus in the following articles. Before understanding this knowledge, we need to be familiar with the internal principles of EventBus. If you are already familiar with its internal mechanism, you can skip this article. Read EventBus3’s “acceleration engine” for Android.

Reading the article, we can learn the following:

  • Internal principles of EventBus3
  • EventBus3 Subscription and Message sending principles
  • EventBus3 Thread switching principle
  • EventBus3 Handle sticky events

This article is based on EventBus version 3.1.1.

EventBus profile

EventBus should be familiar to Android programmers. It is an observatory-based event publish/subscribe framework that we often use to communicate between different components, background threads, etc.

Even though EventBus is very simple and easy to use, the code structure is very confusing because of the EventBus flying around, making it difficult to test and track. Even though EventBus has a lot of problems, it still doesn’t stop us from learning its principles and programming ideas

About the process

Before we look at the inner workings of EventBus, let’s take a look at the general flow of the EventBus framework. As shown below:

In the figure above, green is the subscription process, and red is the sending event process. You can combine the figure above to understand the source code.

In the figure above, we subscribed to AEvent in A. Java, and subscribed to AEvent and BEvent in B. Java. Let’s analyze the two processes of registering and sending events in EventBus. Let’s start with what’s included in Subscription and SubscriberMethod.

The Subscription class contains the following:

  • Current Registered object
  • SubscriberMethod, the encapsulated object corresponding to the subscription method

The SubscriberMethod class contains the following:

  • contains@SubscribeAnnotated Method Method (java.lang.reflectObjects under the package).
  • @SubscribeThe ThreadMode set in the annotation, ThreadMode
  • Method to register the event type of the Class object
  • @SubscribeThe priority set in
  • @SubscribeIs set to whether the event is sticky

The registration process

When we register objects A and B by calling eventbus.register (), EventBus does the following:

  • Through the interiorSubscriberMethodFinderTo obtain the contents of class A and B@SubscribeAnnotation method, and encapsulates the content of the annotation with the corresponding method asSubscriberMethodObject. The current subscription object is then matched with the correspondingSubscriberMethodThen encapsulated in theSubscriptionObject.
  • All of theSubscriptionIn the calledsubscriptionsByEventTypeA type ofMap<Class<? >, CopyOnWriteArrayList<Subscription>>Data structure (the Class object whose key is the event type), becauseSubscriptionThe object containsSubscriberMethod, then we know the event type of the subscription, so we can distinguish by the event typeSubscriptionAnd because the same event can be subscribed to by methods in different subscribers, events of the same type can correspond to differentSubscription.
  • Encapsulate all subscribed events in a subscriber with a nametypesBySubscriberA type ofMap<Object, List<Class<? >>>Data structure (Key is the subscription object and value is the Class object of the event type to which the object is subscribed). This collection is primarily used to unsubscribe, and is described below.

The main flow in the registration process is for EventBus to get the Subscribe method with the @Subscribe annotation in the class through the SubscriberMethodFinder. Prior to EventBus 3.0, this process was always retrieved by reflection. In versions 3.0 and later, EventBus uses APT technology to optimize the SubscriberMethodFinder process for finding subscription methods so that it knows the method for the related subscription event before the EventBus.register() method is called. This reduces the amount of time consumed by using the reflection traversal fetch method at run time. We will also point out specific optimization points below.

Event Sending Process

Once you know the registration process for EventBus, it’s easy to understand how events happen. Since we already store the Subscription for the event via subscriptionsByEventType, once the Subscription is found, we can get the subscriber that subscribed to the event. And the corresponding subscription Method (an object under the java.lang.Reflect package). Then call through reflection:

Subscription contains both a subscriber and a SubscriberMethod

 method.invoke(subscription.subscriber, event)
Copy the code

With the above method, the corresponding events can be sent to the relevant subscribers. Of course, this is just a brief introduction to how events are sent to relevant subscribers. How sticky events are handled in EventBus and how threads are switched. This will be described in more detail below.

Source code analysis

Now that you’ve looked at the internal workflow of EventBus, let’s take a closer look at the internal implementation through the source code. Again, the subscription process and the sending of events are explained.

Subscription process source code analysis

The entry to subscribe to EventBus is the register() method, as shown below:

  public void register(Object subscriber) { Class<? > subscriberClass = subscriber.getClass();// Flow 1: Get all subscription methods in the corresponding class
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            // Process 2: The actual subscription
            for(SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); }}}Copy the code

In this method, the SubscriberMethodFinder lookup method and the actual subscription process are mainly involved, which are described below.

SubscriberMethodFinder Finds method flow

In this process, all subscriberMethods in subscribers are obtained mainly through SubscriberMethodFinder. Let’s first look at the findSubscriberMethods() method:

List<SubscriberMethod> findSubscriberMethods(Class<? > subscriberClass) {// Get the subscription method from the cache and read the cache if there is one, List<SubscriberMethod> subscriberMethods = (List) method_cache. get(subscriberClass); if (subscriberMethods ! = null) { return subscriberMethods; } else {the if (this. IgnoreGeneratedIndex) {/ / if you ignore the index class, using reflection. subscriberMethods = this.findUsingReflection(subscriberClass); } else {// otherwise use the index class subscriberMethods = this.findusingInfo (subscriberClass); } // If the subscriber does not have a subscription method, Will throw an exception if (subscriberMethods isEmpty ()) {throw new EventBusException subscriberClass (" Subscriber "+ +" and its super Classes have no public methods with the @subscribe annotation "); } else {// Add subscriberMethods from the corresponding class to the cache to improve efficiency and facilitate the next search for method_cache. put(subscriberClass, subscriberMethods); return subscriberMethods; }}}Copy the code

The logic of this method is very simple as follows:

  • Step 1: Start from the cache (METHOD_CACHE) to get the corresponding subscriberSubscriberMethodIf so, fetch it from the cache.
  • Step 2: If not in the cache, pass the Boolean variableignoreGeneratedIndexTo determine whether to use reflection directly to get the subscription method or throughThe index classEventBus 3.0 uses classes added to APT. becauseignoreGeneratedIndexIf the default value is false, it will go by defaultfindUsingInfo()methods
  • Step 3: Store the subscription method set obtained in Step 2 in the cache to facilitate the next acquisition and improve efficiency.

Since the findUsingInfo() method is used by default, let’s continue with this method:

    private List<SubscriberMethod> findUsingInfo(Class
        subscriberClass) {
        // Step 1: build a query status cache pool to cache the query status of up to 4 classes
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while(findState.clazz ! =null) {
            // Step 2, obtain the subscription information corresponding to the lookup status, 👇 where EventBus 3.0 uses index classes,
            findState.subscriberInfo = getSubscriberInfo(findState);
            if(findState.subscriberInfo ! =null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    // Add all subscription methods of the subscriber to the FindState collection
                    if(findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); }}}else {// Step 3: If the subscription information is null, reflection is used to fetch all methods in the class
                findUsingReflectionInSingleClass(findState);
            }// Continue to find the method of the parent class
            findState.moveToSuperclass();
        }
        // Step 4, get all the methods in findState and empty the object pool
        return getMethodsAndRelease(findState);
    }
Copy the code
  • Step 1: Create the FindState object associated with the subscriber. Is retrieved from the FinState object cache pool (up to four), with one FindState for each subscriber object and one or more subscription methods for each subscriber object.
  • Step 2: Call from the FindState objectgetSubscriberInfo()Method to get subscriber – related subscription method information. This method uses APT technology to build index classes for EventBus. Specific optimizations will be covered in the next articleEventBus3: acceleration engine for AndroidDescribe it, just to give you an impression.
  • Step 3: If the subscription method information cannot be obtained through Step 2, passreflectionTo get all the subscription methods in the class. And encapsulate the obtained methods into the subscriberMethods set in FindState.
  • Step 4: Return the subscriberMethods collection in the FindState object.

In the above method, we need to note that if there is no associated subscription method currently subscribed, the subscription methods of its parent class are iterated. It is also important to note that FindState in this method uses an object cache pool and does not create a FindState object every time a subscriber is registered. This saves memory usage.

Knowledge about index class, will be introduced in the next article, here we see directly findUsingReflectionInSingleClass () method:

private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; Try {/ / the current subscribers access to all the way. The methods = findState clazz. GetDeclaredMethods (); } the catch (Throwable th) {/ / get all the public methods of this class include inherit the public methods of the methods. = findState clazz. GetMethods (); findState.skipSuperClasses = true; } // Loop through all the methods to find the corresponding subscription method through the relevant annotations. for (Method method : methods) { int modifiers = method.getModifiers(); If ((modifiers & modifiers. Public)! = 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<? >[] parameterTypes = method.getParameterTypes(); // find the parameter 1, If (parametertypes. length == 1) {Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation ! = null) { Class<? > eventType = parameterTypes[0]; If (findstate.checkadd (method, eventType)) {// create a subscribe method object and encapsulate the corresponding method object, eventType, thread mode, priority, stickiness event into the SubscriberMethod object. ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: Must be public, non-static, and non-abstract "); }}}Copy the code

The logic of this Method is also very simple, by retrieving the Class object of the subscriber in FindState, and then by reflection retrieving all Method objects with @SUBSCRIBE and parameter 1, and reading the type EventType of this parameter. Then read thredMode, Priority, and sticy in the annotations, and finally assemble all these data into the newly created SubscriberMethod object, which is finally added to the subscriberMethods collection in FindState.

The actual subscription method is subscribe

When all of the subscriber’s methods are found, the subscribe() method is finally iterated over and viewed:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class<? > eventType = subscriberMethod.eventType;// Step 1, wrap each Subscription method and subscriber as Subscription
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);

        // Step 2, get all the Subscription in the corresponding event and check if it is added repeatedly
        CopyOnWriteArrayList<Subscription> subscriptions = (CopyOnWriteArrayList)this.subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList();
            this.subscriptionsByEventType.put(eventType, subscriptions);
        } else if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
        }

        // Step 3, add the currently newly encapsulated Subscription object to subscriptionsByEventType according to the priority
        int size = subscriptions.size();
        for(int i = 0; i <= size; ++i) {
            if (i == size || subscriberMethod.priority > ((Subscription)subscriptions.get(i)).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break; }}// Step 4, add the event types subscribed by the current subscriber to typesBySubscriberList<Class<? >> subscribedEvents = (List)this.typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList();
            this.typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        // step 5, if the method has subscribed to stickyEvents, fetch the corresponding sticky event from stickyEvents and send it
        if (subscriberMethod.sticky) {
            if(eventInheritance) { Set<Map.Entry<Class<? >, Object>> entries = stickyEvents.entrySet();for(Map.Entry<Class<? >, Object> entry : entries) { Class<? > candidateEventType = entry.getKey();if(eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}else {
                Object stickyEvent = this.stickyEvents.get(eventType);
                this.checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}Copy the code

The main process in the above method is as follows:

  • Step 1, wrap each Subscription method and subscriber as Subscription.
  • Step 2, get all the Subscription in the corresponding event and determine if it is added repeatedly.
  • Step 3, according topriorityTo add the currently newly encapsulated Subscription object to subscriptionsByEventType. (Once the priority is set, EvenBus can send events to subscribers in priority order)
  • Step 4 add the event types that the current subscriber subscribes to to typesBySubscriber.
  • Step 5, if the method has subscribed to stickyEvents, fetch the corresponding sticky event from stickyEvents and send it.

Combined with the rough EventBus flow we drew at the beginning, this method does exactly what the red dotted box shows below:

As for the knowledge of sticky events, we need to understand the sending process of events, which will be introduced in detail in the following paragraphs.

Event sending process source code analysis

The sending of events can be divided into simple events and sticky events. The corresponding methods are post() and postSticky() respectively. Here we first look at the sending of simple events, the code is as follows:

Sending of simple events

Public void post(Object event) {// Step 1, obtain the PostingThreadState independently owned by the current thread, and obtain the eventQueue from it. Will send the event is added to the queue EventBus. PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); // Step 2: Determine whether the current thread is distributing events. If not, loop through the events in the event queue and distribute the events until the current event queue is empty. postingState.isPosting) { postingState.isMainThread = this.isMainThread(); postingState.isPosting = true; // If the current distribution event status is cancelled, Canceled) {throw new EventBusException("Internal error. Abort state was not reset "); } // Loop through the event queue and send the message. Try {while(! eventQueue.isEmpty()) { this.postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; }}}Copy the code

A unique PostingThreadState object is created in EventBus for each thread that calls the POST () method, which records the status of the current thread storing and sending messages. The internal structure is as follows:

PostingThreadState uses a partner of ThreadLocal that is not familiar with ThreadLocal. Check out this article on Android’s ThreadLocal Handler mechanism

That is, when we call the eventBus.post () method, we fetch the message from the EventQueue queue and then actually send the message by calling the postSingleEvent () method, which looks like this:

  private void postSingleEvent(Object event, EventBus.PostingThreadState postingState) throws Error { Class<? > eventClass = event.getClass();boolean subscriptionFound = false;
        // Step 1: 👇 Check whether the event is sent
        if (this.eventInheritance) { List<Class<? >> eventTypes = lookupAllEventTypes(eventClass);int countTypes = eventTypes.size();
            for(int h = 0; h < countTypes; ++h) { Class<? > clazz = (Class)eventTypes.get(h);//👇 loop over the event and send
                subscriptionFound |= this.postSingleEventForEventType(event, postingState, clazz); }}else {
            // Step 2: 👇 If event delivery is not supported, start sending events here.
            subscriptionFound = this.postSingleEventForEventType(event, postingState, eventClass);
        }
        // Step 3: If no way to subscribe is found, prompt the user
        if(! subscriptionFound) {if (this.logNoSubscriberMessages) {
                this.logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }

            if (this.sendNoSubscriberEvent && eventClass ! = NoSubscriberEvent.class && eventClass ! = SubscriberExceptionEvent.class) {this.post(new NoSubscriberEvent(this, event)); }}}Copy the code

The method consists of the following three steps:

  • Step 1: Pass Boolean variableseventInheritanceCheck whether the event is supported. If yes, the event is passedlookupAllEventTypes()Method to get the sent event ancestor class and its interface. Then throughpostSingleEventForEventType()Method, send them all out,
  • Step 2: Step 1 returns false so use it directlypostSingleEventForEventType()Method to send events.
  • Step 3: If no relevant subscription method is found, prompt the user that there is no relevant subscription method.

The Boolean variable eventInheritance defaults to false and can be configured using EventBusBuilder.

So what is event delivery and sending? Let’s look at the lookupAllEventTypes() method:


    private staticList<Class<? >> lookupAllEventTypes(Class<? > eventClass) {synchronized(eventTypesCache) { List<Class<? >> eventTypes = eventTypesCache.get(eventClass);if (eventTypes == null) {
                eventTypes = newArrayList<>(); Class<? > clazz = eventClass;//👇 gets all the ancestor classes of this class and their interfaces
                while(clazz ! =null) {
                    eventTypes.add(clazz);
                    addInterfaces(eventTypes, clazz.getInterfaces());
                    clazz = clazz.getSuperclass();
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            returneventTypes; }}// Add the interface to the collection
    static void addInterfaces(List
       
        > eventTypes, Class
        [] interfaces)
       > {
        for(Class<? > interfaceClass : interfaces) {if(! eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); }}}Copy the code

In this method, all of the ancestor classes and their interfaces that sent the event are retrieved and returned as a collection. Once the collection is retrieved in the postSingleEvent method, all of the data in the collection is sent. What is the effect of this? If our current inheritance system is Aevent -> Bevent -> Cevent (-> inheritance), then all other subscribers who subscribed to Bevent and Cevent will receive the message by sending Aevent.

We continue to view the postSingleEventForEventType () method, the code is shown below:

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class
        eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        //👇 takes previously accessed Subscription from the cache
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if(subscriptions ! =null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    //👇 here to find the corresponding method, start to switch threads.
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break; }}return true;
        }
        return false;
    }

Copy the code

The logic of this method is very simple, which is to get the Subscription stored in our previous subscriptionsByEventType collection, Canceled, subscription, and isMainThread are set to PostingState based on the current thread state, and postToSubscription() is used to implement event delivery.

The process so far looks like this:

postToSubscription()

The postToSubscription() method is the code that actually passes the event to the subscriber. To view the method:

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if(mainThreadPoster ! =null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: "+ subscription.subscriberMethod.threadMode); }}Copy the code

From this method, we get the threadMode threadMode in the Subscription member variable SubscriberMethod to determine which threads the Subscription method needs to execute. If the current thread pattern is POSTING, the default is to call the invokeSubscriber() method directly. The specific code is as follows:

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //👇 calls subscription methods directly through reflection.
            subscription.subscriberMethod.method.
            invoke(subscription.subscriber, event);
        }
        // Omit some code
    }
Copy the code

In other schemas, the enqueue() method is called based on the corresponding poster to control the thread on which the subscription method is executed. The following three posters are provided in EventBus to control the thread on which the subscription method runs.

  • HandlerPoster (switch to main thread)
  • BackgroundPoster (Switch to background thread)
  • AsyncPoster (Switch to background thread)

The above three posters all implement the Poster interface, and internally maintain a queue named PendingPostQueue, which takes PendingPost as the storage unit. The PendingPost stores the Subscription we found based on the current event and the current event.

Then combining the whole process, we can get the following figure:

A brief explanation of the figure above.

  • When we callEventBus.post()When a simple event is sent, the event is placed in a thread-specificPostingThreadState çš„ EventQueueIn the.
  • And then it’s going to change from beforesubscriptionsByEventTypeCollection associated with the eventSubscription.
  • And then we’ll findSubscriptionIs encapsulated with the currently sent eventPendingPostAnd add to the correspondingPosterIn thePendingPostQueueIn the queue.
  • Finally correspondingPosterRetrieves the corresponding from the queuePendingPostCall the subscriber’s subscription method through reflection.

Where the subscribing method executes the thread’s rule as follows:

Thread switching

In the previous section, the thread on which the subscriber’s subscription method executes is implemented by three posters inside EventBus. Let’s look at the implementation of these three posters.

HandlerPoster
public class HandlerPoster extends Handler implements Poster { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; Looper protected HandlerPoster(EventBus EventBus, Looper Looper, int maxMillisInsideHandleMessage) { super(looper); this.eventBus = eventBus; this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); Synchronized (this) {//👇 place PedingPost in PendingPostQueue and send message queue.enqueue(pendingPost); if (! handlerActive) { handlerActive = true; if (! SendMessage (obtainMessage())) {throw new EventBusException("Could not send handler message "); } } } } @Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); PendingPost PendingPost = queue.poll(); PendingPost PendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; }}} //👇 calls the subscriber's subscription method directly through reflection. eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (! SendMessage (obtainMessage())) {throw new EventBusException("Could not send handler message "); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; }}}Copy the code

The logic in HanderPoster is easy to understand, inherits from the Handler, and initializes by default with the main thread’s Looper so that messages sent by that Handler will be processed in the main thread.

Analyze the main steps in HanderPoster:

  • In the callenqueue()Method, will encapsulate what we had beforePendingPostIn thePendingPostQueueA message is sent simultaneously in a queue.
  • inhandleMessage()Method, fromPendingPostQueueRetrieves the nearest from the queuePendingPostAnd then straight througheventBus.invokeSubscriber()Reflection executes the subscriber’s subscription method.
BackgroundPoster
final class BackgroundPoster implements Runnable.Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        // Use a thread pool to submit tasks, which is thread-safe.
        synchronized (this) {
            queue.enqueue(pendingPost);
            if(! executorRunning) { executorRunning =true;
                eventBus.getExecutorService().execute(this); }}}@Override
    public void run(a) {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return; } } } eventBus.invokeSubscriber(pendingPost); }}catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e); }}finally {
            executorRunning = false; }}}Copy the code

The biggest difference between BackgroundPoster and HandlerPoster is that it uses a thread pool internally, and the class also implements the Runnable interface.

In the enqueue() method in BackgroundPoster, the default thread pool DEFAULT_EXECUTOR_SERVICE in EventBus is used to submit tasks by default, which is declared as follows:

 private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
Copy the code

CachedThreadPool is suitable for a large number of small tasks

Similarly, BackgroundPoster is simply a reflection call to the subscriber’s subscription method, except that it is executed on a non-main thread in the thread pool.

Note that EventBus is always thread-safe when sending messages in any thread. This can be seen in the code for BackgroundPoster.

AsyncPoster
class AsyncPoster implements Runnable, Poster { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); } @Override public void run() { PendingPost pendingPost = queue.poll(); If (pendingPost == null) {throw new IllegalStateException("No pendingPost available "); } eventBus.invokeSubscriber(pendingPost); }}Copy the code

I’m not going to explain AsyncPoster here, but I’m sure you can understand from the previous content.

Sending of sticky events

Now we have one last piece of knowledge left, which is the delivery of sticky events. To send sticky events in EventBus, we need to call the method postSticky(), which looks like this:

  public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        post(event);
    }
Copy the code

From the code, we can see that the only difference between sending a sticky event and sending a simple event is that the sent event is added to the stickyEvents collection. So why do it? Before we can understand exactly why, we need to understand the concept of sticky events.

The concept of sticky events: when the subscriber has not subscribed to relevant event A, the program has already sent some event A. According to normal logic, when the subscriber starts to subscribe to event A, it cannot receive the event A that has already been sent by the program, but we want to receive those messages that have already been sent. This kind of out-of-date, re-accepted event is called a sticky event.

So according to the idea of sticky events, we need to store the events that have been sent and do special processing during the subscription of sticky events, that is, in the eventBus.register () method. Remember the subscribe() method from earlier in the registration process? This method internally handles sticky events in a special way, as shown below:

 private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        // Omit some code
        // Determine if it is a sticky event
        if (subscriberMethod.sticky) {
            //👇 supports sticky events for event delivery
            if(eventInheritance) { Set<Map.Entry<Class<? >, Object>> entries = stickyEvents.entrySet();for(Map.Entry<Class<? >, Object> entry : entries) { Class<? > candidateEventType = entry.getKey();if(eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}else {
                //👇 starts executing the subscription method.Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}Copy the code

In the above logic, it derives from the stickyEvents before sending the event, and then call checkPostStickyEventToSubscription (). The code for this method is as follows:

 private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if(stickyEvent ! =null) { postToSubscription(newSubscription, stickyEvent, isMainThread()); }}Copy the code

And because checkPostStickyEventToSubscription () method invokes the internal postToSubscription () method. Finally, the subscriber can receive the previously sent event and execute the corresponding subscription method.

The last

The main flow of EventBus has now been covered. From the actual code, we can not only see its good code specification and encapsulation ideas. You can also see performance improvements in the framework, especially with the addition of some necessary caching. I believe the above points are worthy of our reference and reference. In the next article, we’ll look at the “acceleration engine” indexing class in EventBus. Interested partners can continue to pay attention to.