Bitchat is an INSTANT messaging framework based on Netty

Project address: github.com/all4you/bit…

Quick start

The bitchat-example module provides a server and client implementation example that you can follow for your own business implementation.

Starting the server

To start the Server, you need to get an instance of the Server, which can be obtained from ServerFactory.

At present, only Server in single-machine mode is implemented. SimpleServerFactory only needs to define a port to obtain a single-machine Server instance, as shown below:

public class StandaloneServerApplication { public static void main(String[] args) { Server server = SimpleServerFactory.getInstance() .newServer(8864); server.start(); }}Copy the code

After the server is successfully started, the following information is displayed:

Start the client

At present, only the client directly connected to the server is implemented. Using SimpleClientFactory, you only need to specify a ServerAttr to obtain a client and then connect the client to the server, as shown below:

public class DirectConnectServerClientApplication {

    public static void main(String[] args) {
        Client client = SimpleClientFactory.getInstance()
            .newClient(ServerAttr.getLocalServer(8864));
        client.connect();

        doClientBiz(client); }}Copy the code

After the client is connected to the server, the following information is displayed:

Experience client functions

Currently, the client provides three types of Func: login, view the list of online users, and send single chat messages. Each Func has a different command format.

The login

To log in, run -lo houyi 123456 on the client. Currently, the user center is not implemented. Mock a Mock user service, so any user name or password will be used to log in and a user ID will be created for the user.

If the login is successful, the following information is displayed:

login.jpg

Viewing Online Users

After the successful login, you can run the -lu command to obtain the list of online users. The current users are saved in memory. The command output is as follows:

list-user.jpg

Send single chat messages

To send chat messages to user houyi as gris, run -pc 1 hello,houyi

The second parameter is the user ID to which the message is sent, and the third parameter is the message content

The sender of the message, after sending the message:

send-p2p-msg.jpg

Message receiver, receiving a message:

received-p2p-msg.jpg

The client is disconnected and reconnected

The heartbeat is maintained between the client and the server, and both sides check whether the connection is available. The client sends a PingPacket to the server every 5s, and the server replies with a PongPacket after receiving the PingPacket, indicating that both sides are healthy.

When the server does not receive the message from the client for some reason, the server disconnects the client, and the same client also performs this check.

When the connection between the client and the server is disconnected, the channelInactive method of the client HealthyChecker is triggered to reconnect the client.

client-reconnect.jpg

The overall architecture

Stand-alone version

The architecture of the standalone version only involves the server and client, as well as the protocol layer between them, as shown in the figure below:

stand-alone-arch.jpg

In addition to the server and client, there are three major centers: the message center, the user center, and the link center.

  • Message center: mainly responsible for message storage and history, offline message query

  • User center: Mainly responsible for user and group-related services

  • Link center: saves client links. The server obtains client links from the link center and pushes messages to the client

Cluster version

The standalone version cannot achieve high availability and has certain limitations in performance and number of users that can be served, so the scalable cluster version is required. The cluster version adds a routing layer on the basis of the standalone version, through which the client obtains the available server address and then communicates with the server, as shown in the following figure:

cluster-arch.jpg

The client sends a message to another user. After receiving the request, the server obtains from the Connection center which server the target user “hangs” in. If it is under its own name, it is the simplest to push the message directly to the target user. Have the target server push the message to the target user.

Custom protocol

A custom protocol is used to implement communication between the server and the client. The protocol has the following fields:

*
* <p>
* The structure of a Packet is like blow:
* +----------+----------+----------------------------+
* |  size    |  value   |  intro                     |
* +----------+----------+----------------------------+
* | 1 bytes  | 0xBC     |  magic number              |
* | 1 bytes  |          |  serialize algorithm       |
* | 4 bytes  |          |  packet symbol             |
* | 4 bytes  |          |  content length            |
* | ? bytes  |          |  the content               |
* +----------+----------+----------------------------+
* </p>
*
Copy the code

The meaning of each field

Bytes of use
1 Magic number, default is 0xBC
1 Serialization algorithm
4 The type of Packet
4 The content length of Packet
? The contents of the Packet

