This is the 9th day of my participation in the November Gwen Challenge. See details: The Last Gwen Challenge 2021.

1. Blocking and non-blocking

1.1 the blocked

1.1.1 What are the problems of blocking mode?

1) In blocking mode, each of the following methods causes the thread to pause

  • ServerSocketChannel. Will accept without the thread is stopped when the connection is established
  • Socketchannel. read suspends the thread when there is no data to read
  • When a thread is blocked, it is suspended. The pause does not consume CPU, but the thread is idle

2) Under single thread, blocking methods affect each other and can hardly work normally, requiring multithreading support

3) Under multi-threading, there are new problems, which are reflected in the following aspects

  • The maximum stack for a thread on a 32-bit JVM is 320K, and the maximum stack for a thread on a 64-bit JVM is 1024K. If there are too many connections, it will result in OOM, and there are too many threads, which will cause performance degradation due to frequent context switching.

  • A thread pool technique can be used to reduce the number of threads and thread context switches, but it does not address the symptoms. If you have many connections set up but inactive for a long time, all threads in the thread pool will be blocked. Therefore, it is not suitable for long connections, but only for short connections

1.1.2 Test code:

Server code:

Public class BioServerTest {public static void main(String[] args) throws IOException { Single thread // 0. ByteBuffer = ByteBuffer. Allocate (16); ServerSocketChannel SSC = serverSocketChannel.open (); // 2. Bind the listening port ssc.bind(new InetSocketAddress(8080)); List<SocketChannel> channels = new ArrayList<>(); Println ("connecting...") {// 4. Accept ("connecting...") {// 4. ); SocketChannel sc = ssc.accept(); SocketChannel sc = ssc.accept(); System.out.println("connected... " + sc); channels.add(sc); for (SocketChannel channel : channels) { // 5. Println (" Before read...") +channel); Try {// block the method and the thread stops running channel.read(buffer); } catch (IOException e) { } buffer.flip(); System.out.println(print(buffer)); buffer.clear(); System.out.println("after read..." +channel); } } } static String print(ByteBuffer b) { StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < b.limit(); i++) { stringBuilder.append((char) b.get(i)); } return stringBuilder.toString(); }}Copy the code

Client code:

public class SocketClientTest{ public static void main(String[] args) throws Exception { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8080)); System.out.println("waiting..." ); While (true) {}}}Copy the code

In connecting connection, a thread is blocked:

connecting...
Copy the code

Start the client and see the result, then the connection is successful, and then block until the message is received:

connecting... connected... Java nio. Channels. A SocketChannel [connected local = / 127.0.0.1: remote = 8080/127.0.0.1:64800] before the read... Java nio. Channels. A SocketChannel [connected local = / 127.0.0.1: remote = 8080/127.0.0.1:64800]Copy the code

1.2 a non-blocking

What has 1.2.1 changed compared to blocking?

In non-blocking mode, the methods do not suspend the thread

  • When the ServerSocketChannel. Accept no connection is established, returns null, continue to run
  • SocketChannel. When there is no data to read read, returns 0, but don’t have to be blocked, thread can read or to perform other SocketChannel to execute the ServerSocketChannel. Accept
  • When writing data, the thread simply waits for the data to be written to a Channel, rather than waiting for the Channel to send the data across the network

1.2.2 What are the problems with the non-blocking model?

1) But in non-blocking mode, even if no connection is established and no data is readable, the thread is still running, wasting CPU

2) Threads are actually blocked during data replication (AIO improvement)

1.2.3 Test the code

Server code:

Public class NonIoServerTest {public static void main(String[] args) throws IOException {// Use niO to understand non-blocking mode, Single thread // 0. ByteBuffer = ByteBuffer. Allocate (16); ServerSocketChannel SSC = serverSocketChannel.open (); // Non-blocking mode ssc.configureBlocking(false); // 2. Bind the listening port ssc.bind(new InetSocketAddress(8080)); List<SocketChannel> channels = new ArrayList<>(); While (true) {// 4. Accept establishes a connection with the client. SocketChannel is used to communicate with the client. But sc is null SocketChannel sc = ssc.accept(); if (sc ! = null) { System.out.println("connected... " + sc); // Sc.configureBlocking (false); // Sc.configureblocking (false); channels.add(sc); } for (SocketChannel channel : channels) { // 5. Int read = channel.read(buffer); // The thread will continue to run if no data is read. if (read > 0) { buffer.flip(); System.out.println(print(buffer)); buffer.clear(); System.out.println("after read..." +channel); System.out.println("wait connecting...") ); } } static String print(ByteBuffer b) { StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < b.limit(); i++) { stringBuilder.append((char) b.get(i)); } return stringBuilder.toString(); }} Start the server as follows:Copy the code
wait connecting... wait connecting... wait connecting... wait connecting... wait connecting... wait connecting... wait connecting... wait connecting... wait connecting... . .Copy the code

Println (” Wait connecting…”) ); This line of code is commented out so you can see the result.

Start the client and see the server result:

connected... Java nio. Channels. A SocketChannel [connected local = / 127.0.0.1: remote = 8080/127.0.0.1:61254]Copy the code

Second, multiplexing

A single thread can work with a Selector to monitor read and write events for multiple channels, which is called multiplexing.

  • Multiplexing is only for network IO, common file IO cannot use multiplexing

  • If you use non-blocking mode instead of using selector, the thread will spend most of its time doing nothing, and using selector guarantees three things:

    • Connect only when there are connectable events
    • Read only when there are readable events
    • Write only when there is a writable event (limited by network transmission capacity, a Channel may not always be writable, once a Channel is writable, it will trigger a writable event of the Selector)

2.1 the Selector

Benefits of the above scheme:

  • A thread, in conjunction with a selector, can monitor events for multiple channels before they are processed by the thread. Avoid useless work done in non-blocking mode.
  • Let the thread be fully utilized
  • Saves the number of threads
  • Reduced thread context switching

2.1.1 How to Use Selector?

The following code and comment description:

Public class SelectorTest {public static void main(String[] args) throws IOException {// Create Selector Selector Selector = Selector.open(); // Set channel events (SelectionKey); OP_ACCEPT, OP_CONNECT, OP_READ, OP_WRITE) SocketChannel Channel = socketchannel.open (); channel.configureBlocking(false); SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_ACCEPT); Int count1 = select.select (); int count1 = selector (); Int count2 = selector. Select (1000); // Listen for method 2 and block until a binding event occurs or a timeout occurs. Int count3 = selectNow(); int count3 = selectNow(); }}Copy the code

In this code, when a selector listens for a channel time, it blocks until that time happens. So what happens when a thread becomes unblocked? As follows:

1) When an event occurs (OP_ACCEPT, OP_CONNECT, OP_READ, OP_WRITE)

  • When a client initiates a connection request, the Accept event is triggered
  • Read events are triggered when the client sends data, both normally and abnormally, and multiple read events are triggered if the sent data is larger than the buffer buffer
  • A channel is writable and triggers a write event
  • When niO bugs occur under Linux

Call selector. Wakeup (). Call selector. Close ()

2.2 Handling the Accept Event

The server is as follows:

public class AcceptEventServerTest { public static void main(String[] args) { try { ServerSocketChannel channel = ServerSocketChannel.open(); channel.bind(new InetSocketAddress(8080)); System.out.println(channel); Selector selector = Selector.open(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int count = selector.select(); // int count = selector.selectNow(); System.out.println("select count: " + count); // if(count <= 0) { // continue; Set<SelectionKey> keys = selection.selectedKeys (); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); If (key.isacceptable ()) {ServerSocketChannel c = (ServerSocketChannel) key.channel(); // Must handle SocketChannel sc = c.acept (); System.out.println(sc); } // The event must be removed from ite.remove (); } } } catch (IOException e) { e.printStackTrace(); }}}Copy the code

The clients are as follows:

Public class ClientTest {public static void main(String[] args) {// Accept event try (Socket Socket = new Socket("localhost", 8080)) { System.out.println(socket); // read the event socket.getOutputStream().write("world".getbytes ()); System.in.read(); } catch (IOException e) { e.printStackTrace(); }}}Copy the code

Server print result:

sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080] select count: 1 Java nio. Channels. A SocketChannel [connected local = / 127.0.0.1: remote = 8080/127.0.0.1:60469]Copy the code

The server comments out the method that uses selectNow() in the code above, and needs to determine for itself whether the return value is 0 if you use it.

After an event occurs, either process it or cancel it. You can’t do nothing, otherwise the event will still fire the next time, because niO’s underlying use of horizontal firing.

2.3 Handling read Events

The client is the same as the previous client, except that two clients are started at the same time, sending “Hello” and “world” respectively.

The server code looks like this:

public class ReadEventServerTest { public static void main(String[] args) { try { ServerSocketChannel channel = ServerSocketChannel.open(); channel.bind(new InetSocketAddress(8080)); System.out.println(channel); Selector selector = Selector.open(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int count = selector.select(); System.out.println("select count:" + count); Set<SelectionKey> keys = selection.selectedKeys (); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); If (key.isacceptable ()) {ServerSocketChannel c = (ServerSocketChannel) key.channel(); // Must handle SocketChannel sc = c.acept (); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); System.out.println(" connection established :" + sc); } else if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(128); int read = sc.read(buffer); if (read == -1) { key.cancel(); sc.close(); } else { buffer.flip(); System.out.println(print(buffer)); }} // The event must be removed iter. Remove (); } } } catch (IOException e) { e.printStackTrace(); } } static String print(ByteBuffer b) { StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < b.limit(); i++) { stringBuilder.append((char) b.get(i)); } return stringBuilder.toString(); }}Copy the code

First, the server channel registers itself with the selector. Client 1 sends the Accept event. After receiving it, the server continues the while loop, listening for the read event, printing “Hello”.

Sun. Nio. Ch. ServerSocketChannelImpl [080] / 0:0:0:0:0:0:0:0:8 select count: 1 connection has been established: Java nio. Channels. A SocketChannel [connected Local =/127.0.0.1:8080 remote=/127.0.0.1:61693] select count:1 hello select count:1 Connection has been established: Java nio. Channels. A SocketChannel [connected local = / 127.0.0.1: remote = 8080/127.0.0.1:51495] select count: 1 worldCopy the code

Note: the final iter. Remove (), why is it removed? Because select puts the associated key into the selectedKeys collection after the event, it will not be removed from the selectedKeys collection after processing, so we need to code the deletion ourselves. For example,

  • The accept event on ssckey was triggered for the first time and ssckey was not removed
  • The second read event on ssckey is raised, but the ssckey is still in selectedKeys. This will cause a null pointer exception because no real serverSocket is connected

What does Cancel do in the above code? Cancel unregisters a channel on the selector, and removes the key from the keys set and no further events are listened for

2.3.1 Focus on message boundaries

First, see if there is a problem with the following code, the client code is as follows:

public class ServerTest { public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(9000); while (true) { Socket s = ss.accept(); InputStream in = s.getInputStream(); Byte [] arr = new byte[4]; while (true) { int read = in.read(arr); If (read == -1) {break; } System.out.println(new String(arr, 0, read)); }}}}Copy the code

The server code is as follows:

public class ClientTest { public static void main(String[] args) throws IOException { Socket max = new Socket("localhost", 9000); OutputStream out = max.getOutputStream(); out.write("hello".getBytes()); out.write("world".getBytes()); Out. Write (" hello ". GetBytes ()); Out. Write (" world ". GetBytes ()); max.close(); }}Copy the code

Results:

Hell owor ldCopy the code

Why do these problems arise? This involves the question of message boundaries. The length of the message is different. When we specify the same length of ByteBuffer to receive the message, there must be many situations in different time periods, as shown below:

Since the buffer length is fixed, messages will inevitably be truncated. How to solve these problems?

1) One idea is to fix the message length, the packet size is the same, the server read according to the predetermined length, the disadvantage is a waste of bandwidth

2) Another idea is to split by separator, the disadvantage is low efficiency

3) TLV format, that is, Type Type, Length Length and Value data. When the Type and Length are known, it is convenient to obtain message size and allocate appropriate buffer. The disadvantage is that buffer needs to be allocated in advance, if the content is too large, it will affect server throughput

  • Http 1.1 is in TLV format
  • Http 2.0 is the LTV format

None of the answers given by the surface is the best solution. The important question is how to allocate the size of the Bytebuffer?

A buffer is reserved for a channel and cannot be used by multiple channels because of sticky packets and half packets.

The buffer size should not be too large. If you want to support a large number of connections and set a large buffer at the same time, you will need a large amount of memory.

So we need to set a variable size ByteBuffer.

At present, there are two relatively simple implementation schemes, both of which have their advantages and disadvantages:

1) Pre-allocate a smaller buffer, such as 4K. If it is found that it cannot hold all the contents, create a larger buffer, such as 8K, copy the written 4K to the newly allocated 8kbuffer, and continue writing the remaining contents.

The advantage is that the message must be continuous, but the continuous allocation and copy will have a great impact on performance.

2) Use the form of multiple arrays to form a buffer. When one array cannot hold the data content, put the remaining data into the next array. The CompositeByteBuf class in Netty does this.

Its disadvantage is that the data is discontinuous, which needs to be analyzed and integrated again. Its advantage is that it solves the problem of performance loss caused by the previous scheme.

2.4 Handling write Events

What is the two-stage strategy?

There are two reasons for its occurrence:

1) In non-blocking mode, we cannot guarantee that all the data in the buffer will be written to the channel, so we need to track the return value after writing, that is, the value of the bytes actually written.

int write = channel.write(buffer);
Copy the code

2) We can make all the selectors listen for writable events of the channel, and each channel will have a key to track the buffer, which will take up too much memory. (If you don’t understand this point, you can use the following code to understand)

In view of the above problems, a two-stage strategy emerges:

1) We register a channel with a selector when the message is first written

2) If the first write is not complete, add the write event again, check the writable event on the channel, if all data is written, cancel the channel registration (do not cancel the write event will occur every time).

The following code is used to demonstrate: server:

Public class ServerTest {public static void main(String[] args) throws IOException {// Start a service channel ServerSocketChannel ssc = ServerSocketChannel.open(); // Set non-blocking ssc.configureblocking (false); // Bind port Ssc. bind(new InetSocketAddress(8080)); // initialize selector selector selector = selector. Open (); Register (selector, selectionkey.op_accept); While (true) {// listen on the event, where selector. Select () is blocked; Iterator<SelectionKey> iter = selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); // Remove key iter. Remove (); If (key.isacceptable ()) {SocketChannel sc = ssc.accept(); // Set sc.configureblocking (false); SelectionKey sckey = sc.register(selector, selectionkey.op_read); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 30000000; i++) { sb.append("a"); ByteBuffer = charset.defaultCharset ().encode(sb.tostring ()); //2. Write data to client channel int write = sc.write(buffer); System.out.println(" Bytes actually written :" + write); // 3. If (buffer.hasRemaining()) {// If (buffer.hasRemaining()) { InterestOps (sckey.interestops () + selectionkey.op_write); Sckey. attach(buffer); }} else if (key.iswritable ()) {ByteBuffer = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); Int write = sc.write(buffer); System.out.println(" actually write bytes :" + write); // If the write is complete, the attached buffer should be removed, and the write event should be removed. buffer.hasRemaining()) { key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); key.attach(null); } } } } } }Copy the code

Client:

Public class ClientTest {public static void main(String[] args) throws IOException {// Enable selector selector selector = Selector.open(); SocketChannel sc = socketchannel.open (); // Set sc.configureblocking (false); / / register a connection and read events sc. Register (the selector, SelectionKey OP_CONNECT | SelectionKey. OP_READ); // Connect to server sc.connect(new InetSocketAddress("localhost", 8080)); int count = 0; While (true) {// Monitor the event here, blocking selector. Select (); Iterator<SelectionKey> iter = selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); // Remove iter. Remove (); If (key.isConnecTable ()) {system.out.println (sc.finishConnect()); } else if (key.isreadable ()) {allocate memory ByteBuffer = ByteBuffer. Allocate (1024 * 1024); // Read data to buffer count += sc.read(buffer); Buffer.clear (); // Print the total number of bytes system.out.println (count); } } } } }Copy the code

Start the server and client respectively, and the result is as follows:

Actual written bytes :3801059 Actual written bytes :3014633 Actual written bytes :4063201 Actual written bytes :4718556 Actual written bytes :2490349 Actual written bytes :2621420 Actual written bytes :2621420 Actual written bytes :2621420 Actual written bytes :1426522Copy the code
true 131071 262142 393213 524284 655355 ... . 29753117 29884188 30000000Copy the code

Guys, if you see this, give it a thumbs up. Thanks