1. The purpose of writing this article

I have been using Netty for some time and have a general understanding of Netty. Thinking back to the early days of Netty, many Netty components were not used, or not used well enough, to qualify as “best practices.” The purpose of this article is to guide you to use Netty construct a complete project, to sort out in the practical development experience some of the best practice sharing, of course, these best practices is not necessarily true best practice, only in the development of, or refer to other excellent code compiled together, If you have any different ideas or better practices, welcome to share in the comments section, we learn together and make progress!

2. Project preparation

First, the full version of the code ZPSW/JT808-netty

Development environment :IDEA+JDK1.8+Maven

Use framework: Netty + Spring Boot + Spring Data JPA

Other tools: Lombok (handy if you haven’t used it)

3. Development process

3.1. Understanding JT808 protocol 3.2. Building codec 3.3. Construct business Handler 3.4. Efficient management of Channel 3.5. Some improvementsCopy the code

3.1 Understanding JT808

The following is a brief introduction to the JT808 protocol format description, the full version of the JT808 protocol technical specification. PDF

In the message body attribute, we first pay attention to the length of the message body, not other things, and the subcontracting situation is not considered.

A basic data structure can be abstracted from the header and body of the message

@Data public class DataPacket { protected Header header = new Header(); // Header protected ByteBuf ByteBuf; @data public static class Header {private short msgId; // Message ID 2 bytes private short msgBodyProps; // Message body attribute 2 bytes private String terminalPhone; // Terminal mobile phone number 6 bytes private short flowId; // Get the packet length public shortgetMsgBodyLength() {
            return(short) (msgBodyProps & 0x3ff); } // Obtain the encryption type 3bits public bytegetEncryptionType() {
            return(byte) ((msgBodyProps & 0x1c00) >> 10); } // Whether to subcontract public BooleanhasSubPackage() {
            return((msgBodyProps & 0x2000) >> 13) == 1; }}}Copy the code

We can parse the Header first, and then let the subclass parse the package itself

 public void parse() { try{ this.parseHead(); // Verify the package lengthif(this.header.getMsgBodyLength() ! = this.byteBuf.readableBytes()) { throw new RuntimeException("Inclusion length error"); } this.parseBody(); / / by subclasses override} finally {ReferenceCountUtil. SafeRelease (enclosing byteBuf); }} protected voidparseHead() {
        header.setMsgId(byteBuf.readShort());
        header.setMsgBodyProps(byteBuf.readShort());
        header.setTerminalPhone(BCD.BCDtoString(readBytes(6)));
        header.setFlowId(byteBuf.readShort());
    }
    protected void parseBody() {}Copy the code

The readByte(int Length) method is a simple encapsulation of bytebuf.readBytes (byte[] DST)

public byte[] readBytes(int length) {
        byte[] bytes = new byte[length];
        this.byteBuf.readBytes(bytes);
        return bytes;
}
Copy the code

I didn’t find a similar method in Netty’s official Api, so I defined one myself

Another method is defined for response overwriting.

Response override:

 public ByteBuf toByteBufMsg() { ByteBuf bb = ByteBufAllocator.DEFAULT.heapBuffer(); bb.writeInt(0); // Take up 4 bytes to write msgId and msgBodyProps bb.writeBytes(bcd.tob)cdBytes(StringUtils.leftPad(this.header.getTerminalPhone(), 12, "0")));
        bb.writeShort(this.header.getFlowId());
        returnbb; } * *"Best practices"Buffer () is much more efficient than Unpooled Unpooled buffer(). In ChannelPipeLine we can use ctx.alloc() or channel.alloc() to get the Netty default memory allocator. It is not necessary to set up a unique memory allocator elsewhere. The value can be obtained by ByteBufAllocator.DEFAULT, which is the same as the previous value (unless specially configured). **Copy the code

Here, when we convert the response to ByteBuf and write it out, we don’t know the length of the message body, so we’ll just hold the space and write it later.

All messages are inherited from the DataPacket, and we pick a position with a relatively large number of fields to report the message

Then we set up the data structure of the location reporting message, looking at the format of the location message first

The establishment structure is as follows:

@Data public class LocationMsg extends DataPacket { private int alarm; 4 bytes private int statusField; // State 4 bytes privatefloatlatitude; // latitude 4 bytes privatefloatlongitude; Private short elevation; // Altitude 2 bytes private short speed; // speed 2 bytes private short direction; Private String time; BCD public LocationMsg(ByteBuf ByteBuf) {super(ByteBuf); } @Override public voidparseBody() { ByteBuf bb = this.byteBuf; this.setAlarm(bb.readInt()); this.setStatusField(bb.readInt()); This.setlatitude (bb.readunsignedint () * 1.0f / 1000000); This.setlongitude (bb.readunsignedint () * 1.0f / 1000000); this.setElevation(bb.readShort()); this.setSpeed(bb.readShort()); this.setDirection(bb.readShort()); this.setTime(BCD.toBcdTimeString(readBytes(6))); }}Copy the code

All messages that do not have their own reply require a default reply in the following format

@Data public class CommonResp extends DataPacket { private short replyFlowId; // Reply sequence number 2 bytes private short replyId; // Reply ID 2 bytes private byte result; // Result 1 byte publicCommonResp() {
        this.getHeader().setMsgId(JT808Const.SERVER_RESP_COMMON);
    }

    @Override
    public ByteBuf toByteBufMsg() {
        ByteBuf bb = super.toByteBufMsg();
        bb.writeShort(replyFlowId);
        bb.writeShort(replyId);
        bb.writeByte(result);
        returnbb; }}Copy the code

3.2 Building codecs

decoder

The previous agreement as you can see, the identity of 0 x7e, so our first decoder can use Netty own DelimiterBasedFrameDecoder, one of the natural is 0 x7e delimiters. (Netty provides many built-in codecs. You are advised to customize them after ensuring that Netty’s built-in codecs do not meet your requirements.)

After DelimiterBasedFrameDecoder help us truncation, information to our own decoder, our aim is to convert ByteBuf into our previously defined data structure. Define decoder

public class JT808Decoder extends ByteToMessageDecoder {
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    
    }
}
Copy the code

