In previous installments, we’ve introduced the Scala-based Akka asynchronous programming model, the Web service…… built by SBT and Play2 In this installment, I’ll start with the basics of Scala and mock up a distributed node heartbeat detection mechanism built out of Akka.

Let’s explore the significance of this demo:

  1. Learn about the communication mechanism between Spark Master and Worker in advance.
  2. Deepen the understanding of the heartbeat detection mechanism of master and slave services (about the heartbeat detection, I have introduced in Nginx).

In order to understand the content of this project, it is also necessary to know:

  1. The Scala version of ‘Maven’ : Play2 / SBT Operation Guide (juejin.cn)(the beginning is an introduction to SBT itself, Play2 is not the focus here).
  2. The Akka Framework in Scala (juejin. Cn)

Project requirement analysis

  1. Each Worker registers with the Master. When the Master completes registration, the Worker is replied to register successfully.
  2. The Worker sends heartbeat regularly and receives it by the Master.
  3. After the Master receives the Workder heartbeat, it updates the heartbeat time of the Worker.
  4. Start a scheduled task for the Master to periodically check which registered workders have not updated their heartbeat checks and remove them from the HashMap.
  5. Deploy Worker and Master on multiple Linux systems (cloud servers or virtual machines).

Note: Heartbeat detection is also at the heart of Spark.

Build the project

Import dependencies based on SBT

During this project, we relied on the Akka component to support remote communication, so we introduced both Akka and Akka-Remote.

Name := "scala_spark" version := "0.1" scalaVersion := "2.12.4" resolvers += "Typesafe Repository" at "Http://repo.typesafe.com/typesafe/releases/" libraryDependencies + = "com. Typesafe. Akka" % % "akka - actor" % "2.4.12." LibraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.12"Copy the code

The directory structure

The overall project directory structure is as follows:

Com. Scala. SparkTest ├ common | └ MessageProtocol. Scala ├ master | └ SparkMaster. Scala └ worker └ SparkWorker. ScalaCopy the code

Set up Akka scaffolding

SparkMaster

First, let’s make SparkMaster, SparkWorker, inherit the characteristics of Akka. Let’s take SparkMaster as an example.

SparkWorker has a similar architecture.
class SparkMaster extends Actor{
	//case _ => ...
    // There is the processing of the message
}

