Visit the address for a better reading experience.

One, foreword

Nodejs has been developing at Ant and Ali for four or five years, from “front-end engineer’s toy” to Web and BFF scenarios, and gradually to online and even some core businesses, which is not easy. Why did Nodejs survive? It’s not just about non-blocking I/O, event-driven, lightweight features. I think it’s more important that we bridge the gap with Java and realize the interconnectivity, which makes it really integrated into alibaba’s technology system.

With the open source of Ant SOFA technology stack, we also open source two Nodejs RPC related modules, hoping to fill the gap in Nodejs community, and also summarize and share our experience in Nodejs basic technology in the past few years.

  • Sofa – bolt-Node: Nodejs implementation of ant communication protocol Bolt
  • Sofa – Rpc-node: a generic Nodejs RPC module

What is RPC

Remote Procedure Call (RPC) is a poor translation of the Chinese name “Remote Procedure Call”. “Remote” means that the method is not in the current process, but on another process or machine. Together, RPC means calling a function on another process or machine.

In the pre-internet era, programs were stand-alone and all logic had to be in the same process. Processes are like strange neighbors in a high-rise building. They cannot be shared, and the same function can only be implemented once. Obviously, the obstacles to the process are astronomical and do not conform to the development direction of advanced productivity. At this time, the need for “interprocess communication” emerged, requiring processes to communicate with each other, share and invoke each other. This way, you can use interprocess communication to invoke and share existing functionality. With the advent of the Internet, the barriers between processes are further eliminated. Not only can neighbors in the same building share resources, but residents in other communities or even other cities can call each other through the Internet, which is called RPC. The concept is easy to understand, but there is a big difference between remote and local implementations. It is the architect’s responsibility to design a mechanism that makes calling services remotely as easy as calling local services. This is the RPC framework.

3. Basic principles

RPC mainly solves the problem of communication. The mainstream RPC framework is divided into two kinds based on HTTP and TCP. An HTTP-based RPC call is as simple as if we were visiting a web page, only it returns a single result (JSON or XML). It has the advantages of simple implementation, standardization and cross-language, and is more suitable for the scenario of providing OpenAPI externally. However, it has the disadvantages of low TRANSMISSION efficiency of HTTP protocol and high overhead of short connection (greatly improved after HTTP 2.0). Tcp-based RPC calls, because TCP is at the lower level of the protocol stack, can be more flexible to customize protocol fields, reduce network overhead, improve performance, and achieve greater throughput and concurrency. However, it needs to pay more attention to the complex details at the bottom, and it is difficult to cross languages and cross-platforms, so the cost of implementation is higher. Therefore, it is more suitable for the scenes of pursuing the ultimate performance between internal systems.

The NEXT RPC I mentioned is based on TCP, because it is the way supported by the current mainstream RPC framework in the industry, as well as the way adopted by ALI’S HSF and Ant’s Bolt. Let’s take a look at the basic flow of an RPC call to get a general idea, and then discuss the details one by one


  1. The Client invokes the corresponding interface through the local RPC Proxy
  2. The local proxy translates the RPC service name, method name, and parameter information into a standard RPC Request object to the RPC framework
  3. The RPC framework uses RPC Protocol to serialize the RPC Request object into binary form, and then passes it to the Server over the TCP channel.
  4. The Server receives the binary data and deserializes it into an RPC Request object
  5. The Server finds the local corresponding method according to the information in the RPC Request, executes it by passing in parameters, obtains the result, and encapsulates the result into RPC Response and submits it to the RPC framework
  6. The RPC framework serializes the RPC Response object into binary form through THE RPC Protocol, and then passes it to the service caller (Client) over the TCP channel.
  7. The caller (Client) receives the binary data, deserializes it into an RPC Response object, and returns the result to the business code via a local Proxy

4. Communication layer protocol design

Since the data transmitted over the TCP channel can only be in binary form, we must convert data structures or objects to binary strings to pass to each other, a process called “serialization.” In contrast, the process by which we receive a binary string from the other party and convert it into a data structure or object is called deserialization. The rules for serialization and deserialization are called protocols.

