The Actor model

In object-oriented programming, an object can access or modify the value of another object, in the case of high concurrency, as a result of the machine performance bottleneck, when there are multiple objects of the same competition resources for operation, data error problems may arise (namely actually read data is not expected, but a phase to the stage in front of the unmodified complete data). The Actor model is modified so that instead of manipulating objects directly, it interacts with the outside world through messaging. As shown in the figure:

Actor only receives and processes one message at a time, and unprocessed messages are queued for processing.

Actors have several important concepts:

  • Actor: A working node that processes messages and modifies internal state.
  • Message: Data used to communicate between multiple actors.
  • Messaging: A development pattern that triggers behavior by passing messages.
  • Mailbox address: The target address for message delivery, from which messages are fetched when the Actor is idle.
  • Mailbox: A queue that stores multiple unprocessed messages.
  • Actor system: A system consisting of Actor collections, mailbox addresses, mailboxes, and configurations.

In an application, all actors form the ActorSystem (Actor system), which is a hierarchical structure. Except the top actors, all actors have a parent Actor. When the child Actor has an exception when processing messages, The parent Actor can handle child actors in pre-specified ways, including resuming child actors, restarting child actors, stopping child actors, and scaling failures. When ActorSystem is created, three actors are started by default.

All actors have their own life cycles, and Akka provides corresponding functions to respond to different life cycles. A common operation is to build an Actor to handle messages passed by other actors when they die, also known as Death Wath.

Because actors communicate via messages, it doesn’t care whether other actors are local or remote; actors just manipulate its references.

After a rough understanding of actors, we will start to learn Akka.

My environment is:

Operating system: Windows10

JDK version: JDK11

Akka depends on:

<dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>Akka - actor_2. 13</artifactId>
            <version>server</version>
        </dependency>
Copy the code

Hello Akka

Akka is a message-driven application development framework for high concurrency, distributed and elastic scaling scenarios. Based on the Actor model, it provides developers with the development idea of message control state. As mentioned earlier, actors manipulate model references to control internal state changes. Akka’s implementation of this is the ActorRef object. Akka obtains the Actor system through the actorSystem.create () method, and then calls the API to get references to the specified Actor. Communicate with other actors by sending messages by reference.

First, let’s try Akka’s Version of Hello World:

public class TestAkka{
    static ExecutorService threadPool = Executors.newFixedThreadPool(10000);

    public static void test(a){
        Demo demo = new Demo();
        for (int i = 0; i < 100000; i++) {
            threadPool.execute(newTestAkkaThread(demo)); }}public static void testAkka(a){
        // Get the Actor system
        ActorSystem sys = ActorSystem.create();
        // Get the specified Actor
        ActorRef ref = sys.actorOf(Props.create(AkkaDemo.class), "startActor");
        for (int i = 0; i < 100000; i++) {
            threadPool.execute(newTestAkkaThread(ref)); }}public static void main(String[] args) {
        // test();testAkka(); }}class AkkaDemo extends UntypedAbstractActor {

    private static int cnt = 1;

    public void onReceive(Object message){
        This method is called automatically when the Actor receives a message
        System.out.println(String.format("Message received %d time", cnt++)); }}class Demo {
    private static int cnt = 1;

    public void tell(a){
        System.out.println(String.format("Message received %d time", cnt++)); }}class TestAkkaThread implements Runnable{

    private Object ref;

    public TestAkkaThread(Object ref) {
        this.ref = ref;
    }

    @Override
    public void run(a) {
        if (ref instanceof ActorRef)
            // Pass the message to the corresponding Actor via ActorRef
            ((ActorRef)ref).tell("", ActorRef.noSender());
        else((Demo)ref).tell(); }}Copy the code

GetSelf () : Gets a reference to the current Actor

GetSender () : Returns A reference to the sender of the message received by the current Actor, such that if Actor A sends A message to Actor B, it will return A reference to Actor A when B calls getSender(). This can be simply interpreted as: a reference to the target that is returned to send the response message.

Akka provides two mechanisms for sending messages, namely tell and Ask. The main differences between them are as follows:

