1 Selector and registration

1.1 Relationship between selector and channel

The mission of selector is to complete IO multiplexing, its main work is channel registration, monitoring, event query.

  1. A channel represents a connection channel, through the selector can monitor IO (input and output) of multiple channels at the same time.
  2. The relationship between a selector and a channel is one of monitoring and being monitored.

In NIO programming, it is common for a single thread to process a selector, and a selector can monitor many channels. So, with selectors, a single thread can handle hundreds, thousands, tens of thousands, or even more channels. In extreme cases (tens of thousands of connections), a single thread can handle all channels, dramatically reducing the overhead of context switching between threads.

1.2 Registering selectors

The association between channels and selectors is done through a register. A Channel instance can be registered with a Selector by calling the Channel’s channel. register(Selector sel, int OPS) method.

// The first argument specifies the selector instance to which the channel is registered;
// The second parameter specifies the type of I/O events that the selector will monitor.
public final SelectionKey register(Selector sel, int ops)
    throws ClosedChannelException;
Copy the code

The following four channel I/O event types can be monitored by the selector:

(1) readable: selectionkey.op_read. (2) selectionkey.op_write. (3) connection: selectionkey.op_connect (4) accept: selectionkey.op_acceptCopy the code

The above event type constants are defined in the SelectionKey class. If the selector to monitor channel of a variety of events, you can use “|” operators to implement

int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ;
Copy the code

1.3 the IO events

An I/O event is not an I/O operation on a channel, but a channel is in the ready state of an I/O operation, indicating that the channel is ready to perform an I/O operation.

For example,

  1. An OP_CONNECT event occurs when a SocketChannel transport completes a three-way handshake with its peer.
  2. A ServerSocketChannel server connects to a listening channel, and when it listens for a new connection, an OP_ACCEPT event occurs.
  3. When a SocketChannel has data to read, an OP_READ event occurs.
  4. When a SocketChannel waits for data to be written, an OP_WRITE event occurs.

2 SelectableChannel and SelectionKey

Not all channels can be monitored or selected by selectors. For example, FileChannel cannot be reused by selectors. One prerequisite for determining whether a channel can be monitored or selected by a selector is whether it inherits the abstract class SelectableChannel. If so, it can be selected; otherwise, it cannot be selected.

The SelectableChannel class provides the public methods needed to implement channel selectivity. All network connection socket channels in Java NIO inherit the SelectableChannel class and are optional. FileChannel does not inherit from SelectableChannel and is therefore not an optional channel.

Once the channel and Selector monitoring relationship has been registered, ready events can be selected, which can be done by calling Selector’s Select () method. With the select() method, the selector can continuously select the ready state of the operation occurring in the channel, returning those IO events of interest that have been registered.

The select() method is a blocking method that returns when an event is ready

Selectionkeys are the IO events that are selected by the selectors. When an IO event occurs, it is selected by the SelectionKey if it was previously registered in the SelectionKey. If the IO event occurs, it will not be selected by the selector if it has not been registered before. The relationship between SelectionKey and IO can be simply understood as the SelectionKey is the selected IO event.

SelectionKey is very powerful when it comes to actual programming. By using the SelectionKey, you can obtain the IO event type of the channel (such as selectionkey.op_read) and the channel where the IO event occurred. In addition, you can obtain a selector instance.

3. Selector usage flow

  1. Gets the selector instance. The selector instance is obtained by calling the static factory method open()
// Call the static factory method open() to get the Selector instance
Selector selector = Selector.open();
Copy the code

Inside the Selector class method open() is a request to the Selector SPI to get a new Selector instance through the default SelectorProvider object. Service Provider Interface (SPI) in Java is an extensible Service provision and discovery mechanism. Java provides a default implementation version of the selector via SPI. That is, other service providers can provide dynamic replacements or extensions to customized versions of selectors via SPI.

  1. Registers channels with selector instances. To implement a selector management channel, you need to register the channel with the appropriate selector
// Get the channel
ServerSocketChannelserverSocketChannel = ServerSocketChannel.open();
// Set it to non-blocking
serverSocketChannel.configureBlocking(false);
// Bind the connection
serverSocketChannel.bind(new InetSocketAddress(18899));
// Register the channel with the selector and specify the listening event as "receive connection"ServerSocketChannel. Register (selector, SelectionKey OP_ACCEPT);Copy the code

Note:

1. Registered with the selector channel must be in non-blocking mode, otherwise it will be thrown IllegalBlockingModeException anomalies. This means that a FileChannel cannot be used with a selector because it only has blocking mode and cannot be switched to non-blocking mode; All channels associated with sockets can be used. 2. Second, a channel does not necessarily support all four IO events. For example, ServerSocketChannel, the server listening channel, supports only Accept (receiving a new connection) I/O events, whereas SocketChannel, the transport channel, does not support Accept type I/O events.Copy the code
  1. Selects the IO ready events of interest (select the key set). The registered and ready IO events are selected by the Selector select() method and stored in the SelectionKey collection. The SelectionKey collection is stored inside the selector instance, and its elements are instances of the SelectionKey type. Call the selectedKeys() method of the selector to get the collection of selection keys. Iterate over each selection key in the set to perform the corresponding business operation according to the specific IO event type