I divide RPC protocols into two categories, one is communication layer protocol, the other is application layer protocol. Communication layer protocols are generally irrelevant to services. They are responsible for packaging service data and transmitting it to the receiving party in a secure and complete manner. HSF, Dubbo, and gRPC are all communication layer protocols. Application layer protocols are used to define conversion rules between service data and binary strings. Common application layer protocols include Hessian, Protobuf and JSON. The focus of these two protocols is not quite the same. For an RPC framework, once the communication layer protocol is determined, it rarely changes, which requires it to have good enough universality and expansibility. The application layer protocol can be chosen by the business in theory, and it pays more attention to the coding efficiency and cross-language characteristics. In my opinion, the core of RPC framework is communication layer protocol design. In other words, you understand the meaning of each field of communication layer protocol and basically understand RPC principle.

Let’s try to design an RPC communication protocol. It usually consists of a Header and a Payload, which is called a Packet. We need to have a packet because binary only completes Stream transmission and does not know the start and end of a data request and response. We need to define the packet structure in advance to do the parsing.

Protocol design is like dividing a packet into several “cells” of unit length in sequence, and then deciding what information to store in each “cell”. A “cell” is a Byte, which is the smallest unit of protocol design. A Byte is 8 bits, which can describe the number of 0 to 2^8 bytes. How many bytes are used depends on the information actually stored. When we receive a packet, we first determine whether it is a request or a response, so we need to use a Byte to mark the type of the packet, for example: 0 indicates the request, 1 indicates the response. Once we know the package type, we also need to associate the request with its corresponding response, usually by generating a “unique” ID before the request, passing it to the server in the Header, and then including the same ID in the response Header returned by the server. We chose to represent this ID with an incremented number of type Int32 (4 Bytes). The Header length is fixed, while the Payload length varies. Therefore, set four Bytes (Int32) in the Header to record the length of the Payload. After determining the length of the package, we can split it into separate packages. The encoding rules for the Payload part are determined by the application layer. Different scenarios may use different protocols. How does the receiver know which protocol to decode the Payload part? So, inside the Header we also need a Byte that marks the type of application layer protocol, which we call Codec. Now let’s see what our protocol looks like:

0      1      2      3      4      5      6      7      8      9     10
+------+------+------+------+------+------+------+------+------+------+
| type |          requestId        | codec|         bodyLength        |
+------+---------------------------+------+---------------------------+
|                  ...          payload                               |
|                                                     ...             |
+---------------------------------------------------------------------+
Copy the code

This is already a working RPC communication protocol, but with the increase of RPC capabilities we may need to record more information, such as: the timeout duration in the request header, telling the server not to return if the response time exceeds a certain value; Store the status of the response success or failure in the response header and so on. In addition, although the protocol at the communication layer rarely changes, we generally record the version information of the protocol in the first Byte in consideration of smooth upgrade and backward compatibility in the later stage.

Below is Bolt, ant’s latest RPC communication protocol. What is more than our compact protocol?

/ / / / ant Bolt agreement -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - / / Request Packet 0 1 2 4 6 8 10 12 and 16 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ |proto| type| cmdcode |ver2 | requestId |codec| timeout | classLen | +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | headerLen | contentLen | ... . | +-----------+-----------+-----------+ + | className + header + content bytes | + + | ... . | +-----------------------------------------------------------------------------------------------+ // Response Packet 0 12 3 4 6 8 10 12 14 16 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ |proto| type| cmdcode |ver2 | requestId |codec|respstatus | classLen | headerLen | +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | contentLen | ... . | +-----------------------+ + | header + content bytes | + + | ... . | +-----------------------------------------------------------------------------------------------+Copy the code

How does Nodejs implement RPC communication protocol

All of the above is the principle of protocol design. Next, we will talk about the implementation level, which is divided into two parts: encoding and decoding. The following two codes respectively realize the coding and decoding of the communication protocol designed above.

