Some time ago, there was a new face in the corporate code called EventBus?? What the hell is this? Go to Baidu:

  • Traditionally, Java’s in-process event distribution has been achieved through explicit registration between publisher and subscriber.
  • EventBus was designed to replace this display registration approach with better decoupling between components.
  • EventBus is not a general-purpose publish-subscribe implementation for interprocess communication.

It’s a publish-subscribe model that replaces event distribution. So how does that work? I can’t imagine what the big guy’s thinking! Go ahead and ask:

Kk: Dude, I see you used EventBus in your project. Can you tell me how it works? Big Man: Do you know the observer mode? Rookie KK: I don’t know… Big man: Hurry to have a look, you will understand.

After watching the observer mode, I thought, “That’s it? “, the guy passing by seemed to see my doubt and said:

Big man: Are you done with observer mode? Rookie KK: HMM! Look out! Big guy: that begin to write a demo by oneself, practice is the only standard that tests truth! Rookie KK: Ok! I’m on it!

First create an EventBus instance to publish the event. For different instances, use the POST method to notify listeners to receive the information:

public class Test {
    public static void main(String[] args) {
        EventBus eventBus = new EventBus();
        eventBus.register(new Subscriber());
        eventBus.post(new Event1("Test a hammer."));
        eventBus.post(new Event2("Test a fruit.")); }}Copy the code

Create two different entity classes:

@Data
public class Event1 {
    private String message;

    public Event1(String message) {
        this.message = message; }}Copy the code
@Data
public class Event2 {
    private String message;

    public Event2(String message) {
        this.message = message; }}Copy the code

All listeners must declare with the @subscribe annotation:

public class Subscriber {
    @Subscribe
    public void subscribe1(Event1 event1){
        System.out.println("1st event" + event1.getMessage());
    }

    @Subscribe
    public void subscribe2(Event2 event2){
        System.out.println("2nd event"+ event2.getMessage()); }}Copy the code

The final result is as follows:

Unity of thought is reached, but is it true that there is complete unanimity in use? With a curious mind, I once again approached the big guy:

Newbie KK: Dude, I compared the observer mode to EventBus, and there seems to be a difference. Big Guy: Oh? What’s different? Bird KK: Your method of use seems not to inform all observers? Big guy: there are some differences indeed, only under the same parameters can be notified, or look at the source code!

Big guy’s operation and we are really not the same, to see understand or have to see the source code, from the root to solve the problem! Let’s take a closer look:

 /** Registers all subscriber methods on the given listener object. */
  void register(Object listener) { Multimap<Class<? >, Subscriber> listenerMethods = findAllSubscribers(listener);for(Entry<Class<? >, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<? > eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = newCopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); }}Copy the code

The first is the receiver registration method; ListenerMethods is a one-to-many Multimap that stores subscriber numbers. Go to method findAllSubscribers and see what the implementation is:

  /** * Returns all subscribers for the given listener grouped by the type of event they subscribe to. */
  privateMultimap<Class<? >, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<? >, Subscriber> methodsInListener = HashMultimap.create(); Class<? > clazz = listener.getClass();for(Method method : getAnnotatedMethods(clazz)) { Class<? >[] parameterTypes = method.getParameterTypes(); Class<? > eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }
Copy the code

Method findAllSubscribers obtains by reflection all methods in the Listener that use the @SUBSCRIBE annotation, Take these methods as the first argument (methods using the @SUBSCRIBE annotation allow only one argument,Method public void) com.test.bus.test.Subscriber.subscribe2(com.test.bus.test.event.Event2,java.lang.String) has @Subscribe annotation but Has 2 parameters. Subscriber methods must have exactly 1 parameter.) type to classify, stored in a Multimap. Then EventBus notifies the receiver via post:

  public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if(! (eventinstanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event)); }}Copy the code

By subscribers. GetSubscribers (event) access to the event all corresponding to the Subscriber, Then use Dispatcher.dispatch (Event, eventSubscribers) to notify Subscriber to execute the corresponding dispatch method:

  • ImmediateDispatcher(Direct notification to all eligible recipients)
private static final class ImmediateDispatcher extends Dispatcher {
    private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while(subscribers.hasNext()) { subscribers.next().dispatchEvent(event); }}}Copy the code
  • LegacyAsyncDispatcher(two loops, one to insert all event objects into the queue and the other to fetch events from the queue and send them to the corresponding receivers)
void dispatch(Object event, Iterator<Subscriber> subscribers) {
    checkNotNull(event);
    while (subscribers.hasNext()) {
    queue.add(new EventWithSubscriber(event, subscribers.next()));
    }

    EventWithSubscriber e;
    while((e = queue.poll()) ! =null) { e.subscriber.dispatchEvent(e.event); }}Copy the code
  • PerThreadQueuedDispatcher (two threads private ThreadLocal object, the former for storage in the current thread event object, the latter to judge whether the current thread being used)
/** Per-thread queue of events to dispatch. */
private final ThreadLocal<Queue<Event>> queue =
    new ThreadLocal<Queue<Event>>() {
        @Override
        protected Queue<Event> initialValue(a) {
        returnQueues.newArrayDeque(); }};/** Per-thread dispatch state, used to avoid reentrant event dispatching. */
private final ThreadLocal<Boolean> dispatching =
    new ThreadLocal<Boolean>() {
        @Override
        protected Boolean initialValue(a) {
        return false; }};void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));

      if(! dispatching.get()) { dispatching.set(true);
        try {
          Event nextEvent;
          while((nextEvent = queueForThread.poll()) ! =null) {
            while(nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); }}}finally{ dispatching.remove(); queue.remove(); }}}Copy the code

The dispatch method can be used in the following three ways:

EventBus(
    String identifier,
    Executor executor,
    Dispatcher dispatcher,
    SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}
Copy the code

Each event handler is handled by a private thread, and its core code is as follows:

/** Dispatches {@code event} to this subscriber using the proper executor. */
  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run(a) {
            try {
              invokeSubscriberMethod(event);
            } catch(InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); }}}); }Copy the code

The method is then executed by reflection:

 void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throwe; }}Copy the code

Finally, there are a lot of details and features that I haven’t studied in depth, but at least a superficial understanding of the basic principles of EventBus. If you want to understand more, read the source code! Finally, I learned how to grow up. If I keep doing this, I will be able to say to EventBus, “Your name is EventBus!”