Akka, as a mature concurrency solution in production environment, must have a set of perfect error exception handling mechanism. This article mainly discusses the supervision and fault tolerance in Akka.

regulatory

Those of you who read my previous article should have a certain understanding of the work flow of Actor systems. The very important concept of Actor system in Akka is divide and conquer. Since we assign tasks to actors to perform, we must supervise corresponding actors. When actors fail, such as system environment errors and various exceptions, we can recover the errors according to the corresponding supervision strategies we have developed. It’s fault tolerance that we’ll talk about later.

regulators

Since there is such an event as supervision, there must be such a role as supervisor. Then how to determine this role in ActorSystem?

Let’s take a look at the top regulators in ActorSystem:

Top regulator of Actor systems

An actor system must start at least three actors during its creation, as shown in the figure above. Here are the functions of these three actors:

1./: Root regulator

As the name implies, it is the boss, it oversees all the top actors in ActorSystem, there are several top actors:

  • /userIs the overseer of all user-created top-level actors; The actor created with actorSystem.actorof is below it.
  • /system: is the overseer of all top-level actors created by the system, such as log listeners, or actors that are configured to deploy automatically when the actor system starts.
  • /deadLettersDead letter actor: is dead letter actor, where all messages sent to terminated or nonexistent actors are redirected.
  • /temp: is the overseer of all the short actors created by the system, such as those used in the implementation of ActorRef.ask.
  • /remote: is a man-made virtual path to all actors whose overseers are references to remote actors.

    We usually deal with the most is /user, it is we in the program with ActorSystem. ActorOf to create the regulator of the actor, the following fault tolerance we focus on is its following failure processing, other several top actor specific function definition has been given, interested can also go to understand.

    The root regulator oversees all of the top actors, dealing with their various failures, and generally the whole system stops if the error goes up to the root regulator.

2./user: Top actor regulator

As mentioned above, /user is the supervisor of all top actors created by the user, using ActorSystem. ActorOf. We can make our own regulatory policies, but since it is generated when the actor system is started, we need to configure it in the corresponding configuration file. See the Akka configuration here for details

3./system: Systems Regulator

/system Supervises all top-level actors created by the system, such as the log listener in Akka, where the log itself is implemented by actors. /system supervises all top-level actors created by the system, such as the log listener in Akka, where the log itself is implemented by actors. To receive all except ActorInitializationException and ActorKilledException Exception infinitely restart, of course it will end its all child actor. All other throwables are raised to the root supervisor, and the entire actor system will be shut down.

Regulation of ordinary actors created by users:

The last article introduced the organizational structure of Actor system, which is a tree structure. In fact, this structure is very beneficial to the supervision of actors. Akka implements a form called “parent supervision”, in which each created Actor is supervised by its father. So we can draw a conclusion:

An Actor that is created is definitely a supervised Actor, and possibly a supervisor that oversees its child actors

Regulatory policy

Above we have a certain understanding of the role of supervision in ActorSystem, so how to formulate the corresponding supervision strategy? There are four strategies in Akka:

  • Restore subordinates and maintain the internal state accumulated by subordinates
  • Restart a subordinate to clear its internal status
  • Permanently discontinue subordinates
  • The upgrade failed (failed to pass up the monitor tree) and thus failed itself

This is easy to understand. Here’s a simple example:

 override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case_ :ArithmeticException= >Resume  / / recovery
      case_ :NullPointerException= >Restart / / restart
      case_ :IllegalArgumentException= >Stop / / stop
      case_ :Exception= >Escalate  // pass to the superior
    }Copy the code

We can use different policing strategies depending on the exception, and I’ll give you an example to help you understand. When we implemented our strategy, we had to copy the container in the Actor container for supervisorStrategy because the default supervisory strategy for the Actor is the following:

  final val defaultDecider: Decider = {
    case_ :ActorInitializationExceptionStop
    case_ :ActorKilledExceptionStop
    case_ :DeathPactExceptionStop
    case_ :ExceptionRestart
  }Copy the code

It stops exceptions except for those it specifies, and all exceptions are subordinate restarts.

There are two types of regulatory strategies in Akka: OneForOneStrategy and AllForOneStrategy. The main differences between them are:

  • OneForOneStrategy: This policy applies only to child actors that fail.
  • AllForOneStrategy: This policy is applied to all child actors.

    We generally use OneForOneStrategy to formulate relevant regulatory strategies, of course, you can also choose an appropriate strategy according to specific needs. In addition, we can configure parameters for our policy, such as maxNrOfRetries, withinTimeRange, etc. This means that a maximum of 10 restarts can be performed per minute. If this limit is exceeded, the Actor will be stopped. You can also use the default configuration of the policy. For detailed configuration information, refer to the source code.

Example of regulatory fault tolerance

This example mainly demonstrates that when an error occurs in an Actor, its supervisor will act differently depending on the appropriate regulatory policy. Source link

