This article is written by Taro, the original title “Using Netty to achieve IM chat thief simple” (original link: juejin.cn/post/694066…

I. Introduction

In the previous article, “follow the source code to learn IM(seven) : Hand to hand teach you to use WebSocket to create Web IM chat”, we use WebSocket to achieve a simple IM function, support identity authentication, private chat message, group chat message.

Then someone sent a private message hoping to implement a similar feature using pure Netty, hence this article.

** Note: ** source code please download from the sync link attachment, www.52im.net/thread-3489…

Second, knowledge preparation

For those of you who don’t know what Netty is, here’s a quick introduction:

Netty is a Java open source framework. Netty provides an asynchronous, event-driven network application framework and tools to rapidly develop high-performance and reliable network server and client programs.

In other words, Netty is a niO-based client/server programming framework that allows you to quickly and easily develop a network application, such as a client/server application that implements a certain protocol.

Netty simplifies and streamlines the programming and development of network applications, such as TCP and UDP Socket services.

Here are a few introductory articles about Netty that are worth reading:

  • Getting Started: The most Thorough Analysis of Netty high-performance Principles and Frameworks so far
  • For Beginners: Learning Methods and Advanced Strategies for Netty, the Java High-performance NIO Framework
  • The most popular Netty framework in the history of long article: Basic introduction, environment building, hands-on combat

If you don’t even know what Java NIO is, the following article is recommended as a first read:

  • The Difference between Java NIO and Classic IO in one Minute
  • The greatest Introduction to Java NIO ever: For those worried about getting started and giving up, read this!
  • Java BIO and NIO are difficult to understand?

Netty API and Netty API

  • 1) Netty-4.1.x
  • 2) Netty-4.0.x
  • 3) NetTY-4.1. x API Documentation (online version) (* recommended)
  • 4) NetTY-4.0.x API Documentation (online version)

Third, the source of this article

Download the complete code from the sync link attachment: www.52im.net/thread-3489…

The directory structure of the source code is shown below:

As shown above:

  • 1) Lab-67-NetTY-Demo-server project: Build Netty server;
  • 2) Lab-67-NetTY-Demo-client Project: Build netty client;
  • 3) Lab-67-NetTY-Demo-common Project: Provide basic encapsulation of NetTY, message encoding and decoding, and distribution functions.

Examples of common Netty features are also provided in the source code:

  • 1) Heartbeat mechanism to achieve server-side survival detection of clients;
  • 2) Disconnection and reconnection, so that the client can reconnect to the server.

Don’t beep. Just do it.

V. Communication protocol

In the previous chapter, we implemented the connection between the client and the server. In this section, we’re going to get them to talk to each other, reading and writing data.

In the development of daily projects, HTTP is used as the communication protocol between the front end and the back end, and text content is used for interaction. The data format is generally JSON. But in the TCP world, we need to build our own protocols for client and server communication based on binary.

Let’s take the example of a client sending a message to a server. Suppose the client wants to send a login request.

The corresponding classes are as follows:

public class AuthRequest {

/** User name **/

private String username;

/ * * * * / passwords

private String password;

}

** Obviously: ** We can’t drop a Java object into a TCP Socket. Instead, we need to convert it to a byte array before we can write it to the TCP Socket. That is, the message object needs to be serialized into a byte array.

** Also: ** When the server receives a byte array, it needs to be converted back into a Java object, i.e., deserialization. What if the server processes a string of bytes? !

** The server sends messages to the client, which is also the same process!

There are many serialization tools, such as The Google-provided Protobuf, which is efficient in performance and has a small amount of binary data to serialize. Netty integrates Protobuf and provides the corresponding codec.

As shown below:

However, considering that Protobuf may be unknown to many, because it implements serialization with additional learning costs. Therefore, be careful to use JSON for serialization. Some of you might wonder, doesn’t JSON convert objects to strings? We can convert a string into an array of bytes

Next, we create lab-67-Netty-Demo-common project and implement our self-defined communication protocol under coDEC package.

As shown below:

5.1, Invocation

Create the Invocation class, the message body for the communication protocol.

The code is as follows:

/ * *

* The body of a communication protocol message

* /

