Source: YingKe

review

EventBus is one of the most commonly used open source frameworks in our projects. Using EventBus is also very simple. However, the internal implementation of EventBus is not very complicated. EventBus is an Android event publish/subscribe framework that simplifies Android event delivery by decoupling publishers and subscribers.

The working process for EventBus is simple and can be summed up in one sentence: After an event is submitted to EventBus, it looks up all methods subscribed to the event and executes those subscribed methods

Universal noun

  1. Subscriber: The event Subscriber to receive events
  2. OnEvent: this method handles the event in the thread on which the sender sent it, i.eThe handler and the sender are on the same thread
  3. OnEventMainThread: Regardless of which thread the sender sent it from,This method handles tasks on the main thread
  4. OnEventBackgroundThread: This method processes events in the same child thread if the sender is in the main thread, and in one thread if the sender is in the main thread, i.eHandling events must be in the child thread
  5. OnEventAsync: regardless of which thread the sender sent from,The methods are all executed in the thread pool.

Method of use

First we need to register the class that we want to subscribe to events with the EventBus class. The registration code is as follows:

// Registration for version 3.0
EventBus.getDefault().register(this);
       
// Register for version 2.x
EventBus.getDefault().register(this);
EventBus.getDefault().register(this.100);
EventBus.getDefault().registerSticky(this.100);
EventBus.getDefault().registerSticky(this);

Copy the code

As you can see, there are four registration methods in 2.x, which distinguish between normal registration and sticky event registration, and you can choose the priority of receiving events during registration. We won’t go too far into 2.x here. Since version 3.0 prioritizes stickiness and subscription events in a better way, registration in version 3.0 is simple, with just a register() method.

After registering, we need to write a method that responds to the event. The code is as follows:

/3.0version@Subscribe(threadMode = ThreadMode.BACKGROUND, sticky = true, priority = 100)
public void test(XXXEvent str) {}/ / 2. X version
public void onEvent(XXXEvent event) {}public void onEventMainThread(XXXEvent str) {}public void onEventBackgroundThread(XXXEvent str) {}Copy the code

In version 2. X only methods that start with onEvent are registered, and threads that respond to event methods are distinguished by method names onEventMainThread or onEventBackgroundThread, as opposed to 3.0. The @subscribe annotation is used to determine whether the running thread threadMode accepts sticky events and the event priority. Besides, the method name does not need to start with onEvent, so it is more simple and flexible.

3. Send events

You can send an event via EventBus’s post() method, which executes the method of the class that registered the event. Or send a sticky event via postSticky(). The code is the same for version 2.x and version 3.0.

EventBus.getDefault().post(new XXXEvent());
EventBus.getDefault().postSticky(new XXXEvent());
Copy the code

4. Unregister We need to unregister when we no longer need to receive events, and the same applies to the unregister of 2.x and 3.0. The code is as follows:

EventBus.getDefault().unregister(this);

Copy the code

Source code analysis

1. The class diagram

GetDefault (), register(), post(), register(), register(), post(), etc.

// The default EventBus builder
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
 
/ / the singleton
public static EventBus getDefault(a) {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = newEventBus(); }}}return defaultInstance;
    }

 /**
     * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
     * central bus, consider {@link #getDefault()}.
     */
    public EventBus(a) {
        this(DEFAULT_BUILDER);
    }
    // constructor
    EventBus(EventBusBuilder builder) {
        / / log
        logger = builder.getLogger();
        //key: the subscribed event type,value: the collection of all subscribers subscribed to this event is thread-safe sorted by subscribed event
        subscriptionsByEventType = new HashMap<>();
        // Key: subscriber object,value: This subscriber subscribes to a collection of events sorted by subscriber
        typesBySubscriber = new HashMap<>();
        // Sticky event key: class object of the sticky event, value: event object
        stickyEvents = new ConcurrentHashMap<>();
        // The event is handled by the main threadmainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport ! =null ? mainThreadSupport.createPoster(this) : null;
        // Handle the event Background
        backgroundPoster = new BackgroundPoster(this);
        // Events are handled asynchronously by threads
        asyncPoster = new AsyncPoster(this); indexCount = builder.subscriberInfoIndexes ! =null ? builder.subscriberInfoIndexes.size() : 0;
        
        // Subscriber response function information storage and lookup classes are important
        // By default subscriberInfoIndexes == NULL and ignoreGeneratedIndex == false are used to find Method
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
                
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }


Copy the code

3. Source code analysis of the registration process

3.1 Implementation of register() method

3.0 only provides a register() method, so let’s first look at what the register() method does:



    /** * Register an event subscriber to receive events. When the subscriber is no longer interested in receiving events, it must call {@link #unregister(Object)}
     * <p/>
     * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
     * The {@link Subscribe} annotation also allows configuration like {@link* ThreadMode} and priority. * Subscriber methods for handling events must use annotations@SubscribeAnd allows ThreadMode and priority */ to be configured
    public void register(Object subscriber) {
        // First get the subscriber's class objectClass<? > subscriberClass = subscriber.getClass();// Use the subscriberMethodFinder to find which events the subscriber subscribed to. Returns a List of SubscriberMethod objects,SubscriberMethod
        // Contains the Method object for this Method, the ThreadMode in which the subscription will be responded to, the eventType of the subscription, and the subscription preference
        // Pre-priority, and Boolean values for whether sticky events are accepted.
        
        // Subscribers can subscribe to multiple events
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        
        synchronized (this) {
            // Iterate over the subscription
            for(SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); }}}// Subscriptions are registered to subscriptionsByEventType and typesBySubscriber collections
    // Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
       // Subscribe to the event type classClass<? > eventType = subscriberMethod.eventType;// Encapsulate subscriber and subscriberMethod
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        // Based on the event type, get all subscribers interested in that event type
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "+ eventType); }}// Iterate over all subscribers interested in this event type
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            // Subscriptions were subscriptions at the end and subscriptions were subscriptions at a priority location
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break; }}// Based on the subscriber, get all the event types to which it has subscribedList<Class<? >> subscribedEvents = typesBySubscriber.get(subscriber);if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        // Add the current event, one more
        subscribedEvents.add(eventType);
         
        // If the event is sticky
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).Set<Map.Entry<Class<? >, Object>> entries = stickyEvents.entrySet();for(Map.Entry<Class<? >, Object> entry : entries) { Class<? > candidateEventType = entry.getKey();if(eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}else{ Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}// Subscriber Method
    public class SubscriberMethod {
        // The Method object that handles event methods
        final Method method;
        // In which thread is the response subscription ThreadMode,
        final ThreadMode threadMode;
        // Subscribe to eventType
        finalClass<? > eventType;/ / priority
        final int priority;
        // Whether it is sticky
        final boolean sticky;
        /** Used for efficient comparison */
        String methodString;
   }
   / / subscribe
   final class Subscription {
        / / subscriber
        final Object subscriber;
        // Subscriber Method
        final SubscriberMethod subscriberMethod;
    }
    
    
Copy the code

You can see the register () method is very concise, the comments in the code is also very clear, we can see through subscriberMethodFinder. FindSubscriberMethods (subscriberClass) method can return a SubscriberM Ethod object, and the SubscriberMethod contains all the information we need to subscribe() next. So let’s take a look at how findSubscriberMethods() is implemented, right

3.2 Implementation of SubscriberMethodFinder

See how findSubscriberMethods() works:

 // List of cache methods
 private static finalMap<Class<? >, List<SubscriberMethod>> METHOD_CACHE =new ConcurrentHashMap<>();

 List<SubscriberMethod> findSubscriberMethods(Class
        subscriberClass) {
        // Caches a list of methods for the current class
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        
        if(subscriberMethods ! =null) {
            // If there was a cache before, return it
            return subscriberMethods;
        }
        // EvenBus instantiation is generally false
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            // Get the subscription Method
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            // Put it in cache and return it
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            returnsubscriberMethods; }}Copy the code

