This article author taro, the original title “using Netty to achieve IM chat thief simple”, this base price has revision and change.

I. Introduction to this article

The last “follow the source code to learn IM(seven) : hand to hand teach you to use WebSocket to build Web end IM chat”, we use WebSocket to achieve a simple IM function, support identity authentication, private chat messages, group chat messages.

Then someone sent a private message hoping to implement a similar function using pure Netty, hence this article.

Note: the source code from synchronous link in the attachment, please download, http://www.52im.net/thread-34…

Learning and communication:

  • Im/Push Technology Development Exchange 5 groups: 215477170 [Recommended]
  • Introduction to Mobile IM Development: One Entry is Enough: Developing Mobile IM from Zero
  • Open source IM framework source: https://github.com/JackJiang2…

(synchronous published in this article: http://www.52im.net/thread-34…

Two, knowledge preparation

For those of you who don’t know what Netty is, here’s a brief introduction:

Netty is an open source Java framework. Netty provides an asynchronous, event-driven network application framework and tools for the rapid development of high-performance, reliable network servers and clients.

In other words, Netty is a client-side, server-side programming framework based on NIO. Using Netty ensures that you can quickly and easily develop a network application, such as a client-side application that implements a certain protocol.

Netty simplifies and streamlines the programming and development of network applications, such as TCP and UDP Socket services.

Here are a few introductory articles about Netty that are worth reading:

Getting Started: The Most thorough Analysis of Netty’s high performance principles and frameworks to date


For Beginners: Learning Methods and Advanced Strategies for Netty, a High-performance NIO Framework for Java


“History of the most popular Netty Framework introduction long: Basic introduction, Environment building, Hands-on Combat”

If you don’t even know what NIO for Java is, the following articles should be read first:

The difference between Java’s NIO and classic IO in one minute


The Best Java NIO Ever: For those worried about Getting started and giving up, read this!


“Java BIO and NIO is difficult to understand? Use code practice to show you, don’t understand me to change!”

Netty source code and API

1) Netty-4.1.x complete source code (online reading version) (* recommended)


2) Netty-4.0.x


3) Netty-4.1.x API Documentation (online version) (* recommended)


4) Netty-4.0.x API Documentation (online)

Three, this source code

In this paper, the complete code download attachment, please download from synchronous link attachment, http://www.52im.net/thread-34…

The directory structure of the source code is shown below:

As the picture above shows:

1) Lab-67-Netty-Demo-server project: Set up netty server; 2) Lab-67-Netty-Demo-client project: Set up netty client; 3) Lab-67-Netty-Demo-Common project: Provide the basic encapsulation of Netty, and provide the function of message codec and distribution. In addition, the source code provides examples of common Netty functions:

1) Heartbeat mechanism to realize the survival detection of the server to the client; 2) Reconnection after disconnection, realize the reconnection between the client and the server. No beep, just do it.

5. Communication protocol

In the previous chapter, we implemented the client-server connection. In this section, we want them to be able to talk, that is, to read and write data.

In the development of daily projects, HTTP is used as the communication protocol between the front end and the back end, and text content is used for interaction, and the data format is generally JSON. But in the WORLD of TCP, we need to build our own protocols for client and server communication based on binary builds.

Let’s take the example of a client sending a message to a server. Suppose the client wants to send a login request.

The corresponding classes are as follows:

public class AuthRequest {

/** private String username; **/ private String password;

}

Obviously: You can’t throw a Java object directly into a TCP Socket. Instead, you need to convert it into an array of bytes before you can write it to the TCP Socket. That is, the message object needs to be serialized into an array of bytes.

At the same time: When the server receives a byte array, it needs to convert it into a Java object, that is, deserialization. Otherwise, the server is processing a string of bytes against a string of bytes? !

Friendly tip: the server sends a message to the client, also is the same process ha!

There are a lot of serialization tools, such as Protobuf from Google, which is very efficient and serializes small binary data. Netty integrates Protobuf and provides the corresponding codec.

As shown in the figure below:

But consider that many people may not know Protobuf because it implements serialization and adds additional learning costs. Therefore, a careful one, or use JSON for serialization. Some of you may be wondering, doesn’t JSON convert objects to strings? Convert the string to an array of bytes

Next, let’s create a new lab-67-netty-Demo-Common project and implement our custom communication protocol under the coDEC package.

As shown in the figure below:

Create the Invocation class that communicates the protocol’s Invocation body.

The code is as follows:

/ * *

  • The message body of a communication protocol

* /

public class Invocation {

/ / private String; / / private String; Private String message; / / Private String message; // Null constructor public Invocation() {} Public Invocation(String type, String message) {this.type = type; this.message = message; } public Invocation(String type, Message message) { this.type = type; this.message = JSON.toJSONString(message); } / /... Omit the setter, getter, and toString methods

}

① The type attribute is used to match the corresponding message handler. In the case of HTTP, the type attribute corresponds to the request address.

② Message property, message content, using JSON format.

In addition, Message is the Message interface we define, as follows:

public interface Message {

/ /... Empty, as the marking interface

}

Before we start looking at the Invocation processor, let’s learn about the Invocation and uninvocation.

5.2.1 Possible Cause The main cause of packet sticking and unpacking is that when the operating system sends TCP data, there is a buffer at the bottom layer, for example, 1024 bytes.

If the amount of data sent in one request is too small to reach the buffer size, TCP will combine multiple requests into a single request and send it, which forms the sticky packet problem.

For example, in the article “Socket Programming in detail: The TCP_NODELAY Option,” we see that when the Nagle algorithm is turned off, requests do not wait for the buffer size to be met, but are issued as soon as possible, reducing latency.

If the amount of data sent in one request exceeds the buffer size, TCP splits it into multiple packets. This is called unpacking, which means splitting a large packet into several smaller packets for sending.

The following figure shows a schematic diagram of sticking and unpacking, demonstrating the three cases of sticking and unpacking:

As the picture above shows:

1) Both packets A and B just meet the size of the TCP buffer, or the waiting time has reached the TCP waiting time, so they are sent using two independent packets; 2) The request interval between A and B is short and the data packet is small, so they are combined into one packet and sent to the server; 3) Packet B is relatively large, so it is split into two packets B_1 and B_2 for sending. In this case, because the split PACKET B_2 is relatively small, it is combined with packet A for sending. 5.2.2 Solutions There are three common solutions to the problem of packing adhesion and unpacking.

① When the client sends packets, each packet has a fixed length. For example, 1024 bytes. If the length of the data sent by the client is less than 1024 bytes, add Spaces to the specified length.

This way, have not found the case that adopts this way temporarily.

The client uses a fixed delimiter at the end of each package. For example, if a packet is split, wait until the next packet is sent to find the \r\n in it, and then merge the split header with the rest of the previous packet to get a complete packet. Specific cases include HTTP, WebSocket and Redis.

③ The message is divided into a header and a message body. The header contains the entire length of the current message. A complete message is read only after a sufficient length of the message is read.

Friendly tip: Scheme ③ is the upgrade version of ①, dynamic length.

In this article, the length of the byte array is written to the TCP Socket each time the Invocation is serialized to a byte array and then written to the TCP Socket.

As shown in the figure below:

Create the InvocationEncoder class and implement the Invocation to serialize and write to the TCP Socket.

The code is as follows:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

private Logger logger = LoggerFactory.getLogger(getClass()); @Override protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {// <2.1> Convert the Invocation to byte[] array byte[] content = json.tojsonBytes (Invocation); // <2.2> Write length out.writeint (content.length); // <2.3> Write the content out.writebytes (content); Logger. The info (" [encode] [connection ({}) encoding a message ({})] ", CTX) channel (). The id (), invocation. The toString ()); }

}

MessageToByteEncoder is an abstract ChannelHandler class defined by Netty, which converts generic messages into byte arrays.

# Encode (ChannelHandlerContext CTX, Invocation, ByteBuf out) method to encode the logic.

<2.1>, call JSON #toJSONBytes(Object Object, SerializerFeature… Features) method, and convert the Invocation to a byte array.