public class Invocation {

/ * *

* type

* /

private String type;

/ * *

* Message in JSON format

* /

private String message;

// Empty constructor

public Invocation() {

}

public Invocation(String type, String message) {

this.type = type;

this.message = message;

}

public Invocation(String type, Message message) {

this.type = type;

this.message = JSON.toJSONString(message);

}

/ /… Omit setter, getter, toString methods

}

1.

Type attribute, the type used to match the corresponding message handler. Using the HTTP protocol analogy, the type attribute is equivalent to the request address.

2.

Message property, message content, in JSON format.

In addition, Message is the Message interface we define as follows:

public interface Message {

/ /… Null, as the tag interface

}

5.2 Sticking and unpacking

Before we start looking at the Invocation codec, let’s understand the concept of sticking-down and unpacking.

5.2.1 Possible causes

The main reason for sticky packets and unpack problems is that when the operating system sends TCP data, there is a buffer at the bottom, such as 1024 bytes.

If the amount of data sent in a request is too small to reach the buffer size, TCP will combine multiple requests into one request and send the same request. This causes sticky packets.

** For example: ** In the TCP_NODELAY option article we can see that when the Nagle algorithm is turned off, the request will not wait for the buffer size to be satisfied, but will be sent as soon as possible to reduce latency.

If the amount of data sent in one request is too large to exceed the buffer size, TCP will split it into multiple packets. This is called unpacking, that is, a large packet is split into multiple packets for sending.

The following figure shows a schematic diagram of sticking and unpacking, demonstrating the three situations of sticking and unpacking:

As shown above:

  • 1) Both packets A and B meet the size of the TCP buffer, or their waiting time has reached the TCP waiting time, so they are still sent using two independent packets;
  • 2) The interval between two requests of A and B is short and the data packets are small, so they are combined into the same packet and sent to the server;
  • 3) B packet is relatively large, so it is split into two packets B_1 and B_2 for sending. Here, since the split B_2 packet is relatively small, it is combined with A packet for sending.

5.2.2 Solution

There are three common solutions to sticky and unpack problems.

1.

When the client sends packets, each packet has a fixed length. For example, the data length is 1024 bytes. If the length of the data sent by the client is less than 1024 bytes, add Spaces to complete the data to the specified length.

In this way, there is no case that adopts this way.

2.

The client uses a fixed delimiter at the end of each package. For example, \r\n, if a packet is split, it will wait for the next packet to be sent to find \r\n, and then merge its split header with the rest of the previous packet, so as to get a complete packet. Specific cases include HTTP, WebSocket and Redis.

3.

A message is divided into a header and a body, with the length of the current entire message stored in the header. A complete message is read only when a sufficient length of the message is read.

Note: Scheme ③ is an upgraded version of ①, with dynamic length.

In this article, the length of the byte array will be written to each Invocation before it is serialized to the TCP Socket.

As shown below:

5.3, InvocationEncoder

Create the InvocationEncoder class to serialize the Invocation and write it to the TCP Socket.

The code is as follows:

public class InvocationEncoder extends MessageToByteEncoder {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override

protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {

// <2.1> Convert the Invocation to byte[] array

byte[] content = JSON.toJSONBytes(invocation);

// <2.2> writes length

out.writeInt(content.length);

// <2.3> Write content

out.writeBytes(content);

Logger. The info (” [encode] [connection ({}) encoding a message ({})] “, CTX) channel (). The id (), invocation. The toString ());

}

}

1.

MessageToByteEncoder is a Netty defined encoding ChannelHandler abstract class, will be generic

Messages are converted to byte arrays.

2.

#encode(ChannelHandlerContext CTX, Invocation ByteBuf out) method to encode logic.

<2.1> call JSON #toJSONBytes(Object Object, SerializerFeature… Features) method to convert the Invocation to a byte array.

<2.2> writes the length of the byte array to the TCP Socket. In this way, the subsequent “5.4 InvocationDecoder” can parse the message according to this length, and solve the problem of sticky and unpack.

Note: MessageToByteEncoder will eventually write ByteBuf out to the TCP Socket.

<2.3> writes the byte array to the TCP Socket.

5.4, InvocationDecoder