/ / code const content = {service: 'com. Alipay. Nodejs. HelloService: 1.0', methodName: 'plus', args: [1, 2],}; const body = new Buffer(JSON.stringify(payload)); const header = new Buffer(10); header[0] = 0; header.writeInt32BE(1000, 1); header[5] = 1; // codec => 1 represents JSON serialization header.writeInt32BE(body.length, 6); const packet = Buffer.concat([ header, body ], 10 + body.length);Copy the code
// decode const type = buf[0]; // => 0 (request) const requestId = buf.readInt32BE(1); // => 1000 const codec = buf[5]; const bodyLength = buf.readInt32BE(6); const body = buf.slice(10, 10 + bodyLength); const payload = JSON.parse(body);Copy the code

It can be seen that its core is the operation of Buffer. For detailed API, please refer to the official documents. I only select some interesting points to expand on.

Large endian vs. small endian

When you look at the Buffer API documentation, you will find that many of the API versions are xxxBE and xxxLE in pairs. For example: ReadInt32BE and readInt32LE, writeDoubleBE and writeDoubleLE, etc. What do BE and LE stand for? What’s the difference? Should we use “BE” or “LE”? The writeInt8 API is not available in BE or LE.

The difference is that Int8 can be represented by one byte, while Short, Int32, and Double can not be represented by one byte, so we need to represent them by multiple bytes. This introduces the concept of “byte order”, which is the order in which bytes are stored. For a value to be represented, whether to store its low order to a low address or its high order to a low address is called Little Endian or Big Endian. The big end and the small end have their own advantages and disadvantages, different CPU vendors are not in agreement, but when it comes to network communication, everyone has to have a common standard, otherwise there will be no communication. TCP/IP protocol RFC1700 specifies the use of “big-endian” bytes as network bytes. Therefore, when developing network communication protocols, we should use the big-endian API (BE) for Buffer operations.

Avoid Buffer fragmentation

In the code above, I created three Buffer objects, a Header and a Body, to serialize an RPC request, and finally spliced the two parts together into a complete Packet. This code is logically clear and fine, but Buffer creation and concatenation are performance-intensive, and we need to avoid creating small buffers too often in order to achieve maximum performance. The idea of optimization is very simple: allocate a large chunk of memory in advance, maintain an offset representing the current write position, add the length of the write to the offset after each write, and create slices from the start position to the end position after the write.

const buf = Buffer.alloc(1024 * 1024); // Allocate a block of 1M memory let offset = 0; // start encoding offset = 0; // set offset buf[0] = 0; buf.writeInt32BE(1000, 1); buf[5] = 1; // codec => 1 represents JSON serialization offset += 10; Const content = {service: 'com. Alipay. Nodejs. HelloService: 1.0', methodName: 'plus', args: [1, 2],}; const bodyLength = buf.write(JSON.stringify(payload), offset); buf.writeInt32BE(bodyLength, 6); offset += bodyLength; buf.slice(0, offset); / / returnCopy the code

Take a look at benchmark before and after the optimization. Although the performance improvement is not significant, the optimization is worth it for the underlying code that is called frequently.

Node Version: V8.0.0, Date: Wed May 31 2017 15:39:46 GMT+0800 (CST) Starting... 2 Tests completed. Small Buffer concat x 349,567 OPS/SEC ±2.69% (84 runs sampled) Big Buffer slice x 509,161 OPS/SEC Plus or minus 1.79% (84 runs the javax.media.sound.sampled)Copy the code

The Buffer native API is low-level and not user-friendly. In order to make it easier to use, we have encapsulated a Byte module whose API refers to Java’s ByteBuffer. The above code I use byte module rewrite after become

const ByteBuffer = require('byte'); const bb = ByteBuffer.allocate(1024 * 1024); bb.put(0); bb.putInt(1000); bb.put(1); bb.skip(4); Const content = {service: 'com. Alipay. Nodejs. HelloService: 1.0', methodName: 'plus', args: [1, 2],}; bb.putRawString(JSON.stringify(payload)); const bodyLength = bb.position() - 10; bb.putInt(6, bodyLength); bb.array(); / / returnCopy the code

Processing of type Long

In JavaScript, the only integer that can represent numbers is Number, which ranges from -(2^ 53-1) to (2^ 53-1), whereas in Java, Long ranges from -(2^ 64-1) to (2^ 64-1). So what do we do when we encounter a Long type in an RPC call?

