This post was posted by GodPan on the ScalaCool team blog.

The write this part mentioned now, because this paragraph of time development project just met with some difficulties in this piece, so prepare to share experience with everyone, we are using Akka, often encounter some storage scene Actor’s internal state, under the condition of the normal operation of system, we don’t need to worry about anything, but when the system error, For example, an Actor error needs to be restarted, or memory overflow, or the whole system crashes. If we do not take certain solutions, the state of the Actor will be lost when the system restarts, which will lead to the loss of some key data, resulting in the problem of inconsistent system data. As a mature application in production environment, Akka provides us with a corresponding solution: Akka Persistence.

Why do you need persistent actors?

Plus ca change, the consistency of the data is an eternal theme, a performance, it is a good system cannot ensure that the correct data, also not be a good system, a system is bound to go wrong at run time, how to ensure the system correctly in error after the recovery of data, don’t let the data chaos is a difficult problem. Using the Actor model, we can have such an idea, which is able to not database operation is not as far as possible database operations (here we assume that our database is safe, reliable and can ensure the correctness of the data and consistency, such as using a domestic cloud cloud database), on the one hand, if a large amount of data operation will make database facing great pressure, On the other hand, even if the database can handle large table operations, such as count and update operations, it will consume a lot of time, which is not as fast as direct operations in memory, which greatly affects performance. But some people say memory operations are so fast, why not put all the data in memory? The obvious answer is that when a machine crashes, or memory runs out, the data is probably lost and cannot be recovered. In this context, do we have a better solution that can meet the requirements and use the minimum performance consumption? The answer is Akka Persistence.

The core architecture of Akka Persistence

Before going into Akka Persistence in detail, we can first understand its core design concept. In fact, to put it simply, we can use some things to restore the state of Actor. Here, things can be logs, data in databases, or files, so its essence is very easy to understand. We store some data during processing in the Actor, which the Actor can use to restore its state.

Therefore, Akka Persistence consists of the following key components:

  • PersistentActor: Any Actor that requires persistence must inherit it and must define or implement three key properties:
 def persistenceId = "example" // As a unique representation of persistent Actor, used for persistence or query purposes

 def receiveCommand: Receive=???// The Actor handles the message logic in normal runtime and can persist the desired message in this part of the content

 def receiveRecover: Receive=???//Actor restart recovery is the logic to executeCopy the code

In addition to receiveCommand’s similarity, two other properties must be implemented compared to regular actors. Two other key concepts in persistent actors are Journal, which is used to persist events, and Snapshot, which is used to hold snapshots of actors, both of which play a critical role in restoring the state of actors.

Akka Persistence demo combat

Here, I will first use a demo to let everyone have a certain understanding of the use of Akka Persistence, and can roughly understand its working principle, and then continue to explain some practical problems that may be encountered.

Now suppose that there is a big red envelope of 1W yuan, instantly many people may grab it at the same time, and the amount of each person grab may be different. The scene is very simple, and there are many ways to achieve it, but the premise is to ensure the correctness of the data, such as the most common database guarantee, However, students who know something about this know that this is not a very good solution, because it requires locks and a large number of database operations, resulting in low performance. So can we use actors to achieve this requirement? The answer is absolutely yes.

Let’s start by defining a raffle command,

