1 Reactor Model Introduction

Up to now, high-performance network programming has not gotten around the Reactor pattern. Many well-known server software or middleware implementations are based on the Reactor pattern. For example, the Web server Nginx is based on the Reactor schema; Redis, as one of the high performance cache servers, is also based on the Reactor model. Netty, the popular high-performance communication middleware widely used in open source projects, is based on Reactor model.

The Reactor mode is composed of two roles, Reactor thread and Handlers, whose responsibilities are as follows:

  • The Reactor thread is responsible for responding to IO events and sending them to Handlers.
  • Handlers do the non-blocking execution of business processing logic.

2 Fatal defects of multithreaded OIO

In Java OIO programming, the original network server program used a while loop to constantly listen for new connections to the port. If so, a handler is called to complete the transfer processing.

@Slf4j
public class ServerDemo {

    private final int port;

    private final ServerSocket serverSocket;

    public ServerDemo(int port) throws IOException {
        this.port = port;
        this.serverSocket = new ServerSocket();
    }

    public void start(a) throws IOException {
        // Bind local ports
        serverSocket.bind(new InetSocketAddress((this.port)));
        log.info("the server start success on port [{}]".this.port);
        // Loop around waiting for the client to connect
        while (true) {
            The accept method blocks until a client connectsSocket client = serverSocket.accept(); handler(client); }}private void handler(Socket client) {
        if (client == null) return;
        try {
            log.info("handle client request from {}",client.getInetAddress().getHostAddress());
        } finally {
            // Close the current client connection after processing it
            this.close(client); }}private void close(Closeable closeable) {
        if(null! = closeable){try {
                closeable.close();
            } catch(IOException e) { e.printStackTrace(); }}}}Copy the code

The biggest problem with this approach is that if the handle(socket) of the previous network connection is not finished, subsequent connections cannot be received by the server, and subsequent requests are blocked, resulting in low server throughput.

To solve this serious Connection blocking problem, a very classic pattern emerged: the Connection Per Thread pattern

@Slf4j
public class ConnectionPerDemo {

    private final int port;

    private final ServerSocket serverSocket;

    public ConnectionPerDemo(int port) throws IOException {
        this.port = port;
        this.serverSocket = new ServerSocket();
    }

