The design concept of EventBus is based on the Observer pattern, which can be understood by referring to Design Pattern (1) – Observer Pattern.

1. Program example

Using EventBus is very simple, first you need to add Guava’s dependencies to your own project. Here’s a basic example of how EveentBus is used.

public static void main(String... EventBus EventBus = new EventBus("Joker") {// Define an EventBus object, where Joker is the id of the object. Eventbus.register (new EventListener()); Eventbus. post(new Event("Hello every listener, joke begins... )); Public static class Event {public String message; Event(String message) { this.message = message; }} public static class EventListener {public static class EventListener {public static class EventListener {public static class EventListener { Subscribe public void Listen (Event Event) {system.out.println ("Event Listener 1 Event. Message = ") {Subscribe public void Listen (Event Event) {system.out.println ("Event Listener 1 Event  " + event.message); }}Copy the code

First, we encapsulate an Event object and a listener object, EventListener. We then create an EventBus instance using the Constructor of EventBus and register the listener instance above. We then publish an Event using the above EventBus instance. Then, methods in the above registered listeners that use the @SUBSCRIBE annotation declaration and have only one argument of type Event will be raised when the Event is raised.

Conclusion: The difference between EventBus and observer mode is that when a listener is registered, the method is triggered only if it uses the @SUBSCRIBE annotation declaration and the argument matches the published event type. That is, the same listener can listen for multiple types of events, or it can listen for the same event multiple times.

2. Source code analysis of EventBus

2.1 Before Analysis

Ok, so with the example above, you’ve learned the basics of how to use EventBus. Let’s take a look at how this API is implemented for us in Guava. But first, let’s try to think about how we designed the API, ask a few questions, and then take those questions to the source code for answers.

If we were to design such an API, the easiest way would be to extend the observer mode: Each time the eventbus.post () method is called, all observer objects are iterated over and all of their methods are retrieved to determine whether the method uses @SUBSCRIBE and whether the argument type matches the event type published by post(). If so, So we use reflection to trigger this method. In the Observer mode, each observer implements an interface, and when an event is published, we simply call the interface’s methods, but EventBus makes this restriction more general: listeners don’t need to implement any interfaces, as long as the methods use annotations and their parameters match.

As you can see from the above analysis, this involves not only traversing all listeners, but also traversing their methods, and then using reflection to trigger the method when a match is found. First, when the number of registered listeners is large, the efficiency of chain call is not high; Then we have to use reflection to trigger the matching method, which is definitely less efficient. So how do you solve these two problems in Guava’s EventBus?

In addition, we should also pay attention to the difference between observer and listener in the following. Listener refers to the object registered with eventBus.register (). Observer is the object Subscriber in EventBus, which encapsulates all information of a listener, such as the method of listening, etc. Generally, we will not operate the Subscriber object directly, and its access rights can only be accessed in the package of EventBus.

2.2 Start analysis

First, when we initialize an EventBus with new, we actually call the following method:

EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
    this.subscribers = new SubscriberRegistry(this);
    this.identifier = (String)Preconditions.checkNotNull(identifier);
    this.executor = (Executor)Preconditions.checkNotNull(executor);
    this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
    this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
}
Copy the code

Here the identifier is a string type, similar to the ID of EventBus; Subscribers are SubscriberRegistry type. In fact, EventBus uses methods of this instance when adding, removing and iterating observers, and all observer information is maintained in this instance. Executors are thread pools used in event distribution and can be implemented by themselves. Dispatcher is a subclass of the Dispatcher type used to distribute messages to listeners when an event is posted. There are several default implementations of dispatcher for different distribution modes. ExceptionHandler SubscriberExceptionHandler type, it is used to deal with abnormal information, in the default EventBus implementation, will print out the log when abnormal, of course, we can also define your own exception handling strategies.

So, as you can see from the above analysis, if we want to understand how EventBus is registered and unregistered and how events are triggered throughout, we should start with the SubscriberRegistry. Indeed, personally, the implementation of this class is one of the best parts of EventBus.

2.2.1 SubscriberRegistry

Based on the analysis in 2.1, we need to maintain several mappings in EventBus to find and notify all listeners when an event is published, starting with the mapping of the event type -> observer list. As mentioned above, event release in EventBus is for each method. We maintain the type information and method information corresponding to an event in an object, which is observer Subscriber in EventBus. Then, map to the list of observers by event type. When an event is published, simply look for all observers in the list based on the event type and trigger the listener method. This mapping is done in the SubscriberRegistry using the following data structure:

private final ConcurrentMap<Class<? >, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();Copy the code

From the above definition, we can see that the Class type of the event is used to map to the Subscriber list. The Subscriber list here uses the CopyOnWriteArraySet set in Java, which uses CopyOnWriteArrayList at the bottom and encapsulates it, that is, adding the deduplicate operation on the basic set. This is a collection suitable for read more write less scenario, read data is not locked, write data is locked, and a copy of the array.

Now that we know that a mapping is inserted into the above data structure at registration time within the SubscriberRegistry, we can look at how it does this.

Before looking at the register() method, let’s take a look at a few methods that are commonly used within SubscriberRegistry and whose principles are closely related to the issues we’ve raised above. First is the findAllSubscribers() method, which gets the entire set of observers for a given listener. Here’s the code:

private Multimap<Class<? >, Subscriber> findAllSubscribers(Object Listener) {// Create hash table Multimap<Class<? >, Subscriber> methodsInListener = HashMultimap.create(); // Get the listener's type Class<? > clazz = listener.getClass(); UnmodifiableIterator var4 = getAnnotatedMethods(clazz).iterator(); While (var4.hasNext()) {Method Method = (Method)var4.next(); // 1 // iterate over the above methods and create an observer based on the Method and type parameters and insert it into the mapping table. Class<? >[] parameterTypes = method.getParameterTypes(); // Event type Class<? > eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(this.bus, listener, method)); } return methodsInListener; }Copy the code

Note here that the Multimap data structure, which is a collection structure provided in Guava, differs from a regular hash table in that it can do one-to-many operations. This is used to store a one-to-many mapping of event types to observers. Note the code in the next section. As we mentioned above, when registering a listener, using reflection to fetch all methods and make a judgment is a waste of performance, and here is the answer to this question:

Here’s how the getAnnotatedMethods() method attempts to get all the registered listening methods (that is, with annotations and only one parameter) from the subscriberMethodsCache:

private static ImmutableList<Method> getAnnotatedMethods(Class<? > clazz) { return (ImmutableList)subscriberMethodsCache.getUnchecked(clazz); }Copy the code

The definition of subscriberMethodsCache here is:

private static final LoadingCache<Class<? >, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<Class<? >, ImmutableList<Method>>() { public ImmutableList<Method> load(Class<? > concreteClass) throws Exception { // 2 return SubscriberRegistry.getAnnotatedMethodsNotCached(concreteClass); }});Copy the code

The mechanism of action here is: When using subscriberMethodsCache. GetUnchecked (clazz) method to obtain the specified listener will try to obtain from the cache, if the cache does not exist in the code will be executed 2 place, Call the getAnnotatedMethodsNotCached SubscriberRegistry () method to obtain the surveillance method. Here we omit the definition of this method, can look at the source code, in fact, use reflection and do some verification, not complicated.

Method findAllSubscribers() () when I register a listener, I first get the listener’s type and then try to get all listener methods from the cache. If not, I go through the methods of that class and add them to the cache. It then iterates over the collection of methods retrieved above, creates an observer based on information such as the event type (known from method parameters) and listener, inserts the event type-observer key-value pair into a one-to-many mapping table and returns it.

Let’s look at the code for the register() method in EventBus:

Void register(Object listener) {// Get the event type - observer map Multimap<Class<? >, Subscriber> listenerMethods = this.findAllSubscribers(listener); Collection eventMethodsInListener; CopyOnWriteArraySet eventSubscribers; Subscribers (Iterator var3 = Listenermethods.asmap ().entryset ().iterator(); var3.hasNext(); eventSubscribers.addAll(eventMethodsInListener)) { Entry<Class<? >, Collection<Subscriber>> entry = (Entry)var3.next(); Class<? > eventType = (Class)entry.getKey(); eventMethodsInListener = (Collection)entry.getValue(); eventSubscribers = (CopyOnWriteArraySet)this.subscribers.get(eventType); If (eventSubscribers == NULL) {CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet(); eventSubscribers = (CopyOnWriteArraySet)MoreObjects.firstNonNull(this.subscribers.putIfAbsent(eventType, newSet), newSet); }}}Copy the code

The register() method in SubscriberRegistry is similar to the unregister() method, and we will not explain it. Let’s look at the logic when calling the eventbus.post () method. Here’s the code:

Public void POST (Object event) {// Call the SubscriberRegistry getSubscribers method to get all observer iterators <Subscriber> eventSubscribers = this.subscribers.getSubscribers(event); If (eventSubscribers. HasNext ()) {this.dispatcher. Dispatch (event, eventSubscribers); } else if (! (event instanceof DeadEvent)) { this.post(new DeadEvent(this, event)); }}Copy the code

As can be seen from the above code, when the EventBus.post() method is called, the SubscriberRegistry getSubscribers method is used to obtain all observers corresponding to the event, so we need to look at this logic first. Here is the definition of this method:

Iterator<Subscriber> getSubscribers(Object event) {ImmutableSet<Class<? >> eventTypes = flattenHierarchy(event.getClass()); // 3 List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); UnmodifiableIterator var4 = eventTypes.iterator(); Subscribers while(var4.hasnext ()) {Class<? > eventType = (Class)var4.next(); CopyOnWriteArraySet<Subscriber> eventSubscribers = (CopyOnWriteArraySet)this.subscribers.get(eventType); if (eventSubscribers ! = null) { subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); }Copy the code

If we trigger an event of type Interger, all listener methods of type Number and Object will receive the event and fire. The logic here is simple: find the observer corresponding to the type of the event and all its superclasses and return it.

2.2.2 the Dispatcher

Let’s look at what the logic of the actual distribution event looks like.

As you can see from the eventbus.post () method, when we use the Dispatcher for event distribution, we need to pass in the current event and all observers as parameters to the method. The distribution is then done inside the method. Finally, the listening method of a listener is triggered by reflection. This part of logic is inside Subscriber, and Dispatcher is the policy interface of event distribution. EventBus provides three default Dispatcher implementations for event distribution in different scenarios:

  1. ImmediateDispatcher: directly traverses all observers in the current thread and distributes events;
  2. LegacyAsyncDispatcher: Asynchronous method, there are two cycles, one first and one after, the former is used to continuously load encapsulated observer objects into the global queue, the latter is used to continuously remove observer objects from the queue for event distribution; In fact, EventBus has a class AsyncEventBus that uses this dispenser for event distribution.
  3. PerThreadQueuedDispatcher: This dispenser uses two thread-local variables for control whendispatch()Method is called to get the observer queue of the current thread and pass the list of incoming observers to the queue. A Boolean thread-local variable is then used to determine whether the current thread is distributing, and if not, events are distributed by traversing the above queue.

All the above three dispatchers will eventually call the Subscriber dispatchEvent() method for event distribution:

Final void dispatchEvent(Final Object event) {this.executor.execute(new Runnable() {public void run() {the try {/ / use reflection to trigger monitor method. The Subscriber enclosing invokeSubscriberMethod (event); } the catch (InvocationTargetException var2) {/ / using internal EventBus SubscriberExceptionHandler handle exceptions Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event)); }}}); }Copy the code

The executor in the above method is the executor, which is obtained via EventBus; Deal with abnormal SubscriberExceptionHandler type is also through the EventBus is available. (This is where fields from the original EventBus constructor are used!) There isn’t much logic to reflection triggering method calls.

In addition, it should be noted that there is also a word class SynchronizedSubscriber, which is different from general Subscriber in that its reflection trigger method is modified by sychronized keyword, that is, its trigger method is locked and thread-safe.

Conclusion:

At this point, we have completed the source code analysis of EventBus. A quick summary:

Three caches and four mappings are maintained in EventBus:

  1. Mapping of event types to observer lists (caching);
  2. Mapping of event types to listener method lists (caching);
  3. Mapping of event types to a list of event types and the types of all their parents (caching);
  4. Observer to listener mapping, observer to listener method mapping;

Observer Subscriber encapsulates listener and listening method inside, which can be triggered by direct reflection. If it is mapped to a listener, it is triggered by the type of listener’s method. Personally, I think this design is very good, because we no longer need to maintain a cache of mapping in EventBus, because the one-to-one mapping has been completed in Subscriber.

Each time listeners are registered or unregistered using EventBus, they are retrieved from the cache first. Reflection is not always used. This improves retrieval efficiency and answers our initial efficiency question. When using reflection to trigger a method call seems inevitable.

Finally, there are a number of data structures used in EventBus, such as MultiMap, CopyOnWriteArraySet, and a number of caching and mapping libraries, most of which come from Guava.

After seeing the implementation of EventBus, I can’t help but feel that Google’s engineers are really great! There’s a lot more to Guava than that!

To learn more about thread-local traversal, see another of my blog posts on the use of ThreadLocal and its source code implementation

Thank you very much for your attention