case class LotteryCmd(
  userId: Long, // Participating usersId
  username: String, // User name email:String// Email addresses of participating users)Copy the code

We then implement a raffle Actor and PersistentActor to implement it:

case class LuckyEvent(// Lucky draw success event userId:Long,
    luckyMoney: Int
)
case class FailureEvent(// Draw failed event userId:Long,
    reason: String
)
case class Lottery(
    totalAmount: Int, // Total amount of remainAmount:Int// The amount of red packets left) {
  def update(luckyMoney: Int) = {
    copy(
      remainAmount = remainAmount - luckyMoney
    )
  }
}
class LotteryActor(initState: Lottery) extends PersistentActor with ActorLogging{
  override def persistenceId: String = "lottery-actor-1"

  var state = initState  // Initialize the state of the Actor

  override def receiveRecover: Receive = {
    case event: LuckyEvent =>
      updateState(event)  // Restore Actor state based on persistent events
    case SnapshotOffer(_, snapshot: Lottery) =>
      log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")
      state = snapshot // Use the snapshot to restore the state of the Actor
    case RecoveryCompleted => log.info("the actor recover completed")}def updateState(le: LuckyEvent) =
    state = state.update(le.luckyMoney)  // Update its status

  override def receiveCommand: Receive = {
    case lc: LotteryCmd =>
      doLottery(lc) match {     // Draw a lottery, and get the result of the lottery, according to the result to make different processing
        case le: LuckyEvent= >// Draw a random red envelope
          persist(le) { event =>
            updateState(event)
            increaseEvtCountAndSnapshot()
            sender() ! event
          }
        case fe: FailureEvent= >// The red envelope has been drawn out
          sender() ! fe
      }
    case "saveSnapshot"= >// Receive the snapshot command to store the snapshot
      saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>  ???  // You can perform some operations after the snapshot is saved, such as deleting the previous snapshot
  }

  private def increaseEvtCountAndSnapshot() = {
    val snapShotInterval = 5
    if (lastSequenceNr % snapShotInterval == 0&& lastSequenceNr ! =0) {  // After 5 events are persisted we store a snapshot of the current Actor state
      self ! "saveSnapshot"}}def doLottery(lc: LotteryCmd) = {  // The raffle logic is implemented
    if (state.remainAmount > 0) {
      val luckyMoney = scala.util.Random.nextInt(state.remainAmount) + 1
      LuckyEvent(lc.userId, luckyMoney)
    }
    else {
      FailureEvent(lc.userId, "Come earlier next time, the red envelope has been drawn out!")}}}Copy the code

The procedure is very simple, the key position I have also given notes, I believe that you understand the Actor is easy to understand, of course, if there is some doubt, you can take a look at the article I wrote before, the following we just write to draw a red envelope Actor test:

object PersistenceTest extends App {
  val lottery = Lottery(10000.10000)
  val system = ActorSystem("example-05")
  val lotteryActor = system.actorOf(Props(new LotteryActor(lottery)), "LotteryActor-1")  // Create a raffle Actor
  val pool: ExecutorService = Executors.newFixedThreadPool(10)
  val r = (1 to 100).map(i =>
    new LotteryRun(lotteryActor, LotteryCmd(i.toLong,"godpan"."[email protected]"))  // Create 100 raffle requests
  )
  r.map(pool.execute(_))  // Use a thread pool to initiate a raffle request, simulating simultaneous participation
  Thread.sleep(5000)
  pool.shutdown()
  system.terminate()
}

class LotteryRun(lotteryActor: ActorRef, lotteryCmd: LotteryCmd) extends Runnable { // Raffle request
  implicit val timeout = Timeout(3.seconds)
  def run: Unit = {
    for {
      fut <- lotteryActor ? lotteryCmd
    } yield fut match {  // Display different lucky draw results according to different events
      case le: LuckyEvent => println(S "Congratulations to the user${le.userId}in${le.luckyMoney}Yuan red envelope")
      case fe: FailureEvent =>  println(fe.reason)
      case _ => println("System error, please extract again")}}}Copy the code

Running the program, we might see the following:


result persistence demo


Here I will give the steps of persistence Actor in the whole operation process to help you understand its principle:

  • 1. Initialize Persistence Actor

    • 1.1 If it is initialized for the first time, the initialization is the same as that of normal Actor.
    • 1.2 If you restart the Actor, this is based on the persistent data recovery of the Actor before.
      • 1.2.1 Recovering from a Snapshot: Actors can be quickly restored. However, not every persistent event saves snapshots. If a snapshot is complete, actors have the priority to recover their state from the snapshot.
      • 1.2.2 Recovering from events (logs, database records, etc.) and restoring Actor state by replaying persistent events is critical.
  • 2. Receive commands for processing, and convert them into events that need to be persisted (persistent events contain only critical data as far as possible). Use Persistence Actor’s Persistence method to persist (persist in the above example, I will talk about batch Persistence later), and handle the logical processing after successful Persistence. For example, modify Actor state, send messages to external actors and so on.

  • 3. If you need to store snapshots, you can proactively specify the snapshot storage frequency. For example, if you store snapshots for 100 persistent events, you can store snapshots once.

In general, the general operation of Persistence Actor runtime is the above. Of course, it is how R persists events, what is the mechanism of recovery, etc. If you are interested, you can take a look at Akka source code.

Use the relevant configuration for Akka Persistence

First we must load the corresponding dependency package and add the following dependencies to bulid.sbt:

libraryDependencies ++= Seq(
"com.typesafe.akka"% %"akka-actor" % "2.4.16", //Akka actor core dependencies"com.typesafe.akka"% %"akka-persistence" % "2.4.16", //Akka persistence dependency"org.iq80.leveldb"            % "leveldb"          % "0.7"Leveldb Java version dependency"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"Leveldb Java version dependency"com.twitter"% %"chill-akka"                  % "0.8.0"// Event serialization dependency)Copy the code

We also need to add the following configuration to application.conf:

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "log/journal"
akka.persistence.snapshot-store.local.dir = "log/snapshots"

# DO NOT USE THIS IN PRODUCTION!!!!!!!!! #See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false  // Since levelDB is not installed locally, this property is set to false, but is not recommended for production environments

akka.actor.serializers {
  kryo = "com.twitter.chill.akka.AkkaSerializer"
}

akka.actor.serialization-bindings {
  "scala.Product" = kryo
  "akka.persistence.PersistentRepr" = kryo
}Copy the code

So far, our entire Akka Persistence demo has been built and can run normally. Interested students can download the source code. Source link

Akka personal prestige

1. Persistent plug-ins

If you are not familiar with LevelDB or feel that single storage is not secure, do you have any plug-ins that support distributed data storage, such as your cloud database? The answer is to have cough up of course, conscience of me is to help you all look for good cough up of course.

  • 1. Akka-persistence-sql-async: supports MySQL and PostgreSQL. In addition, akKA-persistence-SQL-async uses a fully asynchronous database driver to provide asynchronous non-blocking APIS. The project address

  • 2. Akka – persistence – Cassandra: the official recommended plug-in, using the write performance very very very fast Cassandra database, a few of the more popular a plug-in, and it also supports the persistence of the query. The project address

  • 3. Akka-persistence-redis: Redis should also fit the akka Persistence scene very well. Students familiar with Redis can use it to have a look. The project address

  • 4. Akka-persistence-jdbc: How can there be no JDBC? How else to be Java dad? Scala and Java are supported. The project address

The specific use of the corresponding plug-in can be seen in the project’s specific introduction to use, I looked at the relatively easy.

2. Batch persistence

As mentioned above, our company uses akka-persistence-SQL-Async plug-in, so we persist events and snapshots to the database. At the beginning, I also persisted each event to the database just like the demo above, but later in the performance test, When the database reaches 1000+ read/write volume per second, the Actor can only handle 60~70 events that need to be persisted per second. However, the actual business scenario requires the Actor to return the processing result within 3 seconds, which leads to a large number of message processing timeout without feedback. In addition, a large number of messages can not be processed, resulting in the explosion of system errors and the decline of user experience. Now that we have found the problem, can we optimize it? PersistAll: Akka Persistence persistAll: Akka Persistence persistAll: Akka Persistence persistAll:

class LotteryActorN(initState: Lottery) extends PersistentActor with ActorLogging{
  override def persistenceId: String = "lottery-actor-2"

  var state = initState  // Initialize the state of the Actor

  override def receiveRecover: Receive = {
    case event: LuckyEvent =>
      updateState(event)  // Restore Actor state based on persistent events
    case SnapshotOffer(_, snapshot: Lottery) =>
      log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")
      state = snapshot // Use the snapshot to restore the state of the Actor
    case RecoveryCompleted => log.info("the actor recover completed")}def updateState(le: LuckyEvent) =
    state = state.update(le.luckyMoney)  // Update its status

  var lotteryQueue : ArrayBuffer[(LotteryCmd.ActorRef)] = ArrayBuffer()

  context.system.scheduler  // The timer triggers the lottery logic
    .schedule(
      0.milliseconds,
      100.milliseconds,
      new Runnable {
        def run = {
          self ! "doLottery"}})override def receiveCommand: Receive = {
    case lc: LotteryCmd =>
      lotteryQueue = lotteryQueue :+ (lc, sender())  // Enter information to join the lottery queue
      println(s"the lotteryQueue size is ${lotteryQueue.size}")
      if (lotteryQueue.size > 5)  // The draw is triggered when there are 5 participants
        joinN(lotteryQueue)
    case "doLottery"= >if (lotteryQueue.size > 0)
        joinN(lotteryQueue)
    case "saveSnapshot"= >// Receive the snapshot command to store the snapshot
      saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>  ???  // You can perform some operations after the snapshot is saved, such as deleting the previous snapshot
  }

  private def joinN(lotteryQueue: ArrayBuffer[(LotteryCmd.ActorRef)]) = {  // Process the lottery results in batches
    val rs = doLotteryN(lotteryQueue)
    val success = rs.collect {  // Get the corresponding information about the winning prize
      case (event: LuckyEvent, ref: ActorRef) =>
        event -> ref
    }.toMap
    val failure = rs.collect {  // Get the corresponding information about not winning the prize
      case (event: FailureEvent, ref: ActorRef) => event -> ref
    }
    persistAll(success.keys.toIndexedSeq) {  // Batch persisting winning user events
      case event =>  println(event)
        updateState(event)
        increaseEvtCountAndSnapshot()
        success(event) ! event
    }
    failure.foreach {
      case (event, ref) => ref ! event
    }
    this.lotteryQueue.clear()  // Clear the queue
  }


  private def increaseEvtCountAndSnapshot() = {
    val snapShotInterval = 5
    if (lastSequenceNr % snapShotInterval == 0&& lastSequenceNr ! =0) {  // After 5 events are persisted we store a snapshot of the current Actor state
      self ! "saveSnapshot"}}private def doLotteryN(lotteryQueue: ArrayBuffer[(LotteryCmd.ActorRef)]) = {  // The raffle logic is implemented
    var remainAmount = state.remainAmount
    lotteryQueue.map(lq =>
      if (remainAmount > 0) {
        val luckyMoney = scala.util.Random.nextInt(remainAmount) + 1
        remainAmount = remainAmount - luckyMoney
        (LuckyEvent(lq._1.userId, luckyMoney),lq._2)
      }
      else{(FailureEvent(lq._1.userId, "Come earlier next time, the red envelope has been drawn out!"),lq._2)
      }
    )
  }
}Copy the code

This is transformed after the participation Actor, to achieve the function of batch persistence, of course, here in order to return messages to the sender, processing logic is a bit more complex, but the real scene may be more complex, relevant source code is also in the project just now.

3.Persistence Query

In addition, Akka Persistence also provides Query interface, which is used for querying persistent events. This part may need to be considered according to actual business scenarios, so I will not expand on it. In addition, I have also written a small demo in the project, and students who want to try it can also try it.