    public void start(a) throws IOException {
        // Bind local ports
        serverSocket.bind(new InetSocketAddress((this.port)));
        log.info("the server start success on port [{}]".this.port);
        // Loop around waiting for the client to connect
        while (true) {
            The accept method blocks until a client connects
            Socket client = serverSocket.accept();
            // Start a thread for processing
            new Thread(newHandler(client)).start(); }}static class Handler implements Runnable {
        final Socket socket;

        Handler(Socket s) {
            socket = s;
        }

        @Override
        public void run(a) {
            handle(socket);
        }

        private void handle(Socket client) {
            if (client == null) return;
            try {
                log.info("handle client request from {}", client.getInetAddress().getHostAddress());
            } finally {
                // Close the current client connection after processing it
                this.close(client); }}private void close(Closeable closeable) {
            if (null! = closeable) {try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}
Copy the code

Each new network connection is assigned to a thread. Each thread alone handles the input and output of its own socket connection. Of course, the server’s listening thread is also independent (the current Demo is the main thread receiving requests), and the input and output processing of any socket connection does not block subsequent socket connection listening and establishment, thus improving the throughput of the server. This is how earlier versions of Tomcat servers were implemented.

The disadvantage of the Connection Per Thread mode is that it consumes a large amount of Thread resources for a large number of connections. Threads are an expensive system resource in a system. If the number of threads is too large, the system will be overwhelmed. There is also a cost to the repeated creation, destruction, and switching of threads. Therefore, in high concurrency application scenarios, the defect of multithreaded OIO is fatal.

In traditional OIO programming, every IO read and write process transmitted by socket is blocked. A thread can process only one socket read/write operation at a time. The previous socket operation is blocked, and other connection I/O operations cannot be processed in parallel. Therefore, in OIO, even if a thread is responsible for processing the input and output of multiple socket connections, the thread can only handle the IO operation of one connection at a time.

2 single-thread Reactor model

The Reactor schema is somewhat similar to the event-driven schema. In event-driven mode, when an event is fired, the event source dispatches the event to a Handler (Handler), which takes care of the event handling. The Reactor role in the Reactor pattern is similar to the event Dispatcher role in the event-driven pattern.

There are two important components in the Reactor schema: Reactor and Handler:

  • Reactor: Queries I/O events and sends an I/O event to the corresponding Handler when it is detected. The I/O events here are the channel I/O events queried by the selector in NIO.
  • Handler: Binds to I/O events (or select keys) and is responsible for processing I/O events, establishing real connections, reading channels, processing service logic, and writing results to channels.

2.1 What is the Single-threaded Reactor Schema

Simply put, the Reactor and Handlers execute in one thread. This is the simplest Reactor model

2.2 Apis used

There are several important member methods that use the SelectionKey:

void attach(Object o): appends the object to the select key.Copy the code

This method can add any Java POJO object as an attachment to the SelectionKey instance. This approach is important because in the single-threaded version of the Reactor pattern implementation you can add Handler instances as attachments to SelectionKey instances.

Object attachment(a): Gets additional objects from the select key.Copy the code

This method is used with attach(Object O) to retrieve the attached Object that was previously added to the SelectionKey instance by attach(Object O). This method is also very important, when the IO event occurs, the select key will be queried by the select method, can directly select the attachment object of the select key.

In the Reactor model implementation, the Handler instance bound by attach(Object O) is taken out by attachment() method, and then the corresponding transmission process is completed by this Handler instance.

In summary, attach and attachment need to be used together in the Reactor model. Attach () method is called after the select key is registered to bind the Handler instance to the select key. When an IO event occurs, the Attachment () method is invoked to retrieve the Handler instance from the select key and distribute the event to the Handler Handler Handler to complete the business processing.

2.3 the Demo

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/ * * *@author wyaoyao
 * @date 2021/6/23 13:36
 */
@Slf4j
public class EchoServerReactor implements Runnable {

    private final int port;

    private ServerSocketChannel serverChannel;

    private final Selector selector;

    private final String host = "localhost";

    private volatile boolean closed = false;

    private Thread innerThread;

    public EchoServerReactor(int port) throws IOException {
        this.port = port;
        // Open a selector
        this.selector = Selector.open();
        // Open the channel
        this.serverChannel = ServerSocketChannel.open();
        / / non-blocking
        this.serverChannel.configureBlocking(false);
        // Register the connection event
        SelectionKey acceptKey = this.serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        // Bind a handler to the connection event key
        acceptKey.attach(new AcceptHandler(this.serverChannel, this.selector));

    }

    /** * Start the service */
    public void start(a) throws IOException {
        // Bind ports
        this.serverChannel.bind(new InetSocketAddress(this.host, this.port));
        innerThread = new Thread(this);
        innerThread.start();
        log.info("echo sever start success on port [{}]".this.port);
    }

    public void close(a) throws IOException {
        if(! closed) {this.closed = true; innerThread.interrupt(); }}private void server(a) throws IOException {
        while(! closed && ! Thread.interrupted()) {// Query for events of interest. This method blocks when no events occur
            this.selector.select();
            // Query the key where the event occurred, iterate over the key, and process it
            Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                // Reactor dispatches received events
                dispatch(iterator.next());
                iterator.remove();
            }
            // Clear the processed selectionKeysselectionKeys.clear(); }}void dispatch(SelectionKey sk) throws IOException {
        Handler handler = (Handler) sk.attachment();
        // Attach the handler object bound to the selection key before calling
        if(handler ! =null) {
            try {
                handler.handler(sk);
            } catch(Exception e) { sk.cancel(); e.printStackTrace(); }}}@Override
    public void run(a) {
        try {
            this.server();
        } catch(IOException e) { e.printStackTrace(); }}private interface Handler {
        void handler(SelectionKey selectionKey) throws IOException;
    }

    class AcceptHandler implements Handler {

        private final ServerSocketChannel serverSocketChannel;

        private final Selector selector;

        private AcceptHandler(ServerSocketChannel serverSocketChannel, Selector selector) {
            this.serverSocketChannel = serverSocketChannel;
            this.selector = selector;
        }


        @Override
        public void handler(SelectionKey selectionKey) throws IOException {
            // Get the connection
            SocketChannel client = this.serverSocketChannel.accept();
            //SocketChannel client = (SocketChannel) selectionKey.channel();
            // And set non-blocking
            client.configureBlocking(false);
            InetSocketAddress remoteAddress = (InetSocketAddress) client.getLocalAddress();
            log.info("client connect [{}:{}] has accept", remoteAddress.getHostName(), remoteAddress.getPort());

            // Prevent deadlocks: the SelectionKey selector () method of captured events blocks
            // If the register method is blocked when it is called, register will be blocked here
            Wakeup is called to wakeup the selector
            selector.wakeup();
            // Register a read-ready event for SocketChannel
            SelectionKey read = client.register(this.selector, SelectionKey.OP_READ);
            read.attach(newReadHandler(selector)); }}class ReadHandler implements Handler {

        private ByteBuffer buffer;
        private final Selector selector;

        ReadHandler(Selector selector) {
            this.selector = selector;
            this.buffer = ByteBuffer.allocate(1024);
        }

        @Override
        public void handler(SelectionKey selectionKey) throws IOException {
            StringBuilder stringBuilder = new StringBuilder();
            // Get the channel associated with the current key
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            // Read data
            int len = 0;
            while ((len = channel.read(buffer)) > 0) {
                // Switch to read mode
                buffer.flip();
                String s = new String(buffer.array(), 0, len, StandardCharsets.UTF_8);

                if(! s.contains("\r\n")) {
                    // If there is not enough data in the buffer that the user requested once (in this case, one row), reading continues
                    continue;
                }
                stringBuilder.append(s);
                / / remove
                buffer.clear();
            }
            if (stringBuilder.toString().contains("bye")) {
                // If the client sends a bye message, the current session exits and the key is invalidated
                selectionKey.cancel();
                channel.close();
                return;
            }
            log.info("accept request ==> {}", stringBuilder.toString());

            this.selector.wakeup();
            channel.register(this.selector, SelectionKey.OP_WRITE).attach(newWriteHandler(stringBuilder.toString(), selector)); }}class WriteHandler implements Handler {

        private String request;
        private Selector selector;


        WriteHandler(String s, Selector selector) {
            this.request = s;
            this.selector = selector;
        }

        @Override
        public void handler(SelectionKey selectionKey) throws IOException {
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            if (request == null || request.length() == 0) {
                return;
            }
            String response = "echo: " + request;
            log.info("send response ==> {}", response);
            channel.write(StandardCharsets.UTF_8.encode(response));
            this.selector.wakeup();
            // Make this key focus on read events
            selectionKey.interestOps(SelectionKey.OP_READ);
            selectionKey.attach(newReadHandler(selector)); }}public static void main(String[] args) throws IOException, InterruptedException {
        EchoServerReactor echoServerReactor = new EchoServerReactor(10010); echoServerReactor.start(); }}Copy the code
  1. Implement a simple client test
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;


@Slf4j
public class BlockEchoClient {

    private final SocketChannel socketChannel;
    private final String serverHost;
    private final int serverPort;

    public BlockEchoClient(String serverHost, int serverPort) throws IOException {
        this.serverHost = serverHost;
        this.serverPort = serverPort;
        this.socketChannel = SocketChannel.open();
        // Connect to the server
        SocketAddress remote = new InetSocketAddress(serverHost, serverPort);
        socketChannel.connect(remote);
        log.info("connect echo server success");
    }

    public void send(String message) {
        try {
            BufferedReader reader = getReader(socketChannel.socket());
            PrintWriter writer = getWriter(socketChannel.socket());
            // Send data
            writer.println(message + "\r\n");
            log.info("send request success; content is [{}]", message);
            // Read the response from the server
            String s1 = reader.readLine();
            log.info("get response success; response is [{}]", s1);
        } catch(Exception e) { e.printStackTrace(); }}public void close(a) throws IOException {
        if(socketChannel ! =null){ socketChannel.close(); }}public BufferedReader getReader(Socket socket) throws IOException {
        InputStream inputStream = socket.getInputStream();
        return new BufferedReader(new InputStreamReader(inputStream));
    }

    public PrintWriter getWriter(Socket socket) throws IOException {
        return new PrintWriter(socket.getOutputStream(), true);
    }

    public static void main(String[] args) throws IOException {
        BlockEchoClient blockEchoClient = new BlockEchoClient("localhost".10010);
        blockEchoClient.send("java");
        blockEchoClient.send("hello");
        blockEchoClient.send("bye");
        blockEchoClient.send("hhhhhhh"); }}Copy the code

Multithreaded Reactor

The evolution of multithreading Reactor is divided into two aspects: (1) Upgrading Handler. To use multiple threads while being as efficient as possible, consider thread pools. (2) Upgrade Reactor. Consider introducing multiple selectors to increase the ability to select a large number of channels.

The multithreaded version of the Reactor schema looks like this:

  1. Puts the execution of the IOHandler handler responsible for data transfer processing into a separate thread pool. In this way, the business processing thread can be isolated from the reactor thread responsible for new connection listening, preventing the server’s connection listening from being blocked.
  2. If the server has a multi-core CPU, you can split the reactor thread into multiple SubReactor threads. At the same time, multiple selectors are introduced, and one thread is introduced for each SubReactor, and one thread is responsible for the event polling of one selector. This fully frees up system resources and greatly improves the reactor’s ability to manage a large number of connections or listen to a large number of transmission channels.