Art is long, life is long

Synchronous/asynchronous/blocking/non-blocking /IO

  • Synchronization: Refers to the coordinated pace. Since it is called synergy, there must be at least two things. The result of synergy is that multiple things can’t go on at the same time, they have to go on one after the other, and the next thing starts after the last thing ends.

There are two small things to know about synchronization:

  1. Scope, which does not need to be de-synchronized globally, but only at certain critical points. For example, there is only one window for selling rice in the canteen. It must be synchronized. After one person buys, the next person buys again. But when eating, one person eats, and the next person starts eating? Of course not.

  2. Granularity, it’s not just big granularity things that have synchronization, it’s small granularity things that have synchronization. It’s just that small-grained synchronization is usually supported naturally, while large-grained synchronization is often handled manually. Synchronization of two threads, for example, requires manual handling, but two statements in one thread are naturally synchronized.

  • Asynchrony: different paces. Since they are different, they are all different. So the result is this: multiple things can go on your own, I can go on my own, nobody cares about the other, everything is going on at the same time.

  • In a word, synchronous means that many things cannot work at the same time, and asynchronous means that many things can work at the same time.

    Note: Be sure to appreciate “multiple things.” Multiple threads are multiple things, multiple methods are multiple things, multiple statements are multiple things, and multiple CPU instructions are multiple things.

  • Blocking: Means blocking. Its original meaning can be understood as the immobility caused by encountering obstacles.

  • The so-called non-blocking: naturally is the opposite of blocking, can be understood as because there is no obstacle to continue unimpeded.

  • IO: refers to the process of reading/writing data, and the process of waiting to read/write data. Once you get the data, it becomes a data operation, not an IO. In the case of network IO, the waiting process is data from the network to the nic to the kernel space. The process of reading and writing is a copy of kernel space and user space.

    IO consists of two processes: one is the process of waiting for data, and the other is the process of reading and writing (copying) data. Also understand that the process of manipulating data must not be included.

Second, the BIO

Example:

BIO stands for blocking IO, and if you look at the following code,

public class BIOServer {

    public static void main(String[] args) throws IOException {
        // Create a thread pool
        ExecutorService executorService = Executors.newCachedThreadPool();
        / / create a serverSocket
        ServerSocket serverSocket = new ServerSocket(8888);
        System.out.println("Server started, port 8888...");
        while (true){
            System.out.println("Thread information ID ="+Thread.currentThread().getId()+ "Name ="+Thread.currentThread().getName());
            System.out.println("Waiting for connection...");
            / / to monitor
            final Socket accept = serverSocket.accept();
            System.out.println("There is a client connection...");
            executorService.execute(new Runnable() {
                @Override
                public void run(a) { handler(accept); }}); }}public static void handler(Socket socket){
        try {
            byte[] bytes = new byte[1024];
            // Get the input stream
            InputStream inputStream = socket.getInputStream();
            while (true){
                System.out.println("Thread information ID ="+Thread.currentThread().getId()+ "Name ="+Thread.currentThread().getName());
                System.out.println("read...");
                int read = inputStream.read(bytes);
                if(read! = -1) {// Outputs the data sent by the client
                    System.out.println(new String(bytes,0,read));
                }else {
                    break; }}}catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                socket.close();
            } catch(IOException e) { e.printStackTrace(); }}}}Copy the code

Start the service, console output:

The server has been started and the port number is8888.. Thread information ID =1Name =main Waiting for connection...Copy the code

On the CLI, enter Telnet 127.0.0.1 8888 and press Enter.

The server has been started and the port number is8888.. Thread information ID =1Name =main Waiting for connection... There is a client connection... Thread information ID =1Name =main Waiting for connection... Thread information ID =13Name = the pool -1-thread-1
read...
Copy the code

Press CTRL +] Enter; Enter data, for example:

Console output:

The server has been started and the port number is8888.. Thread information ID =1Name =main Waiting for connection... There is a client connection... Thread information ID =1Name =main Waiting for connection... Thread information ID =13Name = the pool -1-thread-1read... 100OK Thread id=13Name = the pool -1-thread-1
read...
Copy the code

Again, enter Telnet 127.0.0.1 8888 and press Enter. Console information:

The server has been started and the port number is8888.. Thread information ID =1Name =main Waiting for connection... There is a client connection... Thread information ID =1Name =main Waiting for connection... Thread information ID =13Name = the pool -1-thread-1read... 100OK Thread id=13Name = the pool -1-thread-1read... There is a client connection... Thread information ID =1Name =main Waiting for connection... Thread information ID =14Name = the pool -1-thread-2
read...
Copy the code

