“This is the 19th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

SpringBoot integration Netty

Introducing Maven dependencies

Ty < dependency > < groupId > io.net < / groupId > < artifactId > netty -all < / artifactId > < version > 4.1.35. The Final < / version > </dependency>Copy the code

Server code

NettyServer.java

package com.jony.netty.base; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; Public class NettyServer {public static void main(String[] args) throws Exception {// Create two thread groups. // bossGroup only handles connection requests, real and client business processes, BossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(8); ServerBootstrap bootstrap = new ServerBootstrap(); Bootstrap. group(bossGroup, WorkerGroup). / / set two thread group channel (NioServerSocketChannel. Class) / / using NioServerSocketChannel as server channel to achieve / / initializes the server connection queue size, The server processes client connection requests sequentially, so only one client connection can be processed at a time. // When multiple clients come at the same time, the server queues unprocessed client connection requests for processing. Option (channeloption.so_backlog, 1024).childHandler(new ChannelInitializer<SocketChannel>() {// Create channel initialization object, @Override protected void initChannel(SocketChannel CH) throws Exception {// Sets the handler for the workerGroup's SocketChannel ch.pipeline().addLast(new NettyServerHandler()); }}); System.out.println("netty server start." ); // Start the server (and bind the port). Bind is an asynchronous operation. ChannelFuture cf = bootstrap.bind(9000).sync(); If closeFuture is an asynchronous operation, the channel is closed. If closeFuture is an asynchronous operation, the channel is closed. If closeFuture is an asynchronous operation, the channel is closed. Cf.channel ().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

The server code is pretty much fixed and will not change much later in the actual project

Create thread groups (main thread and child thread)

// Create two thread groups: bossGroup and workerGroup. The number of nioEventloops is twice the number of CPU cores by default. BossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(8);Copy the code

The only thing to note is that bossGroup and workerGroup can define the number of threads. BossGroup only connects to clients, but does not handle read and write operations. A workerGroup is only responsible for reading and writing operations and connecting with clients. For example, the boss of a company (bossGroup) is only responsible for business negotiation, not specific business, while the employees (workGroup) do their work and do not need to communicate with others.

Deal with business code

@override protected void initChannel(SocketChannel CH) throws Exception {// Sets the handler for the workerGroup's SocketChannel ch.pipeline().addLast(new NettyServerHandler()); }Copy the code

Including NettyServerHandler for creating our own classes, we need to inherit ChannelInboundHandlerAdapter so the client connection and read and write data will be handled in this class.

NettyServerHandler.java

package com.jony.netty.base; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; Public class NettyServerHandler extends ChannelInboundHandlerAdapter {/ * * * * * read the client sends data @ param CTX context object, Contains channels, channels, @override public void channelRead(ChannelHandlerContext CTX, Object MSG) throws Exception {system.out.println (" Server reading Thread "+ thread.currentThread ().getName()); //Channel channel = ctx.channel(); //ChannelPipeline pipeline = ctx.pipeline(); // Convert MSG to a ByteBuf, similar to NIO ByteBuffer ByteBuf buf = (ByteBuf) MSG; System.out.println(" The client sends the message :" + buf.toString(charsetutil.utf_8)); ** @param CTX * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext)  ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } /** * handle exceptions, * * @param CTX * @param Cause * @throws Exception */ @override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}Copy the code

The above code is the core code for processing the data sent and received by the server.

Client code

NettyClient.java

package com.jony.netty.base; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; Public class NettyClient {public static void main(String[] args) throws Exception {// the client requires an EventLoopGroup EventLoopGroup  = new NioEventLoopGroup(); Try {// Create the client startup object // Notice that the client is not using ServerBootstrap but Bootstrap Bootstrap = new Bootstrap(); // Set the parameters bootstrap.group(group) // set the thread group.channel (nioSocketChannel.class) // Use NioSocketChannel as the client channel implementation ChannelInitializer<SocketChannel>() {@override protected void initChannel(SocketChannel CH) throws Exception {// Joins the processor ch.pipeline().addLast(new NettyClientHandler()); }}); System.out.println("netty client start." ); ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync(); Cf.channel ().closeFuture().sync(); } finally { group.shutdownGracefully(); }}}Copy the code

Creating a thread group

EventLoopGroup = new NioEventLoopGroup();Copy the code

Here the client only needs to create a thread group.

ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();Copy the code

Note that the IP address and port number of the connected server are required.

Processing business code (sending and receiving messages)

NettyClientHandler.java

package com.jony.netty.base; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; Public class NettyClientHandler extends ChannelInboundHandlerAdapter {/ * * * when a client connect to the server to complete will trigger the method * * @ param CTX * @ throws  Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } // Emitted when a channel has read events, @override public void channelRead(ChannelHandlerContext CTX, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(" Received message from server :" + buf.toString(charsetutil.utf_8)); System.out.println(" server address: "+ ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

Looking at the code, we found that the goal of the Netty shelf is to separate your business logic from the network base application code, so that you can focus on business development without having to write a bunch of niO-like network processing operations. Client-side code, we can also see processing messages through ByteBuf, which is a core component of Netty.

ByteBuf,

Structurally, ByteBuf consists of an array of bytes. Each byte in the array holds information.

ByteBuf provides two indexes, one for reading data and one for writing data. These indexes move through byte arrays to locate where information needs to be read or written.

When read from ByteBuf, its readerIndex is incremented by the number of bytes read.

Similarly, when writing to ByteBuf, its writerIndex is incremented by the number of bytes written.

Note that the limit is when readerIndex reads exactly where writerIndex writes.

If readerIndex exceeds writerIndex, Netty throws an IndexOutOf-BoundsException exception. \

The sample code

package com.jony.netty.base; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; Public class NettyByteBuf {public static void main(String[] args) { Byte [10] // Divides buffer into three regions by readerIndex and writerIndex and capacity // Read regions: [0, readerIndex) // Read regions: [readerIndex,writerIndex] // Writable area: [writerIndex,capacity) ByteBuf byteBuf = Unpooled.buffer(10); System.out.println("byteBuf=" + byteBuf); for (int i = 0;  i < 8; i++) { byteBuf.writeByte(i); } System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 5;  i++) { System.out.println(byteBuf.getByte(i)); } System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 5;  i++) { System.out.println(byteBuf.readByte()); } System.out.println("byteBuf=" + byteBuf); ByteBuf byteBuf2 = Unpooled. CopiedBuffer (" Hello,jony! ", charsetutil.utf_8); // Use the relevant method if (bytebuf2.hasarray ()) {byte[] content = bytebuf2.array (); Println (new String(Content, charsetutil.utf_8)); // Convert content to String system.out.println (new String(Content, charsetutil.utf_8));  System.out.println("byteBuf=" + byteBuf2); System.out.println(byteBuf2.readerIndex());  // 0 System.out.println(byteBuf2.writerIndex()); // 11 System.out.println(byteBuf2.capacity()); Println (bytebuf2.getByte (0)); Println ("len=" + len); for (int I = 0; I < len;  i++) { System.out.println((char) byteBuf2.getByte(i)); Println (bytebuf2.getCharSequence (0, 5, charsetutil.utf_8));} system.out.println (bytebuf2.getCharSequence (0, 5, charsetutil.utf_8));  System.out.println(byteBuf2.getCharSequence(6, 5, CharsetUtil.UTF_8)); } } }Copy the code

Code analysis based on the results

ByteBuf initialization

ByteBuf byteBuf = Unpooled.buffer(10);
System.out.println("byteBuf=" + byteBuf);
Copy the code

Running results:

byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
Copy the code

Ridx: 0, widx: 0, cap: 10, read and write indexes 0.

Write data

for (int i = 0; i < 8; i++) {
    byteBuf.writeByte(i);
}
System.out.println("byteBuf=" + byteBuf);
Copy the code

Running results:

byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 8, cap: 10)
Copy the code

You can see that the write/read index is still 0, but the write index is 8 (0-7 has been written).

Read data (RIDx does not shift)

for (int i = 0; i < 5; i++) {
    System.out.println(byteBuf.getByte(i));
}
System.out.println("byteBuf=" + byteBuf);
Copy the code

Running results:

0
1
2
3
4
byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 8, cap: 10)
Copy the code

As you can see, when reading bytebuf.getByte (I), ridx does not shift.

Read data (RIDx shift)

for (int i = 0; i < 5; i++) {
    System.out.println(byteBuf.readByte());
}
System.out.println("byteBuf=" + byteBuf);
Copy the code

Running results:

0
1
2
3
4
byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 5, widx: 8, cap: 10)
Copy the code

You can see that the ridx has become 5 (0-4 has been read).

Netty chat room

Server code

Chatserver. Java Starts the service

package com.jony.netty.base.chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class ChatServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Add decoder to pipeline. AddLast ("decoder", new StringDecoder()); Pipeline. AddLast ("encoder", new StringEncoder()); pipeline.addLast(new ChatServerHandler()); // add your own business handler}}); System.out.println(" Chatroom server started... ") ); ChannelFuture channelFuture = bootstrap.bind(9000).sync(); // Close channelfuture.channel ().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

Chatserverhandler. Java Monitors client login and logout and message sending and receiving

package com.jony.netty.base.chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; Public class ChatServerHandler extends SimpleChannelInboundHandler < String > {/ / GlobalEventExecutor. The INSTANCE is a global event actuators, Is a singleton private static ChannelGroup ChannelGroup = new DefaultChannelGroup (GlobalEventExecutor. INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // Indicates that the channel is ready, @override public void channelActive(ChannelHandlerContext CTX) throws Exception {Channel Channel = ctx.channel(); // This method will iterate through all channels in the channelGroup, And send the message channelGroup. WriteAndFlush (" [client] "+ channel. RemoteAddress () +" launched "+ SDF. The format (new Java. Util. The Date ()) + "\n"); // Add the current channel to channelGroup channelgroup.add (channel); System.out.println(ctx.channel().remoteAddress() + "live "+ "\n"); } // Indicates that the channel is inactive, @Override public void channelInactive(ChannelHandlerContext CTX) throws Exception {Channel Channel = ctx.channel(); / / to the current online customers will leave the customer information push channelGroup. WriteAndFlush (" [client] "+ channel. RemoteAddress () +" offline "+" \ n "); System.out.println(ctx.channel().remoteAddress() + "offline "+ "\n"); System.out.println("channelGroup size=" + channelGroup.size()); } @override protected void channelRead0(ChannelHandlerContext CTX, String MSG) throws Exception {// Obtain the current channel channel channel = ctx.channel(); ForEach (ch -> {if (channel! WriteAndFlush ("[client]" + channel.remoteAddress() + "send message:" + MSG + "\n"); WriteAndFlush (" + MSG + "\n");} else {writeAndFlush(" + MSG + "\n"); }}); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// Close the channel ctx.close(); }}Copy the code

Client code

Chatclient. Java Starts the client

package com.jony.netty.base.chat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class ChatClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); pipeline.addLast(new ChatClientHandler()); }}); ChannelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); ChannelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); Channel = channelfuture.channel (); System.out.println("========" + channel.localAddress() + "========"); Scanner Scanner = new Scanner(system.in); // The client needs to enter information to create a Scanner. while (scanner.hasNextLine()) { String msg = scanner.nextLine(); WriteAndFlush (MSG); // Send to the server via channel channel.writeAndFlush(MSG); } // for (int i = 0; i < 200; I++) {// channel.writeandflush ("hello, jony!" ); // } } finally { group.shutdownGracefully(); }}}Copy the code

Chatclienthandler. Java Prints messages

package com.jony.netty.base.chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); }}Copy the code