The serialization algorithm will determine which serialization method is used in the Packet encoding and decoding.

The Packet type determines which Packet the byte stream arriving at the server will be deserialized into and which PacketHandler will process the Packet.

The content length will solve the problem of Packet unpacking and sticky Packet. When the server parses the byte stream, it will wait until the byte length reaches the content length before reading the byte.

In addition, a SYNC field is stored in the Packet, which specifies whether the server needs to use an asynchronous business thread pool to process the Packet’s data.

Health check

The server and client respectively maintain a health check service, namely the IdleStateHandler provided by Netty. By inheriting this class and implementing the channelIdle method, we can realize the logical processing when the connection is “idle”. When there is idle, we only care about reading idle. We can assume that there is something wrong with this link.

Just close the link if there is a problem, as shown below:

public class IdleStateChecker extends IdleStateHandler {

    private static final int DEFAULT_READER_IDLE_TIME = 15;

    private int readerTime;

    public IdleStateChecker(int readerIdleTime) {
        super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS);
        readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime;
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        log.warn("[{}] Hasn't read data after {} seconds, will close the channel:{}", IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel()); ctx.channel().close(); }}Copy the code

In addition, the client needs to maintain an additional health checker that normally sends heartbeat to the server on a regular basis and reconnects when the link status becomes inActive, as shown below:

public class HealthyChecker extends ChannelInboundHandlerAdapter {

    private static final int DEFAULT_PING_INTERVAL = 5;

    private Client client;

    private int pingInterval;

    public HealthyChecker(Client client, int pingInterval) {
        Assert.notNull(client, "client can not be null");
        this.client = client;
        this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        schedulePing(ctx);
    }

    private void schedulePing(ChannelHandlerContext ctx) {
        ctx.executor().schedule(() -> {
            Channel channel = ctx.channel();
            if (channel.isActive()) {
                log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName());
                channel.writeAndFlush(new PingPacket());
                schedulePing(ctx);
            }
        }, pingInterval, TimeUnit.SECONDS);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(() -> {
            log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName()); client.connect(); }, 5, TimeUnit.SECONDS); ctx.fireChannelInactive(); }}Copy the code

Business thread pool

As we know, Netty maintains two IO thread pools, one boss is mainly responsible for establishing links, and the other worker is mainly responsible for reading and writing data on links. We should not use IO threads to deal with our business, because it may cause blocking to IO threads. New links cannot be created or data cannot be read or written in a timely manner.

To solve this problem, we need to process our business logic in the business thread pool, but this is not absolute. If the logic we want to perform is simple and does not cause too much blocking, we can do it directly in the IO thread. For example, the client sends a Ping and the server replies with a Pong, There is no need to handle this in the business thread pool, because the IO thread will eventually write the data. But if a business logic needs to query a database or read a file, these operations tend to be time consuming, so these operations need to be wrapped up in a business thread pool.

The server allows the client to specify the mode of service processing in the transmitted Packet. After decoding the byte stream into Packet, the server determines how to process the Packet according to the value of sync field in Packet, as shown below:

public class ServerPacketDispatcher extends 
    SimpleChannelInboundHandler<Packet> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Packet request) {
        // if the packet should be handled async
        if (request.getAsync() == AsyncHandle.ASYNC) {
            EventExecutor channelExecutor = ctx.executor();
            // create a promise
            Promise<Packet> promise = new DefaultPromise<>(channelExecutor);
            // async execute and get a future
            Future<Packet> future = executor.asyncExecute(promise, ctx, request);
            future.addListener(new GenericFutureListener<Future<Packet>>() {
                @Override
                public void operationComplete(Future<Packet> f) throws Exception {
                    if(f.isSuccess()) { Packet response = f.get(); writeResponse(ctx, response); }}}); }else{ // sync execute and get the response packet Packet response = executor.execute(ctx, request); writeResponse(ctx, response); }}}Copy the code

It’s not just IM frameworks

In addition to serving as an IM framework, BitChat can also serve as a general purpose communication framework.

Packet, as the carrier of communication, can quickly realize its own business by inheriting AbstractPacket, and with PacketHandler as the data processor, can realize the communication between client and server.