Make writing a habit together! This is the third day of my participation in the “Gold Digging Day New Plan · April More text Challenge”. Click here for more details.

1 introduction

One of the essential basic components of distributed system is distributed network communication framework. Spark is a general distributed computing system. Since it is distributed, many nodes must communicate with each other. Therefore, Spark components communicate with each other through Remote Procedure Call (RPC).

01. Communication between the driver and the master, for example, the driver sends RegisterApplication messages to the master. For example, the worker will report to the master the Executor information running on the worker 03, and the communication between Executor and driver. Executor runs on the worker, and Spark's tasks are distributed to each Executor. Executor needs to send the result of the task to the driver. 04. Communication between workers and workers. While tasks are running, they need to fetch data from other places, which are generated by tasks running on executors of other workers, and therefore need to fetch data from workersCopy the code

Before Spark-1.6, RPC of Spark was implemented based on Akaa. Akka is an asynchronous messaging framework based on the Scala language. After Spark-1.6, Spark implemented a Netty-based RPC framework based on Akka’s design.

1. Akka library (Scala)

The implementation of spark-1.x network communication mechanism

2. Netty Library (Java)

The implementation of spark-2.x network communication mechanism

In early versions of Spark, Netty communication framework is used for bulk data transmission and Akka is used for RPC communication.

Spark RPC component structure diagram (if not clear, please zoom in to view it) :

Text explanation:

Transportconf and RpcHandler> Transportcontext

2. TransportConf is required to create the TransportClientFactory and TransportServer

3. RpcHandler is only used to create Transportserver

4. TransportClientFactory is the client factory class implementation of Spark RPC

5. Transportserver is the server implementation of Spark RPC

6. TransportClient is the client implementation of Spark RPC

7. ClientPool is an internal component of the TransportClientFactory and maintains the Transportclient pool

8. TransportserverBootstrap is the Spark RPC server boot program

9. TransportClientBootstrap is the Spark RPC client boot program

10. MessageEncoder and MessageDecoder

2 Overall Architecture

The following describes some important component objects contained in Spark RPC, which are simulated using Spark code

Creating the Spark Environment

val conf: SparkConf = new SparkConf(a)val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("NX RPC").getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val sparkEnv: SparkEnv = sparkContext.env
Copy the code

RpcEnv

RpcEnv provides an environment for RpcEndpoint to process messages. RpcEnv manages the entire lifecycle of RpcEndpoint, including registering endpoints, routing messages between endpoints, and stopping endpoints.

val rpcEnv: RpcEnv = sparkEnv.create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      numUsableCores: Int,
      clientMode: Boolean) :RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      numUsableCores, clientMode)
    new NettyRpcEnvFactory().create(	config)
  }
Copy the code

RpcEndpoint

Refers to the individual (such as master,worker,driver) that needs to communicate mainly according to the received message for corresponding processing. An RpcEndpoint goes through the following process: build ->onStart->receive->onStop.

OnStart is called before receiving task messages. Receive and receiveAndReply are used to receive messages from another RpcEndpoint (or itself) send and ask, respectively.

rpcEnv.setupEndpoint(name: String, endpoint: RpcEndpoint) :RpcEndpointRef

class HelloEndPoint(override val rpcEnv: RpcEnv) extends RpcEndpoint { 
    When the instance is constructed, it is automatically executed once
    // Similar to preStart() in actor in akka
    override def onStart() :Unit= {}// Service method
    override def receive: PartialFunction[Any.Unit] = {
        case. }// Service method
    override def receiveAndReply(context: RpcCallContext) :PartialFunction[Any.Unit] = {
        case. }}override def onStop() :Unit= {... }}Copy the code

RpcEndpointRef

RpcEndpointRef is a reference to the remote RpcEndpoint. When we need to send a message to a specific RpcEndpoint, we usually need to get a reference to the RpcEndpoint and send the message through the application.

rpcEnv.setupEndpointRef(address: RpcAddress, endpointName: String) :RpcEndpointRef = {
    setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
  }
Copy the code

RpcAddress

Indicates the address of remote RpcEndpointRef, Host+Port

3 Spark Standalone example

In standalone mode, the worker sends the heartbeat message to the master. How does the heartbeat message go from the worker to the master and how does the master receive the message?

  1. Start by finding the main function of the Work class, where you create RpcEnv and the Endpoint
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir,
            conf = conf)
Copy the code
  1. In the startRpcEnvAndEndpoint() method, rpcEnv creates an Endpoint(worker instance)
rpcEnv.setupEndpoint(ENDPOINT_NAME.new Worker(rpcEnv, webUiPort, cores,
            memory, masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
Copy the code
  1. See the constructor of new Worker below, which defines the forwordMessageScheduler thread and the sending interval, etc. Only part of the variables are shown here
private val forwordMessageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
// 1/4 of the heartbeat timeout
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout".60) * 1000 / 4
Copy the code
  1. Once the constructor is complete, the Worker’s onStart() method is called and registerWithMaster() is called to register with the Master

    A thread is started to send itself a ReregisterWithMaster message, which is processed using the ReregisterWithMaster () method

onStart(){
    registerWithMaster(){
        registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
                    override def run() :Unit = Utils.tryLogNonFatalError {
                        Option(self).foreach(_.send(ReregisterWithMaster}})),INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS.INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS.TimeUnit.SECONDS))}}Copy the code