Start the server first, and then start multiple clients again. You can use the Scanner to receive information from the console and interact with each other. The running results are as follows:

127.0.0.1:57221 is online /127.0.0.1:60464 is onlineCopy the code

The client 1:

I am 89757 [myself] sent a message: I am 89757 [client]/127.0.0.1:57221 sent a message: Hello, I am 666Copy the code

Client 2:

[client]/127.0.0.1:60464 sent a message: I'm 89757 Hello, I'm 666 [self] sent a message: Hello, I'm 666Copy the code

The interaction sequence is as follows: 1. Client 1 first releases “I am 89757”, and the message is released to the server. Then the server determines the message by the following code: The channelGroup is first iterated (this is all connected client channels), and while iterating, the channel determines whether the current channel is itself, if it is, then output [itself] sent XXX, if not, output [client] sent XXX.

@Override protected void channelRead0(ChannelHandlerContext ctx, String MSG) throws Exception {// Obtain the current channel channel channel = ctx.channel(); ForEach (ch -> {if (channel! WriteAndFlush ("[client]" + channel.remoteAddress() + "send message:" + MSG + "\n"); WriteAndFlush (" + MSG + "\n");} else {writeAndFlush(" + MSG + "\n"); }}); }Copy the code

2. Client 2 sees the information released by client 1, and also publishes its own message, still judging by the above code, the message is distributed in different ways of pipeline processing.