Java NIO has selectors, the kernel provides a variety of non-blocking IO models, and excellent NIO frameworks like Netty have emerged in the Java community. What is the relationship between NIO and the blocking model of the kernel? Why does Java have NIO API and netty framework? What is reactor? This article takes you step-by-step through the code to understand the relationship between Java NIO and system functions, and how Java NIO evolved into the Netty framework.

NIO concept

In the previous sections we mentioned the concept of Nonblocking IO. There is a family of Java NIO packages in Java, and most sources on the web equate Java NIO with Nonblocking IO, which is false. NIO in Java refers to a new set of apis that have been provided since version 1.4 to replace standard Java IO. There are three components:

  • Buffer the Buffer

  • The Channel tunnel

  • The Selector Selector

The specific use of the API is not covered in this article.

The code template

NIO is roughly divided into these steps:

  1. Access to the channel

  2. Set non-blocking

  3. Create a multiplexer selector

  4. Channel is associated with selector

  5. Handle the channel state logic based on the selector returned

Serverketchannel ServerSocketChannel = serverSocketChannel.open (); / / set to non-blocking serverSocketChannel. ConfigureBlocking (false); Serversocketchannel. bind(new InetSocketAddress(PORT)); Selector = Selector. Open (); / / bind multiplexer and channel serverSocketChannel. Register (selector, SelectionKey. OP_ACCEPT); // Get the events that arrivewhile (selector.select() > 0) {
    Set<SelectionKey> keys = selector.keys();
    Iterator<SelectionKey> iterator = keys.iterator();
    while (iterator.hasNext()) {
        SelectionKey selectionKey = iterator.next();
        if(selectionKey isAcceptable ()) {/ / processing logic}if(selectionKey.isreadable ()) {// Process logic}}}Copy the code

Single-threaded example

Referring to the code template, we use NIO to implement an Echo Server. The server code is as follows:

public static void main(String[] args) throws IOException {
    Selector selector = initServer();
    while (selector.select() > 0) {
        Set<SelectionKey> set = selector.selectedKeys();
        Iterator<SelectionKey> iterator = set.iterator();
        while (iterator.hasNext()) {
            SelectionKey selectionKey = iterator.next();
            try {
                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel serverSocketChannel = 
                        (ServerSocketChannel) selectionKey.channel();
                    SocketChannel channel = serverSocketChannel.accept();
                    System.out.println("Create a link:" + channel.getRemoteAddress());
                    channel.configureBlocking(false);
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    channel.register(selector, SelectionKey.OP_READ, buffer);
                } else if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                    buffer.clear();
                    StringBuilder sb = new StringBuilder();
                    int read = 0;
                    while ((read = channel.read(buffer)) > 0) {
                        buffer.flip();
                        sb.append(Charset.forName("UTF-8").
                                  newDecoder().decode(buffer));
                        buffer.clear();
                    }
                    System.out.printf("Received from %s: %s\n", channel.getRemoteAddress(), sb); buffer.clear(); Thread.sleep((int) (math.random () * 1000)); buffer.put(("Roger that. You sent:" + sb + "\r\n").getBytes("utf-8"));
                    buffer.flip();
                    channel.write(buffer);
                    System.out.printf("Reply %s: %s\n", channel.getRemoteAddress(), sb); channel.register(selector, SelectionKey.OP_READ, buffer.clear()); } } catch (IOException | InterruptedException e) { selectionKey.cancel(); selectionKey.channel().close(); System.err.println(e.getMessage()); } iterator.remove(); }}}Copy the code

Write a client that simulates 50 threads requesting the server side at the same time, simulating a random sleep in readHandler. The client code:

public static void main(String[] args) throws IOException {
    for (int i = 0; i < 50; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    clientHandler();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    return;
}

private static void clientHandler() throws IOException {
    long start = System.currentTimeMillis();
    Socket socket = new Socket();
    socket.setSoTimeout(10000);
    socket.connect(new InetSocketAddress(9999));
    OutputStream outputStream = socket.getOutputStream();
    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream));
    bw.write("Hello, this is Client." + socket.getLocalSocketAddress() + "\r\n");
    bw.flush();

    InputStream inputStream = socket.getInputStream();
    BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
    System.out.printf("Received server response: %s, processed %d\r\n", br.readLine(), (System.currentTimeMillis() - start));
    br.close();
    inputStream.close();

    bw.close();
    outputStream.close();
}Copy the code