<2.2>, writes the length of the byte array to the TCP Socket. In this way, the subsequent “5.4 InvocationDecoder” can parse the message according to the length and solve the problem of sticking and unpacking.

MessageToByteEncoder will finally write ByteBuf Out to the TCP Socket.

<2.3>, the byte array is written to the TCP Socket.

Create the InvocationDecoder class to read the byte array from the TCP Socket and deserialize it.

The code is as follows:

When the TCP Socket reads new data, it triggers the decoding.

② Read the length from the TCP Socket at <2.1>, <2.2>, and <2.3>.

③ At <3.1>, <3.2>, <3.3>, read the byte array from the TCP Socket and deserialize it into the Invocation object.

Finally, add List<Object> OUT to the subsequent ChannelHandler for processing. Later, in the summary of “6. Message Distribution”, you’ll see the MessageDispatcher and its Invocation to the corresponding MessageHandler to execute the business logic.

Create pom. XML file and introduce Netty, FastJSON and other dependencies.

5.6. At the end of this chapter, we have completed the definition of communication protocol, codec logic, isn’t it interesting? !

In addition, we in NettyServerHandlerInitializer and NettyClientHandlerInitializer initialization code, add codec to it.

As shown in the figure below:

6. Message distribution

In SpringMVC, the DispatcherServlet dispatches the request to the matching Controller Method based on the request address, Method, and so on.

In the Dispatcher package of the lab-67-netty-Demo-client project, we created the MessageDispatcher class to implement functionality similar to that of DispatcherServlet. The Invocation is distributed to the corresponding MessageHandler for business logic execution.

Now, let’s look at the code implementation.

Create the Message interface and define the tag interface for messages.

The code is as follows:

public interface Message {

}

Below is the Message implementation class we are involved in.

As shown in the figure below:

MessageHandler creates the MessageHandler interface, which is the MessageHandler interface.

The code is as follows:

public interface MessageHandler<T extendsMessage> {

/** * execute processing message ** @param channel * @param message */ voide xecute(channel channel, T message); /** * @return Message TYPE, that is, the TYPE static field on each Message implementation class */ String getType();

}

As shown in the above code:


is defined as a generic

class. 2) define two interface methods, see the comments. Below, we are involved in the MessageHandler implementation class.

As shown in the figure below:

Create a MessageHandlerContainer class to serve as a container for MessageHandler.

The code is as follows:

In the #afterPropertiesSet() method, scan all MessageHandler beans and add them to the MessageHandler collection.

#getMessageHandler(String type) #getMessageHandler(String type) We will call this method in MessageDispatcher later.

#getMessageClass(MessageHandler); #getMessageClass(MessageHandler); This is a reference rocketmq – spring project DefaultRocketMQListenerContainer# getMessageType () method, slightly modified.

MessageDispatcher Create the MessageDispatcher class and distribute it to the corresponding MessageHandler for business logic execution.

The code is as follows:

@ChannelHandler.Sharable

public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

