background

There are many examples of asynchronous processing in Spark, and each is worth a close look to help you understand how Spark works and write elegant code for yourself.

NettyRpcEnv. Ask interpretation

RpcEnv role

NettyrpCENV is the only implementation of rpCENV in Spark. What is RpcEnv, if you look at its class header

/**
 * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
 * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
 * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by
 * [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the
 * sender, or logging them if no such sender or `NotSerializableException`.
 *
 * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
 */

In a word, the RPC environment. The two most important operations here are

  • You can go sign upRpcEndpoint
  • You can go asynchronouslyRpcEndpointRef

What is RpcEndpoint and RpcEndpointRef? I won’t go into details here, but will do so in other articles

A simple reviewRpcEndpointandRpcEndpointRef

RpcEndpoint

  • RpcEndpoint

    As is known to all, Spark has executors and drivers that communicate with each other using Netty. Instead of starting a single Netty service on a Spark, there are multiple Netty RPC services for different functions. Use different port numbers to differentiate. After communication between services, the communication “letter” is processed by various logical units, such as Inbox, EventLoop, etc. These are tool-level units that are abstracted out as large, pluggable, extensible logical function modules called rpcendpoints in Spark. It is a module that handles messages sent from other clients or returned from other servers. RpcEndpoint itself is a trait that can be implemented in a variety of ways

RpcEndpointRef

  • RpcEndpointRef

    Spark previously used Akka for network communication, but the new version uses Netty. In Akka, if a communication between two nodes uses the ActorRef of the destination, that is, Aactor wants to send messages to BActor, bactorRef is required to send messages. When Spark’s network traffic is upgraded to Netty, the Endpoint can be understood indirectly as the original Actor, and sending messages to another Actor also requires the RPCEndPoint Ref, or RPCendpointRef. This concept A little meng at first glance, just think, messages from A to B, can send the premise is to have A B “reference”, which in common Http service seems not to be understood, I want to visit A machine supposedly only need to know each other’s IP and Port with respect to OK, also need each other now A “double”? What the hell is this? With the question we can continue to read, here you just need to be aware of this:

    • Used to access B machineRpcEndpointRefYou understand that achievements are wrapped instances of B Machine’s IP and Port

The illustrationRpcEndpointandRpcEndpointRef

  • A graphic

    A Machine can be A physical machine or A virtual machine, while B Machine can be the same physical machine as A, A virtual machine (with different port numbers), or different (in Spark, there is even an MSG sent to it by itself, which will be discussed later). So to send A message from A to B, you use B’s RpcEndpointRef, which sends A message to B Machine

  • [Figure 1] How to access it

  • [Figure 2] Internal principle

  • [Figure 3] What is an instance of RpcEndpointRef for B Machine (simplified version)?

A brief review of Driver and Executor

Ask, as the name implies — to Ask. It could be to say hello, see if it’s available, ask, etc. That’s where nettyrpCENv.ask comes in. In order to understand the role of nettyrpCENv.ask, we need to briefly string together the concepts and processes

Driver threads and Executor processes

First, two things need to be clear, in the YARN environment

  • A Driver is a thread that executes in the applicatioMaster process

    Strictly speaking, this isn’t quite true either, because a Driver is a product of the SparkContext context in the user’s class, and it executes the user’s class thread in which SPARKENV and RPCENV are created. And set up the Driver’s Netty Service and so on, and communicate with Executor

  • ExecutorIs a process that starts on each node via a Java command

What the Yarn collection and ApplicatioMaster is is not covered here, but will be covered in more detail in other articles.

Second, the Driver itself is a coordinating scheduling node that can assign tasks to an Executor and master the situation of the Executor. Allocation means sending tasks to an Executor, master means knowing how the Executor is running, and so on.

  • [Fig. 4] Illustrate it

    For example, a Driver interacts with two executors. The Driver holds two executors (one called E1 and one called E2), rpCendpointRef (E1Ref and E2Ref). The two REFs send MSGs to the E1 and E2 nodes, which themselves process MSGs through their rpcendpoints. E1 and E2 periodically report to the Driver a heartbeat, which in turn sends a heartbeat to the Driver using their own internal DriverEpcEndpointRef. The Driver takes advantage of its own DriverrpcEndpoint to handle heartbeat’s MSG. The top components of all the nodes are in their own nettyrpCENV, which is the implementation of rpCENV.

Example: create a driverrpCendpointRef in rpCENV

background

NettyrPCenv.ask: : nettyrPCenv.ask: : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : :

DriverrPcEndpointRef: RPCEndpointRef

Figure 4 above shows a process for Driver and Executor communication. In fact, when the Driver thread is built in ApplicationMaster, part of the communication is done through the DriverrPendpointRef, which uses the DriverrPendpointRef to send MSG to the DriverrPendpointRef. DriverrpEndpoint handles and responds

  • [Figure 5] Illustrate it

    • ApplicationMasterIn the start”RunThe Driver thread is retrieved from the Driver threadNettyRpcEnv.
    • And use,NettyRpcEnvthesetupEndpointRefMethods theGet】 to twoDriverEndpointRef
    • Subsequent adoption [Use】 theDriverEndpointRefTo visit the DriverDriverEndpoint
    • One thing to note is that,ApplicationMasterSo does the node itselfDriverThe node that is actually accessedDrivertheDriverEndpointIt is supposed to be directly accessible (this is not implemented in the Spark source code, or for better isolation and encapsulation, less coupling, in the future)DriverIf executed as a process, no longerApplicationMasterBut here we still use Netty’s RPC access mode

Look at the source code

This part of the code is in ApplicationMaster. Scala. Just focus on the method runDriver

  • [Picture 6] Illustrate it

    • (I) A server with IP 10.1.2.5 has been startedApplicationMaster
    • (II) Process A, where the Driver thread is started and the user’s class is initialized, and a Netty serviec is started on the 10.1.2.5 node with the IP and Port of 10.1.2.5:13200
    • (III) bProcess,inApplicationMasterThe call continues on the nodeRpcEnv.setupEndpointRef, the purpose is to setup oneDrivertheDriverEndpointReftoRpcEnvThe setup process is to go to 10.1.2.5:13200, if the service is connected, then buildDriverEndpointRefThis “visit it” is what this article is aboutNettyRpcEnv.askMethods.
    • You can see that the order of invocation is

      • (ApplicationMaster. Scala) rpcEnv. SetupEndpointRef left
      • (NettyRpcEnv. Scala) NettyRpcEnv. AsyncSetupEndpointRefByURI left
      • (NettyRpcEndpointRef. Scala) NettyRpcEndpointRef. Ask left
      • (nettyrpcenv.scala) nettyrpcenv.ask — — — — ↓
      • 10.1.2.5:13200 Netty service

  • The following code

    private def runDriver(): Unit = {adDamipFilter (None) /* Here, call the StartUserApplication method to execute the user's class, which is our JAR, invoke our main method to launch SparkContext, Internally launch a set of schedulers and backend, and the taskScheduler, and so forth, and so forth, and so forth, and so forth, and so forth, and so forth, */ UserClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization..." Sparkconf. get(AM_MAX_WAIT_TIME) try {/* Here, Blocked waiting SparkContext returned back from the Driver thread * / val sc = ThreadUtils awaitResult (sparkContextPromise. Future, Duration (totalWaitTime, TimeUnit.MILLISECONDS)) if (sc ! = null) { rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = Get ("spark.driver.port").toint registerAM(host, port, userConf, sc.ui.map(_.weburl)) /* ** NettyrpCENV setupEndpointRef = driverRef = driverRef = driverRef = driverRef = driverRef = driverRef Whether the Driver service exists, and if so, return OK, RPCAddress (host, port); RPCAddress (host, port); RPCAddress (host, port); YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. throw new IllegalStateException("User did  not initialize spark context!" ) } resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() } }

Interpret NettyRpcEnv. Ask

Review of the Future

How to understand the Future, from the literal meaning can be well understood, Future is the Future, also means futures.

When it comes to futures, it is full of uncertainty, because after all, it has not happened and nobody knows what the future will be like. So, to define a Future is to define another (another thread) event that doesn’t happen in the present time (thread). Scala’s Future is very elegant and perfect compared to Java’s Future. Search my blog for a detailed description of the Future for Scala.

  • The official article: https://docs.scala-lang.org/zh-cn/overviews/core/futures.html
  • This article will not build the cognitive concept of Future and Promise from the perspective of source code, and it will be explained in other articles
  • [Picture 7] Illustrate it

    Defining a thread in Java is on the right, but in Scala on the left, using the Future is much more elegant

  • code

    Import scala. Concurrent. -- an optional ExecutionContext. Implicits. Global import scala. Concurrent. The Future/unscramble the Future basis * * * * / object DocFutureTest { def apply(): Unit = { println("I am DocFutureTest") } def main(args: Array[String]): Unit = { val sleeping = 3000; val main_thread = Thread.currentThread().getName; /* Define an event that occurs in another thread. This event is equivalent to the following code block in Java: Scala is a little more elegant in terms of overall indirection, Public class JavaThreading {public static void public class JavaThreading {public static void public class JavaThreading {public static void public class JavaThreading {public static void public class JavaThreading {public static void public class JavaThreading Throws InterruptedException {new Thread(() -> System.out.println(" This is a new String[] args) throws InterruptedException {new Thread(() -> System.out.println(")) Thread.currentThread().getName() + "Thread Story ")).start(); System.out.println(Thread.currentThread().getName()); Thread.sleep(3000); }} */ var future_run = Future {Thread.sleep(1000) println(" This is a story that happened on another called "+ Thread.currentThread().getName +" Thread story ") } // If the main thread does not rest, the main thread will stop first. Thread.sleep(sleeping) println(s"$main_thread thread resting $sleeping milliseconds ")}}
  • Future + Callback (excerpt)

    case class ExceptionError(error: String) extends Exception(error) def main(args: Array[String]): Unit = { val sleeping = 3000; val main_thread = Thread.currentThread().getName; Var future_run = Future {Thread.sleep(1000) PRNTLN (" This is an event that happens on another Thread called "+ Thread.currentThread().getName +" Throw exceptionError ("error") future_run onFailure {case t => println("exception" " + t.getMessage) } future_run onSuccess { case _ => println("success") }
  • Pay attention to the point

    • If the Future is defined, the body of the Thread to be executed is defined, so the execution is also immediate, similar to Java defines a Thread and then calls it directlystart()The same
    • In the Future, Scala’s Try[] is heavily used, but if an exception occurs, it is not doneonFailureYou might not see an exception thrown, which is quite different from Java

Review the Promise

From the superficial to the Future, what is the Promise? In fact, the Future implementation includes a Promise implementation, which means that the Future cannot be run without a Promise. In the literal sense, a Promise is a Promise. With the definition of the Future, an exact Promise is required before it can be carried out, otherwise it is just empty promises that cannot be fulfilled.

  • The official article: https://docs.scala-lang.org/zh-cn/overviews/core/futures.html

Now, after reading the introduction of Future above, many people must still be in a muddled state, because this was also the case when I first started to learn about it. However, what I like is to use the most intuitive picture and imagination to describe an abstract problem. Without further comment, I will continue with the picture above

  • [Figure 8] Illustrate the relationship between Future and Promise

    • The meaning of the Future

      • A mainline life is your Main Thread, which in Spark might be one of the threads being processed
      • At this moment, you start the Become Star Thread.
      • At this moment, you are beginning to become a handsome Thread.
      • Once you have opened both paths, you can continue to follow both paths as long as your Main Thread does not end untilSuccessorFailureThis is the Future, which can be understood as the beginning of a new trajectory

- **Promiose meaning ** - When you open two new "paths", I can promise you different promises in the focus of both paths - When you finish the Future with success, I promise you a result - When you finish the Future with failure, I promise you another result

- **Future vs. Promise ** - Future is a line, a line that contains the execution process, to be followed in the Timeline - Promise is a point, a point that is triggered, How to understand the above two sentences? You can think of it this way: life (Main Thread) is a number line. If you want to keep moving to the right of the timeline, there needs to be a continuous path, and the "path" is a Thread. It can also be a path defined by the Future. We can only take the road to reach the destination, not jump to it. A Promise is similar to a milestone point, and if you have only one Promise, you can't implement it without defining a "path," that is, a Future (or Main Thread). If you define a Promise without regard to the straight path (Future), you will not be able to implement the Future. However, if you define a Promise without a Promise (which is built into the Default Promise), you can execute the Future directly. As shown in the figure below, two empty promises are made without a path defined (how the Future will be implemented), so the two promises cannot be fulfilled. It's important to note that this picture only draws a Promise, but if you want to object a Promise, you can create a Future to execute it using methods in the Promise. Unlike a Future, you can execute it immediately if you define a Promise. If the Promise does, it must explicitly trigger the build Future operation.

  • Look at the code that Promise doesn’t execute

    Here, we define a Promise, and the “Promise” calls a map operation after the Future of the Promise ends and prints out a sentence Future :….

    But nothing is executed when we execute the following statement, right

    import scala.concurrent.Promise
    import scala.util.{Failure, Success}
    
    object PromiseTest {
      def main(args: Array[String]): Unit = {
        import scala.concurrent.ExecutionContext.Implicits.global
        val promise = Promise[String]
            promise.future.onComplete(v => println("onComplete " + v))
        promise.future.map(str => println("future: " + str + " ==> " + Thread.currentThread().getName))
        promise.future.failed.foreach(e => println(e + " ==> " + Thread.currentThread().getName))
        Thread.sleep(3000)
      }
    
    }
  • Look at the code that can be executed

    The only difference from the code above is the addition of Promise. TrySuccess handling

    “Join” and “TrySuccess” create a Future path to reach the Promise and trigger the start of the path.

    In terms of trySuccess, tryComplete and other details, we can go into details where we talk about multithreading in Scala

    • **promise.future.onComplete**

      Either Success or Failure can perform the onComplete handling for a callback after the Future finishes executing

    • **promise.future.map**

      In promise. Future. After the onComplete on future continue to map processing

    • promise.trySuccess

      The trigger that triggers the execution of the entire Future

    import scala.concurrent.Promise
    import scala.util.{Failure, Success}
    
    object PromiseTest {
      def main(args: Array[String]): Unit = {
        import scala.concurrent.ExecutionContext.Implicits.global
        val promise = Promise[String]
            promise.future.onComplete(v => println("onComplete " + v))
        promise.future.map(str => println("future: " + str + " ==> " + Thread.currentThread().getName))
        promise.future.failed.foreach(e => println(e + " ==> " + Thread.currentThread().getName))
            **promise.trySuccess("try success "  + " --> " + Thread.currentThread().getName)**
        Thread.sleep(3000)
      }
    
    }

Ask the code

In fact, with all of the above mentioned, Ask’s code feels like a few sentences away.

Ask itself returns the Future and itself is asynchronous

  • [Picture 9] Illustrate it

    A 10.1.1.1 client machine accesses a 10.1.1.2 Netty service via RPC. When the response is returned correctly, it is handled in a TransportResponseHandler in the client machine. And calls the Listener’s onSuccess method, which is the method defined in the ASK code below. This method itself executes the Promise’s tryComplete, triggering the Promise’s future path execution

private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {Future[T] = {Future[T] = {Future[T] = {Future[T] = {Future[T] = {Future[T] = [T] onFailure(e: Throwable): Unit = { if (! promise.tryFailure(e)) { e match { case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e") case _ => logWarning(s"Ignored failure: $e")}}}} /* The onSuccess declared here will be populated into the onSuccess of the rpcResponseAllback. This listener is the one used in Figure 9. When we get a response from the Server side, note that we are not getting a response of type RPCFailure. Else if (message instanceof RpcResponse) {branch */ def onSuccess(Reply: Any): UnitReply match {case rpcFailure (e) => onFailure(e) case rpCreply => /* */ if (!); */ if (!); */ if (!); */ if (!); */ if (!); promise.trySuccess(rpcReply)) { logWarning(s"Ignored message: $reply") } } try { if (remoteAddr == address) { val p = Promise[Any]() p.future.onComplete { case Success(response) => onSuccess(response) case Failure(e) => onFailure(e) }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => **onSuccess**(deserialize[Any](client, response))) postToOutbox(message.receiver, RpcMessage) / * if the callback is Failure, there will be executed * / promise in the future. Failed. Foreach {case _ : TimeoutException => rpcMessage.onTimeout() case _ => }(ThreadUtils.sameThread) } val timeoutCancelable = timeoutScheduler.schedule(new Runnable { override def run(): Unit = { onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " + s"in ${timeout.duration}")) } }, the timeout. Duration. ToNanos, TimeUnit NANOSECONDS) / * execute when the promise of the future, Will call here the onComplete method * / promise. The future. The onComplete {v = > timeoutCancelable. Cancel (true)} (ThreadUtils. SameThread)} the catch {case NonFatal(e) => onFailure(e)} /* AddMessageIfTimeout (addMessageIfTimeout) If it is an RPCTimeoutException, throw the ex as an RPCTimeoutException */ promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) }

conclusion

This with a small space to explain in detail the o.a.s.rpc.netty.Net tyRpcEnv. Ask () method, a brief description of the asynchronous processing of a spark of small case, it is a small case need a lot of prior knowledge, can suddenly see here a bit meng, Learning needs to integrate a little bit to accumulate, if you do not understand you can slowly accumulate knowledge of other modules and then come here to see the chronicle will be more fruitful.