Look at the implementation of the findUsingInfo method:

  private List<SubscriberMethod> findUsingInfo(Class
        subscriberClass) {
        // Get a FindState object from the cache
        FindState findState = prepareFindState();
        // Initialize the findState we just got
        findState.initForSubscriber(subscriberClass);
        
        while(findState.clazz ! =null) {
             // Get the subscriberInfo and store it in findState, usually null
            findState.subscriberInfo = getSubscriberInfo(findState);
            //一般null
            if(findState.subscriberInfo ! =null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if(findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); }}}else {
                // Use reflection in a class clock to get Method
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        // Get Method and release it
        return getMethodsAndRelease(findState);
    }
    
    
   
    // Four FindState objects are cached
    private static final int POOL_SIZE = 4;
    private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
    // Get it from the cache
    private FindState prepareFindState(a) {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                // If not null, return
                if(state ! =null) {
                    FIND_STATE_POOL[i] = null;
                    returnstate; }}}// No, just a new one
        return new FindState();
    }
      / / FindState class
      static class FindState {
        // declare fianl to avoid reallocating memory
        // A list of subscribed methods
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        // Categorize by event type
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        // Categorize by Method
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);

        // Subscriber typeClass<? > subscriberClass;// Subscriber typeClass<? > clazz;// Skip the parent class is generally false
        boolean skipSuperClasses;
        // Subscriber info, usually null
        SubscriberInfo subscriberInfo;

        void initForSubscriber(Class
        subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null; }}// Use reflection to get Method in a class, and subscriberMethods are stored in findState
 private void findUsingReflectionInSingleClass(FindState findState) {
        / / array Method
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            // Get the methods already defined by the subscriber
            methods = findState.clazz.getDeclaredMethods();
            
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        // Iterate over each method
        for (Method method : methods) {
            // Public /private/protect
            int modifiers = method.getModifiers();
            
            // Subscribe methods must be public, non-static, and non-abstract
            if((modifiers & Modifier.PUBLIC) ! =0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                
                // The parameter type of the methodClass<? >[] parameterTypes = method.getParameterTypes();// Only one parameter can be specified
                if (parameterTypes.length == 1) {
                    
                    // Method gets a Subscribe annotation
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if(subscribeAnnotation ! =null) {
                        // in the case of a method modified by a Subscribe annotation, the first argument type is the eventType that the subscriber cares about, eventTypeClass<? > eventType = parameterTypes[0];
                        // Add eventType and method to findState
                        if (findState.checkAdd(method, eventType)) {
                            // Annotate the object's threadMode variable
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            // Add subscriberMethod to findState
                            findState.subscriberMethods.add(newSubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); }}}else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has "+ parameterTypes.length); }}else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); }}}// Get subscriberMethods from findState and release them
     private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        / / findState subscriberMethods
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        
        // The findState object is recycled to empty the 4 fianL collections to avoid rebuilding
        findState.recycle();
        // cache it
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break; }}}/ / return
        return subscriberMethods;
    }


Copy the code

EventBus register() : EventBus register() : EventBus register()

3.3 Event distribution process source code analysis

Eventbus.getdefault ().post(XXXEvent event); To send an event, so let’s start with this line of code, first looking at how the POST () method is implemented:


    /** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingState is a variable saved by the current calling thread
        PostingThreadState postingState = currentPostingThreadState.get();
        // A collection of events stored in the current thread
        List<Object> eventQueue = postingState.eventQueue;
        // Save the event to be sent in the event queue
        eventQueue.add(event);
         
         // If the current thread's send state, postingState, is not sent, other events will be queued directly
        if(! postingState.isPosting) {// Whether the current sending thread is the main thread
            postingState.isMainThread = isMainThread();
            // In process of sending
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                // Loop through a single event
                while(! eventQueue.isEmpty()) {Send a single event
                    postSingleEvent(eventQueue.remove(0), postingState); }}finally {
                // Initialize the send state maintained by the current thread
                postingState.isPosting = false;
                postingState.isMainThread = false; }}}Copy the code

The first is through currentPostingThreadState. Get () method to get the current thread PostingThreadState object, why is that the current thread let’s take a look at currentPostingThreadState implementation:

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue(a) {
            return newPostingThreadState(); }};Copy the code

CurrentPostingThreadState implementation is a contains PostingThreadState ThreadLocal object, about the ThreadLocal: internal data storage class ThreadLocal is a thread, It allows you to store data in a specified thread that is not shared with other threads. The idea is that by generating an array of wrapped generic objects, different threads will have different array indexes, so that each thread can fetch only the data corresponding to its own thread through the get() method. Take a look at PostingThreadState:

 /** For ThreadLocal, much faster to set (and get multiple values). */
     // The state of the data stored in the thread
    final static class PostingThreadState {
        // The event queue saved by the thread
        final List<Object> eventQueue = new ArrayList<>();
        // In process of sending
        boolean isPosting;
        // Post indicates whether the sending thread is the main thread
        boolean isMainThread;
        
        // Event subscribers and subscribers to Method
        Subscription subscription;
        / / the event object
        Object event;
        // Whether to cancel
        boolean canceled;
    }