The InvocationDecoder class reads the byte array from the TCP Socket and deserializes it to the Invocation.

The code is as follows:

1.

ByteToMessageDecoder is an abstract class defined by Netty to decode ChannelHandler. It is triggered to decode new data when the TCP Socket reads it.

2.

At <2.1>, <2.2>, <2.3>, the length is read from the TCP Socket.

3.

At <3.1>, <3.2>, <3.3>, the byte array is read from the TCP Socket and deserialized into the Invocation object.

Finally, add the List out to the subsequent ChannelHandler for processing. Later, in the “6. Message Distribution” summary, we will see that the MessageDispatcher dispatches the Invocation to the Corresponding MessageHandler for business logic execution.

5.5. Introduce dependencies

Create a pom.xml file and introduce Netty, FastJSON, and other dependencies.

5.6. Summary of this chapter

So far, we have completed the definition of communication protocol, codec logic, isn’t it interesting? !

In addition, we in NettyServerHandlerInitializer and NettyClientHandlerInitializer initialization code, add codec to it.

As shown below:

6. Message distribution

In SpringMVC, the DispatcherServlet dispatches requests to the Method Method of the matching Controller based on the request address, Method, and so on.

In the Dispatcher package of lab-67-netty-Demo-client project, we created the MessageDispatcher class to implement functions similar to DispatcherServlet. The Invocation is distributed to the Corresponding MessageHandler for business logic execution.

Now, let’s look at the actual code implementation.

6.1, the Message

Create the Message interface to define the tag interface for the Message.

The code is as follows:

public interface Message {

}

Below is the Message implementation class in question.

As shown below:

6.2, MessageHandler

Create MessageHandler interface, MessageHandler interface.

The code is as follows:

public interface MessageHandler {

/ * *

* Execute processing messages

*

* @param channel

* @param message

* /

voide xecute(Channel channel, T message);

/ * *

* @return Message TYPE, which is the TYPE static field on each Message implementation class

* /

String getType();

}

As the code above shows:

  • 1) Generic types are defined, which need to be the implementation class of Message;
  • 2) the definition of two interface methods, see the annotation ha.

The following figure shows the MessageHandler implementation class we are talking about.

As shown below:

6.3, MessageHandlerContainer

Create the MessageHandlerContainer class as a container for MessageHandler.

The code is as follows:

1.

Implement the InitializingBean interface. In the #afterPropertiesSet() method, scan all The MessageHandler beans and add them to the MessageHandler collection.

2.

In the #getMessageHandler(String Type) method, get the MessageHandler object corresponding to the type. We will call this method later in the MessageDispatcher.

3.

In the #getMessageClass(MessageHandler handler) method, the Class corresponding to the message type is obtained by parsing the generic type on the MessageHandler. This is a reference rocketmq – spring project DefaultRocketMQListenerContainer# getMessageType () method, slightly modified.

6.4, MessageDispatcher

Create the MessageDispatcher class to dispatch the Invocation to the MessageHandler for the business logic.

The code is as follows:

@ChannelHandler.Sharable

public class MessageDispatcher extends SimpleChannelInboundHandler {

@Autowired

private MessageHandlerContainer messageHandlerContainer;

private final ExecutorService executor = Executors.newFixedThreadPool(200);

@Override

protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {

// <3.1> Get the MessageHandler handler corresponding to type

MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());

// Get the MessageHandler message class

Class<? extendsMessage> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);

// <3.2> Parses the message

Message message = JSON.parseObject(invocation.getMessage(), messageClass);

// <3.3> Execute logic

executor.submit(newRunnable() {

@Override

public void run() {

// noinspection unchecked

messageHandler.execute(ctx.channel(), message);

}

});

}

}

Add the @channelHandler.Sharable annotation to the class to indicate that the ChannelHandler can be used by multiple channels.

(2) is SimpleChannelInboundHandler Netty define message processing ChannelHandler abstract class, dealing with the type of the message is generic.

(3) #channelRead0(ChannelHandle Context CTX, Invocation)

<3.1> The #getMessageHandler(String Type) method of the MessageHandlerContainer is called to obtain the MessageHandler handler for the Type of the Invocation.

