[Good book sharing: Spring Responsive Programming — JINGdong]

C2 Spring Basic concepts of responsive programming

RxJava library, Java’s first responsive library

2.1 Early Scheme

  • Approach 1: You can use callbacks to communicate across components.
  • Method 2: useFuture (java.util.concurrent.Future)
  • Method 3: BetterCompletionStageCompletableFuture.
  • Method 4: Spring 4ListenableFutureAsyncRestTemplate

To better understand the principles involved, we need to understand a classic design pattern.

Observer pattern: The Subject contains a list of dependencies (observers), and the Subject notifies observers of state changes by calling one of its own methods

The observer pattern registers one-to-many dependencies between objects at run time, one-way communication, decoupled implementation, and efficient allocation of events.

public interface Subject<T> {
    void registerObserver(Observer<T> observer); 
    void unregisterObserver(Observer<T> observer); 
	  void notifyObservers(T event); 
}

public interface Observer<T> {
    void observe(T event);
}
Copy the code

The Dependency Injection container is responsible for finding all Subject instances and registries. (@ EventListener)

Let’s do a simple implementation:

public class ConcreteObserverA implements Observer<String> {
    @Override
    public void observe(String event){
        System.out.println("ConcreteObserverA:"+ event); }}public class ConcreteObserverB implements Observer<String> {
    @Override
    public void observe(String event){
        System.out.println("ConcreteObserverB:"+ event); }}//String event
public class ConcreteSubject implements Subject<String> {
    private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();

    public void registerObserver(Observer<String> observer){
        observers.add(observer);
    }
    
    public void unregisterObserver(Observer<String> observer){
        observers.remove(observer);
    }

	  public void notifyObservers(String event){ observers.forEach( ob -> ob.observe(event)); }}Copy the code

CopyOnWriteArraySet is a thread-safe implementation.

Communication must be aware of thread safety issues.

For Funtional interfaces, such as the Observer, there is only one abstract method interface, so it can be implemented directly with lambda.

@Test
public void test(a){
    Subject<String> subOne = new ConcreteSubject();
    Observer<String> obsA1 = Mockito.spy(new ConcreteObserverA());
    Observer<String> obsB1 = Mockito.spy(new ConcreteObserverB());
    subOne.registerObserver(obsA1);
    subOne.registerObserver(obsB1);

    //lambda way
    Subject<String> subTwo = new ConcreteSubject();
    subTwo.registerObserver(event -> System.out.println("ConcreteObserverA:" + event));
    subTwo.registerObserver(event -> System.out.println("ConcreteObserverB:" + event));
    // same as override observe(String event)
}
Copy the code

Spreading messages in parallel:

private final ExecutorService executorService = Executors.newCachedThreadPool();

public void notifyObservers(String event){
    observers.forEach(ob -> executorService.submit(
        () -> ob.observe(event)
    ));
}
Copy the code

Be careful with multithreading. To prevent resource abuse, we can limit the thread pool size and set the LiVENESS property to Violate.

Based on the above idea of the observer model, a variant of the publish-subscribe model can be developed. To implement event distribution, Spring provides the @EventListener annotation.

  • Observer mode: Topic <==> Observer (–> Trigger subscription <–)
  • Publish and subscribe model:

Publisher – Publish -> Event channel <==> Subscriber (–> Trigger subscription <–)

Event channel, also known as message broker or event bus.

Libraries that implement the publish-subscribe model:

  • MBassador
  • EventBus library provided by Guava

Demo exercise: Display room temperature: Implement a raw responsive interface using a Spring native solution

Domain model:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Temperature{
    private final double value;
}
Copy the code

Analog sensor:

@Component
public class TemperatureSensor{
    private final ApplicationEventPublisher publisher;
    private final Random random = new Random();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    public TemperatureSensor(ApplicationEventPublisher publisher){
        this.publisher = publisher;
    }