Implementation principle of Selector

[root@f00e68119764 TMP]# strace -ff -o out /usr/lib/jvm/java-1.8.0/bin/java NIOServerSingle You can see the following snippet in the log:

20083 socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4
20084 setsockopt(4, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
20085 clock_gettime(CLOCK_MONOTONIC, {tv_sec=242065, tv_nsec=887240727}) = 0
20086 fcntl(4, F_GETFL)                       = 0x2 (flags O_RDWR)
20087 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
20088 bind(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
20089 listen(4, 50)                           = 0
20090 getsockname(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
20091 getsockname(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
20092 epoll_create(256)                       = 7
21100 epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=158913789956}}) = 0
21101 epoll_wait(7, [{EPOLLIN, {u32=4, u64=158913789956}}], 8192, -1) = 1Copy the code

As can be seen, in Java NIO (Java 1.8) is the bottom of the system called epoll, about epoll please go out right turn here no longer verbose.

From the source can also be seen:

public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}Copy the code

OpenSelector is an abstract method implementation class. The code on Linux is as follows:

public class EPollSelectorProvider
    extends SelectorProviderImpl
{
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        returnInheritedChannel.getChannel(); }}Copy the code

Java NIO takes advantage of the capabilities provided by the system kernel.

Multithreading

Let’s modify the single-threaded example, readHandler random sleep, a little bit. Simulating server execution of a request is too slow, as shown in the following figure:

When the 15th request comes, randomly sleep:

// Simulate server processing timeif (t.get() == 15) {
    Thread.sleep((int) (Math.random() * 10000));
}Copy the code

As a result, all client executions have a short wait after the 15th thread



This is easy to explain, because in single-threaded processing, channel creation and I/O reading and writing are all one Thread, facing 50 clients, and I/O time needs to be queued for processing. Therefore, we also mentioned in the Redis series that in Redis, we should try to avoid situations where the operation of a particular key is time-consuming. You can refer to going out and turning right

We made some changes to the code, so the client code stayed the same, and the server code changed a little bit. Add a thread to handle read/write time as follows:

if (selectionKey.isAcceptable()) {
    ServerSocketChannel serverSocketChannel = 
        (ServerSocketChannel) selectionKey.channel();
    SocketChannel channel = serverSocketChannel.accept();
    System.out.println("Create a link:" + channel.getRemoteAddress());
    channel.configureBlocking(false);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.register(selector, SelectionKey.OP_READ, buffer);
} else if (selectionKey.isReadable()) {
    service.execute(new Runnable() {
        @Override
        public void run() {try {// processing logic …………………… } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }}}); }Copy the code

This is equivalent to having two threads on the server side. One is the selector initiated by the main thread to listen for the OP_ACCEPT state of the channel, and the other is to handle the reading and writing of the channel. The program can also continue execution, slightly faster.



Reactor model

Contact with NIO must have heard of the term reactor, reactor is often mixed into NIO, so that many people confuse the concept. What a Reactor is, wikipedia explains:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

Four core points:

  1. Reactor Design Pattern a reactor design pattern is a design pattern, not specific to a language or framework.

  2. Event Handling Pattern

  3. Delivers to a service handler by one or more inputs concurrently

  4. Demultiplexes the incoming requests and dispatches them

Let’s make some changes to the single-threaded example:

public static void main(String[] args) throws IOException {
    Selector selector = initServer();
    while (selector.select() > 0) {
        Set<SelectionKey> set = selector.selectedKeys();
        Iterator<SelectionKey> iterator = set.iterator();
        while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); }}}Copy the code

InitServer implementation:

private static Selector initServer() throws IOException {
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);
    serverChannel.bind(new InetSocketAddress(9999));

    Selector selector = Selector.open();
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("Server startup");
    return selector;
}Copy the code

Implementation of dispatcher:

private static void dispatcher(SelectionKey selectionKey) {
    try {
        if(selectionKey isAcceptable ()) {/ / I am only responsible for processing link acceptHandler (selector, selectionKey); }else if(selectionKey.isreadable ()) {// I only deal with read and write datareadHandler(selector, selectionKey); } } catch (IOException | InterruptedException e) { selectionKey.cancel(); selectionKey.channel().close(); System.err.println(e.getMessage()); }}Copy the code

AcceptHandler implementation:

ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("Create a link:" + channel.getRemoteAddress());
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.register(selector, SelectionKey.OP_READ, buffer);Copy the code

Implementation of readHandler:

SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
StringBuilder sb = new StringBuilder();
int read = 0;
while ((read = channel.read(buffer)) > 0) {
    buffer.flip();
    sb.append(Charset.forName("UTF-8").newDecoder().decode(buffer));
    buffer.clear();
}
System.out.printf("Received from %s: %s\n", channel.getRemoteAddress(), sb); buffer.clear(); Thread.sleep((int) (math.random () * 1000)); buffer.put(("Roger that. You sent:" + sb + "\r\n").getBytes("utf-8"));
buffer.flip();
channel.write(buffer);
System.out.printf("Reply %s: %s\n", channel.getRemoteAddress(), sb);
channel.register(selector, SelectionKey.OP_READ, buffer.clear());Copy the code

Single thread Reactor

The code after transformation has the following characteristics:

  1. Event-driven (NIO selector, underlying implementation of event-driven epoll jdk1.8)

  2. Unified Dispatch Centre (Dispatcher Method)

  3. Different event handling (Accept and Read Write split)

Having basically implemented Reactor’s single-threaded schema, let’s do a few more changes to the sample code:

public class ReactorDemo {
    private Selector selector;

    public ReactorDemo() throws IOException {
        initServer();
    }
    private void initServer() throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(9999));

        selector = Selector.open();
        SelectionKey selectionKey = serverChannel.
            register(selector, SelectionKey.OP_ACCEPT);
        selectionKey.attach(new Acceptor());
        System.out.println("Server startup");
    }

    public void start() throws IOException {
        while (selector.select() > 0) {
            Set<SelectionKey> set = selector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                dispater(selectionKey);
                iterator.remove();
            }

        }
    }

    public void dispater(SelectionKey selectionKey) {
        Hander hander = (Hander) selectionKey.attachment();
        if(hander ! = null) { hander.process(selectionKey); } } private interface Hander { void process(SelectionKey selectionKey); } private class Acceptor implements Hander { @Override public void process(SelectionKey selectionKey) { try { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel channel = serverSocketChannel.accept(); System.out.println("Create a link:" + channel.getRemoteAddress());
                channel.configureBlocking(false); selectionKey.attach(new ProcessHander()); channel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } } private class ProcessHandler implements Hander { @Override public void process(SelectionKey selectionKey) { } } public static void main(String[] args) throws IOException { new ReactorDemo().start(); }}Copy the code

We implement the most basic single-thread model of a single-reactor. After the program starts, the selector is responsible for obtaining and separating the available sockets and handing them to the Dispatcher, who will hand them to different handlers for processing. Acceptors only handle socket connections, and ProcessHandler handles I/OS.

I redrew the Reactor diagram that was circulating on the Internet according to my own interpretation



Multithreaded Reactor

In the example code above, there is only one thread processing from the time the socket is set up until the I/O completes. In the NIO single-threaded example we tried to add a thread pool to speed up IO task processing. How does that happen in the REACTOR model?

NIO multithreading adds a thread pool to process all processHandler threads. You can use CPU multi-core to speed up business processing.



Multiple Reactor model

Referring to ReactorDemo, our acceptors use the same selector to handle socket links as our handlers use to handle IO. If we had two selectors on a multithreaded basis, one that handled socket links and one that handled network IO, that would increase the throughput of the system. How would we do that?

public class ReactorDemo {
    private Selector selector;
    private Selector ioSelector;


    public ReactorDemo() throws IOException {
        initServer();
    }

    private void initServer() throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(9999));

        selector = Selector.open();
        ioSelector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Server startup");
    }

    public void startServer() {
        Executors.newFixedThreadPool(1).execute(new Runnable() {
            @Override
            public void run() { try { majorListen(); } catch (IOException e) { e.printStackTrace(); }}}); Executors.newFixedThreadPool(1).execute(newRunnable() {
            @Override
            public void run() { try { subListen(); } catch (IOException e) { e.printStackTrace(); }}}); } public void majorListen() throws IOException { System.out.println("Master selector starts");
        while (selector.select() > 0) {
            System.out.println("Master selector has an event");
            Set<SelectionKey> set = selector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    new Acceptor().process(selectionKey);
                }
                iterator.remove();
            }

        }
    }

    public void subListen() throws IOException {
        System.out.println("Selector starts");
        while (true) {
            if (ioSelector.select(100) <= 0) {
                continue;
            }
            System.out.println("Selector has an event");
            Set<SelectionKey> set = ioSelector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                selectionKey.attach(new ProcessHander());
                dispater(selectionKey, true);
                iterator.remove();
            }

        }
    }

    public void dispater(SelectionKey selectionKey, boolean isSub) {
        Hander hander = (Hander) selectionKey.attachment();
        if(hander ! = null) { hander.process(selectionKey); } } private interface Hander { void process(SelectionKey selectionKey); } private class Acceptor implements Hander { @Override public void process(SelectionKey selectionKey) { try { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel channel = serverSocketChannel.accept(); System.out.println("Get a link:" + channel.getRemoteAddress());
                channel.configureBlocking(false);
                channel.register(ioSelector, 
                                 SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                ioSelector.wakeup();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private class ProcessHander implements Hander {

        @Override
        public void process(SelectionKey selectionKey) {
            try {
                SocketChannel channel = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.clear();
                StringBuilder sb = new StringBuilder();
                int read = 0;
                if ((read = channel.read(buffer)) > 0) {
                    buffer.flip();
                    sb.append(Charset.forName("UTF-8").newDecoder().decode(buffer));
                    buffer.clear();
                } else if (read= = 0) {return;
                } else if (read == -1) {
                    if (selectionKey.isValid()) {
                        selectionKey.cancel();
                        channel.close();
                    }
                }
                System.out.printf("Received from %s: %s\n", 
                                  channel.getRemoteAddress(), sb);
                buffer.clear();
                buffer.put(("Roger that. You sent:" + sb + "\r\n").getBytes("utf-8"));
                buffer.flip();
                channel.write(buffer);
                System.out.printf("Reply %s: %s\n", channel.getRemoteAddress(), sb); channel.register(ioSelector, SelectionKey.OP_READ, buffer.clear()); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new ReactorDemo().startServer(); }}Copy the code

The example creates a selector and an ioSelector, where the selector handles only the establishment of the socket. The socket is registered to an ioSelector in the Acceptor. Process method. In the ProcessHander. Process method, ioSelector handles only I/O events. So, we split up the selectors. If we look at multithreading, we can create N threads that handle the I/O events corresponding to the I/O selector.



conclusion

So far, we know three model results of Reactor, which are single Reactor single thread, single Reactor multi-thread and multiple Reactor multi-thread respectively. All this code is not rigorous enough just to show that you can use relationships between multiple threads or multiple selectors. Summary key points:

  1. Reactor is a design pattern

  2. Event-driven processing

  3. Use the strategy of multiplexing to let different business processes do their jobs

  4. There are three implementation modes: single thread, single Reactor multithreading and multiple Reactor multithreading

Reference:

Gee.cs.oswego.edu/dl/cpjslide…


A series of

NIO: Linux/IO fundamentals

NIO sees and says (2) – The two BIO in Java

NIO sees and says (3) — different IO models

NIO: Java NIO

NIO also said that (v) : Do it, do it today, understand Buffer


Pay attention to my

If you read on wechat, please click the link to follow me. If you read on PC, please scan the code to follow me. Welcome to communicate with me and point out mistakes at any time