  1. Tell indicates synchronous transmission, and ask indicates asynchronous transmission.
  2. Tell has no return value, and ask can get the result after sending.

The applications of ask are as follows:

public class StartAkka extends UntypedAbstractActor {

    @Override
    public void onReceive(Object message){
        System.out.println("Receive message :" + message);
        getSender().tell("Return message", getSelf());
    }

    public static void main(String[] args) {
        ActorSystem sys = ActorSystem.create();
        ActorRef ref = sys.actorOf(Props.create(StartAkka.class), "startAkka");
        ref.tell("Hello Akka!", ActorRef.noSender());
        Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
        Future<Object> akka_ask = Patterns.ask(ref, "Akka Ask", timeout);
        System.out.println("ask...");
        akka_ask.onComplete(new Function1<Try<Object>, Object>() {
            @Override
            public Object apply(Try<Object> v1) {
                // Get the processing logic for successful reply
                if (v1.isSuccess()) System.out.println("Sent successfully, message received :" + v1.get());
                // Get the processing logic for the reply failure
                if (v1.isFailure()) System.out.println("Failed to send:" + v1.get());
                return null;
            }
        }, sys.dispatcher());
        System.out.println("continue..."); }}Copy the code

Patterns. Ask method will be executed asynchronously, if the Actor return messages timeout, will produce a akka. Pattern. AskTimeoutException

Sys.dispatcher () : Returns the message dispatcher for the current Akka, which will be covered later.

The Actor to find

In the Actor model, we know that an Actor system is actually a tree, and each Actor is a node. For existing actors, we can find them by path (current path/absolute path) :

public class SearchAkka extends UntypedAbstractActor {

    private ActorRef target = getContext().actorOf(Props.create(Target.class), "targetActor");

    @Override
    public void onReceive(Object message) throws Throwable, Throwable {
        if (message instanceof String) {
            if ("find".equals(message)){
                /* After receiving a "find" message, LookupActor will use ActorContext to find the ActorSelection object. When ActorSelection sends Identify, a messageId needs to be specified. After the message is sent, the current Actor receives an ActorIdentity. The specified ActorRef */ can be obtained using the actoridentity.getref () method
                ActorSelection targetActor = getContext().actorSelection("targetActor");

                // Find Actor asynchronously
                Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
                Future<Object> find = Patterns.ask(targetActor, "find", timeout);
                find.onComplete(new Function1<Try<Object>, Object>() {
                    @Override
                    public Object apply(Try<Object> v1) {
                        if (v1.isSuccess()) targetActor.tell(new Identify("A001"), getSelf());
                        if (v1.isFailure())  System.out.println("Search failed");
                        return null; } }, getContext().dispatcher()); }}else if (message instanceof ActorIdentity){
            ActorIdentity actorIdentity = (ActorIdentity) message;
            if (actorIdentity.correlationId().equals("A001")) {
                Optional<ActorRef> ref = actorIdentity.getActorRef();
                if(! ref.isEmpty()){ System.out.println("ActorIdentity is:" + actorIdentity.correlationId() + "" + ref);
                    ref.get().tell("hello target", getSelf()); }}}else unhandled(message);
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("sys");
        ActorRef actorRef = system.actorOf(Props.create(SearchAkka.class), "askActorDemo");
        Timeout timeout = new Timeout(10, TimeUnit.MINUTES);
        Future<Object> akka_ask = Patterns.ask(actorRef, "find", timeout);
        akka_ask.onComplete(new Function1<Try<Object>, Object>() {
            @Override
            public Object apply(Try<Object> v1) {
                if (v1.isSuccess()){
                    System.out.println("Received a message :" + v1.get());
                } else if (v1.isFailure()){
                    System.out.println("Lake failed to get message");
                }
                return null; } }, system.dispatcher()); }}/** * the object being searched */
class Target extends UntypedAbstractActor{
    @Override
    public void onReceive(Object message) throws Throwable, Throwable {
        System.out.println("target actor reveive: "+ message); }}Copy the code

Actor life cycle

Actors go through different stages at runtime, including create and start, resume, restart, and stop. Akka provides corresponding response apis for different Actor states:

PreStart () : before startup.

AroundPreStart () : The preStart() method can be overridden. By default, preStart() is reduced.

PreRestart () : preRestart before restart. (Will be abandoned)

AroundPreResrat () : You can override preRestart(). By default, preRestart() is used.

PostRestart () : after the restart.

AroundPostRestart () : This can override the postRestart() method. By default, postRestart() is reduced.

AroundPostStop () : This can override the postStop() method. By default, postStop() is used.

PostStop () : after stopping.

There are three ways to stop actors:

  1. Call ActorSystem or getContext() stop:

    sys.stop(ref);

  2. Send the Actor a PoisonPill message:

    ref.tell(PoisonPill.getInstance(), ActorRef.noSender());

  3. Send the Actor a Kill message that throws ActorKilledException:

    ref.tell(Kill.getInstance(), ActorRef.noSender());

When the Actor stops, it performs the following flow:

  1. Suspends the message queue by finishing processing the message in progress before completely stopping, without processing subsequent messages.
  2. Send termination instructions to all child actors, and when all child actors have stopped, then stop the child. When stopped, the postStop() method is called.
  3. Send a Terminated message to the life cycle monitor.

Actor behavior switch

When we deal with business, we may need to apply different processing logic for different messages. We can encapsulate the processing of multiple states into corresponding components and then assemble them. In Akka, the Producer implementation is provided, which has two methods: become (switch to an action) and unbecome (switch to the previous action). Examples are as follows:

public class StateAkka extends UntypedAbstractActor {

    private PartialFunction<Object, BoxedUnit> procedure1 = new PartialFunction<>() {
        @Override
        public BoxedUnit apply(Object param) {
            System.out.println(param);
            if ("break".equals(param)) getContext().unbecome();
            else System.out.println("state1:" + param);
            return null;
        }
        @Override
        public boolean isDefinedAt(Object x) {
            return true; }};private PartialFunction<Object, BoxedUnit> procedure2 = new PartialFunction<>() {
        @Override
        public BoxedUnit apply(Object param) {
            System.out.println(param);
            if ("break".equals(param)) getContext().unbecome();
            else System.out.println("state2:" + param);
            return null;
        }

        @Override
        public boolean isDefinedAt(Object x) {
            return true; }};@Override
    public void onReceive(Object message) {
        /* When the Procedue unbecome method is executed, the calculation process re-enters onReceive. After a call to become, the code logic for the new Producre is saved into an implementation stack, which can be used to return to the last display by calling UNbecome. You can also pass the second parameter false in the become method to indicate that the current behavior is not stored. * /
        System.out.println("Start execution mode :" + message);
        if ("1".equals(message)) getContext().become(procedure1);
        if ("2".equals(message)) getContext().become(procedure2);
    }

    public static void main(String[] args) {
        ActorSystem sys = ActorSystem.create("sys");
        ActorRef ref = sys.actorOf(Props.create(StateAkka.class), "statActor");
        ref.tell("1", ActorRef.noSender());
        ref.tell("nihao", ActorRef.noSender());
        ref.tell("nihao", ActorRef.noSender());
        ref.tell("break", ActorRef.noSender());
        ref.tell("2", ActorRef.noSender());
        ref.tell("nihao", ActorRef.noSender());
        ref.tell("nihao", ActorRef.noSender());
        ref.tell("nihao", ActorRef.noSender());
        ref.tell("nihao", ActorRef.noSender()); }}Copy the code

Actor fault tolerant processing

The Actor system adopts the mode of “parent supervision” to manage, that is, the parent Actor will monitor the abnormal situation of the child Actor, and then according to the default or default processing logic to determine whether to restore the Actor, stop Actor, restart Actor or submit the error to the parent.

Akka offers two monitoring strategies:

  • One-For-One Strategy(Default oversight policy) : When an exception occurs in a child Actor, only that Actor is treated.
  • All-For-One StratwWhen an exception occurs in one child Actor, all actors are treated.

When no explicit policy is specified in the program, a default policy is initiated that follows the following rules:

  1. When thrownActorlnitializationExceptionandActorKilledException, terminates the child Actor.
  2. When thrownException, the subactor is restarted.
  3. Throws of another typeThrowableExceptions are traced back to the parent.

Custom fault tolerance policies:

public class StrategyAkka extends UntypedAbstractActor {

    // Define the monitoring policy
    private SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.apply("1 minute"),
            new Function<Throwable, SupervisorStrategy.Directive>() {
                @Override
                public SupervisorStrategy.Directive apply(Throwable err) throws Exception {
                    if (err instanceof IOException) {
                        System.out.println("-----------IOException-----------");
                        return SupervisorStrategy.resume(); // Resume running
                    } else if (err instanceof IndexOutOfBoundsException) {
                        System.out.println("-----------IndexOutOfBoundsException-----------");
                        return SupervisorStrategy.restart(); / / restart
                    } else if (err instanceof SQLException) {
                        System.out.println("-----------SQLException-----------");
                        return SupervisorStrategy.stop(); / / stop
                    } else {
                        System.out.println("-----------UnkownException-----------");
                        return SupervisorStrategy.escalate(); // Upgrade failed}}});@Override
    public SupervisorStrategy supervisorStrategy(a) {
        return strategy;
    }

    @Override
    public void preStart(a) throws Exception {
        ActorRef ref = getContext().actorOf(Props.create(WorkActor.class), "workActor");

        // Monitor the lifecycle
        getContext().watch(ref);

        ref.tell("Hello", ActorRef.noSender());
        ref.tell(new IOException(), ActorRef.noSender());
        ref.tell(new IndexOutOfBoundsException(), ActorRef.noSender());

        Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
        Future<Object> akka_ask = Patterns.ask(ref, "getValue", timeout);
        System.out.println("ask...");
        akka_ask.onComplete(new Function1<Try<Object>, Object>() {
            @Override
            public Object apply(Try<Object> v1) {
                if (v1.isSuccess()) System.out.println("Sent successfully, message received :" + v1.get());
                if (v1.isFailure()) System.out.println("Failed to send:" + v1.get());
                return null;
            }
        }, getContext().dispatcher());

        System.out.println("continue...");
        super.preStart();
    }

    @Override
    public void onReceive(Object message) throws Throwable, Throwable {
        if (message instanceof Terminated)
            System.out.println(((Terminated)message).getActor() + "It has stopped.");
        else System.out.println("stateCount:" + message);
    }

    public static void main(String[] args) {
        ActorSystem sys = ActorSystem.create("sys");
        ActorRef ref = sys.actorOf(Props.create(StrategyAkka.class), "strategyActor"); }}class WorkActor extends UntypedAbstractActor {

    private int state = 1; // State parameters

    @Override
    public void preStart(a) throws Exception, Exception {
        System.out.println("start, state is:" + state++);
        super.preStart();
    }

    @Override
    public void postStop(a) throws Exception {
        System.out.println("stop");
        super.postStop();
    }

    @Override
    public void postRestart(Throwable reason) throws Exception {
        System.out.println("postRestart");
        super.postRestart(reason);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        // Simulate the computation task
        this.state++;
        System.out.println("message:" + message);
        if (message instanceof Exception) throw (Exception) message;
        else if ("getValue".equals(message)) getSender().tell(state, getSelf());
        elseunhandled(message); }}Copy the code

The OneForOneStrategy object requires three parameters:

  1. maxNrOfRetries: Indicates the maximum number of restarts within a specified period.
  2. withinTimeRange: Specifies the time size.
  3. decider: receives a Function object, passedapplyMethod to return the supervisor instruction:
    • SupervisorStrategy. Resume () :
    • SupervisorStrategy. Restart () : restart
    • SupervisorStrategy. Stop () : stop
    • SupervisorStrategy. Escalate () : failed to upgrade

Actor Circuit breaker mechanism

In a distributed environment, the system may fail to cascade due to network or other problems. To prevent massive resource consumption caused by system retry, Actor provides a circuit breaker mechanism. That is, if a specified number of attempts fail, an error message is displayed. The circuit breaker has the following three states:

  • Closed: Normally, the circuit breaker is Closed. When the call exceeds the configured wait time, a failure count is added and a success count is reset. When the number of failures reaches the specified number, the state enters Open.
  • Open: the caller throws CircuitBreakerOpenException errors and (resetTimeout) after the time specified, into the Half – Open state.
  • Half-open: After entering the half-open state, the first call is attempted. If the first call is successful, the state is returned to Close; otherwise, the state is Open and waits for the next restart time.
public class CricuitBreakAkka extends UntypedAbstractActor {

    private ActorRef workChild;

    private static SupervisorStrategy strategy = new OneForOneStrategy(20, Duration.ofMinutes(1),
            new Function<Throwable, SupervisorStrategy.Directive>() {
                @Override
                public SupervisorStrategy.Directive apply(Throwable param) throws Exception {
                    // Resume running directly
                    returnSupervisorStrategy.resume(); }});@Override
    public SupervisorStrategy supervisorStrategy(a) {
        return strategy;
    }

    @Override
    public void preStart(a) throws Exception {
        super.preStart();
        workChild = getContext().actorOf(Props.create(CricuitWorkActor.class), "workActor");
    }


    @Override
    public void onReceive(Object message) throws Throwable {
        workChild.tell(message, getSender());
    }

    public static void main(String[] args) {
        ActorSystem sys = ActorSystem.create();
        ActorRef ref = sys.actorOf(Props.create(CricuitBreakAkka.class), "cricuitBreakActor");
        for (int i = 0; i < 15; i++) {
            ref.tell("block Hello "+ i, ActorRef.noSender()); }}}class CricuitWorkActor extends UntypedAbstractActor{

    private CircuitBreaker breaker;

    @Override
    public void preStart(a) throws Exception {
        super.preStart();
        /** * Create a CircuitBreaker object at startup that blocks for 3s after sending a string beginning with "block" to CricuitWorkActor to trigger a timeout and count it once. * When the count reaches 3, the CircuitBreaker will be Open, triggering the onOpen function. After 5 seconds, the half-open state will be entered, at which point the onHalfOpen function will be called. If the message is successfully processed, the CircuitBreaker goes to Closed and onClose is called, otherwise it goes to Open again. * /
        this.breaker = new CircuitBreaker(getContext().dispatcher(), getContext().system().scheduler(), 3,
                Duration.ofSeconds(2), Duration.ofSeconds(5)).onOpen(new Function0<BoxedUnit>() {
            @Override
            public BoxedUnit apply(a) {
                System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> Acator CircuitBreak");
                return null;
            }
        }).onHalfOpen(new Function0<BoxedUnit>() {
            @Override
            public BoxedUnit apply(a) {
                System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> Acator CircuitBreak);
                return null;
            }
        }).onClose(new Function0<BoxedUnit>() {
            @Override
            public BoxedUnit apply(a) {
                System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> CircuitBreak");
                return null; }}); }@Override
    public void onReceive(Object message) throws Throwable {
        if (message instanceof String) {
            String msg = (String) message;
            if (msg.startsWith("block")) {
                getSender().tell(breaker.callWithCircuitBreaker(new Callable() {
                    @Override
                    public String call(a) throws Exception {
                        System.out.println("msg is: " + msg);
                        Thread.sleep(3000);
                        return null; } }), getSelf()); }}}}Copy the code

The custom

Customize the Dispatcher

In Akka, message communication and task execution by actors are based on a completely transparent scheduling mechanism that shields the use of the underlying thread pool and exposes a message distributor, the Dispatcher, as thread scheduling.

Associated with the Dispatcher are executors, which provide the Dispatcher with policies for executing asynchronous tasks. There are two types of executors:

  • Thread-pool-executor: A pool of threads based on work queues.
  • Fork-join-executor: The Java-like idea of fork/join, based on work-stealing thread pools, is the default option for Akka.

Custom Dispatcher:

Create the application.conf file in the classpath and write:

Mforkjoin-dispatcher {# dispatcher type = dispatcher # dispatcher type executor = "fork-join-executor" Fork-join-executor {# ForkJoin thread pool parallelism-min = 3 # Parallelism-factor = 3.0 # parallelism-max = 16 # ForkJoin thread pool Parallelism-min = 3 # Parallelism-factor = 3.0 # parallelism-max = 16 } throughput = 1executor My-pinned -dispatcher{executor = "thread-pool-executor" type = PinnedDispatcher # PinnedDispatcher is another Dispatcher that provides a thread pool of only one thread per Actor}Copy the code

Parallelism -factor Affects the maximum number of threads available to the thread pool as a concurrency factor: Maximum number of threads = number of processors * concurrency factor.

The type, including:

  • Dispatcher: Event-based scheduler that binds a set of actors to a thread pool.

  • PinnedDispatcher: Provides a thread pool of only one thread per Actor.

  • CallingThreadDispatcher: No thread of execution is created. The current thread makes the Actor call.

Use your own Dispatcher in your code:

// Specify the policy when creating the Actor reference
ActorRef ref = sys.actorOf(Props.create(CustomActorDemo.class).
                withDispatcher("my-forkjoin-dispatcher"), "customActor");
Copy the code

Customize MailBox

We know that when we send a message to an Actor, we don’t send it directly, we send the message to the “mailbox,” which schedules when to send it. An Actor mailbox is essentially a message queue and follows a first-in, first-out rule by default. Of course, we customize it based on some scenarios.

Mailboxes are divided into Unbounded and Unbounded mailboxes. By default, actors use UnboundedMailbox.

UnboundedMailbox is a queue structure based on a linked list. Elements enter the queue from the end of the queue and exit the queue from the first queue. Meanwhile, it uses CAS to ensure security in multi-threading and high performance.

Akka includes a number of custom mailboxes, including:

email instructions Whether blocking If there is a world
UnboundedMailbox Based on ConcurrentLinkedQueue implementation, first in, first out. N N
SingleConsumerOnlyUnboundedMailbox Multi-producer — single consumer pattern (slower than ConcurrentLinkedQueue). Y N
NonBlockingBoundedMailbox The multi-producer – single consumer model, which directly eliminates redundant messages, is not requiredmailbox-push-timeout-time. N Y
UnboundedControlAwareMailbox Priority send implementationControlMessageControl message. N N
UnboundedPriorityMailbox Allows the content to be prioritized. Extend this class and provide a comparator in the constructor. N N
UnboundedStablePriorityMailbox Similar to UnboundedPriorityMailbox, but it preserves the order of messages with equal priority. N N
BoundedMailbox The default bounded mailbox type used by the participant. Z N
BoundedPriorityMailbox A bounded mailbox that allows its contents to be prioritized. Z Y
BoundedStablePriorityMailbox A bounded mailbox that allows its contents to be prioritized, preserving the order of messages with equal priority. Z Y
BoundedControlAwareMailbox Priority send implementationControlMessageThere are boundaries and idols that control the message. Z Y

Z indicates: If mailbox-push-timeout-time is not 0, it may block, otherwise it will not.

First, we will create a custom mailbox class:

class CustomEmail extends UnboundedStablePriorityMailbox {

    public CustomEmail(ActorSystem.Settings settings, Config config){
        /* The smaller the return value, the higher the priority */
        super(new PriorityGenerator() {
            @Override
            public int gen(Object message) {
                if (message instanceof String) {
                    String msg = (String) message;
                    if (msg.startsWith("Zhang")) return 0;
                    if (msg.startsWith("Li")) return 1;
                    if (msg.startsWith("The king")) return 2;
                 }
                return 3; }}); }}Copy the code

Then, modify the contents of the application.conf file:

My - mailbox {mailbox - type = "cn. Bigkai. Akka. CustomEmail" # binding email mailbox capacity = 1000 # email mailbox capacity - push - timeout - time = 10s # queue timeout (for bounded mailboxes)}Copy the code

To test, associate custom mailboxes:

ActorRef ref = sys.actorOf(Props.create(CustomEmailActorDemo.class).withMailbox("my-mailbox"));
Copy the code

You can also configure the mailbox in the dispatcher configuration file and associate the dispatcher directly with the code:

My - forkjoin - the dispatcher {mailbox - type = "cn. Bigkai. Akka. CustomEmail" # binding email}Copy the code

Or simply inherit the corresponding mailbox interface from your Actor:

ControlMessage messages sent to this Actor will be processed first
public class CustomMailBoxAkka extends UntypedAbstractActor implements RequiresMessageQueue<UnboundedControlAwareMailbox> {
    @Override
    public void onReceive(Object message) throws Throwable { System.out.println(message); }}Copy the code

The mailbox queue interface corresponds to the type:

interface type
UnboundedMessageQueueSemantics UnboundedMailbox
BoundedMessageQueueSemantics BoundedMailbox
DequeBasedMessageQueueSemantics UnboundedDequeBasedMailbox
UnboundedDequeBasedMessageQueueSemantics UnboundedDequeBasedMailbox
BoundedDequeBasedMessageQueueSemantics BoundedDequeBasedMailbox
MultipleConsumerSemantics UnboundedMailbox
AwareMessageQueueSemantics UnboundedControlAwareMailbox
UnboundedControlAwareMessageQueueSemantics UnboundedControlAwareMailbox
BoundedControlAwareMessageQueueSemantics BoundedControlAwareMailbox
LoggerMessageQueueSemantics LoggerMailboxType

Above we customize mailboxes by inheriting the interface of the corresponding mailbox type, but what if we want to modify the implementation of the queue ourselves or do more fine-grained operations? This is where you need to implement their parent interface: MessageQueue:

class BusinessMsgQueue implements MessageQueue{

    private Queue<Envelope> queue = new ConcurrentLinkedDeque<>();

    @Override
    public void enqueue(ActorRef receiver, Envelope handle) {
        queue.offer(handle);
    }

    @Override
    public Envelope dequeue(a) {
        return queue.poll();
    }

    @Override
    public int numberOfMessages(a) {
        return queue.size();
    }

    @Override
    public boolean hasMessages(a) {
        return! queue.isEmpty(); }@Override
    public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
        for(Envelope ev : queue) deadLetters.enqueue(owner, ev); }}class BusinessMailBoxType implements MailboxType.ProducesMessageQueue<BusinessMsgQueue>{

    // You must build a configuration parameter with Settings and Config
    public BusinessMailBoxType(ActorSystem.Settings settings, Config config) {}// Specify a custom queue
    @Override
    public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
        return newBusinessMsgQueue(); }}Copy the code

Actor message routing

Akka not only provides TELL, Ask and other APIS for simple message delivery, but also realizes corresponding methods for polling, broadcast and other complex message delivery logic through routing. Let’s try Akka’s routing mechanism.

Before we start coding, let’s understand two concepts: Router and Route:

  • Router: indicates a Router that forwards messages.
  • Routee: indicates the routing target. The message forwarded by the router is forwarded to the corresponding routing target.

Here is a simple test code:

public class RouteAkka extends UntypedAbstractActor{

    private Router router;

    @Override
    public void preStart(a) throws Exception {
        super.preStart();
        ArrayList<Routee> list = new ArrayList<>();
        for (int i = 0; i < 2; i++)
            list.add(new ActorRefRoutee(getContext().actorOf(Props.create(RouteeActor.class), "routeeActor" + i)));
        router = new Router(new RoundRobinRoutingLogic(), list); // Poll delivery
    }

    @Override
    public void onReceive(Object message) throws Throwable {
        router.route(message, getSender());
    }

    public static void main(String[] args) {
        ActorSystem sys = ActorSystem.create();
        ActorRef ref = sys.actorOf(Props.create(RouteAkka.class), "routeActor");
        for (int i = 0; i < 10; i++) {
            ref.tell("Hello "+ i, ActorRef.noSender()); }}}class RouteeActor extends UntypedAbstractActor {

    @Override
    public void onReceive(Object message) throws Throwable {
        System.out.println(getSelf() + "-- >"+ message); }}Copy the code

RoundRobinRoutingLogic inherits RoutingLogic and rewrites its select method to get the subscript of the current routee by incrementing and complementing an AtomicLong atomic class.

private final AtomicLong next = new AtomicLong();

public Routee select(final Object message, final IndexedSeq<Routee> routees) {
    Object var10000;
    if (routees.nonEmpty()) {
        int size = routees.size();
        int index = (int) (this.next().getAndIncrement() % (long)size);
        var10000 = (Routee)routees.apply(index < 0 ? size + index : index);
    } else {
        var10000 = .MODULE$;
    }

    return (Routee)var10000;
}
Copy the code

In the Route method of a Router, send is used to send messages.

public void route(final Object message, final ActorRef sender) {
    BoxedUnit var3;
    if (message instanceof Broadcast) {
        Broadcast var5 = (Broadcast)message;
        Object msg = var5.message();
        // Send a message
        (new SeveralRoutees(this.routees())).send(msg, sender);
        var3 = BoxedUnit.UNIT;
    } else {
        this.send(this.logic().select(message, this.routees()), message, sender); var3 = BoxedUnit.UNIT; }}Copy the code

Built-in routing types:

routing instructions Support the pool Support group
RoundRobinRouting Polling sends messages. Y Y
RandomRouting Send messages randomly. Y Y
BalancingRouting Dynamically allocate Routee’s tasks to achieve a balance of execution time. Y N
SmallestMailbox Try sending a message to a non-pending Routee with fewer messages. Y N
Broadcast Will be broadcast to all Routees. Y Y
ScatterGatherFirstComp leted Send the message to all routees and wait for the quickest reply (throw an exception if there is no reply within the specified time). Y Y
TailChopping A message is first sent to a randomly selected Routee and then sent to a second randomly selected Routee for some time until a reply is received. Y Y
ConsistentHashing Use a consistent Hash algorithm to select Routee. Y Y

A routing Actor has two modes:

  • Pool: Router Actor creates sub-actors to act as its Routee, monitors and monitors it, and removes it when a Routee terminates, as shown in the example above.

  • Group: The mode of production for Routee can be placed externally (that is, set in a CONF file), and then the router Actor sends messages to these targets through a path.

    my-dispatcher{
        executor = "thread-pool-executor"
        type  = PinnedDispatcher
        router = boradcast-group
        routees.path = ["/default/user/routeActor/routeeActor1", "/default/user/routeActor/routeeActor2"]
    }
    Copy the code