Step 1: Escape restore, escape rules are as follows

0x7d 0x01 -> 0x7d

0x7d 0x02 -> 0x7e

public ByteBuf revert(byte[] raw) { int len = raw.length; ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(len); // The DataPacket parse method is recycledfor (int i = 0; i < len; i++) {
            if (raw[i] == 0x7d && raw[i + 1] == 0x01) {
                buf.writeByte(0x7d);
                i++;
            } else if (raw[i] == 0x7d && raw[i + 1] == 0x02) {
                buf.writeByte(0x7e);
                i++;
            } else{ buf.writeByte(raw[i]); }}return buf;
    }
Copy the code

Step 2: Check

byte pkgCheckSum = escape.getByte(escape.writerIndex() - 1); escape.writerIndex(escape.writerIndex() - 1); Byte calCheckSum = jt808Util. XorSumBytes(escape);if(pkgCheckSum ! = calCheckSum) { log.warn(PkgCheckSum :{},calCheckSum:{}", pkgCheckSum, calCheckSum); ReferenceCountUtil.safeRelease(escape); // Be sure not to leak the releasereturn null;
    }
Copy the code

Step 3: Decode

 public DataPacket parse(ByteBuf bb) {
        DataPacket packet = null;
        short msgId = bb.getShort(bb.readerIndex());
        switch (msgId) {
            case TERNIMAL_MSG_HEARTBEAT:
                packet = new HeartBeatMsg(bb);
                break;
            case TERNIMAL_MSG_LOCATION:
                packet = new LocationMsg(bb);
                break;
            case TERNIMAL_MSG_REGISTER:
                packet = new RegisterMsg(bb);
                break;
            case TERNIMAL_MSG_AUTH:
                packet = new AuthMsg(bb);
                break;
            case TERNIMAL_MSG_LOGOUT:
                packet = new LogOutMsg(bb);
                break;
            default:
                packet = new DataPacket(bb);
                break;
        }
        packet.parse();
        return packet;
    }
Copy the code

In switch, we try to put the ones with high frequency in the front to avoid too much if judgment

Then we add the message out.add(MSG) to get the message to our business Handler.

The encoder

The encoder needs to convert our DataPacket to ByteBuf and then escape it. Define encoder


public class JT808Encoder extends MessageToByteEncoder<DataPacket> {
    protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out) throws Exception {
    
    }
}
Copy the code

Step 1: Switch

ByteBuf bb = msg.toByteBufMsg();
Copy the code

Remember when we converted the header from a DataPacket to a DataPacket that took up 4 bytes to be overwritten later

