Secular wanderer: a programmer who focuses on technical research

Say what I said before

After talking about Socket programming, we will talk about NIO topics, of course, here will only introduce the use of a wide range of classes/methods, other use is not much, not much introduction, when used to check the API is good

In this section we’ll give you a general idea of how to use it, and we’ll focus on what we missed when we get to Netty. For example:

  • Four network models
  • Reactor model
  • .

The Channel and the Buffer

Socket programming, which we talked about earlier, is essentially operating on the IO stream once the connection is established, which is part of the synchronous blocking model, also known as BIO

The disadvantages of this approach are obvious, so to improve efficiency, NIO came after JDK1.4:

  • hasThe bufferFor the carrier, through the establishmentChannelFor data transmission

NIO is a non-blocking model, which means:

  • When we callread/writeMethod, if there is data at this point, the data is read and returned
  • If there is no data, it returns directly and does not block the main thread

And all operations are based on event listening

First let’s talk about buffers:

Buffer

A Buffer is a specific type of container used to store data that needs to be transferred. This class is an abstract class.

The name should tell you what it means, but I won’t tell you more here

The following examples use ByteBuffer as an example

Important attributes

I’m talking about its features, which contain three very important properties:

  • capacity

This property represents the current Buffer capacity, which is fixed after initialization and cannot be modified.

// Create process
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// Specify content creation directly
ByteBuffer byteBuffer = ByteBuffer.wrap("123".getBytes());
// Allocate a direct memory buffer
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
Copy the code

Three differences:

  • allocate()It just opens up an array space
  • wrap()Means to wrap an array object directly into a buffer, so that the object created,limitandcapacityBoth are array lengths,positionIs 0, so there is no need to call it when reading dataflip()methods
  • allocateDirect()andallocate()The expression is the same, but the underlying implementation is different.allocateDirect()fromDirect memoryCreate a buffer space in.

Direct memory Memory that belongs to the kernel space managed by the OS and can be zero-copy

By looking at the source code step by step into Buffer, you can see it

// Cap is the capacity we initialized
this.capacity = cap;
Copy the code
  • limit

Limit indicates a limit, that is, an index limit that cannot be read or written to an element. By default, after initialization, the limit cannot exceed the capacity

/ / 1024
System.out.println("limit:" + byteBuffer.limit());
Copy the code
  • position

Indicates the position of the index after reading or writing. The value cannot exceed the limit or a BufferOverflowException will be thrown

byteBuffer.put("11".getBytes());
// 2 inserts 2 bits of data
System.out.println("position:" + byteBuffer.position());
Copy the code

Read/write mode

The most troublesome part about using Buffer is here. As mentioned earlier, a Channel transmits data through Buffer, so we can consider Buffer as the central point

  • As to theBufferIn theput()Is calledWrite mode, and by defaultWrite mode, can be accessed throughclear()For mode conversion
byteBuffer.clear();
Copy the code
  • When theBufferIn theget()Is calledRead modeWe passedflip()To transform the
byteBuffer.flip();
Copy the code

Here’s an experiment to check:

public static void main(String[] args) {
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    byteBuffer.put("123456".getBytes());

    print("Original", byteBuffer);
    byteBuffer.flip();
    print(After the "flip", byteBuffer);
    byteBuffer.clear();
    print("After the clear", byteBuffer);

}

private static void print(String tag, ByteBuffer byteBuffer) {
    System.out.println(tag + ":position:" + byteBuffer.position());
    System.out.println(tag + ":limit:" + byteBuffer.limit());
    System.out.println("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = =");
}
Copy the code

The output

Original: the position:6Original: limit:1024= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = : after the flip the position:0After the flip: limit:6= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = : after the clear position:0After the clear: limit:1024= = = = = = = = = = = = = = = = = = = = = = = = = = = = = =Copy the code
Important conclusions

By analyzing the above output, we can draw the following conclusions:

  • By default, Buffer is in write mode, postion indicates where data is written, and limit indicates the maximum limit

  • When we call flip(), position is set to 0, meaning we read from bit 0, and limit is set to the original position, meaning we cannot read more than the length of the last write, and if we put() here, the previous data will be overwritten

  • When clear() is called, it returns to write mode. Position is still 0, but the data that was written to it does not disappear until put() is rewritten

  • Clear () and flip() only modify position and limit, not the actual storage array

I recommend that you try it out yourself, and look at the changes in the two properties for every detail

Methods to sort out

Here are a few methods that we use a lot:

array()
byteBuffer.flip();
final byte[] array = byteBuffer.array();
System.out.println(new String(array, 0, byteBuffer.limit()));
byteBuffer.clear();
Copy the code

This method returns the maximum size of the array, so it is best to switch to read mode when reading

putXXX()

In addition to writing to byte arrays, bytebuffers can write to other types of data, including:

Same thing with get. I’m not going to write it here

Channel

