Series of articles

  • Go | Go language pack static files and how to use the Go – bindata with Gin
  • Go | Gin solve the problem of cross-domain cross-domain configuration

[TOC]


preface

EventBus is Guava’s event handling mechanism and an implementation of the Observer pattern (production/consumption model).

The observer mode is widely used in our daily development. For example, in the order system, changes in order status or logistics information will send APP push, SMS to users, and notify sellers and buyers, etc. In the examination and approval system, the flow of the examination and approval order will be notified to the users who initiated the examination and approval and the leaders of the examination and approval, etc.

The Observer mode is also built-in in the JDK. It has been in the Observer since version 1.0, but the way it is used has not changed as Java versions have evolved rapidly, with many libraries providing simpler implementations. For example, Guava EventBus, RxJava, and EventBus

Why use the Observer mode and the benefits of EventBus?

EventBus advantages

  • Compared to Observer programming is simple and convenient
  • By customizing parameters, synchronous and asynchronous operations and exception handling can be realized
  • It is used by a single process and has no network impact

disadvantages

  • This parameter can only be used by a single process
  • An abnormal restart or exit of the project does not guarantee message persistence

If distributed use is required, MQ will be used

2. How to use EventBus

1. The import library

Gradle

compile group: 'com.google.guava'.name: 'guava'.version: '29.0-jre'
Copy the code

Maven

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0 the jre</version>
</dependency>

Copy the code

After introducing depend on, here we mainly use com.google.com mon. The eventbus. The eventbus class, it provides the register and unregister, post registered to subscribe, unsubscribe, and release information

public void register(Object object);

public void unregister(Object object);

public void post(Object event);
Copy the code

2. Use it synchronously

1. Create an EventBus first

EventBus eventBus = new EventBus();
Copy the code

2. Create a subscriber

In Guava EventBus, the subscription is based on the parameter type, and each subscription method can have only one parameter, with the @Subscribe identifier

class EventListener {

  /** * listen for messages of type Integer */
  @Subscribe
  public void listenInteger(Integer param) {
    System.out.println("EventListener#listenInteger ->" + param);
  }

  /** * listen for String messages */
  @Subscribe
  public void listenString(String param) {
    System.out.println("EventListener#listenString ->"+ param); }}Copy the code

3. Register with EventBus and post messages

EventBus eventBus = new EventBus();

eventBus.register(new EventListener());

eventBus.post(1);
eventBus.post(2);
eventBus.post("3");
Copy the code

The run result is

EventListener#listenInteger ->1
EventListener#listenInteger ->2
EventListener#listenString ->3
Copy the code

We can create as many subscribers as needed to complete the subscription information, and if there are more than one subscriber for a type, all subscription methods are executed

Why is this synchronous?

Guava Event is actually using a thread pool to process the subscribe message, through the source code, you can see that when we use the default constructor to create EventBus, the executor to MoreExecutors directExecutor (), Its implementation calls the Runnable#run method directly, so that it is still executed in the same thread, so that the default operation is still synchronized. This approach also has some advantages, such as decoupling and allowing methods to execute in the same thread for the convenience of the same thread, such as transaction processing

EventBus part of the source

public class EventBus {
  private static final Logger logger = Logger.getLogger(EventBus.class.getName());
  private final String identifier;
  private final Executor executor;
  private final SubscriberExceptionHandler exceptionHandler;
  private final SubscriberRegistry subscribers;
  private final Dispatcher dispatcher;

  public EventBus(a) {
    this("default");
  }

  public EventBus(String identifier) {
    this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
  }

  public EventBus(SubscriberExceptionHandler exceptionHandler) {
    this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
  }

  EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
    this.subscribers = new SubscriberRegistry(this);
    this.identifier = (String)Preconditions.checkNotNull(identifier);
    this.executor = (Executor)Preconditions.checkNotNull(executor);
    this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
    this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler); }}Copy the code

DirectExecutor part of the source

enum DirectExecutor implements Executor {
  INSTANCE;

  private DirectExecutor(a) {}public void execute(Runnable command) {
    command.run();
  }

  public String toString(a) {
    return "MoreExecutors.directExecutor()"; }}Copy the code

3. Use it asynchronously

Guava EventBus provides a simplified solution called AsyncEventBus to simplify operations

EventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
Copy the code

This enables asynchronous use

AsyncEventBus source

public class AsyncEventBus extends EventBus {
  public AsyncEventBus(String identifier, Executor executor) {
    super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }

  public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
    super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
  }

  public AsyncEventBus(Executor executor) {
    super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); }}Copy the code

4. Exception handling

What should I do if an exception occurs during processing? In the source, whether EventBus or can be introduced into the custom AsyncEventBus SubscriberExceptionHandler the handler is called when abnormal, I can be available from the parameters of the exception exception information, Retrieves message information from the context for specific processing

Its interface is declared as

public interface SubscriberExceptionHandler {
  /** Handles exceptions thrown by subscribers. */
  void handleException(Throwable exception, SubscriberExceptionContext context);
}
Copy the code

conclusion

On the basis of the above, we can define some message type to implement different news monitoring and processing, by implementing SubscriberExceptionHandler to deal with the abnormal situation, no matter when the synchronous or asynchronous can

reference

  • Github.com/google/guav…
  • Github.com/greenrobot/…
  • Github.com/ReactiveX/R…