@Autowired private MessageHandlerContainer messageHandlerContainer; private final ExecutorService executor = Executors.newFixedThreadPool(200); @Override protected void channelRead0(ChannelHandlerContext ctx, {// <3.1> Get the type MessageHandler MessageHandler = messageHandlerContainer.getMessageHandler(invocation.getType()); // Get the MessageHandler Class<? extendsMessage> messageClass = MessageHandlerContainer.getMessageClass(messageHandler); // <3.2> Message Message = json.parseObject (Invocation. GetMessage (), messageClass); // <3.3> Execute logic executor. Submit (newRunnable() {@override public void run() {// noInspection unchecked messageHandler.execute(ctx.channel(), message); }}); }

}

Sharable: @channelHandler. Sharable: @channelHandler can be used by multiple channels.

(2) is SimpleChannelInboundHandler Netty define message processing ChannelHandler abstract class, dealing with the type of the message is generic.

③ #channelRead0(ChannelHandlerContext CTX) Method, handle the message and distribute it.

<3.1>, call the MessageHandlerContainer #getMessageHandler(String Type) method, and get the MessageHandler type.

Then, call the #getMessageClass(messageHandler) method of MessageHandlerContainer to get the message class of the messageHandler handler.

<3.2>, call the JSON ## parseObject(String text, Class<T> clazz) method, and parse the message into the MessageHandler corresponding message object.

<3.3>, and then call MessageHandler’s #execute(Channel Channel, T message) method to execute the business logic.

Note: Why throw it into an Executor thread pool? Let’s take a look at EventGroup’s threading model.

Note: When we start a Netty server or client, we set its EventGroup.

An EventGroup is simply a pool of threads, and the pool size is the number of cpus x 2. Each Channel is assigned to only one of the threads to read and write data. In addition, multiple channels share a thread, that is, the same thread is used to read and write data.

So, if you think about MessageHandler in the logical line of sight, it’s often involved in IO processing, such as reading from a database. This can cause a Channel to block data reading from other channels that share the current thread while executing MessageHandler.

Therefore, we create a pool of Executor threads here to execute MessageHandler logic and avoid blocking the Channel from reading data.

One might say, can we make the EventGroup thread pool a little larger, say 200? For a Netty server with a persistent connection, 1000 to 100000 Netty clients are connected to it. In this way, data reading will be blocked no matter how large the thread pool is set.

A friendly reminder: Executor thread pools, commonly referred to as business thread pools or logical thread pools, simply execute business logic. This is the design approach that is used by current RPC frameworks such as Dubbo. Then, read the article [NIO Series] – Reactor Model for further understanding.

Create the NettyServerConfig config class to create MessageDispatcher and MessageHandlerContainer Bean.

The code is as follows:

@Configuration

public class NettyServerConfig {

@Bean

public MessageDispatcher messageDispatcher() {

    return new MessageDispatcher();

}



@Bean

public MessageHandlerContainer messageHandlerContainer() {

    return new MessageHandlerContainer();

}

}

6.6, NettyClientConfig Create the NettyClientConfig configuration class to create the MessageDispatcher and MessageHandlerContainer Bean.

The code is as follows:

@Configuration

public class NettyClientConfig {

@Bean

public MessageDispatcher messageDispatcher() {

    return new MessageDispatcher();

}

@Bean

public MessageHandlerContainer messageHandlerContainer() {

    return new MessageHandlerContainer();

}

}

Following this chapter, we will demonstrate the use of message distribution in detail in the following sections.

Seven, disconnect and reconnect

The Netty client needs to implement the disconnection and reconnection mechanism to solve the disconnection in various cases.

For example:

1) When the Netty client is started, the Netty server is down and cannot be connected. 2) During the running process, the Netty server is down, causing the connection to be disconnected. 3) The network jitter at either end leads to abnormal connection disconnection. The actual code implementation is relatively simple, only need to add the reconnection mechanism in two places:

1) When the Netty client fails to connect to the Netty server during startup, a reconnection is initiated. 2) When the Netty client is running and disconnected from Netty, it initiates a reconnection. Considering that reconnection may fail, we use periodic reconnection to avoid occupying too many resources.

① In NettyClient, provide #reconnect() method, realize the logic of timing reconnect.

The code is as follows:

// NettyClient.java

public void reconnect() {

Eventgroup. schedule(new Runnable() {@override publicVoidRun () {logger.info("[reconnect][reconnect] "); try{ start(); } catch(InterruptedException e) {logger.error("[reconnect][reconnect] ", e); } } }, RECONNECT_SECONDS, TimeUnit.SECONDS); Logger.info ("[reconnect][{} reconnect] ", RECONNECT_SECONDS);

}

The timing logic is implemented by calling the #schedule(Runnable Command, Long Delay, TimeUnit Unit) method provided by EventLoop. The internal logic initiates a connection to the Netty server by calling the NettyClient’s #start() method.

When the #start() method fails to connect to the Netty server, the client invoks the #reconnect() method and initiates a scheduled reconnect again. This loop repeats until the Netty client connects to the Netty server.

As shown in the figure below:

(2) Implement #channelInactive(ChannelHandlerContext CTX) method in NettyClientHandler. Call Netty Client’s #reconnect() method to initiate a reconnect.