Press CTRL +] Enter; Enter data 200OK, the console information is as follows:

The server has been started and the port number is8888.. Thread information ID =1Name =main Waiting for connection... There is a client connection... Thread information ID =1Name =main Waiting for connection... Thread information ID =13Name = the pool -1-thread-1read... 100OK Thread id=13Name = the pool -1-thread-1read... There is a client connection... Thread information ID =1Name =main Waiting for connection... Thread information ID =14Name = the pool -1-thread-2read... 200OK thread information ID =14Name = the pool -1-thread-2
read...
Copy the code
  • The accept method cannot return as long as no client connects to the server. This is blocking; The same is true for read and write operations. In order to read data, it must wait for data to arrive before returning. This is called blocking.

  • The server creates a separate thread for each request

  • When the number of concurrent requests is large, a large number of threads need to be created, occupying large system resources

  • After a connection is established, if the current thread has no data to read, the thread blocks on read, resulting in a waste of thread resources

Third, the NIO

NIO server code:

public class NIOServer {

    // Selector
    private static Selector selector;

    public static void main(String[] args) throws IOException {
        // Create a channel manager.
        selector = Selector.open();
        // Create channel ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // Set the channel to non-blocking
        serverSocketChannel.configureBlocking(false);
        // Bind the ServerSocket corresponding to ServerSocketChannel to the specified port (port)
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(9090));
        /** * Register a Channel with the Selector manager, and register the selectionkey. OP_ACCEPT event for that Channel. * If the event does not reach selector. Select () will block. * /
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // loop processing
        while (true) {
            // The method returns when the registration event arrives, otherwise the method will remain blocked
            selector.select();
            // Get the listener event
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            // Iterative processing
            while (iterator.hasNext()) {
                // Get the event
                SelectionKey key = iterator.next();
                // Remove events to avoid repeated processing
                iterator.remove();
                // Check if a client request connection is ready to be accepted
                if (key.isAcceptable()) {
                    handleAccept(key);
                } else if (key.isReadable()) {// Check that the socket is ready to read datahandleRead(key); }}}}/** * Handle client connection success event */
    private static void handleAccept(SelectionKey key) throws IOException {
        // Get the client connection channel
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = server.accept();
        socketChannel.configureBlocking(false);
        // The information is sent to the client through the channel
        String msg = "Hello Client!";
        socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
        // Set a read event for the channel. The client listens to the read event and performs the read operation
        socketChannel.register(selector, SelectionKey.OP_READ);
    }