// Poll, select IO ready events of interest (select key set)
while (selector.select() > 0) {
        Set selectedKeys = selector.selectedKeys();
        Iterator keyIterator = selectedKeys.iterator();
        while(keyIterator.hasNext()) {
               SelectionKey key = keyIterator.next();
// Perform service operations based on the SPECIFIC I/O event type
                if(key.isAcceptable()) {
                  //IO event: ServerSocketChannel The server listens for a new connection on the channel
                } else if (key.isConnectable()) {
                  //IO event: The transmission channel connection is successful
                } else if (key.isReadable()) {
                  //IO event: the transport channel is readable
                } else if (key.isWritable()) {
                  //IO event: the transport channel is writable
                }
                // When the process is complete, remove the select keykeyIterator.remove(); }}Copy the code

Once the processing is complete, the SelectionKey needs to be removed from the SelectionKey collection to prevent it from being processed again the next time through the loop. SelectionKey collection can’t add elements, if trying to add elements to SelectionKey, would throw the Java. Lang. UnsupportedOperationException anomalies.

The select() method used to select ready IO events has multiple overloaded implementations as follows:

(1) SELECT () : blocks the call until a registered IO event has occurred on at least one channel. (2) select (longTimeout) : Same as select(), but the maximum blocking time is the number of milliseconds specified by timeout. (3SelectNow () : non-blocking, returns immediately with or without IO events.Copy the code

The return value of the select() method is an integer (int) that represents the number of I/O events that occurred, that is, the number of channels that I/O events occurred between the last select and this select, or more precisely, the number of I/O events that the selector is interested in (registered with).

NIO implements Discard server

The function of the Discard server is simple: it only reads the input data from the client channel, closes the client channel after reading, and Discard the read data directly.

1. The service side

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/ * * *@author wyaoyao
 * @date2021/6/21 18:04 * /
@Slf4j
public class NioDiscardServer {

    private final int port;

    private Selector selector;

    private ServerSocketChannel serverChannel;

    private final SocketAddress socketAddress;

    private final Charset charset = Charset.forName("utf-8");

    public NioDiscardServer(int port) throws IOException {
        this.port = port;
        this.socketAddress = new InetSocketAddress("localhost", port);
    }


    public void startServer(a) throws IOException {
        // Create a selector
        this.selector = Selector.open();
        // Get a channel
        this.serverChannel = ServerSocketChannel.open();
        // Set it to non-blocking
        serverChannel.configureBlocking(false);
        // Bind ports
        this.serverChannel.bind(this.socketAddress);
        log.info("NIO discard server start success; the port is [{}]. ".this.port);
        // Register the connection event
        this.serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        // Poll the time of interest
        while (this.selector.select() > 0) {
            // Get the selection key set
            Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // What is the event
                if (selectionKey.isAcceptable()) {
                    // Connect ready event, then get the connection
                    handleAccept(selectionKey);
                } else if (selectionKey.isReadable()) {
                    // If the IO event is readable, data is read
                    handleRead(selectionKey);
                }
                / / remove the keyselectionKeys.remove(selectionKey); }}}public void close(a) throws IOException {
        if (this.selector ! =null) {
            this.selector.close();
        }
        if (this.serverChannel ! =null) {
            this.serverChannel.close(); }}private void handleRead(SelectionKey selectionKey) throws IOException {
        // Get the current channel
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        // Read data
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int len;
        StringBuilder content = new StringBuilder();
        while((len = channel.read(buffer)) ! = -1) {
            // Switch buffer to read mode
            buffer.flip();
            content.append(new String(buffer.array(), 0, len, charset));
            buffer.clear();
        }
        channel.close();
        // Prints data
        log.info("read data is [{}]", content.toString());
    }

    private void handleAccept(SelectionKey selectionKey) throws IOException {
        // Get the connection
        SocketChannel socketChannel = this.serverChannel.accept();
        // Set it to non-blocking
        socketChannel.configureBlocking(false);
        // Register readable events for the newly connected channel with the selector
        // The new socketChannel client transfer channel must be registered with the same selector.
        // The same thread can be used to continuously select key queries for all registration channels.
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }

    public static void main(String[] args) throws IOException {
        NioDiscardServer server = new NioDiscardServer(10010); server.startServer(); }}Copy the code

During event processing, new socketChannel client transport channels are registered with the same selector, so that the same selector thread can be used to continuously query all registered channels for the select key.

  1. Write a simple client test

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.TimeoutException;

/ * * *@author wyaoyao
 * @date 2021/6/22 13:36
 */
@Slf4j
public class NioDiscardClient {

    private final String serverHost;

    private final int serverPort;

    private final SocketAddress serverSocketAddress;

    private final SocketChannel socketChannel;

    private final Charset charset = Charset.forName("utf-8");

    public NioDiscardClient(String serverHost, int serverPort) throws IOException {
        this.serverHost = serverHost;
        this.serverPort = serverPort;
        this.serverSocketAddress = new InetSocketAddress(serverHost, serverPort);
        this.socketChannel = SocketChannel.open();
        // Set it to non-blocking
        this.socketChannel.configureBlocking(false);
    }

    public void connect(long timeout) throws IOException, TimeoutException {
        this.socketChannel.connect(this.serverSocketAddress);
        long end = System.currentTimeMillis() + timeout;
        while (!this.socketChannel.finishConnect()) {
            if (System.currentTimeMillis() >= end) {
                throw new TimeoutException("connect.time.out");
            }
        }
        log.info("client connect server success");
    }

    public void sendMessage(String message) throws IOException {
        ByteBuffer encode = this.charset.encode(message);
        this.socketChannel.write(encode);
        this.socketChannel.shutdownOutput();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        NioDiscardClient client = new NioDiscardClient("localhost".10010);
        client.connect(1000);
        client.sendMessage("hello world"); }}Copy the code