bb.markWriterIndex(); Short bodyLen = (short) (bb.readableBytes() -12); short bodyProps = createDefaultMsgBodyProperty(bodyLen); // Overwrite the occupied 4 bytes bb.writerIndex(0); bb.writeShort(msg.getHeader().getMsgId()); bb.writeShort(bodyProps); bb.resetWriterIndex(); bb.writeByte(JT808Util.XorSumBytes(bb));Copy the code

Step 2: Escape

public ByteBuf escape(ByteBuf raw) { int len = raw.readableBytes(); ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer(len + 12); // Assume that up to 12 need to be escaped buf.writeByte(jt808conconst.PKG_DELIMITER);while (len > 0) {
            byte b = raw.readByte();
            if (b == 0x7e) {
                buf.writeByte(0x7d);
                buf.writeByte(0x02);
            } else if (b == 0x7d) {
                buf.writeByte(0x7d);
                buf.writeByte(0x01);
            } else {
                buf.writeByte(b);
            }
            len--;
        }
        ReferenceCountUtil.safeRelease(raw);
        buf.writeByte(JT808Const.PKG_DELIMITER);
        returnbuf; } * *"Best practices"We return that ByteBuf is written out, so using directBuffer is more efficientCopy the code

When the escape is complete, it is sent directly, without forgetting to release it.

        ByteBuf escape = escape(bb);
        out.writeBytes(escape);
        ReferenceCountUtil.safeRelease(escape);
Copy the code

3.3 Building a Service Handler

In the decoder we return a DataPacket object, so we have two options for writing this Handler:

One is to define a Handler to receive the DataPacket and determine the type, as shown below

@Component
@ChannelHandler.Sharable
public class JT808ServerHandler extends SimpleChannelInboundHandler<DataPacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DataPacket msg) throws Exception {
        log.debug(msg.toString());
        if (msg instanceof AuthMsg || msg instanceof HeartBeatMsg || msg instanceof LocationMsg || msg instanceof LogOutMsg) {
            CommonResp resp = CommonResp.success(msg, getFlowId(ctx));
            ctx.writeAndFlush(resp);
        } else if(msg instanceof RegisterMsg) { RegisterResp resp = RegisterResp.success(msg, getFlowId(ctx)); ctx.writeAndFlush(resp); }}}Copy the code

Alternatively, each subtype of a DataPacket defines a Handler, as shown below

public class LocationMsgHandler extends SimpleChannelInboundHandler<LocationMsg> 
public class HeartBeatMsgHandler extends SimpleChannelInboundHandler<HeartBeatMsg> 
public class RegisterMsgHandler extends SimpleChannelInboundHandler<LogOutMsg> 
Copy the code

I chose the second option here, one for good code style and the other for more on that later.

Here is the detailed code for a LocationMsgHandler, which saves the location to the database and then replies to the device


@Slf4j
@Component
@ChannelHandler.Sharable
public class LocationMsgHandler extends BaseHandler<LocationMsg> {

    @Autowired
    private LocationRepository locationRespository;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LocationMsg msg) throws Exception {
        log.debug(msg.toString());
        locationRespository.save(LocationEntity.parseFromLocationMsg(msg));
        CommonResp resp = CommonResp.success(msg, getSerialNumber(ctx.channel()));
        write(ctx, resp);
    }
}
Copy the code

SimpleChannelInboundHandler BaseHandler inheritance, it defines some common methods, such as getSerialNumber () to obtain response serial number

    private static final AttributeKey<Short> SERIAL_NUMBER = AttributeKey.newInstance("serialNumber");

    public short getSerialNumber(Channel channel){
        Attribute<Short> flowIdAttr = channel.attr(SERIAL_NUMBER);
        Short flowId = flowIdAttr.get();
        if (flowId == null) {
            flowId = 0;
        } else {
            flowId++;
        }
        flowIdAttr.set(flowId);
        return flowId;
    }

Copy the code

We store the serial number in Channel for easy maintenance.

3.4. Efficient management of Channel

Suppose we now have a requirement that we need to find a specific connection to send a message. In our case, specificity means finding a connection based on the phone number in the header and sending a message. We could maintain a Map of all channels, but that would waste the set of methods provided by Netty’s DefaultChannelGroup. Therefore, we can define a ChannelManager, which internally uses DefaultChannelGroup to maintain Channel, and maintains the mapping relationship between mobile phone number ->ChannelId.