The code is as follows:

// NettyClientHandler.java

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

// Start nettyclient.reconnect (); // Continue to trigger the event super.channelInactive(CTX);

}

7.2. Simple Test ① Start Netty Client but do not start Netty Server. The console displays logs as shown in the following figure:

You can see that the Netty Client repeatedly initiates periodic reconnections when the connection fails.

② Start the Netty Server, the console print as follows:

The Netty Client is successfully connected to the Netty Server.

8. Heartbeat mechanism and idle detection

We can see that TCP’s own idle detection mechanism, the default is 2 hours. Such a detection mechanism is acceptable from the perspective of system resources.

However, at the service level, if the connection between the client and server is found to be disconnected only two hours later, a large number of intermediate messages are lost, affecting the customer experience.

Therefore, we need to implement idle detection at the business level to ensure that the actual disconnect between the client and the server is discovered as soon as possible.

The implementation logic is as follows:

1) When the server finds that no message is read from the client within 180 seconds, it disconnects the connection actively; 2) When the client finds that no message is read from the server within 180 seconds, the client disconnects the server. Given that messages are not always exchanged between the client and server, we need to add a heartbeat mechanism.

The logic is as follows:

1) The client sends a heartbeat message to the server every 60 seconds to ensure that the server can read the message. 2) When receiving the heartbeat message, the server replies a confirmation message to the client to ensure that the client can read the message. Tips:

Why 180 seconds? You can increase or decrease it to see how quickly you want to detect an abnormal connection. Too short a time may cause the heartbeat to be too frequent and occupy too many resources.

Why 60 seconds? Three times to check whether the heartbeat timed out.

It sounds a little complicated, but it’s not that complicated to implement.

8.1, server-side free test In NettyServerHandlerInitializer, we added a ReadTimeoutHandler processor, it is more than a specified time not read data from the end, The ReadTimeoutException is thrown.

As shown in the figure below:

In this way, the server detects that the message is not read from the client for 180 seconds and disconnects the client actively.

8.2, the client’s free detection In NettyClientHandlerInitializer, we added a ReadTimeoutHandler processor, it is more than a specified time not read data from the end, The ReadTimeoutException is thrown.

As shown in the figure below:

In this way, the client detects that the message is not read from the server for 180 seconds and disconnects the server.

8.3 Heartbeat Mechanism Netty provides the IdleStateHandler handler to detect idleness. If the read or write of a Channel is idle for a long time, an IdleStateEvent is triggered.

In this way, we only need to send a heartbeat message from the client to the client in the NettyClientHandler processor when the IdleStateEvent event is received.

As shown in the figure below:

HeartbeatRequest is a HeartbeatRequest.

At the same time, in the server project, we created a HeartbeatRequestHandler message handler to reply the client with a confirmation message upon receiving the client’s heartbeat request.

The code is as follows:

@Component

public class HeartbeatRequestHandler implementsMessageHandler<HeartbeatRequest> {

private Logger logger = LoggerFactory.getLogger(getClass()); @override public void execute(Channel Channel, HeartbeatRequest message) {logger.info("[execute][received HeartbeatRequest for connection ({})]", channel.id()); HeartbeatResponse Response = newHeartbeatResponse(); channel.writeAndFlush(newInvocation(HeartbeatResponse.TYPE, response)); } @Override public String getType() { return HeartbeatRequest.TYPE; }

}

HeartbeatResponse is the heartbeat acknowledgement response.

8.4 Simple Test Start the Netty Server and then the Netty Client. After 60 seconds, you can see the following heartbeat logs:

Authentication logic

Starting with this section, let’s look at an example of processing business logic in detail.

The authentication process is shown in the following figure:

9.1 AuthRequest Create the AuthRequest class to define the user authentication request.

The code is as follows:

public class AuthRequest implements Message {

public static final String TYPE = "AUTH_REQUEST";

/ * *

*/ private String accessToken; / /... Omit the setter, getter, and toString methods

}

Here we use the accessToken authentication token for authentication.

Normally, we use HTTP to log in to the system, and then use a post-login identity (such as the accessToken authentication token) to authenticate the client to the current user.