Then, call the #getMessageClass(messageHandler) method of the MessageHandlerContainer to get the message class of the messageHandler handler.

<3.2> call JSON ## parseObject(String Text, Class Clazz) to parse the Message from the Invocation to the MessageHandler message object.

<3.3>, drop into the thread pool, and then call MessageHandler’s #execute(Channel Channel, T message) method to execute the business logic.

** Note: ** why throw it into the Executor thread pool? Let’s take a look at the threading model of EventGroup.

** When we start the Netty server or client, we will set its EventGroup.

An EventGroup can be thought of simply as a thread pool, and the size of the thread pool is only the number of cpus * 2. Each Channel is allocated to only one thread for data reading and writing. In addition, multiple channels share a thread, that is, the same thread is used to read and write data.

So think about it. The logic of MessageHandler involves IO processing, such as database reads. This can cause one Channel to block the reading of other channels that share the current thread while executing MessageHandler.

Therefore, we create a pool of Executor threads here to perform logical execution of MessageHandler and avoid blocking Channel reads.

One might say, can we make the EventGroup thread pool a little bigger, say 200? For long-connected Netty servers, there are 1000 to 100000 Netty clients connected to them. In this way, data reading will be blocked no matter how large the thread pool is set.

Note: An Executor thread pool, commonly known as a business thread pool or logical thread pool, executes business logic as the name suggests. Such a design approach, Dubbo and other RPC frameworks, are adopted this way. Later, you can read the Reactor Model of NIO Series carefully for further understanding.

6.5, NettyServerConfig

Create the NettyServerConfig configuration class and create the MessageDispatcher and MessageHandlerContainer beans.

The code is as follows:

@Configuration

public class NettyServerConfig {

@Bean

public MessageDispatcher messageDispatcher() {

return new MessageDispatcher();

}

@Bean

public MessageHandlerContainer messageHandlerContainer() {

return new MessageHandlerContainer();

}

}

6.6, NettyClientConfig

Create the NettyClientConfig configuration class and create the MessageDispatcher and MessageHandlerContainer beans.

The code is as follows:

@Configuration

public class NettyClientConfig {

@Bean

public MessageDispatcher messageDispatcher() {

return new MessageDispatcher();

}

@Bean

public MessageHandlerContainer messageHandlerContainer() {

return new MessageHandlerContainer();

}

}

6.7. Summary of this chapter

We will demonstrate the use of message distribution in detail in the following sections.

Disconnect and reconnect

The Netty client needs to implement the disconnection and reconnection mechanism to resolve the disconnection in various scenarios.

For example:

  • 1) When the Netty client is started, the Netty server is suspended, causing the connection failure.
  • 2) When the Netty server is running, the connection is disconnected.
  • 3) Network jitter on either end leads to abnormal connection disconnection.

The code implementation is relatively simple, adding reconnection mechanisms in two places:

  • 1) When the Netty client fails to connect to the Netty server, the client initiates a reconnection.
  • 2) When the Netty client is running and disconnected from Netty, the client initiates a reconnection.

Considering that reconnection may fail, we use timed reconnection to avoid occupying too many resources.

7.1. Specific code

1.

In NettyClient, the #reconnect() method is provided to implement the logic of timed reconnection.

The code is as follows:

// NettyClient.java

public void reconnect() {

eventGroup.schedule(new Runnable() {

@Override

publicvoidrun() {

Logger. info(“[reconnect][reconnect] “);

try{

start();

} catch(InterruptedException e) {

Logger. error(“[reconnect][reconnect] “, e);

}

}

}, RECONNECT_SECONDS, TimeUnit.SECONDS);

Logger. info(“[reconnect][{} reconnect in seconds]”, RECONNECT_SECONDS);

}

The timing logic is implemented by calling the #schedule(Runnable Command, Long Delay, TimeUnit Unit) method provided by EventLoop. The internal logic calls the #start() method of NettyClient to initiate a connection to the Netty server.

If #start() fails to connect to the Netty server, NettyClient will call #reconnect() to reconnect to the Netty server. This loop repeats until the Netty client connects to the Netty server.

As shown below:

2.