The channels used with buffers can be understood as a connection. Depending on the transport protocol, there are different Channel implementations:

FileChannel

This is the channel for manipulating files, and it’s important to note that a FileChannel is a channel in blocking mode

Open channel
// The first boot mode
Path path = Paths.get("File path");
final FileChannel fileChannel = FileChannel.open(path);

// Start by stream
final FileChannel fileChannel = new FileInputStream("File path").getChannel();
Copy the code
force()

The process of writing the contents of the buffer to the channel is performed by the operating system. For performance reasons, it is not possible to write in real time, so to ensure that the data is actually written to disk, you need to force a flush

fileChannel.force(true);
Copy the code

The other ones are the same as IO, so I’m not going to write them, but I’m going to give you an example. Okay

NIO based file transfer

public class CopyFile {
    public static void main(String[] args) throws IOException {
        // Get the file
        final Path source = Paths.get("D:\\Working Directory\\project\\study\\study-java\\src\\main\\java\\zopx\\top\\study\\jav\\_nio\\CopyFile.java");

        // Read channel: open through open
        final FileChannel sourceChannel = FileChannel.open(source);
        // Write the channel: open through the stream
        final FileChannel targetChannel = new FileOutputStream(new File("D:\\Working Directory\\project\\study\\study-java\\src\\main\\java\\zopx\\top\\study\\jav\\_nio\\_CopyFile.txt")).getChannel();

        // --------------------
        / / define ByteBuffer
        ByteBuffer bytebuffer = ByteBuffer.allocate(1024);
        // Read actually writes data to ByteBuffer, so this is the default and no conversion is required
        while((sourceChannel.read(bytebuffer)) ! = -1) {
            // Write reads data from byteBuffer and sends it via channel, so it needs to be converted to read mode
            bytebuffer.flip();
            // Write out the data in bytebuffer to the channel
            targetChannel.write(bytebuffer);
            // After writing the data out, it is better to change the mode of bytebuffer to write mode, otherwise writing the data will fail
            bytebuffer.clear();
        }
        
        // Force refresh
        targetChannel.force(true);

        // Close the channelsourceChannel.close(); targetChannel.close(); }}Copy the code

There is another way to do this without worrying about ByteBuffer mode conversions

long size = sourceChannel.size();
long pos = 0, count = 0;
while (pos < size) {
    count = size - count < 1024 ? size - count : 1024;
    pos += targetChannel.transferFrom(sourceChannel, pos, count);
}
Copy the code

Channel under TCP/IP

They come in pairs. I don’t want to write them separately

Channels based on network transmission are respectively expressed as:

  • ServerSocketChannel
  • SocketChannel

Same as ServerSocket/Socket before

The channel is enabled on the server
// Open the channel
ServerSocketChannel ssc = ServerSocketChannel.open();
// Bind ports
ssc.bind(new InetSocketAddress(8888));
Copy the code
The client opens the channel
// Connect to the server
SocketChannel sc = 
    SocketChannel.open(new InetSocketAddress("127.0.0.1".8888));

// You can also connect to the server by calling the connect() method
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1".8888))
Copy the code

The following methods are the same on both sides, taking the server as an example

Main Configuration items
ssc.configureBlocking(false);
Copy the code

This configuration sets the blocking mode of the channel if:

  • Set to true, indicating blocking model, and subsequentread/writeMethods are blocking, andFacing the flowThe same operational efficiency
  • Set to false to indicate non-blocking mode
Register selector
Selector selector = Selector.open();
// Register to receive events
ssc.register(selector, SelectionKey.OP_ACCEPT);
Copy the code

If a channel represents a connection, then in non-blocking mode, the IO health of multiple connections can be monitored through the selector, so to speak, the selector and the channel are the relationship between monitoring and being monitored

Channel I/O events

Indicates that the channel has the conditions to complete an I/O operation, including the following events:

  • SelectionKey.OP_ACCEPT

Ready to receive

  • SelectionKey.OP_CONNECT

Can be connected

  • SelectionKey.OP_READ

Can be read

  • SelectionKey.OP_WRITE

Can write

Polls I/O events that you are interested in
  • Ready state query: by selectorselect()Query the ready state of all registered sockets. If any registered sockets are ready, add them to the ready list
  • And then we go through the loop
while (selector.select() > 0) {
    final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    while (iterator.hasNext()) {
        final SelectionKey selectionKey = iterator.next();
        // Prevent repeated processing
        iterator.remove();

        if (selectionKey.isAcceptable()) {
            / / receive
        } 
        if (selectionKey.isConnectable()) {
            / / the connection
        }
        if (selectionKey.isReadable()) {
            / / to read
        }
        if (selectionKey.isWritable()) {
            / / write}}}Copy the code

This is the basic operation, and the next step is to use the read() and write() operation Buffer, again with an actual example

Be careful about transitions between Buffer modes

NIO based peer-to-peer chat

  • The service side
public class ChatServer {

    /** * bind port number */
    public static int PORT = 8888;

    private static Map<String, SocketChannel> TOKEN_SOCKET_MAP = new HashMap<>();
    private static Map<SocketChannel, String> SOCKET_TOKEN_MAP = new HashMap<>();


    public static void main(String[] args) {
        try {
            new ChatServer().start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    ServerSocketChannel ssc = null;

    private void start(a) throws IOException {
        // Enable NIO ServerSocket channel
        ssc = ServerSocketChannel.open();
        // Bind ports
        ssc.bind(new InetSocketAddress(PORT));

        // Set it to non-blocking, which is the key point of NIO
        ssc.configureBlocking(false);

        / / selector
        Selector selector = Selector.open();
        // Register to receive events
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        // Poll the registration event
        while (selector.select() > 0) {
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // Remove to avoid repeated processing
                iterator.remove();

                if (selectionKey.isAcceptable()) {
                    // Get the connected client
                    SocketChannel accept = ssc.accept();

                    if (null == accept)
                        continue;

                    System.out.println(accept);

                    / / maintain Token
                    String token = UUID.randomUUID().toString();
                    TOKEN_SOCKET_MAP.put(token, accept);
                    SOCKET_TOKEN_MAP.put(accept, token);

                    // Set the client to non-blocking and register writable events
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_WRITE);
                }
                // If it is a read event
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();

                    ByteBuffer buffer = ByteBuffer.allocate(1024);

                    // Read data
                    StringBuilder sb = new StringBuilder();
                    while ((channel.read(buffer)) > 0) {
                        buffer.flip();
                        String msg = new String(buffer.array(), 0, buffer.limit());
                        sb.append(msg);
                        buffer.clear();
                    }

                    if (sb.length() > 0) {
                        System.out.println("----- Read data:"+ sb.toString()); writeMsg(channel, sb.toString()); }}// Write the event
                if (selectionKey.isWritable()) {

                    // Return the generated token to the client
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    String content = getTokenBySocket(channel) + ":" + getTokenBySocket(channel);

                    if (null! = channel) { send(channel, content); channel.register(selector, SelectionKey.OP_READ); } } } } }/** * process data and send *@paramChannel Current channel *@paramMSG Sends the content *@throws IOException IOException
     */
    private void writeMsg(SocketChannel channel, String msg) throws IOException {
        if (null == msg || "".equals(msg)) return;

        String[] split = msg.split(":");
        SocketChannel targetChannel = getSocketByToken(split[0]);
        String content = getTokenBySocket(channel) + ":" + split[1];

        send(targetChannel, content);
    }

    /** * Find SocketChannel through token **@param token token
     * @return SocketChannel
     */
    public SocketChannel getSocketByToken(String token) {
        return TOKEN_SOCKET_MAP.get(token);
    }

    /** * Find Token ** through channel@paramThe channel tunnel *@return String
     */
    public String getTokenBySocket(SocketChannel channel) {
        return SOCKET_TOKEN_MAP.get(channel);
    }

    /** * Send message * sender token: send content **@paramThe channel tunnel *@paramMSG message *@throwsIOException I/O exception */
    public void send(SocketChannel channel, String msg) throws IOException {
        System.out.println("Message sent by ---- #"+ msg); ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); channel.write(buffer); }}Copy the code
  • The client
