preface

Remote Procedure Call (RPC), translated as Remote Procedure Call, is an effective communication mechanism between services or nodes in a distributed system. With RPC, a node (or client) can easily call a remote (or server) method or service just as easily as it can call it locally. Many existing RPC frameworks require that the server address be exposed, that is, you need to know the SERVER’s IP address and RPC port. This article, on the other hand, describes a way of RPC communication that does not require exposing IP addresses and ports. This method is based on Redis BRPOP/BLPOP operation implementation of the delay queue, and Golang goroutine coroutine asynchronous mechanism, the framework is very simple and easy to understand, but also very efficient, stable and safe. This method has been applied to node communication in Crawlab, and has become the main way for nodes to transmit information in real time. Starting with PubSub, Crawlab’s early node communication solution, we will describe the problems and solutions encountered, then how to transition to the current RPC solution, and how it works in Crawlab.

PubSub based scheme

PubSub

The early Crawlab was based on Redis’ PubSub, a publish-subscribe model. This is a scheme mainly used for one – to – many unidirectional communication in Redis. The usage is very simple:

  1. Subscriber useSUBSCRIBE channel1 channel2 ...To subscribe to one or more channels;
  2. Publishers exploitPUBLISH channelx messageTo send out a message to the channel’s subscribers.

Redis’ PubSub can be used as a broadcast mode, where one publisher corresponds to multiple subscribers. In Crawlab, we have one subscriber to one publisher (primary -> work node: Nodes :

) or one subscriber to multiple publishers (work node -> Primary: Nodes: Master >). This is to facilitate two-way communication.

The following is the schematic diagram of node communication principle.

The nodes communicate with each other through Redis’ PubSub function.

PubSub is simply a publish-subscribe model. A Subscriber subscribes to a channel on Redis, and any other node can Publish messages on that channel as a Publisher.

Communication architecture

In Crawlab, the primary node subscribes to the Nodes: Master channel. If other nodes need to send messages to the primary node, they only need to publish messages to the Nodes: Master. Similarly, each work node subscriizes its own channel nodes:

(node_id is the node ID in MongoDB, MongoDB ObjectId). If you need to send messages to the work node, you only need to publish messages to this channel.

A simple process for a network request is as follows:

  1. The client (front-end application) sends requests to the master node (API);
  2. The primary node passes RedisPubSub<nodes:<node_id>The channel publishes messages to the appropriate worker node;
  3. After the worker node receives the message, it performs some operations and passes the corresponding message<nodes:master>Channels are published to the primary node;
  4. The master node receives the message and returns it to the client.

Not all nodes communicate bidirectional, that is, the master node only communicates with the worker node unilaterally, and the worker node does not return a response to the master node, so called unidirectional communication. The following are the types of communication in Crawlab.

changoroutine

If you read the Crawlab source code, you’ll see a lot of Chan syntax in node communication, which is a concurrency feature of Golang.

Chan represents a channel, which is divided into unbuffered and buffered channels in Golang. We use the unbuffered channel to block the coroutine. Only when Chan receives a signal (chan <- “some signal”), the blocking will be released, and the coroutine moves on to the next step). In request-response mode, if the communication is two-way, the master node will generate an unbuffered channel to block the request upon receiving the request. When receiving the message from the worker node, the master node will assign a value to the unbuffered channel, release the blocking, and return the response to the client.

The go command initiates a goroutine (coroutine) to complete concurrency, and with Chan, the coroutine can use unbuffered channels to hang, waiting for the signal to perform the next operation.

Scheme based on delay queue

The problem with PubSub

PubSub, a subscription-publish design pattern, is an efficient way to implement node communication, but it has two problems:

  1. PubSub data is instantaneous and is lost when Redis goes down;
  2. Writing pubSub-based communication services is requiredgoroutinechannel, which increases development difficulty and reduces maintainability.

The second problem is the trickier one. If we want to add more functionality, we need to write a lot of asynchronous code, which increases the coupling between the system modules, causes poor scalability, and makes the code painful to read.

Therefore, in order to solve this problem, we adopted RPC service based on Redis deferred message queue.

Delay queue architecture

The following figure is a schematic of RPC implementation based on the deferred queue architecture.

Each node has a Client and a Server. The client sends the message to the Target Node and receives the message returned by it. The server receives and processes the message from the Source Node and returns the message to the client of the Source Node.