In NettyClientHandler, implement the #channelInactive(ChannelHandlerContext CTX) method to detect a disconnection from the Netty server. Call Netty Client’s #reconnect() method to initiate a reconnect.

The code is as follows:

// NettyClientHandler.java

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

// initiate a reconnection

nettyClient.reconnect();

// Continue to fire events

super.channelInactive(ctx);

}

7.2. Simple test

① Start Netty Client. Do not start Netty Server. The console logs are as follows:

When the connection fails, the Netty Client repeatedly initiates scheduled reconnection.

② Start Netty Server. The following figure is displayed on the console:

The Netty Client successfully reconnects to the Netty Server.

8. Heartbeat mechanism and idle detection

We can learn about TCP’s built-in idle detection mechanism, which is 2 hours by default. Such detection mechanism is acceptable from the system resource level.

However, on the service level, if it takes two hours to discover that the client is disconnected from the server, many messages will be lost, affecting the user experience.

Therefore, at the business level, we need to implement idle detection to ensure that the client is disconnected from the server as soon as possible.

The implementation logic is as follows:

  • 1) The server disconnects from the client when it finds that no message is read from the client for 180 seconds.
  • 2) The client disconnects from the server when no message is read from the server for 180 seconds.

Considering that the client and server do not always interact with each other with messages, we need to add a heartbeat mechanism.

The logic is as follows:

  • 1) The client sends a heartbeat message to the server every 60 seconds to ensure that the server can read the message;
  • 2) When receiving the heartbeat message, the server replies the client with a confirmation message to ensure that the client can read the message.

Friendly tips:

Why 180 seconds? You can increase or decrease to see how quickly you want to detect a connection exception. If the heartbeat duration is too short, the heartbeat frequency is too high, occupying too many resources.

Why 60 seconds? Three times to check whether the heartbeat timed out.

It sounds complicated, but it’s not complicated to implement.

8.1 Idle detection on the server

In NettyServerHandlerInitializer, we added a ReadTimeoutHandler processor, it is more than a specified time not read data from the end, will be thrown ReadTimeoutException anomalies.

As shown below:

In this way, the server disconnects from the client if no message is read from the client for 180 seconds.

8.2 Idle detection of the client

In NettyClientHandlerInitializer, we added a ReadTimeoutHandler processor, it is more than a specified time not read data from the end, will be thrown ReadTimeoutException anomalies.

As shown below:

In this way, the client disconnects from the server if no message is read from the server for 180 seconds.

8.3. Heartbeat Mechanism

Netty provides the IdleStateHandler processor, which provides idle detection. If a Channel’s read or write is idle for a long time, an IdleStateEvent will be triggered.

Thus, we only need to send a heartbeat message from the client to the client once the IdleStateEvent event is received in the NettyClientHandler processor.

As shown below:

HeartbeatRequest is a HeartbeatRequest.

At the same time, we created a HeartbeatRequestHandler message handler in the server project to reply the client with a confirmation message upon receiving the client’s heartbeat request.

The code is as follows:

@Component

public class HeartbeatRequestHandler implementsMessageHandler {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override

public void execute(Channel channel, HeartbeatRequest message) {

Logger. The info (” [the execute] [receive heart request connection ({})] “, channel. The id ());

// Respond to heartbeat

HeartbeatResponse response = newHeartbeatResponse();

channel.writeAndFlush(newInvocation(HeartbeatResponse.TYPE, response));

}

@Override

public String getType() {

return HeartbeatRequest.TYPE;

}

}

HeartbeatResponse is a heartbeat acknowledgement response.

8.4. Simple tests

Start the Netty Server and then the Netty Client. After 60 seconds, the following heartbeat logs are displayed:

Authentication logic

Starting with this section, we’ll look at a concrete example of processing business logic.

The authentication process is shown below:

9.1, AuthRequest

Create the AuthRequest class to define the user authentication request.

The code is as follows:

public class AuthRequest implements Message {

public static final String TYPE = “AUTH_REQUEST”;

/ * *

* the authentication Token

* /

private String accessToken;

/ /… Omit setter, getter, toString methods

}

Here we use the accessToken authentication token for authentication.

