NIO
What is the NIO
NIO is a Non Blocking IO framework. The server implementation pattern is that a thread can process multiple requests (connections). All connection requests sent by the client are registered to the multiplexer selector, and the multiplexer surveys the connection and processes the IO request
Why NIO
In a common network service, if each client maintains a connection to the login server. Then the server will maintain multiple connections to the client to communicate with contnect, read, and write of the client, especially for long link services. How many C ends need to maintain the same IO connection on the S end. This is a big overhead for the server (famous C10K problem)
NIO how to play
NIO server for Reactor mode
public static void main(String[] args) throws IOException {
ServerSocketChannel socketChannel = ServerSocketChannel.open();
// Bind the port
socketChannel.bind(new InetSocketAddress(8080));
// Declare async
socketChannel.configureBlocking(false);
// Declare the multiplexer and create the Epoll file description object under Linux
Selector selector = Selector.open();
// Register connection events
socketChannel.register(selector,SelectionKey.OP_ACCEPT);
while (true) {// Add the selectionKey that triggers the event to the set using the epoll of the operating system
selector.select();
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
// Traverse the selectionKey that triggers the event. The selectionKey contains the socketChannel
while (iterator.hasNext()){
final SelectionKey next = iterator.next();
// Determine the event type
if(next.isAcceptable()){
// If it is a connection event, register the corresponding SocketChannel to read the event
final ServerSocketChannel channel = (ServerSocketChannel) next.channel();
final SocketChannel accept = channel.accept();
accept.configureBlocking(false);
accept.register(selector,SelectionKey.OP_READ);
}else if (next.isReadable()){
// If it is a read event, read it directly
final SocketChannel channel = (SocketChannel) next.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
final int read = channel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(),0,read)); }}// Remove the socketChannel from the set to avoid secondary processingiterator.remove(); }}Copy the code
How do you run and test this NIO server?
In Windows, you can run the Telnet command on the cli tool to connect to the server and send data. Telnet localhost 8080 Connect to the server. CTRL +] Opens the send page and sends the message to the server by using the send command
How does NIO accept socket requests and read data?
When the NIO server is started, a Linux epoll/ Windows SELECT instance object is created with Selector Selector = Selector.open(), and then when the Selector.select(), The event and channel are registered in ePoll and wait for the event to occur. If the event occurs, it will be added to SelectionKey from the rdList of ready events inside the operating system. And then I’m going to process it.
What pain points does NIO address in BIO, and what other drawbacks
- NIO has solved the
- BIO thread blocking, read/write blocking, thread waiting time is too long
- Excessive server CPU and thread waste due to too many client connections (C10K)
- NIO also has the following drawbacks
- Does not intrinsically solve the C10K problem [if there are 100,000 connections, reading and writing data at the same time, single selector, NIO loop takes a long time]
- Selectors. Select (), empty polling bug, when the select() method does not get an event, it may not block, directly execute down, and once appear will continue to appear.
- The code is complex and requires a lot of exception IO handling
Netty
Netty is a high performance asynchronous event driven network application framework based on NIO and designed by Reactor model.
What pain points did Netty address in NIO
- Fixed the empty polling bug of selselector. Select ()
- Simplified development, Netty internal has been for disconnection, network intermittent disconnection, heartbeat processing, half packet read and write, network congestion and abnormal flow processing.
- Netty internally initializes multiple selectors to avoid C10K problems
Netty how to play
Netty server
public class NettyService {
public static int port = 8080;
public static void main(String[] args) {
// Create a selector thread group that accepts the Accept event
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// Create a selector thread group that handles business processing events such as read/write
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// Service processing handler
socketChannel.pipeline().addLast(new DefaultChannelInboundHandlerAdapter());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}}Copy the code
Service processing handler
public class DefaultChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
protected static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client online:"+ ctx.channel().remoteAddress());
channels.add(ctx.channel());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client offline:"+ ctx.channel().remoteAddress());
channels.remove(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
String rtn = "Client:"+ ctx.channel().remoteAddress()+":"+ byteBuf.toString(CharsetUtil.UTF_8); channels.forEach(channel -> channel.writeAndFlush(Unpooled.wrappedBuffer(rtn.getBytes(StandardCharsets.UTF_8)))); }}Copy the code
Netty client
public class NettyClient {
public static void main(String[] args) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()).addLast(new StringDecoder())
.addLast(newDefaultClientHandlerAdapter()); }});try {
ChannelFuture connect = bootstrap.connect("127.0.0.1".8080).sync();
Channel channel = connect.channel();
System.out.println("= = = ="+channel.localAddress()+"= = = =");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
channel.writeAndFlush(next);
}
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{ workerGroup.shutdownGracefully(); }}}Copy the code
Client service processing handler
public class DefaultClientHandlerAdapter extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Client receives message");
System.out.println(msg.toString());
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client write message"); }}Copy the code
How to run and test the above Netty code
Run the service segment main method and then the client main method to send the input to the server via the client console
Netty’s threading model
See a picture on the Internet, feel very in place! (Server)
- According to the source code of new NioEventLoopGroup(), it is not difficult to see that NioEventLoopGroup is actually a thread group object, with an internal thread pool object containing selectors and taskQueues.
- The bossGroup is responsible for accept events, and the workerGroup is responsible for read and write events
- NioEventLoop is actually a thread
- A pipeline is a business pipeline that processes events
- ChannelHandler is a specific handler class in the business pipeline. For example, in a client application, events are called outbound if they are moving from client to server. That is, the data sent by the client to the server will pass through a series of channelOutboundHandlers in pipeline (ChannelOutboundHandler calls logic from tail to head direction one by one). The opposite is called inbound, and the inbound only calls ChannelInboundHandler logic in the pipeline. (ChannelInboundHandler calls the logic that calls each Handler one by one from head to tail.)
Netty Adhesive package unpacking
As a transport layer protocol, TCP does not understand the specific meaning of upper-layer service data. It divides packets according to the size of the TCP buffer. For example, if the cache size is 10K, and you send 12K data, TCP will split the 12K data into 10K+2K [unpack], send 10K first, wait for the next data, take 8K and 2K of the previous data to form 10K [stick packet] and then send.
How to solve Netty
- The length of the message is fixed, and the size of the transmitted message is fixed. For example, 100 bytes are sent each time, and insufficient blanks are filled
- Special characters are added at the end of each message, and then split on the receiving end
- When sending a message, send the length at the same time. Send the length first and then send the message. The message receiver receives the message length, waits for the next packet to arrive, and obtains the specified length.
Netty provides multiple decoders for subcontracting operations, such as:
- LineBasedFrameDecoder
- DelimiterBasedFrameDecoder special separator (subcontracting)
- FixedLengthFrameDecoder
Netty disconnection automatic reconnection
When the client realizes the connection between the client and the server, add a listener to listen to the connection status. If the connection fails, reconnect the code again:
ChannelFuture cf = bootstrap.connect(host, port);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(! future.isSuccess()) {// Reconnect to the backend thread for execution
future.channel().eventLoop().schedule(() -> {
try {
connect();
} catch(Exception e) { e.printStackTrace(); }},3000, TimeUnit.MILLISECONDS); }}});// Listen for channel closures
cf.channel().closeFuture().sync();
Copy the code
How does Netty solve the selselector. Select () empty polling bug
Select (int selectCnt = 0); select (int selectCnt = 0); select (int selectCnt = 0); When selectCnt is greater than the setting threshold, 512 】 【 io.net ty. SelectorAutoRebuildThreshold can set themselves up, Netty will create a selector, Re-register the registered event on the buggy selector to the new selector and close the buggy selector.
Source:
protected void run(a) {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// Omit some fetching logic with part of select
default:}}catch (IOException e) {
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
// Omit some irrelevant code
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // This line of code is key!
selectCnt = 0; }}catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); }}catch (Error e) {
// Omit the exception handling code}}}Copy the code
UnexpectedSelectorWakeup method:
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD
= SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold".512);
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
return true;
}
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
return true;
}
return false;
}
Copy the code