The Reactor Network Programming Model is a high-performance network programming service architecture pattern proposed by Dung Lea, author of the Java.util. concurrent package, in Scalable IO in Java, Netty, High-performance NIO service frameworks such as Mina use the Reactor schema.

1. Internet services

Network services exist in different forms, such as Web services, distributed services, and so on, but they mostly have the same processing flow.

  1. Receives a request to read byte stream data from network I/O.
  2. Decodes the request, turning the byte stream into a convention object.
  3. Process the business logic.
  4. The response is encoded, processed by the business logic, into a byte stream.
  5. Returns the response and sends the response to the network I/O.

2. Traditional service design pattern

Start a new thread in the network service for each connection processing, as shown below:

As I’ve analyzed in how I took NIO down from its pedestal, this model relies heavily on threads and the system doesn’t scale well enough to handle the volume of requests.

3. High-performance scalable service design pattern

In building high-performance scalable network services, we hope to achieve the following goals:

  1. Graceful downgrading under high volume requests and high load.

  2. Upgrading hardware resources can continuously improve system performance (CPU, memory, disk, bandwidth).

  3. Also meet availability and performance goals

    • Low latency
    • High load
    • Adjustable quality of service

Divide-and-conquer is usually the best way to achieve any scalability goal.

4. Divide and Conquer

  1. Break down the entire process into smaller tasks.
  2. Each small task performs one action and does not block.
  3. Execute the task when it is ready. Here, an I/O event is usually used as a trigger.

Java.nio provides the basic mechanism for implementing divide-and-conquer

  • Non-blocking reads and writes.
  • Distribute related tasks (read ready, write ready, connect events) based on I/O events.

Event-driven design pattern brings rich scalability to high-performance network service architecture.

5. Event-driven Designs

Event-driven design patterns are often more effective than other options.

  • Fewer resources: no need to start a new thread for each client.
  • Low overhead: fewer thread context switches, fewer locks.

But task scheduling is slower and harder to program. Because operations must be manually bound to events, the associated functionality must be unwrapped into simple non-blocking operations similar to GUI event-driven mechanisms, and of course it is not possible to eliminate all blocking such as GC, page faults, etc. You must track the logical state of the service (because it is event-driven, you need to determine the actions to be performed based on the state).

5.1 Event-driven design in AWT

6. Reactor model

  • The Reactor pattern responds to I/O events by assigning appropriate handlers, similar to AWT threads.
  • Each Handler performs a non-blocking action, similar to an ActionListeners.
  • Handlers are managed by binding them to events, similar to addActionListeners of AWT.

Reactor Model Core roles

  • Reactor: distributes I/O events to acceptors or corresponding handlers.
  • Acceptor: Creates a Handler for new client connections.
  • Handler: Performs non-blocking read/write tasks.

6.1 Single-threaded Mode

  1. Reator thread initialization
