Akka is a set of asynchronous communication framework based on Actor model implemented by Scala language, which can be used to build highly concurrent, distributed, fault-tolerant, event-driven JVM-based applications. It has been used in Spark to realize communication between processes and nodes. In the actual project, we successfully built a model deployment platform to meet business requirements.

Project background

A large domestic chain catering enterprises have a large number of stores. The daily production, ordering and scheduling of restaurants and stores all depend on the rationality of daily customer unit estimation. Its internal data team has realized a set of estimation model, and TalkingData is needed to help build an engineering platform to support the training and deployment of the model, so as to truly apply the model to the actual production process.

After communication, we found that in the actual production environment, there are some problems in various aspects:

  • Asynchronous: the previous day sales and business data of all stores are integrated and uploaded by the store manager of each store. The upload start time, end time, and data integrity are uncertain. Both model training and prediction rely on this part of the data, which means that it is impossible to set a unified starting point for model training and prediction.

  • High concurrency: Except for some special types of stores, the business hours of most stores are relatively fixed. From the store manager’s decision to organize and upload sales data, to the preparation of materials and scheduling for the next day’s business, about 3 hours are left for model training and model prediction to return forecast results. If each store has 2 to 3 predictive indicators, it needs sufficient scheduling capacity to complete about 20,000 model training and prediction processes within the specified time.

  • Fault tolerance: With so many stores and different situations, there are still many potential factors that could cause the process to go wrong or fail. In principle, the failure of one process should not have any impact on other processes, and each process should be a separate task at the platform level.

Therefore, we need a set of lightweight distributed service framework to realize the model training and prediction platform to meet the above requirements, and to ensure the scalability of the platform to a certain extent. Based on the accumulation of technology in the previous team, Akka framework was finally selected to realize the internal communication of the platform.

The selection process

Message-driven approach — process asynchrony

A complete prediction tasks include: training data preparation, model training and model results derived to predict data preparation, prediction results export, including data preparation steps in time not sure, model steps uncertainty on the results, if use synchronization model, will produce a large number of waiting threads, take up a huge waste of resources. In the Actor model, each Actor, acting as a basic computing unit, responds to received messages while simultaneously:

  • Send a limited number of messages to other actors

  • Create a limited number of new actors

  • Specifies the behavior when the next message is received

The above operations are not assumed to be sequential, so they can be done in parallel. The sender is decoupled from the message already sent and can communicate asynchronously without waiting.

Actor model communication mode

Actors in Akka are essentially objects that receive messages and take action to process them. They are objects that encapsulate state and behavior. Their only way of communication is to exchange messages, storing them in the recipient’s mailbox. Actors naturally form a tree structure, the essence of which is that tasks are broken down and delegated until they are small enough to be processed in their entirety. Therefore, we split and abstract each step of the prediction task, and create type messages corresponding to the steps, and give each step to the thread-level Actor for processing. By sending messages of different types, we trigger the actors to create different operations, so that the whole prediction process does not need to wait.

Structure – Handles high concurrency

Since most stores operate at roughly the same time, the platform will have obvious peaks and valleys in flow. During the valleys, the platform needs to reduce the amount of resources as much as possible, and when the peak flow comes, the platform should respond in time to ensure sufficient availability.

After discussion, we determined the platform structure using master-worker mode, in which the Master is responsible for receiving and assigning tasks, and the Worker is responsible for processing and executing specific model tasks.

Both Master and Worker are independent Actorsystems that manage actors with different internal operation logic and occupy very little resources in idle state. Actors are thread-level and occupy only a small amount of resources. Their life cycles are managed by ActorSystem. When there are a small number of requests, Actor threads have a high reuse rate. When the request concurrency is high, ActorSystem will create a large number of Actor threads to accept requests and ensure availability.

The life cycle of actors in Akka

Subactors — Modularity improves fault tolerance

The model-related steps of each prediction task have the possibility of failure. In addition, network fluctuation and content verification errors during data preparation may lead to the failure of the current prediction task. For failed tasks, we want to be able to log error messages as much as possible to provide preconditions for rerunning.