The entire RPC communication process is as follows:

  1. The client of the source node passes. ProcedureLPUSHTo push the message to Redisnodes:<node_id>, and executeBRPOP nodes:<node_id>:<msg_id>Block and listen to the message queue;
  2. The server of the target node passed. ProcedureBRPOPI’ve been listening innodes:<node_id>After receiving the message, pass theMethodField to execute the corresponding program;
  3. After the target node is executed, the server passesLPUSHTo push the message to Redisnodes:<node_id>:<msg_id>;
  4. Because the source node client is listeningnodes:<node_id>:<msg_id>This message queue, when the server of the target node pushes a message to this queue, the client of the source node will immediately receive the returned message and do subsequent processing.

In this way, the communication process of the whole node is completed through Redis. The advantage of doing this is that you do not need to expose the IP address and port of the HTTP. You only need to know the node ID to complete the RPC communication.

The RPC code thus designed is easier to understand and maintain. Each time you need to extend a new communication class, you simply extend the RPC.Service class and implement the ClientHandle (client handling) and ServerHandle (server handling) methods.

Here’s a little more about BRPOP. It moves out and gets the last element of the message queue, and if there are no elements in the message queue it blocks the queue until the wait times out or an eject element is found. Therefore, using the BRPOP command can avoid the constant requests to Redis and waste of network and computing resources, as opposed to rotation or other methods.

If you are not familiar with the Redis operating commands, you can refer to the gold booklet Redis Deep Adventure: Core Principles and Application Practices. This booklet provides an in-depth introduction to Redis principles and engineering practices, which is very useful for applying Redis to real development.

Code practice

With all this theoretical knowledge, we still need to look at the code. Teachers often teach us: “Talk is cheap. Show me the code.”

Since the Crawlab backend was developed by Golang, understanding the following code requires some basic knowledge of Golang.

Message data structure

First we need to define a data structure to transmit the message. The code is as follows.

package entity

type RpcMessage struct {
	Id      string            `json:"id"`      / / message ID
	Method  string            `json:"method"`  // The message method
	NodeId  string            `json:"node_id"` / / the node ID
	Params  map[string]string `json:"params"`  / / parameters
	Timeout int               `json:"timeout"` / / timeout
	Result  string            `json:"result"`  / / the result
	Error   string            `json:"error"`   / / error
}
Copy the code

Here, we define the fields of message ID, method, node ID, parameter, and so on. The message ID is a UUID, ensuring the uniqueness of the message ID.

Based interface

First, we define an abstract base interface that is easy for the actual business logic module to inherit. The server logic is in the ServerHandle, which returns the RpcMessage in the entity, and the client logic is in the ClientHandle.

// RPC service base class
type Service interface {
	ServerHandle() (entity.RpcMessage, error)
	ClientHandle() (interface{}, error)
}
Copy the code

Client-side generic methods

When we call a common method on the client, we need to implement two logic:

  1. Send message: generate message ID, serialize the message to JSON, LPUSH the message to Redis queue;
  2. The returned message is obtained through THE BRPOP delay and returned to the caller.

Here is the code for the implementation.

// The client handles the message function
func ClientFunc(msg entity.RpcMessage) func(a) (entity.RpcMessage, error) {
	return func(a) (replyMsg entity.RpcMessage, err error) {
		/ / request ID
		msg.Id = uuid.NewV4().String()

		// Send an RPC message
		msgStr := utils.ObjectToString(msg)
		if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err ! =nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// Get the RPC reply message
		dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
		iferr ! =nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// Deserialize the message
		if err := json.Unmarshal([]byte(dataStr), &replyMsg); err ! =nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// If there is an error in the return message, return an error
		ifreplyMsg.Error ! ="" {
			return replyMsg, errors.New(replyMsg.Error)
		}

		return}}Copy the code

Server-side processing

The server processes the logic as follows:

  1. In a loop, the message corresponding to the node is obtained through BRPOP;
  2. When a message is obtained, a Goroutine is generated to process the message asynchronously;
  3. Keep waiting.

You can see this logic in the InitRpcService method. The private handleMsg method implements the logic for serializing, calling the server-side RPC service method, and sending the return message. If you need to extend the RPC method type, you can do so in the GetService factory method.

// Obtain the RPC service
func GetService(msg entity.RpcMessage) Service {
	switch msg.Method {
	case constants.RpcInstallLang:
		return &InstallLangService{msg: msg}
	case constants.RpcInstallDep:
		return &InstallDepService{msg: msg}
	case constants.RpcUninstallDep:
		return &UninstallDepService{msg: msg}
	case constants.RpcGetLang:
		return &GetLangService{msg: msg}
	case constants.RpcGetInstalledDepList:
		return &GetInstalledDepsService{msg: msg}
	}
	return nil
}