@Slf4j
public class ServerReactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    @Setter
    private volatile boolean stop = false;

    // ************ Reactor 1: Setup ***********************
    
    public ServerReactor(int port, int backlog) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(port), backlog);
        serverSocket.setReuseAddress(true);
        serverSocketChannel.configureBlocking(false); // Register a channel with the multiplexer, And to monitor the ACCEPT events SelectionKey SelectionKey = serverSocketChannel. Register (selector, SelectionKey. OP_ACCEPT); log.info("ServerSocket-Key: [{}]",selectionKey); SelectionKey. Attach (new Acceptor(Selector, selectionKey, serverSocketChannel)); } @Override public voidrun() {/ / * * * * * * * * * * * * Reactor 2: Dispatch Loop * * * * * * * * * * * * * * * * * * * * * * * try {/ / unlimited receive client connectionwhile(! stop && ! Thread.interrupted()) { int num = selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator();while(it.hasNext()) { SelectionKey key = it.next(); Remove (); // Remove key, otherwise it will cause events to consume it.remove(); KeyUtil.keyOps(key); try { dispatch(key); } catch (Exception e) {if(key ! = null) { key.cancel();if(key.channel() ! = null) { key.channel().close(); } } } } } } catch (IOException e) { log.error("{}", e);
        }
        if(selector ! = null) { try { selector.close(); } catch (IOException e) { log.error("{}", e); }} private void dispatch(SelectionKey key) {// This is an Acceptor. // This is a Handler. Runnable Runnable = (Runnable) key.attachment(); runnable.run(); }}Copy the code
  1. Step 2: Acceptor creation
@Slf4j
public class Acceptor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    final SelectionKey selectionKey;

    // ************ Reactor 3: Acceptor ***********************

    public Acceptor(Selector selector, SelectionKey selectionKey, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.selectionKey = selectionKey;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        try {
            if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                SocketChannel socketChannel = serverSocketChannel.accept();
                log.info("channel [{}->{}] establish", socketChannel.getRemoteAddress(), socketChannel.getLocalAddress()); // Create the corresponding Handler new BasicHandler(selector, socketChannel); } } catch (IOException e) { log.error("{}", e); }}}Copy the code
  1. Step 3: Handler creation
@Slf4j
public class BasicHandler implements Runnable {
    final SelectionKey selectionKey;
    final SocketChannel socketChannel;

    // ************ Reactor 4: Handler ***********************

    public BasicHandler(Selector selector, SocketChannel socketChannel) throws IOException {
        this.socketChannel = socketChannel;
        this.socketChannel.configureBlocking(false); selectionKey = this.socketChannel.register(selector, 0); // attach replace selectionKey. Attach (this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); log.info("Socket-Key: [{}]", selectionKey);

    }

    @Override
    public void run() {
        try {
            if (selectionKey.isReadable()) {
                doRead();
            } else if (selectionKey.isWritable()) {
                doWrite();

            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doWrite() {
        Scanner scanner = new Scanner(System.in);
        new Thread(() -> {
            while (scanner.hasNext()) {
                try {

                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                    writeBuffer.put(scanner.nextLine().getBytes());
                    writeBuffer.flip();
                    socketChannel.write(writeBuffer);
                } catch (Exception e) {

                }
            }
        }).start();
        selectionKey.interestOps(SelectionKey.OP_READ);
    }

    protected void doRead() throws IOException {// setup1: **** Reads data once *** ByteBufferreadBuffer = ByteBuffer.allocate(1024);
        int readBytes = socketChannel.read(readBuffer);
        if (readBytes > 0) {//readBuffer);
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        } else if (readBytes < 0) {
            selectionKey.cancel();
            socketChannel.close();
        }

    }

    protected void process(ByteBuffer readBuffer) {/ / setup2: * * * * * * * decoding / / setup3: * * * * * * * processing datareadBuffer.flip();
        byte[] bytes = new byte[readBuffer.remaining()];
        readBuffer.get(bytes);
        log.info("recv client content: " + new String(bytes));
        try {
            TimeUnit.SECONDS.sleep(10);
            log.info("Business processing completed"); } catch (InterruptedException e) { e.printStackTrace(); } // setup4: **** ***}}Copy the code

This model applies to scenarios where the Handler can quickly process business logic. Serialization can minimize lock contention, but it can’t make full use of CPU multi-core resources.

6.2 Single-reactor multi-work model

In order to make full use of the advantage of multi-core, we can use multithreading model used to handle the IO operations to obtain higher scalability, business processing method (process) in general may take longer, and business processing block will affect the performance of Reacotr, so we can put the business process of * * IO operations (encoding/decoding, Business logic)** to be handled by the Work thread pool.

  1. Unloading non-IO operations to improve the Reactor thread’s processing performance is similar to the Proactor design in POSA2.
  2. Is simpler than redesigning non-IO operations to be event-driven.
  3. It is difficult to overlap with IO. It is better to read all input into the buffer first.
  4. Tuning and controlling thread resources using thread pools typically requires a much smaller number of threads than the number of clients.

@Slf4j
public class MultiThreadHandler extends BasicHandler {
    static final int threadPoolSize = Runtime.getRuntime().availableProcessors() + 1;
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            threadPoolSize,
            threadPoolSize,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<Runnable>(1000));

    public MultiThreadHandler(Selector selector, SocketChannel socketChannel) throws IOException {
        super(selector, socketChannel);
    }


    @Override
    protected void doRead() throws IOException {// setup1: **** Reads data once *** ByteBufferreadBuffer = ByteBuffer.allocate(1024);
        int readBytes = socketChannel.read(readBuffer);
        if (readBytes > 0) {// Thread pool for business processing executor.execute(new Processer(readBuffer));
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        } else if (readBytes < 0) {
            selectionKey.cancel();
            socketChannel.close();
        }
    }

    class Processer implements Runnable {
        final ByteBuffer buffer;

        Processer(ByteBuffer buffer) {
            this.buffer = buffer;
        }

        @Override
        public void run() { process(buffer); }}}Copy the code

6.3 Multiple Reactor and Worker thread model

To further coordinate CPU and I/O read and write efficiencies and improve system resource utilization. The Reactor can be unwrapped into two parts. The MainReactor listens to sockets for processing accept events, and distributes the established connections to the SubReactor, which is executed by the thread pool for processing I/O read and write events. Netty uses this model.

@Slf4j
public class MainReactor implements Runnable {
    final Selector selector;

    final ServerSocketChannel serverSocketChannel;

    @Setter
    private volatile boolean stop = false;

    public MainReactor(int port, int backlog) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(port), backlog);
        serverSocket.setReuseAddress(true);
        serverSocketChannel.configureBlocking(false); // Register a channel with the multiplexer, And to monitor the ACCEPT events SelectionKey SelectionKey = serverSocketChannel. Register (selector, SelectionKey. OP_ACCEPT); log.info("ServerSocket-Key: [{}]", selectionKey); SelectionKey. Attach (new MultiAcceptor(selectionKey, serverSocketChannel)); } @Override public voidrun() {
        try {
            while(! stop && ! Thread.interrupted()) { int num = selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator();while(it.hasNext()) { SelectionKey key = it.next(); Remove (); // Remove key, otherwise it will cause events to consume it.remove(); KeyUtil.keyOps(key); try { dispatch(key); } catch (Exception e) {if(key ! = null) { key.cancel();if(key.channel() ! = null) { key.channel().close(); } } } } } } catch (IOException e) { log.error("{}", e); } } private void dispatch(SelectionKey key) { Runnable runnable = (Runnable) key.attachment(); runnable.run(); }}Copy the code
@Slf4j
public class MultiAcceptor implements Runnable {
    static final int subReactorSize = Runtime.getRuntime().availableProcessors();
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            subReactorSize,
            subReactorSize,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<Runnable>(1000), new NameThreadFactory("subReactor")); private Selector[] selectors = new Selector[subReactorSize]; // The number of multiplexers is the same as the number of REACTOR threads private ServerSocketChannel ServerSocketChannel; final SelectionKey selectionKey; private volatile int next = 0; public MultiAcceptor(SelectionKey selectionKey, ServerSocketChannel serverSocketChannel) throws IOException { this.selectionKey = selectionKey; this.serverSocketChannel = serverSocketChannel; init(); } public void init() throws IOException {for (int i = 0; i < subReactorSize; i++) {
            selectors[i] = Selector.open();
        }
    }

    @Override
    public synchronized void run() {
        try {
            if (selectionKey.isValid() && selectionKey.isAcceptable()) {

                SocketChannel socketChannel = serverSocketChannel.accept();
                log.info("channel [{}->{}] establish", socketChannel.getRemoteAddress(), socketChannel.getLocalAddress());

                if(socketChannel ! = null) { executor.execute(new SubReactor(selectors[next], socketChannel)); }if(++next == selectors.length) next = 0; } } catch (IOException e) { e.printStackTrace(); }}}Copy the code
@Slf4j
public class SubReactor implements Runnable {

    private Selector selector;

    public SubReactor(Selector selector, SocketChannel socketChannel) throws IOException {
        this.selector = selector;
        new MultiThreadHandler(selector, socketChannel);
    }

    @Setter
    private boolean stop = false;

    @Override
    public void run() {
        // ************ Reactor 2: Dispatch Loop ***********************

        try {
            while(! stop && ! Thread.interrupted()) { int num = selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator();while(it.hasNext()) { SelectionKey key = it.next(); Remove (); // Remove key, otherwise it will cause events to consume it.remove(); KeyUtil.keyOps(key); try { dispatch(key); } catch (Exception e) {if(key ! = null) { key.cancel();if(key.channel() ! = null) { key.channel().close(); } } } } } } catch (IOException e) { log.error("{}", e);
        }
        if(selector ! = null) { try { selector.close(); } catch (IOException e) { log.error("{}", e); } } } private void dispatch(SelectionKey key) { Runnable runnable = (Runnable) key.attachment(); runnable.run(); }}Copy the code

The Reactor model after splitting is more extensible. The responsibilities of different reactors are more specific, and typically a single MainReactor thread and multiple subreactors threads can handle millions of client connections.

References:

  1. Gee.cs.oswego.edu/dl/cpjslide…