Math.pow(2, 64) -1 // 18446744073709551615 = 18446744073709552000 = 18446744073709552000Copy the code

First, we need to figure out how Long is stored. A Long takes up 8 Bytes. We can divide it into two 32-bit integers of 4 Bytes each, called “high” and “low” respectively. The high level stores the value of the Long integer divided exactly by 2^32. Here are some Long integers in binary form:

Long: 4294967296

High: 1     Low: 0
+-----------+-----------+
|00 00 00 01|00 00 00 00|
+-----------+-----------+
  
Long: 1000

High: 0     Low: 1000
+-----------+-----------+
|00 00 00 00|00 00 03 e8|
+-----------+-----------+
  
Long: 45565600000000

High: 10609 Low: 291956736
+-----------+-----------+
|00 00 29 71|11 66 e8 00|
+-----------+-----------+
Copy the code

How do you represent a Long in JS? Here we use the community Long module, which can get a Long integer by Number or string, and supports various operations

const Long = require('long');
const buf = new Buffer([ 0x00, 0x00, 0x29, 0x71, 0x11, 0x66, 0xe8, 0x00 ]);
let long = new Long(
  buf.readInt32BE(4),
  buf.readInt32BE(0)
);
long = long.add(1);
long.toString(); // 45565600000001
const longBuf = new Buffer(long.toBytes()); // <Buffer 00 00 29 71 11 66 e8 01>

Long.fromString('18446744073709551615'); // Long { low: -1, high: -1, unsigned: false }
Copy the code

Shards of protocol packages

Since network data is not transmitted in units of protocol packets defined by us, it is possible to receive multiple packets at a time, or a packet in multiple packets. The first thing you should do when you receive the data is to break it up into a complete package. This may seem simple, but we’ve gone through several versions of evolution.

Based on the data event version

A data event is a form of passive consumption, which means that as soon as data is transmitted, it will be spit out to the listener. The listener must process or cache the data immediately, or the data will be lost. The following is an implementation of shred protocol packages based on data events

const net = require('net'); Const socket = net.connect(12200, '127.0.0.1'); const HEADER_LEN = 10; // Header length const buf; socket.on('data', data => { if (! buf) { buf = data; } else { buf = Buffer.concat([ buf, data ]); } while (buf.length > HEADER_LEN) { const packetLength = HEADER_LEN + buf.readInt32BE(6); If (buf.length > packetLength) {const packet = buf.slice(0, packetLength); //... buf = buf.slice(packetLength); } else { break; }}});Copy the code

For easy use, we have encapsulated a Cutter module, and the above code can be simplified as:

const Cutter = require('cutter'); const net = require('net'); Const socket = net.connect(12200, '127.0.0.1'); const HEADER_LEN = 10; // Header length const cutter = new cutter (10, data => {return HEADER_LEN + data.readint32be (6); }); Cutter. On ('packet', packet => {cutter. }); socket.on('data', data => { cutter.handleData(data); });Copy the code

Based on readable Events

Based on the version of the data event, we have used it for a long time, but it has the disadvantage of requiring frequent merging and splitting of buffers, because when and how much data is coming is unknown. We later discovered that sockets also have an active consumption mode based on readable events. Unlike data events, a readable event will not directly return data to the business. It will only notify the business that new data is coming, and the business can call socket.read to actively read the data on demand. The read method takes an optional size argument and returns null if the current number of bytes received is less than the size passed in. If size is not passed, all data is read. Then based on the readable event, our code becomes

const net = require('net'); Const socket = net.connect(12200, '127.0.0.1'); const HEADER_LEN = 10; // Let header; socket.on('readable', () => { if (! header) { header = socket.read(HEADER_LEN); } if (! header) { return; } const bodyLength = header.readInt32BE(6); const body = socket.read(bodyLength); if (! body) { return; } const packet = Buffer.concat([ header, body ], HEADER_LEN + bodyLength); // packet processing logic //... });Copy the code

Data and Readable events can only be selected 1 of the two; they are mutually exclusive.

Stream-based version

