“This is the second day of my participation in the August More Text Challenge.


preface

NIO is the foundation for understanding NetTY. The first article in the NetTY series starts with NIO. It is not the most comprehensive introduction to NIO, but it is definitely a heart to heart article.


What is NIO?

NIO (New IO) is a synchronous non-blocking I/O model, which is also the basis of I/O multiplexing. It has obvious advantages in high concurrency and large number of connections.

NIO is a Channel – and buffer-based I/O approach with three core components: channels, buffers, and selectors. When a thread operates on data, it either reads data from a Channel to a Buffer or writes data from a Buffer to a Channel. Selector is used to listen for events on multiple channels, to tell a connection to open, data to arrive, and so on, and a thread can listen for multiple data channels.

How is it different from traditional IO?

While traditional IO is stream-oriented, NIO is buffer-oriented.

  • Java IO stream-oriented means that one or more bytes are read from the stream at a time until all bytes are read without being cached anywhere. In addition, it cannot move data back and forth in a stream. If you need to move data read from the stream back and forth, you need to cache it into a buffer first.
  • NIO reads data directly from the channel into the buffer, so it can be moved back and forth in the buffer.
  • The various streams of IO are blocked, and while reading and writing, the thread is blocked, or data is written completely, NIO does not.

First, explain synchronous, asynchronous, blocking, and non-blocking

1. Synchronous and asynchronous

Synchronous and asynchronous focus on the mechanism by which messages are communicated.

Synchronization is when you make a call that doesn’t return until you get the result. That is, the caller actively waits for the result to return.

Asynchrony, on the other hand, returns immediately after the call is made, with no result. The result of the call is not immediately available, but is notified by the caller through the status or handled by a callback function.

Here’s an example from life to help you understand:

When you think about his girlfriend on valentine’s day to her favorite restaurant for dinner, but may need to book hotel, so you call your boss, if it is a synchronous mechanism, the boss will say, you wait a moment, I check, and then check ah, such as check good (may be a few seconds, can also be a century, haha) tell you the result, the process you are waiting for the boss to give you a reply.

With asynchronous communication, your boss will tell you I’ll check it, call you when it’s done, and then hang up (no results returned). I’ll call you when I’m done. In this scenario the boss calls back by “calling back”.

2. Blocking and non-blocking

Blocking and non-blocking are concerned with a state in which a program is waiting for the result of a call.

Block calls. When a thread makes a read() or write() call, the thread blocks and cannot do anything else until the result is returned.

A non-blocking call is when a thread reads or writes data without waiting for results, and it can do other things. Threads typically spend idle time of non-blocking IO performing IO operations on other channels, so a single thread can manage multiple input and output channels. Thus, NIO allows the server side to use one or a limited number of threads to simultaneously process all the clients connected to the server side.


2. Channel

A Channel can be understood as a road. If you want to transport fruit to a place, you need to transport it through the road, and data is equivalent to what you need to transport. The road is the Channel, and the car carrying fruit is the buffer zone. A Channel is very similar to a Stream in traditional IO. The main difference lies in: channel it is bidirectional, it can be read, also can be written. For example, InputStream can only read, and OutputStream can only write.

The main channel implementation class

  • FileChannel: Channel for reading, writing, mapping, and manipulating files
// The first creation method
RandomAccessFile randomAccessFile=new RandomAccessFile("/Users/channel/Logs/2021-06-19.log"."rw");
       FileChannel fileChannel=randomAccessFile.getChannel();
// The second creation method
        FileChannel fileChannel = FileChannel.open(Paths.get("/Users/channel/Logs/2021-08-06.log"),     
        StandardOpenOption.READ,StandardOpenOption.WRITE);
Copy the code
  • DatagramChannel: Reads and writes data channels on the network through UDP
 //1. Create a DatagramChannel object using the open() method of DatagramChannel
        DatagramChannel datagramChannel = DatagramChannel.open();
        // Bind a port.
        datagramChannel.bind(new InetSocketAddress(1234));
Copy the code
  • SocketChannel: Reads and writes data on the network through TCP
  SocketChannel socketChannel=SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1".8080));
Copy the code
  • ServerSocketChannel: Listens for new TCP connections and creates a SocketChannel for each connection
   ServerSocketChannel serverSocketChannel= ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1".3333));
           // Create a socketChannel method with the Accept method for read and write operations
       SocketChannel socketChannel= serverSocketChannel.accept();
Copy the code