public class ChatClient {

    private static Charset charset = StandardCharsets.UTF_8;

    public static void main(String[] args) throws IOException {
        / / open a SocketChannel
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", ChatServer.PORT));
        // Configure non-blocking
        socketChannel.configureBlocking(false);

        new Thread(() -> {
            try {
                Selector selector = Selector.open();
                socketChannel.register(selector, SelectionKey.OP_READ);

                while (selector.select() > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();

                        if (selectionKey.isReadable()) {
                            SocketChannel channel = (SocketChannel) selectionKey.channel();

                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            StringBuilder sb = new StringBuilder();
                            while ((channel.read(byteBuffer)) > 0) {
                                byteBuffer.flip();
                                String msg = new String(byteBuffer.array(), 0, byteBuffer.limit());
                                sb.append(msg);
                                byteBuffer.clear();
                            }
                            if (!"".equals(sb.toString())) {
                                System.out.println(sb.toString());
                            }
                            // Set readable eventsselectionKey.interestOps(SelectionKey.OP_READ); }}}}catch (Exception e) {

            }
        }).start();

        Scanner scanner = new Scanner(System.in);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (scanner.hasNext()) {
            String next = scanner.next();
            System.out.println("---- wants to send data:"+ next); buffer.put(next.getBytes()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); }}}Copy the code

The code was actually tested, and the message was sent according to the token: message body

Demo code, many places are not optimized, if interested in trying to update on this basis

The last word

DatagramChannel, which belongs to the UDP protocol, is not introduced here

Also, many other API methods are not discussed, but the documentation is provided, when used, can be checked now:

NIO Buffer

NIO Channel

NIO File