The reabable version was used for a while, but it wasn’t good enough. The main problem was that the shred logic of the protocol package was actually part of the protocol. It should be embedded in the protocol, not scattered in the socket event listener function. In this way, if the protocol changes, we only need to replace the protocol implementation without modifying other parts of the RPC framework. Net.socket is a duplex stream, so can we encapsulate both serialization and deserialization based on the stream style? My desired effect is as follows:

const net = require('net');
const socket = net.connect(12200, '127.0.0.1');
const encoder = protocol.encode();
const decoder = protocol.decode();

encoder.pipe(socket).pipe(decoder);

decoder.on('request', req => {});
decoder.on('response', res => {});

encoder.writeRequest({ ... });
encoder.writeResponse({ ... });
Copy the code

This is elegant, simple and in line with Nodejs style. For implementation details, Encoder is a Transform Stream that converts JS objects into binary and passes them to net.socket

// encoder const ByteBuffer = require('byte'); const Transform = require('stream').Transform; const bb = ByteBuffer.allocate(1024 * 1024); let id = 0; class ProtocolEncoder extends Transform { constructor(options) { super({ writableObjectMode: true }); } writeRequest(req) { return this._doTransform('request', req); } writeResponse(res) { return this._doTransform('response', res); } _doTransform(packetType, data) { const packetId = id++; return new Promise(resolve => { this.once(`tranform_${packetType}_${packetId}`, resolve); this.write(Object.assign({ packetId, packetType, }, data)), }).then(([ result ]) => { if (result.error) { throw result.error; } return result; }); } _transform(packet, encoding, callback) { const { packetId, packetType, data } = packet; const result = { packetId, packetType, data: null, error: null, }; try { bb.reset(); bb.put(packetType === 'request' ? 0:1); bb.putInt(packetId); bb.put(1); bb.skip(4); bb.putRawString(JSON.stringify(data)); const bodyLength = bb.position() - 10; bb.putInt(6, bodyLength); result.data = bb.array(); callback(null, result.data); } catch (err) { result.error = err; } this.emit(`tranform_${packetType}_${packetId}`, result); }}Copy the code

Decoder is a Writable Stream, it is responsible for receiving data from net.socket, after decoding thrown to business processing

// decoder const Writable = require('stream').Writable; const HEADER_LEN = 10; class ProtocolDecoder extends Writable { constructor(options) { super(options); this._buf = null; this._bufLength = 0; } _write(chunk, encoding, callback) { if (this._bufLength > 0) { const total = this._bufLength + chunk.length; this._buf = Buffer.concat([ this._buf, chunk ], total); this._bufLength = total; } else { this._buf = chunk; this._bufLength = chunk.length; } try { let unfinish = false; do { unfinish = this._decode(); } while (unfinish); callback(); } catch (err) { callback(err); } } _decode() { if (this._bufLength < HEADER_LEN) { return false; } const bodyLength = this._buf.readInt32BE(6); const packetLength = HEADER_LEN + bodyLength; if (this._bufLength < packetLength) { return false; } const packet = { packetId: this._buf.readInt32BE(1), packetType: this._buf[0] === 0 ? 'request' : 'response', codec: this._buf[5]; data: JSON.parse(this._buf.toString('utf8', HEADER_LEN, packetLength)), }; NextTick (() => {this.emit(packet. PacketType, packet); }); const restLen = this._bufLength - packetLength; this._bufLength = restLen; if (restLen) { this._buf = this._buf.slice(packetLength); return true; } this._buf = null; return false; }}Copy the code

It is important to note that although we use a “stream” style API, the RPC protocol makes it impossible for it to be a real stream, since we can only serialize the object to the full binary once we encode it, and we can only deserialize it when we decode it by getting the full package. This is highlighted because some students will use the RPC interface to perform upload and download functions, which is not appropriate. The correct way is to upload files to OSS first, then return the OSS address through the RPC interface, and then download from OSS.

Six, to be continued

The above is my understanding of RPC communication layer protocol, and a complete RPC framework contains a lot of content, which is planned to be serialized in a series:

  • Publishing and addressing of services
  • Load Balancing Policy
  • Distributed Tracing
  • Monitoring and Metrics
  • High availability features
    • Current limiting and fusing
    • Single machine fault removal
  • Safety related
    • Service authentication
    • Packet check