1, the preface

RpcEndpointRef class RpcEndpointRef class RpcEndpointRef class RpcEndpointRef class RpcEndpointRef must have a method to send messages, and RpcEndpoint must have a method to receive messages and return them

RpcEndpoint can clearly see two methods:

// Receive messages without returning
def receive: PartialFunction[Any.Unit] = {
    case_ = >throw new SparkException(self + " does not implement 'receive'")}// To receive a message, you need to return information
def receiveAndReply(context: RpcCallContext) :PartialFunction[Any.Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))}Copy the code

Back in the RpcEndpointRef class, you can also clearly see two methods:

// corresponding to rpcendPoint.receive () method
def send(message: Any) :Unit
/ / corresponding RpcEndpoint receiveAndReply () method
def ask[T: ClassTag](message: Any, timeout: RpcTimeout) :Future[T]
Copy the code

Here we can speculate that Worker needs to communicate with Master. In order to communicate with Master, Worker needs both RpcEnv and Master reference. Below we will track our guess

2. Worker startup process

Open the Worker class diagram, it can be seen that Worker is also an associated object, starting directly from the main() method

With the Master type, the startRpcEnvAndEndpoint() method is called

Point into the Worker. StartRpcEnvAndEndpoint () method, you can see two familiar methods RpcEnv. The create (), the method we’ve explained in the previous article, is ready to RpcEnv communication environment, ready for the service, The other is that rpcenv.setupendpoint () registers an endpoint with the Rpc environment and starts the endpoint instance by calling the endpoint’s OnStart() method

Looking at the class attributes of Worker, it can be seen that Worker is also a RpcEndpoint class. At the same time, we can also see an important attribute:

masterRpcAddresses: Array[RpcAddress]
Copy the code

An array in which the user stores the Master’s address of type RpcAddress. This is consistent with a property of the Inbox. Rpc communication with the Master can be performed only after the Master’s address is obtained

Let’s continue to look at what communication the Worker needs to have with the Master when starting the Worker. Enter the worker.onstart () method to see the key registerWithMaster() method we need to focus on

Enters the Worker. RegisterWithMaster () method, find another way we focus on tryRegisterAllMasters (), we can learn from the method name, to all the Master to register

Enters the Worker. TryRegisterAllMasters () method, this way we’ll explain the logic code:

// 1
masterRpcAddresses.map { masterAddress =>
  registerMasterThreadPool.submit(new Runnable {
    override def run() :Unit = {
      try {
        logInfo("Connecting to master " + masterAddress + "...")
          // get the Master reference from the Master address, for communication
        val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)

        /** * Send a registration message to Master */
        sendRegisterMessageToMaster(masterEndpoint)
      } catch {
        case ie: InterruptedException= >// Cancelled
        case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
      }
    }
  })
}
Copy the code

As can be seen from the above code logic, the Worker saves the communication address of the Master, which is equivalent to getting the entrance to communicate with the Master

Enters the Worker. SendRegisterMessageToMaster () method, you can see to get the Master reference, called send () method, send Master RegisterWorker (), which contains the Worker information

Go back to the Master receive() method, find the corresponding message, and check the processing logic:

if (state == RecoveryState.STANDBY) {
  // Master is STANBY mode, return Master message
  } else if (idToWorker.contains(id)) {
  // The Worker is registered
  } else {
    val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerWebUiUrl)
    // Log and verify Worker information
    if (registerWorker(worker)) {
        // Join the registration success set
      persistenceEngine.addWorker(worker)

      /** * register successfully, send a message to Worker */
      workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
      schedule()
    } else {
      // The Worker is already registered, and a registration failure message is returned
    }
Copy the code

The master.registerworker () method is used to delete the old history of the Worker, keep the latest one, and update the workerAddress so that the Master can call the Worker to allocate resources

Back to the main process, after the Worker is registered, the Master still has follow-up operations to send the successful message RegisteredWorker to the Worker.

We go to worker.receive () to find the processing logic of the RegisteredWorker message. We can see that RegisteredWorker is the implementation of RegisterWorkerResponse. The processing logic follows the handleRegisterResponse() method

Enters the Worker. HandleRegisterResponse () method, this way we can simplify the code logic:

    // Registration succeeded
    registered = true
	// Change the address of the Master
    changeMaster(masterRef, masterWebUiUrl, masterAddress)
	// Start dispatching heartbeat
    forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
      override def run() :Unit = Utils.tryLogNonFatalError {
        // Register successfully, start dispatching heartbeat, send SendHeartbeat message to yourself
        // If you want to send it to the Master, you can send it to the Master.
        self.send(SendHeartbeat)}},0.HEARTBEAT_MILLIS.TimeUnit.MILLISECONDS)
	// Subsequent operations
Copy the code

After the Worker receives the RegisteredWorker message indicating that the Master is successfully registered, it updates the Master address recorded locally to be consistent with the current Master address, starts scheduling, and periodically sends heartbeat information to the Master

Self. send(SendHeartbeat), send a message to yourself

  • Asynchronous decoupling, the scheduler is responsible for timing the heartbeat message into the message queue, as for the heartbeat message sent by the consumer thread to process the message and send the heartbeat

Going back to the worker.receive () method, you can see that sendToMaster() sends the heartbeat to the Master and brings the workerId there

Finally, go back to the master.receive () method and look at the Heartbeat message processing logic, which is simpler: update the last Heartbeat time if it exists, or re-register if it doesn’t

At this point, when the Worker’s heartbeat message completes communication, it also means that the Worker has completed startup and successfully completed communication with the Master

After tracking the startup of Master and Worker, it proves that we have completely followed the startup of Spark’s resource layer

Let’s add worker to our source summary

3, summarize

  • RpcEndpoint has the Receive () and receiveAndReply() methods, corresponding to RpcEndpointRef’s send() and ask() methods
  • In order to communicate with other roles, you must have an RpcEnv environment, which is the basis of communication. Secondly, you must have a reference to the object to communicate with RpcEndpointRef, which contains the communication address RpcAddress
  • Worker also has an RpcEnv environment, and all Master communication addresses are recorded in Worker attributes. Users create Master reference classes to communicate with Master