In Akka, a tree supervision structure of parent and child actors is constructed to provide the supervision mechanism of actors to ensure fault tolerance and give the responsibility of handling response errors to entities other than the error object. The parent Actor creates child actors to delegate to the child tasks and automatically oversees them. The list of child actors is maintained in the context of the parent Actor, which has access to it.

The Actor structure in Akka

Prepare is responsible for data preparation, while Executor is responsible for model-related steps. The parent Actor of Worker is responsible for management. Errors and exceptions are thrown to the upper layer, which are recorded by the parent Actor of the Worker terminal and sent to the error collection module for unified processing.

Practical application

ActorSystem

When you create an ActorSystem, by default you will look for application.conf, application.json, and application.properties in your classpath and load them automatically:

val system=ActorSystem("RsModelActorSystem")
val system=ActorSystem("RsModelActorSystem"Configfactory.load ()) // same as aboveCopy the code

If you want to use your own configuration file, you can configure loading with ConfigFactory:

        val system = ActorSystem("UniversityMessageSystem", 
           ConfigFactory.load("own-application.conf")) 


        val config = ConfigFactory.parseString(
              s"""
            |akka.remote.netty.tcp.hostname = $host
            |akka.actor.provider = akka.remote.RemoteActorRefProvider
            |akka.remote.enabled-transport = akka.remote.netty.tcp
            |akka.remote.netty.tcp.port = 2445
              """.stripMargin)
        val system = ActorSystem("RsModelActorSystem", config.withfallback (configFactory.load ())Copy the code


A large number of parameters in the configuration parameters of ActorSystem can be customized and need to be modified according to actual needs. For example, in this project, the size of a single algorithm task object exceeds the default Akka Remote package size of 128000 bytes. Need to modify the parameters of the akka.remote.net ty. TCP. Maximum – frame – size

Actor

An Actor contains state, behavior, a mailbox, subactors, and a regulatory policy, all wrapped in an Actor reference. An Actor object usually contains variables that reflect its possible state, and Akka-Actor’s own lightweight threads are completely isolated from the rest of the system, so you don’t have to worry about concurrency. Each time a message is processed, it matches the current behavior of the Actor. A behavior is a function that defines the action to be taken to process the current message at a certain point in time, and requires specific logic to be written in accordance with actual requirements. The mailbox of Actor is the link between sender and receiver. Each Actor has only one mailbox, and all messages sent are queued in the mailbox. You can choose between mailbox implementations with different policies. The default is FIFO.

Write logic

In Actor classes, the main logic is implemented in the receive method, which executes and returns the corresponding logic via partial function methods:

Class RsActor extends Actor with ActorLogging {override def receive: receive = {case MapMessage(parameters) =>
                      println(parameters.get("code"))

                case MapKeyMessage(parameters, key) =>
                      println(parameters.get(key))

                case StringMessage(msg) =>
                      println(msg.getBytes().length)

                case o: Object =>
                      println(o.getClass)

                case _: AnyRef =>
                      println("233")}}Copy the code

To generate the reference

There are two main ways to generate an Actor instance that can receive messages:

Val rsActor = system.actorof (Props[rsActor],"rsActor"Val rmActor = system.actorSelection(val rmActor = system.actorSelection("Akka. TCP: / / [email protected]:2445 / user/rsActor") // Use! Send a message to the corresponding Actor instance rsActor! StringMessage("test")
        rmActor ! MapMessage(Map("code"->"233"))Copy the code

Message

Akka is not too strict about the content of the message passed, which can be either a basic data type or an object that supports serialization:

/ / the scalacaseClass makes it easy to create message classes succinctlycase class StringMessage(msg: String) extends Serializable
        case class MapMessage(parameters: Map[String, String]) extends Serializable
        case class MapKeyMessage(parameters: Map[String, String], key: String) extends SerializableCopy the code

other

As a widely used open source tool, Akka has shown many advantages in practical projects. The asynchronous message-driven approach also provides us with a new set of ideas and implementation methods.



Li Tianye is a data scientist at TalkingData. Graduated from Northeastern University, working in the Data science team of TalkingData, engaged in data science automation related work.

This article was adapted from: InfoQ