    /** * listen to the read event, read the message from the client */
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        // Reads data from the channel to the buffer
        ByteBuffer buffer = ByteBuffer.allocate(128);
        channel.read(buffer);
        // Outputs the message sent by the client
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("Server received MSG from client:+ msg); }}Copy the code

NIO client code:

public class NIOClient {

    // Selector
    private static Selector selector;

    public static void main(String[] args) throws IOException {
        // Create a channel manager.
        selector = Selector.open();
        // Create a SocketChannel
        SocketChannel channel = SocketChannel.open();
        // Set the channel to non-blocking
        channel.configureBlocking(false);
        // The client is connected to the server, but the connection is not implemented by the method execution. Channel.finishconnect () is called in the handleConnect method to complete the connection
        channel.connect(new InetSocketAddress("localhost".9090));
        /** * After registering a Channel with the Selector manager and registering selectionkey. OP_CONNECT for the Channel, when the event arrives, selection.select () returns, * If the event does not reach selector. Select () will block. * /
        channel.register(selector, SelectionKey.OP_CONNECT);
        // loop processing
        while (true) {
            /* * Select a set of events that can perform I/O operations, place them in the selector, the method on the client does not block, the selector wakeup method is called, the method returns, and for the client, the channel is always selected * this is different from the method on the server, Look at the API comments to see when at least one channel is selected. * /
            selector.select();
            // Get the listener event
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            // Iterative processing
            while (iterator.hasNext()) {
                // Get the event
                SelectionKey key = iterator.next();
                // Remove events to avoid repeated processing
                iterator.remove();
                // Check if there is a ready connected server success event
                if (key.isConnectable()) {
                    handleConnect(key);
                } else if (key.isReadable()) {// Check that the socket is ready to read datahandleRead(key); }}}}/** * Handle the successful client connection to the server */
    private static void handleConnect(SelectionKey key) throws IOException {
        // Get the channel to establish the connection with the server
        SocketChannel channel = (SocketChannel) key.channel();
        if (channel.isConnectionPending()) {
            // channel.finishConnect() to complete the connection
            channel.finishConnect();
        }
        channel.configureBlocking(false);
        // Data write channel
        String msg = "Hello Server!";
        channel.write(ByteBuffer.wrap(msg.getBytes()));
        // The channel registers with the selector, and the channel is only interested in reading events
        channel.register(selector, SelectionKey.OP_READ);
    }

    /** * listen to the read event, read the message from the client */
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        // Reads data from the channel to the buffer
        ByteBuffer buffer = ByteBuffer.allocate(128);
        channel.read(buffer);
        // The output server responds to the sent message
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("Message from the server:"+ msg); }}Copy the code

3.1 channel channel

As we said, the core of NIO is channels and caches, so they work like this:

Channels are similar to streams in IO, except that the same channel allows both reads and writes, and any stream is either a read stream or a write stream.

But you need to understand that channels, like streams, need to be based on physical files, and each stream or channel manipulates files through file Pointers. The “channel is bidirectional” also has a precondition, that is, channels are based on the readable and writable file Pointers of the RandomAccessFile “RandomAccessFile”.

“RandomAccessFile” is both readable and writable, so the channel based on it is bi-directional, so the statement “the channel is bi-directional” is not taken out of context.

The basic channel types are as follows:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

FileChannel is a file-based channel, SocketChannel and ServerSocketChannel are used to read and write NETWORK TCP socket DatagramChannel is used to read and write network UDP socket datagrams.

A channel cannot exist alone, it always needs to be bound to a cache, all data only exists in the cache, whether you write or read, the cache must channel to the disk file, or the disk file must channel to the cache. That is, the cache is the “starting point” and “end point” of the data.

3.2 Cache Buffer

An overview of the

  • Buffer: A container for a specific base data type. Defined by the java.nio package, all buffers are subclasses of the Abstract Buffer class.
  • Buffers in Java NIO are used primarily to interact with NIO channels, from which data is read into the Buffer and then written to the channel from the Buffer.
  • A Buffer is like an array that can hold multiple pieces of the same type of data. Depending on the data type (Boolean), there are the following common subclasses of Buffer: ByteBuffer, CharBuffer, ShortBuffer, IntBuffer, LongBuffer, FloatBuffer, DoubleBuffer. The above Buffer classes manage data in a similar way to yong, but each manages different types of data. Get a Buffer object as follows:
public static XxxBuffer allocate(int capacity) {}
Copy the code

Basic properties of buffers

  • Capacity: Indicates the maximum capacity of the Buffer. The capacity of the Buffer cannot be negative and cannot be changed once created.
  • Limit: The index of the first data that should not be read or written, that is, data after the limit cannot be read or written. Buffer limits cannot be negative and cannot exceed capacity.
  • Position: The index of the next data to read or write. The position of the buffer cannot be negative and cannot exceed its limit.
  • Mark and reset: The mark is an index that specifies a specific position in the Buffer through the mark() method in the Buffer, which can be restored by calling the reset() method.
  • 0 <= mark <= position <= limit <= capacity

3.3 Selector

Selector is a component of Java NIO that listens for various states of multiple channels and is used to manage multiple channels. But essentially because FileChannel does not support registering selectors, selectors are generally considered to serve the network socket channel.

Creating a Selector is usually done through the Selector factory method, Selector. Open:

Selector selector = Selector.open();
Copy the code

A channel that wants to register with a selector must adjust its mode to non-blocking, for example:

// Create a TCP socket channel
SocketChannel channel = SocketChannel.open();
// Adjust the channel to non-blocking mode
channel.configureBlocking(false);
// Register a channel with the selector
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
Copy the code

The code above is the simplest version of registering a channel to a selector. All channels that support registering a selector have a register method that registers the current instance channel to the specified selector.

The first argument to this method is the target selector, and the second argument is actually a binary mask that indicates which events of the current channel are of interest to the current selector. The following values are provided as enumerated types:

  • int OP_READ = 1 << 0;
  • int OP_WRITE = 1 << 2;
  • int OP_CONNECT = 1 << 3;
  • int OP_ACCEPT = 1 << 4;

The mechanism of using a binary mask to represent some state was also encountered when we talked about the virtual machine class file structure, which uses a binary bit to describe a state.

The register method returns an instance of SelectionKey that represents an association between a selector and a channel. You can call its selector method to return the current associated selector instance, or you can call its channel method to return the current associated channel instance.

In addition, SelectionKey’s readyOps method returns the set of ready events in the current channel of interest for the current selection, still returning an integer value, which is a binary mask.

Such as:

int readySet = selectionKey.readyOps();
Copy the code

If the value of readySet is 13 and the binary “0000 1101” is numbered backwards, with the first bit being 1, the third bit being 1, and the fourth bit being 1, then the selector associated channel is ready to read, write, and connect.

So, when we register a channel to the selector, we can listen for various events on that channel through the returned SelectionKey instance.

Of course, once multiple channels are registered in a selector, it is not possible to listen for channel events by recording the SelectionKey instances returned at the time of their registration. The selector should have a method to return all SelectionKey instances related to the channels that were successfully registered.

Set<SelectionKey> keys = selector.selectedKeys();
Copy the code

The selectedKeys method returns a collection of SelectionKey instances of all channels successfully registered in the selector. The SelectionKey instance of this collection allows us to get event readiness for all channels and handle them accordingly.

3.4 Advantages and disadvantages and bottlenecks

  • Advantages:
    • All connections are handled by one thread, which loops over and over again
  • Disadvantages:
    • A single thread making system calls in a loop can also use up CPU resources
  • The bottleneck of NIO
    • Because we need to constantly call the system call, each link we have to call the system call to ask if there is incoming data, if we know which connection has packets coming, we don’t have to go through and find the answer

Four, AIO

Asynchronous I/O, also known as NIO 2, introduced in Java 7 is an improved version of NIO, NIO 2, which is the Asynchronous non-blocking IO model known today as AIO.

Asynchronous IO is implemented based on events and callbacks, meaning that an action is applied and returned, not blocked, and when the background processing is complete, the operating system notifies the appropriate thread to proceed with the subsequent action.

Client, sample program:

public class AioClient {

    public static void main(String[] args) throws IOException, InterruptedException {
        // Open a client channel
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        // Establish a connection with the server
        channel.connect(new InetSocketAddress("127.0.0.1".9988));
        // Sleep for one second and wait for the connection to the server
        Thread.sleep(1000);

        try {
            // Send data to the server
            channel.write(ByteBuffer.wrap("Hello, this is the client.".getBytes())).get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        try {
            // Read the returned data from the server
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            channel.read(byteBuffer).get();// Write the data in the channel to the Buffer
            byteBuffer.flip();
            String result = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
            System.out.println("The client receives the message from the server:"+result);// Data returned by the server
        } catch(ExecutionException e) { e.printStackTrace(); }}}Copy the code

Server, example program:

public class AioServer {

    public AsynchronousServerSocketChannel serverSocketChannel;

    public void listen(a) throws Exception {
        // Open a server channel
        serverSocketChannel = AsynchronousServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9988));// Listen on port 9988
        / / to monitor
        serverSocketChannel.accept(this.new CompletionHandler<AsynchronousSocketChannel, AioServer>() {
            @Override
            public void completed(AsynchronousSocketChannel client, AioServer attachment) {
                try {
                    if (client.isOpen()) {
                        System.out.println("Received new client connection, address:" + client.getRemoteAddress());
                        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        // Read the data sent by the client
                        client.read(byteBuffer, client, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
                            @Override
                            public void completed(Integer result, AsynchronousSocketChannel attachment) {
                                try {
                                    // Read the request and process the data sent by the client
                                    byteBuffer.flip();
                                    String content = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
                                    System.out.println("The server receives data from the client:" + content);
                                    // Send data to the client
                                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                    writeBuffer.put("Server send".getBytes());
                                    writeBuffer.flip();
                                    attachment.write(writeBuffer).get();
                                } catch(Exception e) { e.printStackTrace(); }}@Override
                            public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
                                try {
                                    exc.printStackTrace();
                                    attachment.close();
                                } catch(IOException e) { e.printStackTrace(); }}}); }}catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // When a new client is added, the accept method is called directly and the execution continues recursively. This ensures that multiple clients can block
                    attachment.serverSocketChannel.accept(attachment, this); }}@Override
            public void failed(Throwable exc, AioServer attachment) { exc.printStackTrace(); }}); }public static void main(String[] args) throws Exception {
        // Start the server and listen for the client
        new AioServer().listen();
        // Since it is asynchronous I/O execution, let the main thread sleep but not closeThread.sleep(Integer.MAX_VALUE); }}Copy the code

This combination is complex and can only be used in very complex distributed situations, such as message synchronization between clusters. For example, Cassandra’s Gossip communication mechanism is asynchronous and non-blocking.

Five, multiplexer

This means that no matter how many connections come in, we can ask the kernel with a system call, and the kernel tells us which connections have changed, and then we do IO operations on those connections that have changed. The multiplexer solves the problem of state, but does not solve the problem of I/O read and write. It uses one system call to ask all THE I/O state, instead of every I/O, which reduces the switch from user mode to kernel mode.

Multiplexer implementation:

  • select
  • poll
  • epoll

First of all, the multiplexer is a kernel-level implementation, and now becomes an IO specification, Linux, Unix, Windows each major operating system has support, the difference is the specific implementation of different. Select, poll is an old implementation, epoll is the latest implementation, the two implementation ideas are different, but they both follow the select interface specification

Second, to know that Java for NIO, multiplexer this new IO model, specifically launched a new IO package, Java.nio.*, which contains all said above

Multiplexers allow a program to monitor multiple file operators, not fast IO reads and writes, but to tell you which file descriptors have changed

The select, poll

Poll has a maximum of 1024 connections. Poll does not have a maximum of 1024 connections

Implementation logic:

  1. The user process also maintains a collection of queues for all connections
  2. With the new system call SELECT (FDS), which asks the kernel which connections have changed, the method blocks, but the timeout can be set
  3. The kernel goes through all the connections one by one looking for changes in state and returns them to the user process

OK, it’s that simple

  • Advantages:
    • 1 system call on the line, saving a lot of user -> kernel – mode switch, much better performance
  • Disadvantages:
    • The kernel itself still has to iterate over all the connections, and operations in the kernel still take a performance toll, which can be significant in high-concurrency environments

epoll

Epoll allows the kernel to create space to record all connections by simply asking the kernel, saving a lot of file descriptors for each query

That’s how it works, if you look at the picture

Epoll three methods

  • epoll_create(): epoll Creates a space for listening objects
  • epoll_ctl(): Adds a type of listener to epoll
  • epoll_wait(): User query results. This method blocks, but the timeout period can be set

Combined with kernel calls:

socket --> 3
bind(3.8090)
listen(3)
epoll_create() --> 7: epoll Creates a space to store the registered listening content epoll_ctl(7,ADD,3,accept) : Add a listener to the socket file of typeaccept
accept(3)--> 8: the connection is now connectedepoll_ctl(7,ADD,8,read): Adds listening, the file is connected, the type is read/write epollwait(a)
Copy the code
  • Advantages:
    • There is no need to pass a large number of file descriptors to query results, and these file descriptors also require memory operations
    • Can effectively use the multi-core, the operation monitoring in the kernel is no longer as traversal type coherent operation, need to be executed in one breath, monitor the connection state transformation, there is no consistency between tasks, can be executed by multiple cores, encounter the task, which core is free which core is written
  • Disadvantages:
    • In the scenario of large concurrency, a single thread manages all connections, and a single wait() operation still takes a long time. The interval between waits () may be long, resulting in delayed connection responses.

Multiplex + multithreading

** The bottleneck of epoll is that the hLOD does not live on a single thread under large concurrency, and a single wait() takes too long, affecting the connection response

So I thought of using multiple threads + multiple SELECT with the idea of using

  1. Each select runs in a separate Thread
  2. One of the select acts as the controller, and after receiving the connection, assigns the connection to the different SELECT to register
  3. The rest of the complex select is responsible for reading and writing the connection
class Test{

    void main3(a) throws IOException {

        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        socketChannel.bind(new InetSocketAddress(8090));
        socketChannel.configureBlocking(false);

        Selector selector_root = Selector.open();
        Selector selector_work1 = Selector.open();
        Selector selector_work2 = Selector.open();
        Selector[] works = {selector_work1, selector_work2};
        AtomicInteger index = new AtomicInteger(0);

        socketChannel.register(selector_root, SelectionKey.OP_ACCEPT);

        while (selector_root.select(200) > 0) {
            Set<SelectionKey> keys = selector_root.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();

                // When new connections come in, they are sent to other select for load balancing
                if (selectionKey.isAcceptable()) {
                    SocketChannel channel = ((ServerSocketChannel) selectionKey.channel()).accept();
                    channel.configureBlocking(false);
                    channel.register(works[index.get() % 2], SelectionKey.OP_READ, ByteBuffer.allocateDirect(4096));
                    index.incrementAndGet();
                }
            }

        }
    }
}
Copy the code

Tomcat, Netty, Nginx, Redis using this idea, the difference is that some of the multi-threaded, some of the single thread.