• Scotland team
  • Author: Jonny

As we know, TCP is connection flow oriented transmission, which uses Nagle algorithm, in the buffer on the upper data processing. The automatic fragmentation mechanism and a large number of small packets on the network also cause sticky packets (merging small packets) and half-packets (splitting large packets). As a result, data has no message protection boundary and the receiving end cannot determine whether a complete packet is received once. So what’s the solution to this problem?


1. Solutions to sticky packages and comparison

Simply, since the message has no boundaries, we should add a boundary recognition to the message before it passes down.

  1. Sends fixed-length messages
  2. Use special tags to distinguish between message intervals
  3. Send the size of the message along with the message

The first option is not flexible enough; The second risk is that if the data happens to have that special character in it, there will be a problem. The third option, while increasing header parsing, is relatively secure.

2. Subcontracting and unpacking

Since the use of the third scheme, it is inevitable to involve the problem of packaging and unpacking.

You must first define the structure of the packet, which is like an Http packet, with a header and a body. In fact, the packet head is a fixed size structure, in which a structure member variable represents the length of the package, and other structure members can be defined according to the need. According to the fixed length of packet header and the variable containing packet length in packet header, a complete packet can be correctly split. The package contains the data content.

At the sending end, packets need to be wrapped. Packet encapsulation is to add a header to a piece of data, so that the packet is divided into a header and a packet body.

At the receiving end, the package needs to be unpacked. The main process is as follows:

  1. Dynamically allocate a buffer for each connection and associate the buffer with the SOCKET.
  2. When data is received, it is first stored in the buffer.
  3. Check whether the data length in the cache is enough for a packet header. If not, do not unpack the data.
  4. According to the packet head data to parse out the variable representing the length of the packet body.
  5. Determine whether the data length except the packet header in the cache is enough for a packet body. If not, do not unpack the packet.
  6. Retrieve the entire packet. “Fetch” here means not only copying the packet from the buffer, but also deleting it from the cache. This is done by moving the data following the packet to the start address of the buffer.

Among them, there are two kinds of buffer design:

  1. Dynamic buffer transient is used to adjust the buffer size according to the data size. The disadvantage of this solution is that to avoid the buffer growing, the residual data in the buffer needs to be copied to the buffer head each time a full package is parsed, which increases the system load.
  2. A circular buffer is used to define two Pointers to the head and tail of valid data. Only the head and tail Pointers are moved when storing and deleting data

3. Network and native byte order

Once the message structure is defined, the sender and receiver also need to unify the byte order. As we know, native endienment varies from machine to machine, with the vast majority of X86 machines being small-endiended and then a few being big-endian. Therefore, the byte order must be unified before the data stream is transmitted. In general, network byte order (big endian) is used for transmission, and Unicode is used for encoding.

4. Code implementation

With that in mind, what are we going to do now? The sender seals the packet according to the defined protocol rules, the receiver puts the received buffer into the buffer, and starts to unpack the packet when there is a complete packet in the buffer. Note that when reading or writing data that exceeds one byte, the data must be read in big-endian byte order. Here is the code implementation for Node (only the core implementation snippet is provided) :

1) Sender packet:

    let head = new Buffer(4);
    let jsonStr = JSON.stringify(json);
    let body = new Buffer(jsonStr);
    // Big endian write of more than one byte
    head.writeInt32BE(body.byteLength, 0);
    let buffer = Buffer.concat([head, body]);
Copy the code

2) Receiving end receives buffer into buffer:

let dataReadStart = 0; // Start position of new data
let dataLength = buffer.length; // The length of the data to copy
let availableLen = _bufferLength - _dataLen; // Free space in the buffer