    @PostConstruct
    public void startProcessing(a){
        executor.schedule(this::probe, 1, SECONDS);
    }

    private void probe(a){
        var temp = random.nextGaussian() * 10 + 8;
        publisher.publishEvent(new TemperatureSensor(temp));
        executor.schedule(this::probe, random.nextInt(3000), MILLISECONDS); }}Copy the code

The @Component registers it as a bean, and when the bean is ready, the @PostConstruct annotation’s non-static function is called by the Spring framework and triggered to start publishing random temperature sequences. Event generation takes place in a separate ScheduledExecutorService executor.

In Spring Web MVC, you can return not only the generic defined by @Controller, but also:

  • Callable<T>Blocking calls in non-container threads
  • DeferredResult<T>You can callsetResult(T res)Generate asynchronous responses in non-container threads.

After version 4.2, ResponseBodyEmitter can be returned to send multiple objects, each decouple from the message converter. SseEmitter takes this one step further and can send multiple outgoing messages with one incoming request.

StreamimgResponseBody sends raw data asynchronously, making it easier to stream big data without blocking Servlet threads.

Expose SSE endpoints:

Let’s continue building components and write controllers for HTTP communication to implement the demo.

@RestController
public class TemperatureController {
    private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();

    @GetMapping("/temperatre-stream")
    public SseEmitter events(HttpServletRequest request) {
        SseEmitter emitter = new SseEmitter();
        emitter.onTimeout(() -> clients.remove(emitter));
        emitter.onComplete(() -> clients.remove(emitter));
        clients.add(emitter);
        return emitter;
    }

    @Async
    @EventListener
    public void handleMessage(Temperatre t) {
        List<SseEmitter> deadEmitters = new ArrayList<>();
        clients.forEach(emitter -> {
            try {
                emitter.send(t, MediaType.APPLICATION_JSON);
            } catch(Exception e) { deadEmitters.add(emitter); }}); clients.removeAll(deadEmitters); }}Copy the code

SseEmitter’s only purpose is to send SSE events. The client requesting the URI will get a new instance of SseEmitter, which will be registered with the server’s Set

clients. If the SseEmitter times out or completes processing, it will be removed from the clients list.

SseEmitter has established a communication channel, so it needs the server to notify the subscriber when it receives an event about a temperature change. @EventListener can accept events from Spring, and @async marks asynchronous execution, so it is called in a manually configured thread pool. HandleMessage () accepts the Temperatre event and sends the result to each client. Because SseEmitter does not handle error callbacks, we use a try catch here to handle exceptions.

Configure asynchronous support:

@SpringBootApplication
@EnableAsync
public class Application implements AsyncConfigurer {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

    @Override
    public Executor getAsyncExecutor(a){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(60);
        executor.setQueueCapacity(5);
        executor.initialize();
        returnexecutor; }}Copy the code

CorePoolSize is the thread limit for executing the unit.

QueueCapacity is the capacity of the task queue. The default value is integer. MAX_VALUE.

Set the capacity for the ThreadPoolExecutor’s BlockingQueue. Default is Integer.MAX_VALUE. Any positive value will lead to a LinkedBlockingQueue instance; any other value will lead to a SynchronousQueue instance. See Also: LinkedBlockingQueue, SynchronousQueue

MaxPoolSize Controls the total number of threads. If the number of threads is greater than or equal to MaxPoolSize, the new task is processed according to the Policy set by RejectedExecutionHandler.

We can test our responsive interface by running the server program written above

> curl http://localhost:8080/temperature-streamData: {" value ": 14.51242412312} data: {}" value ": 23.08610531321Copy the code

Questions and reflections:

  • The publish-subscribe mechanism of Spring Events is designed to handle application lifecycle events and is not suitable for high load and performance scenarios, and it is risky to rely on Spring for the business architecture, as changes and updates to the framework may lead to errors.
  • Manual allocation of thread pools for asynchronous broadcast events is primitive, so we should use a better asynchronous reactive framework, and reactive