In the previous article wrote some NIO related knowledge and simple NIO implementation examples, but the sample, the client connection and read, write, when handling the client data are in one thread, a single thread processing client data, performance is very poor, and can not make full use of the performance of the server, this article mainly introduces the Reactor thread model, NIO’s knowledge of multiplexing to provide performance on the server side.

Single Reactor thread model

The single-reactor thread model, which ultimately uses a single thread, is basically the same as the example at the end of the last article, except that three classes are split for processing.

Reactor thread model based on worker thread

In this thread model, after the client connects, part of the service data is pulled out and processed in the thread pool. The advantage of this method is that if the service data is processed for a long time, the client’s read and write operations will not be affected.The diagram above shows a simple worker thread-based working pattern in which business data processing is isolated and processed in a thread pool.

Multiple Reactor threading model

The final thread model is the multiple Reactor thread model. Client data is also read and written in multiple threads, which greatly improves performance.

Example of a multiple Reactor thread model

MainReactor-Thread and Acceptor code:

   /** ** mainReactor-Thread * receives client connections and submits them to acceptors */
    class MainReactor extends Thread {
        // Create a Selector
        public Selector selector;

        AtomicInteger integer = new AtomicInteger(0);

        public MainReactor(a) throws IOException {
            selector = Selector.open();
        }

        @Override
        public void run(a) {
            while (true) {
                try {
                    / / start the Selector
                    selector.select();
                    // Get the event
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    // Iterate over events
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        if (selectionKey.isAcceptable()) {
                            // Get the client channel
                            SocketChannel socketChannel = ((ServerSocketChannel)selectionKey.channel()).accept();
                            Acceptor acceptor = new Acceptor();
                            // Assign the client channel to the Acceptor for processing
                            acceptor.register(socketChannel);
                        }
                        // Remove it after processingiterator.remove(); }}catch(IOException e) { e.printStackTrace(); }}}public void register(ServerSocketChannel serverSocketChannel) throws ClosedChannelException {
            // Register the OP_ACCEPT event
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        }

       /** * assign a subReactor-Thread */ to each client connection
       class Acceptor {

           public void register(SocketChannel socketChannel) throws IOException {
               // Set to non-blocking mode
               socketChannel.configureBlocking(false);
               int index = integer.getAndIncrement() % subReactors.length;
               SubReactor subReactor = subReactors[index];
               // Assign a subReactor thread to the client connection
               subReactor.register(socketChannel);
               // Start the subReactor thread
               subReactor.start();
               System.out.println("Received new connection:"+ socketChannel.getRemoteAddress()); }}}Copy the code

SubReactor – Threads code:

    /** * a thread is responsible for connecting multiple clients * to read and write data from a channel */
    class SubReactor extends Thread {

        // Create a Selector
        public Selector selector;
        // Determine whether the SubReactor thread has been started
        public volatile boolean isRunning = false;

        public SubReactor(a) throws IOException {
            selector = Selector.open();
        }

        @Override
        public void start(a) {
            // Determine whether the SubReactor thread has been started
            // If not, start the SubReactor thread
            if(! isRunning) { isRunning =true;
                super.start(); }}@Override
        public void run(a) {
           while (isRunning) {
               try {
                   / / start the Selector
                   selector.select();
                   // Get the event
                   Set<SelectionKey> selectionKeys = selector.selectedKeys();
                   Iterator<SelectionKey> iterator = selectionKeys.iterator();
                   // Iterate over events
                   while (iterator.hasNext()) {
                       SelectionKey selectionKey = iterator.next();
                       if (selectionKey.isReadable()) {
                           try {
                               SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                               new Handler(socketChannel);
                           } catch(Exception e) { e.printStackTrace(); selectionKey.cancel(); }}// Remove it after processingiterator.remove(); }}catch(IOException e) { e.printStackTrace(); }}}public void register(SocketChannel socketChannel) throws IOException {
            // Register the OP_READ event
            socketChannel.register(selector, SelectionKey.OP_READ);
        }

        /** Reads or writes data */
        class Handler {
            // Used to read or write data
            public Handler(SocketChannel socketChannel) throws IOException {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                while(socketChannel.isOpen() && socketChannel.read(readBuffer) ! = -1) {
                    // If there is data to read, it is simply greater than 0
                    if (readBuffer.position() > 0) {
                        break; }}// If there is no data to read, return directly
                if (readBuffer.position() == 0) {
                    return;
                }
                // Switch to read mode
                readBuffer.flip();
                byte[] bytes = new byte[readBuffer.limit()];
                readBuffer.get(bytes);
                System.out.println("New data obtained:" + new String(bytes));
                System.out.println("New data obtained from:" + socketChannel.getRemoteAddress());
                // Thread pool, used to process business data
                threadPool.execute(new Runnable() {
                    @Override
                    public void run(a) {}});// Write data to the client
                String response = "HTTP / 1.1 200 OK \ r \ n" +
                        "Content-Length: 11\r\n\r\n" +
                        "hello world";
                ByteBuffer writeBuffer = ByteBuffer.wrap(response.getBytes());
                while(writeBuffer.hasRemaining()) { socketChannel.write(writeBuffer); }}}}Copy the code

Initialization code:

    /** Server channel */
    public ServerSocketChannel serverSocketChannel;
    /** to receive client connections */
    public MainReactor mainReactor;
    /** is used to read and write client connections */
    public SubReactor[] subReactors = new SubReactor[10];
    /** thread pool, used to process business logic after client connection */
    public ExecutorService threadPool = Executors.newCachedThreadPool();

    public static void main(String[] args) throws IOException {
        NioReactor nioReactor = new NioReactor();
        nioReactor.initAndRegister();
        nioReactor.init();
        nioReactor.bind();
    }

    /** Initialize the server */
    public void init(a) throws IOException {
        // Create a server channel
        serverSocketChannel = ServerSocketChannel.open();
        // Set to non-blocking mode
        serverSocketChannel.configureBlocking(false);
        // Register to mainreactor-thread
        mainReactor.register(serverSocketChannel);
        // Start the mainReactor-Thread Thread
        mainReactor.start();
    }

    /** Server bound port */
    public void bind(a) throws IOException {
        serverSocketChannel.socket().bind(new InetSocketAddress(8056));
        System.out.println("Server started successfully");
    }

    /** Initialize the MainReactor and SubReactor */
    public void initAndRegister(a) throws IOException {
        mainReactor = new MainReactor();

        for (int i=0; i<subReactors.length; i++) {
           subReactors[i] = newSubReactor(); }}Copy the code

MainReactorThread and subReactorThread have a lot of duplicate code, which can be extracted and processed:

/** * Multiplex, REACTOR thread model */
public class NioReactor2 {
    abstract class ReactorThread extends Thread {
        // Create a Selector
        public Selector selector;
        // Determine whether the thread has been started
        public volatile boolean isRunning = false;

        /** Call this method */ when an event occurs
        public abstract void handler(SelectableChannel channel) throws IOException;

        public ReactorThread(a) throws IOException {
            selector = Selector.open();
        }

        @Override
        public void run(a) {
            while (isRunning) {
                / / start the Selector
                try {
                    selector.select();
                    // Get the event
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    // Iterate over events
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        int readyOps = key.readyOps();
                        // Focus only on OP_ACCEPT and OP_READ events
                        if((readyOps & (SelectionKey.OP_ACCEPT | SelectionKey.OP_READ)) ! =0 || readyOps == 0) {
                            try {
                                / / for the channel
                                SelectableChannel channel = key.channel();
                                // Set to non-blocking mode
                                channel.configureBlocking(false);
                                // If there is an event, call handler
                                handler(channel);
                                if(! channel.isOpen()) {// If channel is closed, cancel keykey.cancel(); }}catch (Exception e) {
                                e.printStackTrace();
                                // If there is an exception, cancel keykey.cancel(); }}// Remove it after processing
                        iterator.remove();
                    }
                    selector.selectNow();
                } catch(IOException e) { e.printStackTrace(); }}}public SelectionKey register(SelectableChannel channel) throws ClosedChannelException {
            // First registers the Selector without registering any events
            return channel.register(selector, 0);
        }

        @Override
        public void start(a) {
            // Determine whether the SubReactor thread has been started
            // If not, start the SubReactor thread
            if(! isRunning) { isRunning =true;
                super.start(); }}}/** Server channel */
    public ServerSocketChannel serverSocketChannel;
    /** to receive client connections */
    public ReactorThread[] mainReactors = new ReactorThread[1];;
    /** is used to read and write client connections */
    public ReactorThread[] subReactors = new ReactorThread[10];
    /** thread pool, used to process business logic after client connection */
    public ExecutorService threadPool = Executors.newCachedThreadPool();

    /**
     * 初始化mainReactors和subReactors
     */
    public void initAndRegister(a) throws IOException {
        // Subdivision thread. It is used to read and write data after client connection
        for (int i=0; i<subReactors.length; i++) {
            subReactors[i] = new ReactorThread() {
                @Override
                public void handler(SelectableChannel channel) throws IOException {
                    SocketChannel socketChannel = (SocketChannel)channel;
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    while(socketChannel.isOpen() && socketChannel.read(readBuffer) ! = -1) {
                        // If there is data to read, it is simply greater than 0
                        if (readBuffer.position() > 0) {
                            break; }}// If there is no data to read, return directly
                    if (readBuffer.position() == 0) {
                        return;
                    }
                    // Switch to read mode
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.limit()];
                    readBuffer.get(bytes);
                    System.out.println("New data obtained:" + new String(bytes));
                    System.out.println("New data obtained from:" + socketChannel.getRemoteAddress());
                    // Thread pool, used to process business data
                    threadPool.execute(new Runnable() {
                        @Override
                        public void run(a) {}});// Write data to the client
                    String response = "HTTP / 1.1 200 OK \ r \ n" +
                            "Content-Length: 11\r\n\r\n" +
                            "hello world";
                    ByteBuffer writeBuffer = ByteBuffer.wrap(response.getBytes());
                    while(writeBuffer.hasRemaining()) { socketChannel.write(writeBuffer); }}}; }// Switchback threads are used for client connections
        for (int i=0; i<mainReactors.length; i++) {
            mainReactors[i] = new ReactorThread() {
                AtomicInteger integer = new AtomicInteger(0);

                @Override
                public void handler(SelectableChannel channel) throws IOException {
                    // Get the client channel
                    SocketChannel socketChannel = ((ServerSocketChannel)channel).accept();
                    // Set to non-blocking mode
                    socketChannel.configureBlocking(false);
                    int index = integer.getAndIncrement() % subReactors.length;
                    ReactorThread subReactor = subReactors[index];
                    // Start the thread
                    subReactor.start();
                    // Register events
                    SelectionKey key = subReactor.register(socketChannel);
                    key.interestOps(SelectionKey.OP_READ);
                    System.out.println("Received new connection:"+ socketChannel.getRemoteAddress()); }}; }}/** Initialize the server */
    public void init(a) throws IOException {
        // Create a server channel
        serverSocketChannel = ServerSocketChannel.open();
        // Set to non-blocking mode
        serverSocketChannel.configureBlocking(false);
        // Register to mainreactor-thread
        int index = new Random().nextInt(mainReactors.length);
        SelectionKey keys = mainReactors[index].register(serverSocketChannel);
        keys.interestOps(SelectionKey.OP_ACCEPT);
        // Start the mainReactor-Thread Thread
        mainReactors[index].start();
    }

    /** Server bound port */
    public void bind(a) throws IOException {
        serverSocketChannel.socket().bind(new InetSocketAddress(8056));
        System.out.println("Server started successfully");
    }

    public static void main(String[] args) throws IOException {
        NioReactor2 nioReactor = newNioReactor2(); nioReactor.initAndRegister(); nioReactor.init(); nioReactor.bind(); }}Copy the code

conclusion

At this point, the Reactor thread model in NIO is complete. The above example can be split into several classes for processing and can be used to make a simple Tomcat server based on the HTTP protocol portion of the request parsing.