object SparkMaster {
	def main(args: Array[String) :Unit = {
		// Construct an ActorSystem and manage actors.}}Copy the code

ActorSystem Implements network communication based on sockets. So when we start this system, we first need to bind the host, the port number information. In addition, in order for the ActorSystem running on another host to have access to the corresponding resource, we need to assign a name to both the ActorSystem and the Akka Actor.

Here’s how SparkMaster is configured:

//1. Bind the local address to the startup port number
val host = "127.0.0.1" // Bind to the local address
val port = 9999 // Bind the local startup port number

//2. Bind the configuration file
val config = ConfigFactory.parseString(
  s""" akka{ actor{ provider = "akka.remote.RemoteActorRefProvider" } remote{ enabled-transports=["akka.remote.netty.tcp"] netty.tcp{ hostname="$host"
          port=$port}}}""")

//3. Set the name of ActorSystem
val sparkMasterSystem = ActorSystem("master", config = config)

//4. Set the name of the Master Actor
val sparkMasterRef: ActorRef = sparkMasterSystem.actorOf(Props[SparkMaster]."SparkMaster-01")
Copy the code

We want ActorSystem to put the SparkMaster into listening state the moment the main program runs, so we’ll wake it up by sending the string start to the ActorRef immediately after the above code:

//5. Send a message to Master ActorRef.
sparkMasterRef ! "start"
Copy the code

Similarly, we override the receive method in the SparkMaster companion class (which uses the class modifier) and refine its logic later. Here, let’s simply add the logic for processing when we receive the start string.

override def receive: Receive = {
    case "start" =>
      println("Master server started successfully!")
Copy the code

SparkWorker

SparkWorker’s ActorRef gets SparkMaster’s ActorRef using the preStart() method before starting and binds the WorkerRef together. We need to correctly configure the ActorSystem name of the Master and the corresponding Actor name, so that the Worker can access the correct resources.

To identify the unique Worker, we use the java.util.UUID tool to generate the ID number.

class SparkWorker(masterHost: String, masterPort: Int) extends Actor {

  // That's just another name for ActorRef.
  var masterProxy: ActorSelection = _

  // Each Worker generates a random ID.
  val id = java.util.UUID.randomUUID().toString

  // Initialize the Master Proxy.
  // Master is the name of SparkMaster's ActorSystem.
  // SparkMaster-01 is the name of Master ActorRef.
  override def preStart() :Unit = {
    masterProxy = context.actorSelection(
      s"akka.tcp://master@$masterHost:$masterPort/user/SparkMaster-01")}//SparkWorker also needs to override the receive method, which we'll add later.
  override def receive: Receive = {
      case "start"=>
        println("The Worker is working!")}}Copy the code

Similarly, we create an ActorSystem for the Worker. Because the Worker needs to get the reference of the Master, it also needs the host name and port number of the Master.

    //1. Bind the local address to the startup port number
    val workerHost = "127.0.0.1" // Bind to the local address
    val workerPort = 10000 // Bind the local startup port number

    //1.1 Setting the Remote IP address and Startup port number
    val masterHost = "127.0.0.1"
    val masterPort = 9999


    //2. Bind the configuration file
    val config = ConfigFactory.parseString(
      s""" akka{ actor{ provider = "akka.remote.RemoteActorRefProvider" } remote{ enabled-transports=["akka.remote.netty.tcp"] netty.tcp{ hostname="$workerHost"
              port=$workerPort}}}""")

    val workerSystem = ActorSystem("worker", config = config)

    val sparkWorkerActorRef: ActorRef = workerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "SparkWorker")

    sparkWorkerActorRef ! "start"
Copy the code

Implement the registration function of Worker to Master

Every time the Worker starts, we generate an ID number for it that is random enough to be almost never repeated. We hope that the Worker will actively send its own information (including ID number, CPU number, memory capacity and other information) to the Master when it starts, and the Master will manage these Worker information in a unified manner.

The communication between Worker and Master is involved here, so we first need to specify some message formats, so that both parties can parse the corresponding information.

Define the Workers and Master message protocol

The types of messages sent between Workers and Master vary, including various subsequent timer messages. Here we write them all into a messageProtocol.scala file.

We first give two structures:

// The information that the worker sends to the server during registration.
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)

// This structure is used by the master to save information about each registered worker into a hashMap.
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) 

// Return the singleton object when the registration was successful.
case object RegisteredWorkerInfo
Copy the code

When WorkerRef passes! When you send a registration message to MasterRef, RegisterWorkerInfo is sent. After receiving this message, MasterRef will extract some brief information about the Worker into the WorkerInfo and create a mutable hashMap in memory to keep it.

In fact, WorkerInfo contains more information than just the three parameters mentioned in this article. For the Worker, it only needs to send the necessary information. For the Master, it also needs to record other information of the Worker. For example, the time when each Worker sends the last heartbeat information will be mentioned later. We chose to extend this content in WorkerInfo.

When the registration is successful, the Master returns a RegisteredWorkerInfo instance. We only use it as a registered “signal” by itself, so it doesn’t need anything else to indicate that it is a singleton. We use case object to decorate it here. Case Object does not have apply, unapply methods compared to Case class.

We can do the same thing in other ways: we can use enumerated classes, for example, or just send a simple “RegisteredWorkerInfo” string. Here, we are simply simulating how the Spark framework does it.

We refined the logic in SparkMaster by declaring a mutable Map to hold “ID => WorkerInfo” style key-value pairs.

// Define a hashMap that manages worker information. This hashMap must be mutable.
val workers : mutable.Map[String.WorkerInfo] = mutable.Map[String.WorkerInfo] ()Copy the code

Register with the Master while the Worker is working

Now we have defined the necessary messaging protocols. We refined SparkWorker’s functionality to send its own registration message to MasterRef at startup.

Override def receive: receive = {case "start" => println(" Worker successfully started!") )+ masterProxy ! RegisterWorkerInfo(id, 8, 16 * 1024)
Copy the code

Similarly, SparkMaster’s Recevie method also adds logic:

  1. When new Worker registration information is received, id, CPU, RAM and other information are extracted and stored inworkersHash table.
  2. usingsender()Returns aRegisteredWorkerInfoSingleton to notify the Worker that registration succeeded.
    case RegisterWorkerInfo(id, cpu, ram) =>
      // Receive the registration information from the client
      if(! workers.contains(id)) {// Extract the basic information of the worker.
        val workerInfo = new WorkerInfo(id, cpu, ram)
        workers += (id -> workerInfo)

        // When everything is done, return the singleton (associated class)
        sender() ! RegisteredWorkerInfo
      }
Copy the code

Send and receive heartbeat messages

After the Worker is properly registered, it will receive a RegisteredWorkerInfo message sent back by the Master. At this point, the Worker will enter the normal working state. In order to keep in touch with the Master, we need to implement the heartbeat detection function: that is, the registered Worker sends a message to the Master every once in a while, indicating that it is online.

To do this, we need to introduce a new gadget: the timer function provided by the ActorSystem, which has the following logic:

  1. Every once in a while, the ActorSystem sends a message to the Worker: “You should send a heartbeat message to the Master.”
  2. The Worker receives this prompt and sends a heartbeat message to the Master.

Notice that there are two kinds of information involved. The timer function is taken care of by ActorSystem, and the Worker needs to rely on the reminder of this timer, so that it can know the time to send the heartbeat message to the Master.

Added message protocols for heartbeat detection

Add the following code to the messageProtocol.scala file. In addition, when the Worker sends a heartbeat message to the Master, it will also attach its own ID number, so that the Master can identify which Worker’s heartbeat message has been updated.

// When the Akka context reminds the worker to send heartbeat information through a timer, it sends this singleton object.
case object SendHeartBeat

// When the worker receives SendHeartBeat, it sends the corresponding heartbeat information to the master and labels its own ID.
case class HeartBeat(id: String)
Copy the code

Obviously, we also need to add a variable to the previous WorkerInfo: it is used to record the last heartbeat message sent by the Worker. In this way, the Master can calculate to see which workers are offline.

class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {

+ // Extension: it is necessary to record the last heartbeat message sent by each worker.
+ var lastHeartBeat : Long = System.currentTimeMillis()

}
Copy the code

Context is used to periodically send heartbeat messages

We continue to refine SparkWorker’s functionality: when it receives a successful registration message, it enables a timer mechanism to periodically send heartbeat messages. To more easily use millis this unit of time, there can be additional import in scala, concurrent. Duration. _.

case RegisteredWorkerInfo =>

  println(s" Worker : $idRegistration is successful!")

  // After successful registration, define a timer to send messages at regular intervals.
  import context.dispatcher