Normally, we log in to the system using HTTP, and then bind the client to the current user using the login identity (say accessToken authentication token).

9.2, AuthResponse

Create an AuthResponse class to define the user authentication response.

The code is as follows:

public class AuthResponse implements Message {

public static final String TYPE = “AUTH_RESPONSE”;

/ * *

* Response status code

* /

private Integer code;

/ * *

* Response prompt

* /

private String message;

/ /… Omit setter, getter, toString methods

}

9.3, AuthRequestHandler

The service side…

Create the AuthRequestHandler class to handle client authentication requests for the server.

The code is as follows:

The code is simpler, look at the comments on <1>, <2>, <3>, <4>.

9.4, AuthResponseHandler

The client…

Create the AuthResponseHandler class to handle the server authentication response for the client.

The code is as follows:

@Component

public class AuthResponseHandler implements MessageHandler {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override

public void execute(Channel channel, AuthResponse message) {

Logger. info(“[execute][authentication result: {}]”, message);

}

@Override

public String getType() {

return AuthResponse.TYPE;

}

}

Print a certification result for easy debugging.

9.5, TestController

The client…

Create a TestController class that provides a /test/mock interface to simulate a client sending requests to a server.

The code is as follows:

@RestController

@RequestMapping(“/test”)

public class TestController {

@Autowired

private NettyClient nettyClient;

@PostMapping(“/mock”)

public String mock(String type, String message) {

// Create the Invocation object

Invocation invocation = new Invocation(type, message);

// Send a message

nettyClient.send(invocation);

return “success”;

}

}

9.6. Simple tests

Start the Netty Server, then the Netty Client, and use Postman to simulate an authentication request.

As shown below:

In addition, the following logs are displayed:

11. Group chat logic

The process of group chat is shown in the picture below:

The server forwards the group chat messages sent by client A to clients A, B, and C.

** Note: ** For logical brevity, the example provided in this section is not a group, but everyone in a large group chat

11.1, ChatSendToAllRequest

Request to create ChatSendToOneRequest class to send group chat messages to all.

The code is as follows:

public class ChatSendToAllRequest implements Message {

public static final String TYPE = “CHAT_SEND_TO_ALL_REQUEST”;

/ * *

* Message number

* /

private String msgId;

/ * *

* content

* /

private String content;

/ /… Omit setter, getter, toString methods

}

**PS: ** If it is a serious group chat, there will be a groupId field, indicating the group number.

11.2, ChatSendToAllHandler

The service side…

Create the ChatSendToAllHandler class to handle group chat requests from clients for the server.

The code is as follows:

The code is simpler, look at the comments on <1> and <2>.

11.3. Simple test

1.

Start the Netty Server.

2.

Start Netty Client A. Postman is then used to simulate an authentication request (user yunai).

As shown below:

3.

Start Netty Client B. Note that port –server.port needs to be set to 8081 to avoid collisions.

(4)

Start Netty Client C. Note that port –server.port needs to be set to 8082 to avoid collisions.

5.

Finally, Postman is used to simulate sending a group chat message.

As shown below:

In addition, you can see that the logs sent by client A to all clients are as follows:

Finally, for a systematic overview of all aspects of IM development, read on: Just getting Started: Developing Mobile IM from Scratch.

Appendix, series of articles

Follow the source code to learn IM(a) : Hand to hand teach you to use Netty to achieve heartbeat mechanism, disconnection mechanism

Follow the source code to learn IM(two) : their own IM development is difficult? Hand to hand teach you a Android version of IM

Learning IM from Source code (3) : Developing an IM Server from Scratch based on Netty

Follow the source code to learn IM(four) : pick up the keyboard is dry, teach you freehand development of a set of distributed IM system

Learn IM with source code (5) : Understand IM long connection, heartbeat and reconnection mechanism correctly, and implement it.

Learn IM with source code (6) : Hand in hand to teach you to use Go to quickly build high performance, extensible IM System

Follow the source code to learn IM(7) : Hand in hand to teach you to use WebSocket to create Web IM chat

Follow the source code to learn IM(eight) : swastika long text, hand to hand teach you to use Netty to create IM chat “(* article)

This article has been simultaneously published at: www.52im.net/thread-3489…