Copy the code

Next we look at the postSingleEvent() method:

Send a single event
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        // Event classClass<? > eventClass = event.getClass();// Whether the subscription flag bit is found
        boolean subscriptionFound = false;
        See EventBus Initialization, EventBusBuilder says true
        if (eventInheritance) {
            // Find the class collection of all the parent classes of the current event and the interface that implements themList<Class<? >> eventTypes = lookupAllEventTypes(eventClass);int countTypes = eventTypes.size();
            // Iterate over each event/event's parent class
            for (int h = 0; h < countTypes; h++) { Class<? > clazz = eventTypes.get(h);// Send a single event to a single event classsubscriptionFound |= postSingleEventForEventType(event, postingState, clazz); }}else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if(! subscriptionFound) {if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if(sendNoSubscriberEvent && eventClass ! = NoSubscriberEvent.class && eventClass ! = SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); }}}Copy the code

Let’s look at lookupAllEventTypes:

 /** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
    
    // Find the class set of all the parent classes and interfaces that are currently sending events and put it in the cache
    private staticList<Class<? >> lookupAllEventTypes(Class<? > eventClass) {synchronized(eventTypesCache) { List<Class<? >> eventTypes = eventTypesCache.get(eventClass);// If there is no new add
            if (eventTypes == null) {
                eventTypes = newArrayList<>(); Class<? > clazz = eventClass;// loop add
                while(clazz ! =null) {
                    // Add the current eventClass
                    eventTypes.add(clazz);
                    // Add the interface class of the current eventClass recursively
                    addInterfaces(eventTypes, clazz.getInterfaces());
                    
                    // Find the parent of the current eventClass
                    clazz = clazz.getSuperclass();
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            returneventTypes; }}/** Recurses through super interfaces. */
    // Add the interface class of the current eventClass recursively
    static void addInterfaces(List
       
        > eventTypes, Class
        [] interfaces)
       > {
        for(Class<? > interfaceClass : interfaces) {if(! eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); }}}Copy the code

When an Event is posted, its parent Event is also posted.

Finally see postSingleEventForEventType


   Send a single event for a specific event
    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class
        eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            // All subscribers to the current event + subscribe to Method
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        
        if(subscriptions ! =null && !subscriptions.isEmpty()) {
            // Iterate over all subscribers
            for (Subscription subscription : subscriptions) {
            
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    // Actually send to execute
                    postToSubscription(subscription, event, postingState.isMainThread);
                    
                
                    aborted = postingState.canceled;
                } finally {
                    // Reset postingState
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break; }}return true;
        }
        return false;
    }

 Execute the subscription method according to the thread pattern of the subscription method
 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                The default is to call the subscriber's event-response method directly on the thread that performed the Post operation
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    // If the thread of post is the main thread, the response method is executed on the main thread
                    invokeSubscriber(subscription, event);
                } else {
                    // If the POST thread is not the main thread, it is added to the mainThreadPoster queue and the response method is executed on the main thread
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:// priority the main thread
            
                if(mainThreadPoster ! =null) {
                    // priority the main thread
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    //mainThreadPoster equals null to proceed with the response method in the POST thread
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    // If the post thread is the main thread, the backgroundPoster is queued and executed in the thread pool
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    // If the POST thread is a child thread, the response method continues in the child thread
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                // Whether the POST thread is in the main thread or the child thread, the response method is executed in the child thread
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: "+ subscription.subscriberMethod.threadMode); }}Copy the code

To summarize the code above, get all subscriptions to the event from subscriptionsByEventType and distribute the event through postToSubscription() Invoke () subscriber methods in different threads using different threadmodes. There are four classes of threadmodes:

  • PostThread: The default ThreadMode that calls the subscriber’s event-response method directly on the thread that performs the Post operation, whether it is the master thread (UI thread) or not. When the thread is the master thread, there must be no time-consuming operations in the response method, otherwise there is a risk of blocking the main thread. Application scenario: There is no requirement on whether the operation is performed on the main thread. However, if the Post thread is the main thread, time-consuming operations cannot be performed.

  • MainThread: executes response methods in the MainThread. If the publishing thread is the main thread, the subscriber’s event-response method is called directly, otherwise the message is sent through the main thread’s Handler and processed in the main thread — calling the subscriber’s event-response function. Obviously, methods of the MainThread class also cannot have time-consuming operations to avoid blocking the MainThread. Application scenario: Operations that must be performed on the main thread.

  • BackgroundThread: Executes response methods in background threads. If the publishing thread is not the main thread, the subscriber’s event response function is called directly, otherwise a unique background thread is started to process. Since background threads are unique, they are queued for execution when more than one event occurs. Therefore, although this type of response method is not as performance-sensitive as PostThread and MainThread methods, it is better not to have heavy time-consuming operations or light time-consuming operations that are too frequent, causing other operations to wait. Application scenario: The operation is light and time-consuming and not too frequent, that is, general time-consuming operations can be placed here.

  • Async: An idle thread is used for processing regardless of whether the publishing thread is the master thread or not. Unlike backgroundThreads, Async threads are independent of each other, so there is no problem with threads getting stuck. Application scenario: Time-consuming operations, such as network access.

In the main thread, the child thread executes the response function.

 // Switch to the main thread to execute the response function
 mainThreadPoster.enqueue(subscription, event);
 // Switch to the child thread to execute the response function
 backgroundPoster.enqueue(subscription, event);
Copy the code

During EvnetBus initialization:

mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport ! =null ? mainThreadSupport.createPoster(this) : null;
 backgroundPoster = new BackgroundPoster(this);
        
 / / then see mainThreadSupport createPoster (this)
  // The main thread supports interfaces
  public interface MainThreadSupport {

    boolean isMainThread(a);

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {
        // This is mainlooper
        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread(a) {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            // New a HandlerPoster object
            return new HandlerPoster(eventBus, looper, 10); }}}// Look at HandlerPoster. MainThreadPoster is a HandlerPoster object
    
    public class HandlerPoster extends Handler implements Poster {
    // Queue that did not perform post
    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        // Create a queue
        queue = new PendingPostQueue();
    }

    // mainThreadPoster. Enqueue (subscription, event);
    
    public void enqueue(Subscription subscription, Object event) {
        // Get a post to execute
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        
        synchronized (this) {
            // Join the queue
            queue.enqueue(pendingPost);
            if(! handlerActive) { handlerActive =true;
                 // Then send a Message to the main thread, which is the following handleMessage
                if(! sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message"); }}}}@Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            // Queue is processed in an infinite loop
            while (true) {
                // The queue is null and exits
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return; }}}// The real processing response method is through reflection
                eventBus.invokeSubscriber(pendingPost);
                
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if(! sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return; }}}finally{ handlerActive = rescheduled; }}/ / see BackgroundPoster
  
/** * Posts events in background@author Markus
 */
final class BackgroundPoster implements Runnable.Poster {
    // There is still a queue
    private final PendingPostQueue queue;
    //eventbus
    private final EventBus eventBus;
    
    // Whether the thread is executing
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        // Create a queue
        queue = new PendingPostQueue();
    }
    // Join the queue as backgroundposter.enqueue (subscription, event);
    
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // Get a POST and queue
            queue.enqueue(pendingPost);
            
            // If the thread is not executing, join the thread pool and execute the following run method
            if(! executorRunning) { executorRunning =true;
                eventBus.getExecutorService().execute(this); }}}@Override
    public void run(a) {
        try {
            try {
                
                // Process the POST queue in an infinite loop
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    // The pair column is null
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return; }}}//// really handles the response method through reflectioneventBus.invokeSubscriber(pendingPost); }}catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e); }}finally {
            executorRunning = false; }}}Copy the code

This explains how to switch response functions in the main thread and child threads.

Take a look at the implementation method invokeSubscriber by reflection:



    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            // reflection execution method
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e); }}// Final class post that has not yet been executed
final class PendingPost {

    // as a buffer pool to avoid duplicate object creation
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
    
    // The event object
    Object event;
    // The subscriber + subscribe method for this event
    Subscription subscription;
    // Next post not yet executed
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                returnpendingPost; }}// If there are no objects in the cache pool, a new object will be created
        return new PendingPost(event, subscription);
    }

    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) { pendingPostPool.add(pendingPost); }}}}Copy the code