Main methods of Channel

  • Filechannel. open(file path, operation permission): Gets a channel
  • Read (buffer): Writes data from a channel to a buffer
  • Read (buffers): Writes data sequentially to a buffer array. When a buffer is full, it is written to the next buffer
  • Write (buffer): Writes the contents of the buffer to a file
  • Write (buffers): Writes the buffer array data sequentially to a channel
  • TransferFrom (inChannel, 0, inchannel.size ()) : Transfers data from a source inChannel to the local channel
  • TransferTo (): transfers the data of the local channel to the destination channel
  • Socketchannel.connect (new InetSocketAddress(hostname,port)): Remote channel connection
  • ServerSocketChannel. Socket (). The bind (new InetSocketAddress (hostname, port) : listening
  • Create a socketChannel serverSocketChannel. The accept () :
  • Close (): closes the channel

The code examples

 public static void fileChannelTest(a) throws IOException {
        /** * Two ways to get FileChannel * 1. Via RandomAccessFile * 2. Through FileChannel * /
        // RW stands for read and write support
// RandomAccessFile randomAccessFile=new RandomAccessFile("/Users/channel/Logs/2021-08-19.log","rw");
// FileChannel fileChannel=randomAccessFile.getChannel();

        FileChannel fileChannel = FileChannel
        .open(Paths.get("/Users/channel/Logs/2021-08-19.log"), 
                StandardOpenOption.READ,StandardOpenOption.WRITE);
       
        // Read the contents of the file:
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        // Write the data in the channel to buffer
        int num=fileChannel.read(buffer);
        System.out.println("Amount of data read :"+num+". Contents :\r\n"+new String(buffer.array()));
        buffer.clear();
        // After writing, you can continue to append to the file
        buffer.put("\r\n".getBytes());
        buffer.put("Hi, I'm good, everybody's good.".getBytes());
        // Switching buffer from write mode to read mode requires calling flip
        buffer.flip();
        while (buffer.hasRemaining()){
            // Write the contents of Buffer to a file
            fileChannel.write(buffer);
        }
        // Close the channel
        fileChannel.close();
    }
Copy the code

N Buffer processing

/** * Scatter information from a Channel to N buffers. ** Gather information from N buffers to a Channel in sequence. ** /
    public static void scatterGatherTest(a) throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get("/Users/channel/Logs/2021-08-19.log"), StandardOpenOption.READ,StandardOpenOption.WRITE);
        ByteBuffer oneBuffer=  ByteBuffer.allocate(1024);
        ByteBuffer twoBuffer=  ByteBuffer.allocate(1024);
        ByteBuffer[] byteBuffers={oneBuffer,twoBuffer};
        // The read method writes data sequentially to the buffer array, and when one buffer is full, it writes to the next
        fileChannel.read(byteBuffers);
        oneBuffer.flip();
        twoBuffer.flip();
        // The write() method takes care of writing data sequentially to channels.
        fileChannel.write(byteBuffers);
    }
Copy the code

Direct transmission between channels

    public static void transfer(a) throws IOException {
        FileChannel fileOneChannel = FileChannel.open(Paths.get("/Users/channel/Logs/2021-08-19.log"), StandardOpenOption.READ,StandardOpenOption.WRITE);
        FileChannel fileTwoChannel = FileChannel.open(Paths.get("/Users/channel/Logs/2021-08-20.log"), StandardOpenOption.READ,StandardOpenOption.WRITE);
        // Transfer method internal is a direct buffer implementation
        // Transfer fileTwoChannel data to fileOneChannel
        fileOneChannel.transferFrom(fileTwoChannel, 0, fileTwoChannel.size());
        // Transfer fileOneChannel data to fileTwoChannel
        fileOneChannel.transferTo(0,fileTwoChannel.size(),fileTwoChannel);
        fileTwoChannel.close();
        fileOneChannel.close();
    }
Copy the code

3. Buffer

concept

A Buffer is essentially a container, a contiguous array. A Channel provides a Channel for reading data from a file or network, but all data read or written must pass through Buffer, as shown in the following figure:When a client sends data, it must be stored in Buffer and then transported through Channe. In order to receive data, the server must read the data into the Buffer through a Channel, and then read the data from the Buffer for processing.

Three properties of Buffer

A Buffer has three attributes: Capacity (a fixed size value), position(pointing to the next position), and limit (limiting the amount).

In addition to these three attributes, there is another tag attribute :mark, which temporarily holds a specific position that can be restored when needed.

capacity

As a block of memory, a Buffer has a fixed size, called “capacity”. You can only write capacity into it, and once the Buffer is full, you can’t write any more. Capacity, once initialized, cannot be changed. When a Buffer object is initialized, it allocates internal memory based on capacity. Once the memory is allocated, the size cannot be changed. Memory is typically allocated using an abstract subclass of Buffer, bytebuffer.allocate ().

position

Position represents the current position, and the value of position is different in the two Buffer modes. Read mode When reading data, the data is read from position. When you switch Buffer from write to read mode, position is reset to 0. When data is read from position in the Buffer, position moves forward to the next readable position. Write Mode When writing data to a Buffer, position indicates the current write position. The initial position value is 0. The maximum value of position can be Capacity -1. Each time a data is written to buffer, position is moved backwards to the next writable position. This one moves backwards to the next position, and maybe it’s a little confusing, but let me give you an example: Let’s say five pieces of data have been written, because time starts at position 0, so when five pieces of data have been written, the fifth piece of data is at position 4. Some readers might think position is 4, but position always moves to the next writable position, so the next writable position is 5, So the true value of position is 5, not 4.

limit

Limit indicates the maximum limit, and like position, the read and write mode values are different. Read mode In read mode, the Buffer limit indicates how much data can be read from the Buffer. When Buffer switches from write mode to read mode, the value of limit is set to the value of position in write mode. Write mode limit Indicates the maximum amount of data that can be written. When switching from read mode to write mode, the limit value is changed to the capicity of Buffer.

Buffer type

Let me draw a pictureThese buffers cover almost all of the Basic Java types that can be transferred from IO, so we won’t cover them here.

Methods in Buffer

1. Allocate () method allocates a buffer and creates a buffer size.

 // Each buffer object provides a static method allocate, which is used to create a buffer size
        // Help us implement a Buffer
        ByteBuffer byteBuffer=ByteBuffer.allocate(9);
Copy the code

2. Put () method: Writes data

    ByteBuffer byteBuffer=ByteBuffer.allocate(9);
        byte[] bytes=new byte[] {0.1.2.3.4.5};
        byteBuffer.put(bytes);
Copy the code

Flip () method: Switch to read mode

byteBuffer.flip();
Copy the code

4. Get () method: read data

       Postion +1; postion+1; postion+1
        byte val1=byteBuffer.get();

        //2. Obtain the specified location data
        byte val2=byteBuffer.get(3);

        // Copy the data in the buffer to a specified array
        byte[] bytes=new byte[3];

        //3. Copy the buffer data to the specified array
        byteBuffer.get(bytes);
         // 4. Return the byte[] array inside Buffer directly
        String str=new String(byteBuffer.array());
Copy the code

There are four ways to read, and I’ll show you the third way to help you understand

5. Compact () method

The Compact method processes data that has not yet been read, that is, data between position and limit (data that has not yet been read) by moving it to the left and then writing it later.

  public static void compactBuffer(a){
        ByteBuffer byteBuffer=ByteBuffer.allocate(9);
        byte[] bytes=new byte[] {0.1.2.3.4.5};
        byteBuffer.put(bytes);

        // The flip method needs to be called before reading
        byteBuffer.flip();

        for (int i=0; i<4; i++){ System.out.println(byteBuffer.get()); } System.out.println("Position prior to Compact operation"+byteBuffer.position());
        // Write some data into the database, so as to increase the writable space
        byteBuffer.compact();
        byte b=9;
        byteBuffer.put(b);
        System.out.println("Position position after data filling after compact operation"+byteBuffer.position()); } **** Result value **** Position before compact operation4The location of the limit6Position position after filling data after compact operation2The location of the limit9Position position after filling data after compact operation3The location of the limit9
Copy the code

6. Clear () : clear Buffer, position set to 0, limit set to Capacity Position is set to 0, limit remains unchanged, and read data can be re-read due to position resetting


        ByteBuffer byteBuffer=ByteBuffer.allocate(9);
        byte[] bytes=new byte[] {0.1.2.3.4.5};
        byteBuffer.put(bytes);

        // The flip method needs to be called before reading
        byteBuffer.flip();
        byte[] readbyte=new byte[3];

        //4. Copy the buffer data to the specified array
        byteBuffer.get(readbyte);
        System.out.println("Byte array position before rewind operation"+byteBuffer.position()+"Position of limit"+byteBuffer.limit());
        byteBuffer.rewind();
        System.out.println("Byte array position after rewind operation"+byteBuffer.position()+"Position of limit"+byteBuffer.limit());
        System.out.println(byteBuffer.get());
   /* * Console print value // / byte array position before rewind 4 limit 6 // byte array position after rewind 0 limit 6 // 0Copy the code

The code examples

Allocate a buffer in THE JVM memory by using the allocate() method. The indirect cache first reads data from the physical hard disk into physical memory, then copies the data into THE JVM memory, and finally reads the final data from the JVM. The code is as follows (example) :

data = pd.read_csv(
    'https://labfile.oss.aliyuncs.com/courses/1283/adult.data.csv')
print(data.head())
Copy the code

4. Selector

A Selector can be called a Selector or a multiplexer. It is used to check whether one or more NIO channels are readable or writable. In this way, a single thread can manage multiple channels, that is, multiple network connections can be managed. The selectoe underlying function defaults to select on Windows and epoll on Linux. Why do WE use Selector?

Using Selector requires fewer threads to process multiple channels, and can significantly reduce the overhead of thread context switching compared to using multiple threads.

Selector use method

1. Creation of Selector

Create a Selector object by calling the open method:

  Selector selector = Selector.open();
Copy the code

2. Register a Channel with a Selector

   // Set it to non-blocking
        socketChannel.configureBlocking(false);
        / / read-only
        socketChannel.register(selector, SelectionKey.OP_READ);
Copy the code

Channels are blocked by default, so they must be set to non-blocking here. The FileChannel is not non-blocking, so the FileChannel cannot be switched to non-blocking mode. The second argument to the registor() method is a collection. It means what event you’re interested in when you listen through Selector. You can listen for four different types of events:

SelectionKey.OP_CONNECT  // Connection ready
SelectionKey.OP_ACCEPT   // Ready to receive
SelectionKey.OP_READ    / / read in place
SelectionKey.OP_WRITE    / / write ready
Copy the code

The channel fires an event to indicate that the event is ready. When a Channel successfully connects to another server, it is called connection ready. A Server Socket Channel is ready to receive incoming connections. A channel with data to read represents read readiness. The channel waiting to write data represents write ready.

3. SelectionKey is introduced

A SelectionKey represents the registration relationship between a particular channel object and a particular selector object.

   key.attachment(); 
   key.channel();
   key.selector();
   key.interestOps();
   key.readyOps();
Copy the code

InterestOps () can determine whether a Selector is interested in a certain event for a Channel by using the following method

int interestSet = selectionKey.interestOps(); 
booleanIsInterestedInAccept = (interestSet & selectionkey.op_accept) == selectionkey.op_accept;boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
Copy the code

The key.readyops () ready collection is a collection of operations for which channels are ready, and the following methods are defined in JAVA to check that these operations are ready.

// The method to create the ready collection
int readySet = selectionKey.readyOps();
// a method to check that these operations are ready
key.isAcceptable();// Whether it is readable or not returns true
boolean isWritable(a):// Writable or not, returns true
boolean isConnectable(a):// Whether it is connectable or not returns true
boolean isAcceptable(a):// Whether it can be received, if true
Copy the code

4. Select () method

    Select (): blocks the current thread until a channel managed by selector is ready to wake up the main thread.
        int readyChannels=  selector.select();
        //2. Select (long timeout) this interface will block the calling thread with timeout.
        // If there are channels ready within the timeout period, return early. Otherwise, wait until the timeout period.
        int readyChannel=  selector.select(2000);
        //selectNow () is a non-blocking select interface that returns immediately after being called, or the number of ready channels if there are at the bottom
        int readyChannelNow=selector.selectNow();
Copy the code

The code examples

With all of these concepts and methods in mind, here’s a simple code example:

        // Get a Selector multiplexer
        Selector selector = Selector.open();
        // Create ServerSocketChannel and listen on port 8866.
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(8866));
        serverSocketChannel.configureBlocking(false);
        // Register serverSocketChannel with selector and set the event of interest to accept
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            // Use the blocking select method, which only returns ready channels registered with selector.
            selector.select();
            // Get the ready list
            Set<SelectionKey> keys = selector.selectedKeys();
            // Get the iterator
            Iterator<SelectionKey> it = keys.iterator();

            while (it.hasNext()) {
                SelectionKey key = it.next();
                key.attachment();
                key.channel();
                key.selector();
                key.interestOps();
                key.readyOps();
                key.isAcceptable();
                try {
                    key.cancel();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // The current ready key means that an ACCEPT event is ready and a client wants to connect to the current server
                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                    // The current ServerSocketChannel is ACCEPT ready, so the current ACCEPT method returns a SocketChannel instance
                    SocketChannel socketChannel = serverChannel.accept();
                    // Set the client socket to non-blocking mode
                    socketChannel.configureBlocking(false);
                    // The client channel registers with the current selector
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    it.remove();
                } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    try {
                        // The client socket read buffer already has data waiting to be read, so the read method takes the data directly and writes it to byteBuffer
                        while (true) {
                            byteBuffer.clear();
                            int reads = socketChannel.read(byteBuffer);
                            if (reads == -1) break; byteBuffer.flip(); socketChannel.write(byteBuffer); }}catch (IOException e) {
                        e.printStackTrace();
                        // After the peer is closed, the server must be closed, otherwise the selector will always check the socketchannel
                        socketChannel.close();
                    } finally{ it.remove(); }}}}Copy the code

conclusion