  /* 1. InitialDelay: indicates the initialDelay. Set the worker to immediately send a heartbeat detection after receiving the message of successful registration. 2. InternalDelay: indicates the time interval. Set heartbeat messages to be sent every 3000 milliseconds. 3. ActorRef: Which actor the local actor system will send messages to. 4. Message: The content of the message to be sent. Its logic is that the local Akka system "reminds" the worker to send the heartbeat message through a timer, and then the worker sends the real heartbeat message to the master after receiving the "reminder". * /
  context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)

case SendHeartBeat =>

  println(s" Worker : $idSent a heartbeat message.")
  masterProxy ! HeartBeat(id)
Copy the code

It actually consists of two steps: First, it makes ActorSystem send a SendHeartBeat message to the internal Akka system (self) every once in a while. When the Worker receives the SendHeartBeat message every once in a while, it sends the real heartbeat message to masterProxy.

The Master updates the status through heartbeat messages

Before, we added a lastHeartBeat variable in WorkerInfo to record the time when each Worker last sent the heartbeat detection (to be more precise, this is actually the time when the Master received the heartbeat detection).

Obviously, each time the Master receives a new heartbeat message, it should update the lastHeartBeat variable of the Worker with the specified ID in the workers hash table. The specific processing logic is:

  1. To receiveHeartBeatMessage, extract the ID number.
  2. inworkersIs used to fetch the corresponding WorkerInfo based on the ID.
  3. Update the WorkerInfo informationlastHeartBeatThe variable.
  4. Save this WorkerInfo back toworkersIn the middle.

As a result, SparkMaster’s receive method adds the following functionality:

    case HeartBeat(id) =>
      // Update information about the specified ID worker.
      // 1. Fetch the message
      val info: WorkerInfo = workers(id)

      // 2. Update time
      info.lastHeartBeat = System.currentTimeMillis()

	  // 3.Scala's Map automatically overwrites the value of the same key.
      workers += id -> info

      println(s"Worker id : $idThe information has been updated!")
Copy the code

Master Implements heartbeat detection

Now, the Master can timely update the status of workers based on the received heartbeat messages. However, this is not the end. The Master also needs a timer to make a regular overall inspection on workers.

The periodic detection function of the Master is enabled

Similarly, the moment the main program “activates” the Master with the start string, we need to have it also turn on scheduled checks and periodically check workers in memory (which is the mutable hash table used to store the WorkerInfo mentioned above). Eliminate the inactive Worker in time.

We divide this logic into two parts:

  1. The Master actively activates its own detection function at startup.
  2. Periodically check the logic.

So we continue to add the remaining two message structures in the MessageProtocol:

// When the master starts, it sends this singleton to itself to trigger the periodic check mechanism.
case object StartTimeOutWorker

// AKka context sends this singleton object when the timer reminds the master to delete expired workers.
case object RemoveTimeOutWorker
Copy the code

We added to the start logic of SparkMaster’s receive method:

Case "start" => println(" Master server started successfully! ") )+ // Automatically start the periodic check mechanism for workers.
+ self ! StartTimeOutWorker
Copy the code

That is, after the Master starts, it sends itself a StartTimeOutWorker message. Let’s go ahead and add logic to receive:

   case StartTimeOutWorker= >import context.dispatcher
      // 1.0 millis -> Start the timer immediately
      // 2.9000 millis -> Check every 9 seconds. This period is longer than the Worker's heart rate.
      // 3. Self -> let ActorSystem remind Master to clean every 9 seconds.
      // 4. RemoveTimeOutWorker -- > Master uses this to identify a "sweep" instruction.
     context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
Copy the code

How to detect the outdated Worker?

It is not difficult to determine whether the Worker is expired or not, because we record the lastHeartBeat information. We only need to subtract the time of the latest heartbeat message sent by this Worker from the current time during detection, and set a value threshold: If the time exceeds a certain period, the status of this Worker is considered to be dead.

If we do this with an anonymous function, it would look like this: pass in a WorkerInfo and calculate the time difference between the lastHeartBeat in the WorkerInfo and the current time. Set a time difference threshold: If the time difference exceeds this value, the Worker is considered to lose heartbeat.

val isTimedOut : WorkerInfo= >Boolean =
(workInfo : WorkerInfo) = > {val now : Long = System.currentTimeMillis()
	val threshold : Long = 6000L
	now - workerInfo.lastHeartBeat > threshold
}
Copy the code

We can make full use of Scala’s collection operation content here, pass the above logic into the filter method, so that the Master can quickly filter out timeout workers, and then use foreach to eliminate each timeout Worker from the workers hash table.

 case RemoveTimeOutWorker= >val now: Long = System.currentTimeMillis()

      // Check which worker's heartbeat times out and delete it from the hashMap.
      // Use Scala's functional programming to solve problems.
      // 1. Select workers whose time expires
      // 2. Remove these workers from the hashMap.
      //
      // The logic for determining the timeout:
      // (now-specified_id_worker's lastHeartBeat) > threshold.
      // Since the network transmission has some small latency (generally clusters are within the LAN), we select the threshold of 6 seconds.

      workers.values.filter(worker => now - worker.lastHeartBeat > 6000)
        .foreach(worker => workers.remove(worker.id))

      println("Current" + workers.size + "One worker survives.")
Copy the code

At this point, a complete akka-based node heartbeat detection function is implemented.

Parametric configuration

Our current Host and Port information is currently written in the source file. We prefer that these arguments be passed as arguments to the args array of the main function at run time.

For SparkMaster, we can make the following changes so that we have the flexibility to specify Host and Port before starting the Master program.

//1. Bind the local address to the startup port number
val host = args(0)// Bind to the local address
val port = args(1) // Bind the local startup port number
Copy the code

For SparkWorker, we can make the following changes so that we have the flexibility to specify Master and Host/Post on the local machine before starting the Worker program.

//1. Bind the local address to the startup port number
val workerHost = args(0) // Bind to the local address
val workerPort = args(1) // Bind the local startup port number

//1.1 Setting the Remote IP address and Startup port number
val masterHost = args(2)
val masterPort: Int = args(3).toInt

/ /... Omit part of the code
// Worker Actor name is specified at startup.
 val sparkWorkerActorRef: ActorRef = workerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), args(4))
Copy the code

Use the Assembly plug-in for project packaging

Configure the plug-in into the project

In order to package our Scala code as a JAR to run on various hosts, we need to enhance the SBT in this project with the help of the Assembly plug-in. The Assembly plug-in makes it easy to load a project into a “fat” JAR package along with external dependencies such as akka-remote, akka-actor, etc. (Because this example is simple, let’s not worry about conflicting dependencies on the Assembly plug-in on the network for a moment.)

First create a plugins. SBT file in the project folder at the root of your project and write the plug-in declaration there.

AddSbtPlugin ("com.eed3si9n" % "sbt-assembly" % "0.14.10")Copy the code

Note that the version of the plug-in depends on the SDK version of the project. My project SDK version is 2.12.4. (Note that your native SDK version may not be the same as your project’s SDK version. For example, my local SDK version is only 2.11.8.)

Intellij IDEA provides the SBT shell command line, we can directly enter the SBT command in this terminal. We first type Reload to refresh the SBT configuration file. If the plug-in installation is successful, we can check by the plugins commands to sbtassembly. AssemblyPlugin.

** If the plug-in is not installed successfully, you can go to the mavenRepo repository to browse the SDK version number and replace it. ** For simple projects, after successfully installing the plug-in, you can simply type Assembly on the SBT shell command line to package it.

For our project, we still need to do some small additional configuration, because there are two main functions (or function entry) for Master and Worker in our project, so in fact we need to make two packages corresponding to different function entry.

Package for Master and Worker

Let’s go ahead and add the following configuration to the build.sbt configuration file:

mainClass in assembly := Some("com.scala.SparkTest.master.SparkMaster")
Copy the code

As a reminder, you can use show discoveredMainClasses to see the list of classes that have a main function. If there are more than one main function entry in a project, we must specify the display.

To distinguish the functionality of the JAR package, we can also add the following configuration to specify the name of the compiled JAR package (not required)

assemblyJarName in assembly := "spark-master.jar"
Copy the code

Reload your build. SBT configuration file, and then use the assembly command to have the plug-in do the packing. The package will be stored in the target-scala-x. xx directory.

Now we have the JAR package with the main function entry being SparkMaster. We changed the mainClass configuration in build. SBT to

mainClass in assembly := Some("com.scala.SparkTest.worker.SparkWorker")
Copy the code

And then type a package, you can also get a main function entry SparkWorker JAR package.

Now all we need to do is transfer the packages to the different machines and have the Master start the spark-master.jar package. With Workers starting the spark-worker.jar package (which can be defined with assemblyJarName or rename), we can implement heartbeat detection in a real environment!

🌎 Reference link

  • The basic use of SBT is detailed here.

  • A tutorial on using the Assembly plug-in can be found on the GitHub home page.

  • See Stack Overflow to answer questions about using the Assembly plug-in.

  • My code demo has been submitted to the GitHub home page