// Process RPC messages
func handleMsg(msgStr string, node model.Node) {
	// Deserialize the message
	var msg entity.RpcMessage
	if err := json.Unmarshal([]byte(msgStr), &msg); err ! =nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}

	/ / for the service
	service := GetService(msg)

	// Call the local Method based on Method
	replyMsg, err := service.ServerHandle()
	iferr ! =nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}

	// Send a return message
	if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err ! =nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}
}

// Initialize the server-side RPC service
func InitRpcService(a) error {
	go func(a) {
		for {
			// Get the current node
			node, err := model.GetCurrentNode()
			iferr ! =nil {
				log.Errorf(err.Error())
				debug.PrintStack()
				continue
			}

			// Obtain Obtain message queue information
			msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
			iferr ! =nil {
				iferr ! = redis.ErrNil { log.Errorf(err.Error()) debug.PrintStack() }continue
			}

			// Process the message
			go handleMsg(msgStr, node)
		}
	}()
	return nil
}
Copy the code

Call method example

Crawlab nodes often need to install third-party dependencies for crawlers, such as Pymongo, requests, etc. Among them, we also need to know whether a dependency has been installed on a certain node, which requires cross-server communication, that is, two-way communication in a distributed network. This logic is implemented through RPC. The primary node makes an RPC call to the target node, and the target node runs the invoked method and returns the result, namely the list of installed dependencies, to the client, which in turn returns it to the caller.

The following code implements the RPC method to get the dependencies installed on the target node.

// Obtain the installed dependency services
// Inherit from the base Service class
type GetInstalledDepsService struct {
	msg entity.RpcMessage
}

// Server handling method
/ / overloaded ServerHandle
func (s *GetInstalledDepsService) ServerHandle(a) (entity.RpcMessage, error) {
	lang := utils.GetRpcParam("lang", s.msg.Params)
	deps, err := GetInstalledDepsLocal(lang)
	iferr ! =nil {
		s.msg.Error = err.Error()
		return s.msg, err
	}
	resultStr, _ := json.Marshal(deps)
	s.msg.Result = string(resultStr)
	return s.msg, nil
}

// Client handling method
/ / overloaded ClientHandle
func (s *GetInstalledDepsService) ClientHandle(a) (o interface{}, err error) {
	// Initiate an RPC request to obtain server data
	s.msg, err = ClientFunc(s.msg)()
	iferr ! =nil {
		return o, err
	}

	// Deserialize
	var output []entity.Dependency
	if err := json.Unmarshal([]byte(s.msg.Result), &output); err ! =nil {
		return o, err
	}
	o = output

	return
}
Copy the code

A call

With the RPC server and client handling methods written, you can easily write the invocation logic. Here is the method to call to get the list of remote installed dependencies. The GetService factory class first gets the previously defined GetInstalledDepsService, then calls its client handler ClientHandle, and returns the result. This is like calling a method locally. Is it simple?

// Obtain the remote installed dependencies
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
	params := make(map[string]string)
	params["lang"] = lang
	s := GetService(entity.RpcMessage{
		NodeId:  nodeId,
		Method:  constants.RpcGetInstalledDepList,
		Params:  params,
		Timeout: 60,
	})
	o, err := s.ClientHandle()
	iferr ! =nil {
		return
	}
	deps = o.([]entity.Dependency)
	return
}
Copy the code

conclusion

This article mainly introduces a KIND of RPC communication mode based on Redis delay queue. This mode does not expose the IP address or port of each node or service, which is a very safe way. Moreover, this approach has already been implemented in Crawlab with Golang for two-way communication, especially the goroutine, which naturally supports asynchrony in Golang, making it easy to implement this approach. In fact, this approach is theoretically very efficient and can support high concurrent data transmission.

However, there is a hitch in the Crawlab implementation, which does not limit the amount of concurrency the server can handle. Therefore, if too many messages are transmitted, the server resources will be used up, resulting in the risk of slow processing or even downtime. The fix is to limit the amount of concurrency on the server. In addition, due to limited time, the author has not had time to test the actual transmission efficiency of this RPC communication mode, and the fault tolerance mechanism has not been added. So overall there is still a lot of room for improvement and optimization.

Nevertheless, this approach is sufficient for Crawlab’s low-concurrency remote communication, and it is very stable with no problems in practice. We also recommend trying this approach in practice for developers who want to keep their address information confidential.

reference

  • Crawlab home page
  • Crawlab Demo
  • Crawlab document
  • RPC remote method calls are implemented through Redis