@Component
public class ChannelManager {

    private static final AttributeKey<String> TERMINAL_PHONE = AttributeKey.newInstance("terminalPhone");
    
    private ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private Map<String, ChannelId> channelIdMap = new ConcurrentHashMap<>();

    private ChannelFutureListener remover = future ->
            channelIdMap.remove(future.channel().attr(TERMINAL_PHONE).get());


    public boolean add(String terminalPhone, Channel channel) {
        boolean added = channelGroup.add(channel);
        if (added) {
            channel.attr(TERMINAL_PHONE).set(terminalPhone);
            channel.closeFuture().addListener(remover);
            channelIdMap.put(terminalPhone, channel.id());
        }
        return added;
    }

    public boolean remove(String terminalPhone) {
        return channelGroup.remove(channelIdMap.remove(terminalPhone));
    }

    public Channel get(String terminalPhone) {
        return channelGroup.find(channelIdMap.get(terminalPhone));
    }

    public ChannelGroup getChannelGroup() {
        returnchannelGroup; }}Copy the code

We define a ChannelFutureListener that executes this callback when a channel is closed to help us keep our channelIdMap from being too bloated and efficient. The same is true for DefaultChannelGroup. So you don’t have to worry about channels that don’t exist and use up memory. In addition, we can provide DefaultChannelGroup to broadcast at some time.

3.5. Some improvements

1. Database operations have occurred in our LocationMsgHandler

        locationRespository.save(LocationEntity.parseFromLocationMsg(msg));
Copy the code

In Netty, however, handlers are driven by the Reactor thread by default, and blocking greatly reduces concurrency, so we defined a dedicated EventExecutorGroup to drive time-consuming handlers. Just specify it when the Channel is initialized. Another benefit of defining a Handler for each DataPacket subtype is that we can have time-consuming handlers driven by a dedicated business thread pool, while non-time-consuming handlers are driven by the default Reactor thread, adding flexibility.

pipeline.addLast(heartBeatMsgHandler); pipeline.addLast(businessGroup,locationMsgHandler); // Add businessGroup Pipeline.addLast (authMsgHandler) to businessGroup pipeline.addLast(authMsgHandler); pipeline.addLast(registerMsgHandler); pipeline.addLast(logOutMsgHandler);
Copy the code

In addition, like the case order in the switch in the decoder parse(), we can also save some if judgments by increasing the order of the Handler.

Our LocationMsgHandler is now driven by businessGroup, but the Reactor thread is still used to write the response, so we can write(CTX, RESP) to reduce some judgment and improve performance. Instead of

workerGroup.execute(() -> write(ctx, resp));
Copy the code

One of the workergroups is in the boot, we use Spring to define it as a separate bean, when the use of direct annotations can be introduced

serverBootstrap.group(bossGroup, workerGroup)
Copy the code

3. With the help of Spring, we can define almost all components as singletons, which improves performance slightly, except encoders and decoders, which cannot be defined as singletons because they have some properties to maintain.

conclusion

Thank you for your patience. This is my first time to write an article. Please bear with me if there are any mistakes.

Also send the full version code to ZPSW/JT808-NETTY

This is also the first project of personal open source, if it is helpful to you, give a Star will be appreciated.

Here are some other Netty best practices (paraphrased from Best Practice in Netty) :

  • Write andFlush should not be called all the time. Should we call write and flush at appropriate times, because every system flush is a system call, and if possible, the number of write calls should be reduced. Because it will go through the entire pipeline(github.com/netty/netty…)
  • If you don’t care about the results of write, use channel.voidPromise() to reduce object creation
  • Has been written for processing capacity weaker recipients, may cause OutMemoryError, focus on the channel. The isWritable () and the channelhandler cahnnelWritabilityChanged () will be very helpful, Channel. BytesBeforeUnwritable and channel. BytesBeforeWritable () is also worth attention
  • Note that write_buffer_high_water_mark and write_buffer_low_water_mark are configured, for example, high:32kb(default 64kb), low:8kb(default 32kb).
  • Can be triggered by channelpipeline custome events (pipeline fireUserEventTriggered (MyCustomEvent)), to deal with the corresponding in DuplexChannelHandler events

Some English ones will not be posted here

In addition, amway offers a network debugging tool for beginners, NetAssist

goodbye