The main module of EventBus

Subscribe

Annotations that indicate which methods can be registered and notified. It requires the annotated method to have one and only one argument, which is the event to register for listening, for example:

class EventBusChangeRecorder {
  @Subscribe 
  public void recordCustomerChange(ChangeEvent e) { recordChange(e.getChange()); }}Copy the code

You do not need to specify events explicitly when registering

eventBus.register(new EventBusChangeRecorder());
Copy the code

Subscriber

Methods annotated by @Subscribe can be called to execute as event handlers

private EventBus bus;
final Object target;
private final Method method;
private final Executor executor;

final void dispatchEvent(final Object event) {
    this.executor.execute(new Runnable() {
        public void run(a) {
            try {
                Subscriber.this.invokeSubscriberMethod(event);
            } catch (InvocationTargetException var2) {
                Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event)); }}}); }Copy the code

Target, the instance of event registration

Method, an instance of a method in Java reflection

DispatchEvent, the exposed call (event dispatch) method

SubscriberRegistry

Event registry class, mainly provides the registration method, unregistration method

void register(Object listener) { Multimap<Class<? >, Subscriber> listenerMethods = findAllSubscribers(listener);for(Entry<Class<? >, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<? > eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue();// CopyOnWriteArraySet was introduced to address thread-safety issues with collections
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
        
      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = newCopyOnWriteArraySet<>(); } eventSubscribers.addAll(eventMethodsInListener); }}Copy the code
void unregister(Object listener) { Multimap<Class<? >, Subscriber> listenerMethods = findAllSubscribers(listener);for(Entry<Class<? >, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<? > eventType = entry.getKey(); Collection<Subscriber> listenerMethodsForType = entry.getValue(); CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);if (currentSubscribers == null| |! currentSubscribers.removeAll(listenerMethodsForType)) {throw newIllegalArgumentException(); }}}Copy the code

As you can see from the code, both methods implement thread-safety, using CopyOnWriteArraySet to neatly address thread-safety issues during registration/deregistration.

CopyOnWriteArraySet: When writing data, it creates a new set and clone the original data to the new set. After writing data to the new set, it replaces the old set with the new set. In this way, data can be written without affecting data read operations, thus solving the problem of read/write concurrency.

Later I have time to parse the source of CopyOnWriteArraySet

EventBus

The combination class of the event bus combines the event distributor (Dispatcher to be mentioned later), event registry, etc., to provide unified external registration, distribution and other functions

private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;

private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;

public void register(Object object) {
   subscribers.register(object);
}

public void unregister(Object object) {
   subscribers.unregister(object);
}

public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if(! (eventinstanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event)); }}Copy the code

In summary, the modules above make up the main framework of EventBus, which solves the problems mentioned in the previous chapter

  • Thread-safe issues: By introducing the thread-safe collection CopyOnWriteArraySet

  • Explicit registration problem: By defining the annotation class Subscribe, mark the method that can be registered, and will listen to the Event as the only parameter of the method, using the Java reflection feature, realize implicit registration

Other modules

Dispatcher

EventBus provides a dedicated event dispatcher and provides two strategies for event distribution, breadth first and depth first.

private static final class PerThreadQueuedDispatcher extends Dispatcher {

  private final ThreadLocal<Queue<Event>> queue;
  private final ThreadLocal<Boolean> dispatching;

  @Override
  void dispatch(Object event, Iterator<Subscriber> subscribers) {
    Queue<Event> queueForThread = queue.get();
    queueForThread.offer(new Event(event, subscribers));

    if(! dispatching.get()) { dispatching.set(true);
      try {
        Event nextEvent;
        while((nextEvent = queueForThread.poll()) ! =null) {
          while(nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); }}}finally{ dispatching.remove(); queue.remove(); }}}}Copy the code

The dispatcher above, when distributed by the same thread (the same thread calls POST), ensures that the event distribution is orderly. Queue was also introduced to implement breadth-first, so let’s look at another depth-first implementation

private static final class ImmediateDispatcher extends Dispatcher {

  @Override
  void dispatch(Object event, Iterator<Subscriber> subscribers) {
    while(subscribers.hasNext()) { subscribers.next().dispatchEvent(event); }}}Copy the code

SubscriberExceptionHandler

In the previous section, we also mentioned the problem of exception handling when calling, and Guava also provided a solution. Guava defines an interface

public interface SubscriberExceptionHandler {
  /** Handles exceptions thrown by subscribers. */
  void handleException(Throwable exception, SubscriberExceptionContext context);
}
Copy the code

Call back to this interface in case of an exception in the call (refer to Subscriber’s dispatchEvent method for full code)

try {
    Subscriber.this.invokeSubscriberMethod(event);
} catch (InvocationTargetException var2) {
    // Call custom exception handling when an exception occurs
    Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
}
Copy the code

Custom exception handling can also be passed in when EventBus is instantiated

public EventBus(SubscriberExceptionHandler exceptionHandler) {
    this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
Copy the code

Above, combined with the questions raised in the previous chapter, we have analyzed Guava’s EventBus. We can see that The Code of EventBus implemented by Google is very elegant and the program is also very robust. They will take many aspects into consideration in the design, which will give us a lot of inspiration for our own programming and code framework.

The last

The entire series on EventBus was finished, and I kept going back to Guava’s source code during the writing process and learned a lot. I suggest you also read the Guava source code to see how the world’s top Java developers code.

Github.com/google/guav…