Create the AuthResponse class to define the user authentication response.

The code is as follows:

public class AuthResponse implements Message {

public static final String TYPE = "AUTH_RESPONSE"; /** * Response status code */ Private Integer code; / / Private String message; / / private String message; / /... Omit the setter, getter, and toString methods

}

9.3 AuthRequestHandler Server…

Create the AuthRequestHandler class to process client authentication requests for the server.

The code is as follows:

The code is simple, look at the comments on <1>, <2>, <3>, and <4>.

9.4 AuthResponseHandler Client…

Create the AuthResponseHandler class to handle the authentication response from the server for the client.

The code is as follows:

@Component

public class AuthResponseHandler implements MessageHandler<AuthResponse> {

private Logger logger = LoggerFactory.getLogger(getClass()); @override public void execute(Channel Channel, AuthResponse message) {logger.info("[execute][authentication result: {}]", message); } @Override public String getType() { return AuthResponse.TYPE; }

}

Print a certification result, convenient debugging.

9.5 TestController Client…

Create the TestController class and provide the /test/mock interface to simulate the client sending requests to the server.

The code is as follows:

@RestController

@RequestMapping(“/test”)

public class TestController {

@Autowired private NettyClient nettyClient; @PostMapping("/mock") public String mock(String type, String message) {// Create the Object Invocation = New Invocation(type, message); // Send (Invocation) nettyclient.send (Invocation); return "success"; }

}

9.6 simple Test Start the Netty Server, then start the Netty Client, and use Postman to simulate an authentication request.

As shown in the figure below:

In addition, you can view the following logs about the authentication success:

Group chat logic

The process of group chat is shown in the picture below:

The server forwards the group chat message sent by client A to clients A, B, and C.

Note: For the sake of concise logic, the examples provided in this section are not one group at a time, but all in one large group chat

Create the ChatSendToOneRequest class to send a group chat message request to everyone.

The code is as follows:

public class ChatSendToAllRequest implements Message {

public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST"; /** * Message id */ private String msgId; /** * content */ private String content; / /... Omit the setter, getter, and toString methods

}

PS: If it is a serious group chat, there will be a groupId field which indicates the group number.

11.2 ChatSendToAllHandler Server…

Create the ChatSendToAllHandler class to handle group chat requests from clients for the server.

The code is as follows:

The code is simple, look at the comments on <1>, <2>.

11.3. Simple Test ① Start the Netty Server.

② Start Netty Client A. Then use Postman to simulate an authentication request (the user is Yunai).

As shown in the figure below:

③ Start the Netty Client B. Note that you need to set –server.port to 8081 to avoid conflicts.

④ Start the Netty Client C. Note that you need to set –server.port to 8082 to avoid conflicts.

Finally, Postman is used to simulate sending group chat messages at a time.

As shown in the figure below:

In addition, you can view the following logs sent by client A to all clients:

Finally, to learn all aspects of IM development systematically, read on: Getting Started is Enough: Developing Mobile IM from Scratch

Appendix, series of articles

“Follow the source code to learn IM(a) : hand to hand teach you to use Netty heartbeat mechanism, disconnect reconnect mechanism”


“Follow the source code to learn IM(two: self development IM difficult? Hand hand teach you a Android VERSION IM”


“Follow source code learning IM(3) Based on Netty, from zero to develop an IM server”


“Follow the source code to learn IM(four) pick up the keyboard is dry, teach you to develop a distributed IM system”


“Follow the source code to learn IM(five) : correct understanding of IM long connection, heartbeat and reconnection mechanism, and start to achieve”


“Follow the source code to learn IM(six) : hand to hand teach you to quickly build high-performance, extensible IM system with Go”


“Follow the source code to learn IM(seven) : Hand to hand teach you to use WebSocket to build Web IM chat”


“Follow the source code to learn IM(eight) : ten thousand words long, hand to hand teach you to use Netty to create IM chat” (* article)

This post has been posted on the “Im Technosphere” official account.



▲ The link of this article on the official account is: click here to enter. The synchronous publish link is:http://www.52im.net/thread-34…