// The remaining buffer space is not enough to store the data
if (availableLen < dataLength) {
    let newLength = Math.ceil((_dataLen + dataLength) / _bufferLength) * _bufferLength;
    let _tempBuffer = Buffer.alloc(newLength);
    
    // Copy the old data to the new buffer and modify the related parameters
    if (_writePointer < _readPointer) { // The order in which data is stored in the tail + head of the old buffer
        let dataTailLen = _bufferLength - _readPointer;
        _buffer.copy(_tempBuffer, 0, _readPointer, _readPointer + dataTailLen);
        _buffer.copy(_tempBuffer, dataTailLen, 0, _writePointer);
    } else {  // Data is stored completely in sequence
        _buffer.copy(_tempBuffer, 0, _readPointer, _writePointer);
    }
    _bufferLength = newLength;
    _buffer = _tempBuffer;
    _tempBuffer = null;
    _readPointer = 0;
    _writePointer = _dataLen;

    // Store the newly arrived buffer
    buffer.copy(_buffer, _writePointer, dataReadStart, dataReadStart + dataLength);
    _dataLen += dataLength;
    _writePointer += dataLength;

} else if (_writePointer + dataLength > _bufferLength) {
// If there is enough space, the data will burst the end of the buffer. Some of the data will be stored in the buffer after the old data, and some of the data will be stored at the beginning of the buffer
    // The length of the remaining space at the tail of the buffer
    let bufferTailLength = _bufferLength - _writePointer;

    // Data tail position
    let dataEndPosition = dataReadStart + bufferTailLength;
    buffer.copy(_buffer, _writePointer, dataReadStart, dataEndPosition);

    // data Indicates the number of bytes not copied into the cache
    let restDataLen = dataLength - bufferTailLength;
    buffer.copy(_buffer, 0, dataEndPosition, dataLength);

    _dataLen = _dataLen + dataLength;
    _writePointer = restDataLen

} else { // Copy the data directly to the buffer
    buffer.copy(_buffer, _writePointer, dataReadStart, dataReadStart + dataLength);
    _dataLen = _dataLen + dataLength;
    _writePointer = _writePointer + dataLength
}
Copy the code

2) Fetch all complete packets from buffer (after receiving buffer into buffer)

let _dataHeadLen = 4;
timer && clearInterval(timer);
timer = setInterval((a)= >{
    // The buffer data is not enough to parse the packet header
    if (_dataLen < _dataHeadLen) {
        console.log('The data length is smaller than the specified length in the packet header. Wait for the data...... ')
        clearInterval(timer);
    }
    // Set the length of the packet
    // The last readable byte of the tail
    let restDataLen = _bufferLength - _readPointer;
    let dataLen = 0;
    let headBuffer = Buffer.alloc(_dataHeadLen);
    // The packet is stored in segments. The packet header cannot be directly parsed
    if (restDataLen < _dataHeadLen) {
        // Fetch the first header byte
        _buffer.copy(headBuffer, 0, _readPointer, _bufferLength)
        // Retrieve the second header byte
        let unReadHeadLen = _dataHeadLen - restDataLen;
        _buffer.copy(headBuffer, restDataLen, 0, unReadHeadLen)
        dataLen = headBuffer.readUInt32BE(0);
    } else {
        _buffer.copy(headBuffer, 0, _readPointer, _readPointer + _dataHeadLen);
        dataLen = headBuffer.readUInt32BE(0);;
    }

    // The length of the data is not enough to read
    if (_dataLen - _dataHeadLen  < dataLen) {
        log.info("Buffer existing body data length is less than the body defined by the header, waiting for data......")
        clearInterval(timer);

    } else { // The data is readable enough to read packets
        let package = Buffer.alloc(dataLen);
        // Data is stored in segments, which require two reads
        if (_bufferLength - _readPointer < dataLen) {
            let firstPartLen = _bufferLength - _readPointer;
            // Read the first part, directly to the end of the character
            _buffer.copy(package, 0, _readPointer, firstPartLen + _readPointer);
            // Read the second part, the data stored at the beginning
            let secondPartLen = dataLen - firstPartLen;
            _buffer.copy(package, firstPartLen, 0, secondPartLen);
            _readPointer = secondPartLen; // Update the readable starting point

        } else { // Read data directly
            _buffer.copy(package, 0, _readPointer, _readPointer + dataLen);
            _readPointer += dataLen; // Update the readable starting point
        }

        _dataLen -= readData.length; // Update the data length
        // All data has been read
        if (_readPointer === _writePointer) {
            clearInterval(timer)
        }

        // Start unpackingcallback(package); }},50);
Copy the code

4) Unpack and get data

let headBytes = 4;
let head = new Buffer(headBytes);
buffer.copy(head, 0.0, headBytes);
let dataLen = head.readUInt32BE();
const body = new Buffer(dataLen);
buffer.copy(body, 0, headBytes, headBytes + dataLen)

let content = null;
try {
    const str = body.toString('utf-8');
    if(str === ' '){
        content = null;
    }else{
        content = JSON.parse(body); }}catch (e) {
    log.error('head specifies the length of the body. ')}// pass to the business layer
callback(content);
Copy the code

5, summary

From the above we have seen a process of unpacking packets. TCP is reliable. Only one packet is transmitted on the network at a time, and packet loss is retransmitted. Therefore, there is no need to worry about packet loss or packet disorder. UDP has message protection boundaries, does not need to be unpacked, then it is unreliable transmission, also need to solve some other problems, such as packet loss and packet sorting.

When designing the packet structure above, we simply add a packet length. In fact, we can increase the required fields freely in business scenarios, such as protocol version and protocol type.