The reregisterWithMaster() method creates a thread to send a RegisterWorker message to the master

reregisterWithMaster(){
     Array(registerMasterThreadPool.submit(new Runnable {
           override def run() :Unit = {
               try {
                   /** * 1; /** * 1; /** * 2; /** * 2
                   val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                   sendRegisterMessageToMaster(masterEndpoint)
	} 
}}))}
sendRegisterMessageToMaster{
    masterEndpoint.send(RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl, masterEndpoint.address))
}
Copy the code
  1. The message is sent to the worker main class for registration, and the RegisteredWorker message is sent to the worker after successful registration
  2. The worker then periodically (HEARTBEAT_MILLIS) sends itself a SendHeartbeat message, and when the worker receives the message, the worker calls sendToMaster, Send the Heartbeat message (containing the worker Id and rpcEndpoint reference for the current worker) to the master.
case SendHeartbeat= >if (connected) {
        // Execute every 15s
        sendToMaster(Heartbeat(workerId, self))
    }
Copy the code
  1. In worker’s sendToMaster function, the message is sent by masterref. send(message). So what’s going on behind this call? The implementation of SEND in NettyRpcEnv is as follows:
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
Copy the code
  1. As you can see, the current sending address (nettyenv.address), the destination master address (this), and the sent message (SendHeartbeat) are encapsulated as RequestMessage messages, if it is a remote RPC call,

Eventually Send calls the postToOutbox function, and the message is serialized into a Byte stream.

private[netty] def send(message: RequestMessage) :Unit = {
    val remoteAddr = message.receiver.address
    if (remoteAddr == address) {
        // Message to a local RPC endpoint.
        try {
            // TODO comment: send() sends a OneWayMessage
            dispatcher.postOneWayMessage(message)
        } catch {
            case e: RpcEnvStoppedException => logDebug(e.getMessage)
        }
    } else {
        // Message to a remote RPC endpoint.
        postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))}}Copy the code
  1. In the postToOutbox function, messages pass through the sendWith method (client.send(content)) in OutboxMessage and eventually through the TransportClient sent method

(Client.send (content)), and the TransportClient further encapsulates the message and sends it to the master.

public void send(ByteBuffer message) {
        channel.writeAndFlush(new OneWayMessage(new NioManagedBuffer(message)));
    }
Copy the code
  1. In the Handle method of the master TransportRequestHandler, the heartbeat information is encapsulated as OneWayMessage on the worker side, so processOneWayMessage is called in the Handle method to process it.
  @Override
  public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
      processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else if (request instanceof UploadStream) {
      processStreamUpload((UploadStream) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: "+ request); }}Copy the code
  1. The processOneWayMessage function calls the Receive method in rpcHandler’s implementation class, NettyRpcHandler. In this method, the message is first unpacked into internalReceive

    RequestMessage. The message is then sent via the Dispatcher to the corresponding endpoint.

override def receive(client: TransportClient, message: ByteBuffer) :Unit = {
  	val messageToDispatch = internalReceive(client, message)
  	dispatcher.postOneWayMessage(messageToDispatch)
}
Copy the code
  1. How was the message distributed In the postMessage method of the Dispatcher, you can see that you first get the EndpointData information based on the corresponding endpoint, then stuff the message to the mailbox of the endpoint (master in this case), and finally stuff the message to the blocking queue of receive.
private def postMessage(endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) = >Unit) :Unit = {
        val error = synchronized {
            val data = endpoints.get(endpointName)
            if (stopped) {
                Some(new RpcEnvStoppedException()}else if (data == null) {
                Some(new SparkException(s"Could not find $endpointName."))}else {
                data.inbox.post(message)
                receivers.offer(data)
                None}}// We don't need to call `onStop` in the `synchronized` block
        error.foreach(callbackIfStopped)
    }
Copy the code
  1. How are messages in the queue consumed? There is a threadpool in the Dispatcher, threadpool, in the run method of the MessageLoop class, which fetches the receive object to the mailbox

    Process method to process. If no message is received, it will block at take.

private class MessageLoop extends Runnable {
    override def run() :Unit = {
        try {
            while (true) {
                try {
                    // Get the message
                    val data = receivers.take()
                    if (data == PoisonPill) {
                        receivers.offer(PoisonPill)
                        return
                    }
                    // Process the message
                    data.inbox.process(Dispatcher.this)}catch {
                    case NonFatal(e) => logError(e.getMessage, e)
                }
            }
        } 
    }
}
Copy the code
  1. In the Process method of the Inbox, the message is first retrieved, and then, depending on the type of message (in this case, oneWayMessage), the endpoint’s Receiver method (the Receive method in master) is finally called for processing. At this point, the whole process of an RPC call ends.
case OneWayMessage(_sender, content) => endpoint.receive.applyOrElse[Any.Unit](content, { msg =>
     throw new SparkException(s"Unsupported message $message from ${_sender}")})Copy the code
  1. The master receives the heartbeat message and updates the workerInfo.lastHeartbeat of the worker

conclusion

In this paper, the Spark of history, the network communication architecture, architecture, main characters and the role of creation method has carried on the simple narrative, and with a concrete example of Heartbeat to the analysis is made to confirm, hope can deepen people’s understanding of the Spark, to this end, this paper may focus of interest, thank you.