Since this example is relatively simple, I will post the corresponding code directly, and then explain the response of various regulatory policies based on specific test cases:

class Supervisor extends Actor {
  // Monitor subordinates to handle exceptions thrown by subordinates accordingly
  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case_ :ArithmeticException= >Resume
      case_ :NullPointerException= >Restart
      case_ :IllegalArgumentException= >Stop
      case_ :Exception= >Escalate
    }
  var childIndex = 0 // Identifies the serial number of the subordinate Actor

  def receive = {
    case p: Props =>
      childIndex += 1
      // Returns a reference to Child Actor, so the Supervisor Actor is the Supervisor of the Child Actor
      sender() ! context.actorOf(p,s"child${childIndex}")}}class Child extends Actor {
  val log = Logging(context.system, this)
  var state = 0
  def receive = {
    case ex: Exception= >throw ex // Throw the corresponding exception
    case x: Int => state = x // Change the state
    case s: Command if s.content == "get" =>
      log.info(s"the ${s.self} state is ${state}")
      sender() ! state // Return to its state}}case class Command(// The corresponding command content:String,
    self: String
)Copy the code

Now let’s look at the specific test case: First let’s build a test environment:

class GuardianSpec(_system: ActorSystem)
    extends TestKit(_system)
    with WordSpecLike
    with Matchers
    with ImplicitSender {

  def this() = this(ActorSystem("GuardianSpec"))

  "A supervisor" must {

    "apply the chosen strategy for its child" in {
        code here...
        val supervisor = system.actorOf(Props[Supervisor]."supervisor") // Create a supervisor
        supervisor ! Props[Child]
        val child = expectMsgType[ActorRef] Get the response from TestKit's testActor}}}Copy the code

1.TestOne: The system is running properly

child ! 50 // Set the state to 50
child ! Command("get",child.path.name)
expectMsg(50)Copy the code

Normal operation, test passed.

TestTwo: Throws ArithmeticException

child ! new ArithmeticException // crash it
child ! Command("get",child.path.name)
expectMsg(50)Copy the code

Do you think the test will pass at this point? The answer is yes. The reason is that according to our regulatory policy, when the supervisor is faced with the ArithmeticException thrown by its child Actor, it will restore the corresponding Actor with the exception and keep the state of that Actor. Therefore, the state of the Actor is still 50, which passes the test.

3.TestThree: Throws a NullPointerException

child ! new NullPointerException // crash it harder
child ! "get"
expectMsg(50)Copy the code

Would the test still pass in this case? The answer is no. The reason is that according to our regulatory strategy, when the supervisor is faced with NullPointerException thrown by the child Actor, it will restart the Actor corresponding to the exception and its state will be cleared. Therefore, the state value of the Actor at this time should be 0 and the test fails.

4.TestFour: Throws IllegalArgumentException

supervisor ! Props[Child] // create new child
val child2 = expectMsgType[ActorRef]
child2 ! 100 // Set the state to 100
watch(child) Have testActor watch "child"
child ! new IllegalArgumentException // break it
expectMsgPF() {
  case Terminated(`child`) => (println("the child stop"))
}
child2 ! Command("get",child2.path.name)
expectMsg(100)Copy the code

Create a Child Actor (child2) and set its state to 100. Monitor the Child Actor (child1) and send it an IllegalArgumentException.

The Child Stop test passedCopy the code

From the results, we can see that the Child is stopped by its overseer after it throws an IllegalArgumentException, but the other actors under the overseer still work fine.

TestFive: Throws a custom exception

 watch(child2)
 child2 ! Command("get",child2.path.name) // verify it is alive
 expectMsg(100)
 supervisor ! Props[Child] // create new child
 val child3 = expectMsgType[ActorRef]
 child2 ! new Exception("CRASH") // escalate failure
 expectMsgPF() {
    case t @ Terminated(`child2`) if t.existenceConfirmed => (
       println("the child2 stop")
    )
}
child3 ! Command("get",child3.path.name)
expectMsg(0)Copy the code

Create Child Actor as child3, monitor child2, and send it an Exception(“CRASH”) to throw the Exception.

The child2 stop test failsCopy the code

A lot of people might wonder why TestFour can pass, but not here. Because here the error Actor throws an exception that its regulator can’t handle and only passes the failure back, The top actor default policy is for all of the Exception condition to restart (ActorInitializationException and ActorKilledException Exception). Since the default restart instruction stops all child actors, our child3 is also stopped. The test fails. Of course, you can also override the default restart method, for example:

override def preRestart(cause: Throwable, msg: Option[Any]) {}Copy the code

This will restart the corresponding Actor without stopping all actors under its children.

This article mainly introduces the supervision and fault tolerance in the Actor system, which is also very important in Akka. It is ingeniously combined with the tree organization structure of actors. This article refers to the corresponding chapter of Akka official documents, interested students can click here Akka Docs. You can also download my sample program, which contains an official provided fault tolerance example.