Future is a convenient tool for asynchronous programming, and Akka initially provided its own implementation. At the same time, Twitter Finagle and Scalaz also gave the implementation of Future respectively to prove the feasibility. After the validation phase, Scala redesigned the Scala.Concorrent package in an enhanced way after version 2.10, and Future is now a standard library feature.

For the basic use of Future, see Scala + Future for Asynchronous Programming – Nuggets (juejin. Cn). The focus of this chapter is how to combine futures to express exactly the parallel computation that is expected to be performed. In Chapter 7 of Functional Programming in Scala, I discuss Functional style parallel computing further.

Before you get started, you need to familiarize yourself with Scala’s syntactic sugar: when a function/method call takes only one argument, you can write f(g) instead of the curly braces form f {g}. Also, Scala allows the use of a space ‘ ‘instead of the access operator. Such as:

// This is suitable for DSL style.
List {1} map {_ + 1} foreach {print}
//
List(1).map(_ + 1).foreach(print)
Copy the code

A lot of code blocks in the form of Future {codeblock} follow, which are essentially generic calls to the Future.apply build method. The codebLock inside the Future is deferred, and it is evaluated in another thread.

Building a task with a Future requires an implicit Execution. It is an abstraction for scheduling tasks based on an existing thread pool implementation, or scheduler for short. Scala provides schedulers by default that are fork-join thread pools based on the type work-stealing.

import scala.concurrent.ExecutionContext.Implicits.global
Copy the code

Combinatorial mechanics (critical)

The flatMap receives the parameter S => Future[U], which means that we can nest as many futures as we want with this combinator and end up with the map combinator at the deepest point of the recursion. See the code comments for details.

val eventualInts: Future[List[Int]] = Future {List(1.2.3.4.5)}

// Notice that we are actually creating a serial code.
val convertedEventualInts: Future[List[Int]] = eventualInts flatMap {
    // The Future contains a list of free variables,
    // This free variable list comes from the closure eventualInts,
    // So the Future has data associated with external eventualInts.
    list => Future { list map {_+1}} flatMap { list =>
        // In the same way, the Future being created is also associated with the upper-level Future data.
        Future { list map {_ * 2}} map{l => l}
    }
}
Copy the code

Too much nesting with flatMaps and maps can make your code less readable. For Future tasks consisting purely of flatMaps and maps, it is much simpler to use imperative for-yield expressions.

val convertedEventualInts2: Future[List[Int]] = for {
    list          <- eventualInts
    convertedList <- Future {list.map(_ + 1)}   	 // f1
    finalList     <- Future {convertedList.map(_ * 2)}   // f2
} yield finalList
Copy the code

The change in code style does not disguise the two Future F1 and F2 data related issues. Here F2 again has to wait for the result passed in by F1, so this code is executed serially.

In fact, if you pass a temporarily constructed Future {… } constructors, or temporary calls to functions that return the Future[A], always cause the for expression side to initialize the Future side to execute. In plain English, this means that multiple Future tasks are sequentially arranged, even if there is no data between them.

Here, for example, is a deceptively parallel Map-reduce task whose time is the sum of its subtasks.

val futures : Future[Int] = for {
    l1            <- Future {Thread.sleep(100);1}
    l2            <- Future {Thread.sleep(200);2}
    l3            <- Future {Thread.sleep(300);3}
    l4            <- Future {Thread.sleep(400);4}}yield l1 + l2 + l3 + l4
// The program takes at least 1s to run, so this code throws an exception.
val r2 = Await.result(futures, 9. second)
Copy the code

Instead, create Future tasks externally, and then collect the results of those tasks through a for expression.

val f1 = Future  {Thread.sleep(100);1}
val f2 = Future  {Thread.sleep(200);1}
val f3 = Future  {Thread.sleep(300);1}
val f4 = Future  {Thread.sleep(400);1}

val futures : Future[Int] = for {
  l1            <- f1
  l2            <- f2
  l3            <- f3
  l4            <- f4
} yield l1 + l2 + l3 + l4
val r2 = Await.result(futures, 4. second)
Copy the code

With only a slight change in writing, the program now takes 0.4 seconds to execute. The for expression is the preferred syntax, and the following code is a subversion of its flatMap/map combinator:

val f1: Future[Int] = Future {Thread.sleep(100);1}
val f2: Future[Int] = Future {Thread.sleep(200);2}
val f3: Future[Int] = Future {Thread.sleep(300);3}
val f4: Future[Int] = Future {Thread.sleep(400);4}

val re: Future[Int] = f1 flatMap {
    v1 =>
    f2 flatMap {
        v2 =>
        f3 flatMap {
            v3 =>
            f4 map {
                v4 => v1 + v2 + v3 + v4
            }
        }
    }
}
Copy the code

Instead of serializing multiple futures, you can create a function that then passes and executes multiple futures at once through the argument list.

def map3(f1 : Future[Int],f2 : Future[Int],f3 : Future[Int) :Future[Int] ={
  f1.flatMap(v1 => f2.flatMap( v2 => f3.map(v3 => v1 + v2 + v3)))
}

val f = map3(
  Future {Thread.sleep(300);2},
  Future {Thread.sleep(300);3},
  Future {Thread.sleep(400);4})Await.result(f,4. second)
Copy the code

Here’s a further abstraction: use S => Future[S] to express a Future plan to be executed. This test code demonstrates three different combinations. They are expressed similarly, but map2Futurejob0 and (map2FutureJob1, map2FutureJob2) have different semantics, and map2FutureJob1 and map2FutureJob2 are performed differently.

type FutureJob[S] = S= >Future[S]

val futureJob1 : FutureJob[Int] = (i : Int) = >Future {Thread.sleep(200); i +1}
val futureJob2 : FutureJob[Int] = (i : Int) = >Future {Thread.sleep(150); i +2}

// The serial task executes futureAction1 and futureAction2 with its result.
// result = 6
val map2FutureJob0 = (futureAction1 : FutureJob[Int], futureAction2 : FutureJob[Int]) => (zeroValue : Int) => {
    futureAction1(zeroValue).flatMap(futureAction2)
}

// Parallel tasks, but still execute sequentially.
// result = 9
val map2FutureJob1 = (futureAction1 : FutureJob[Int], futureAction2 : FutureJob[Int]) => (zeroValue : Int) => {
    futureAction1(zeroValue).flatMap(i => futureAction2(zeroValue).map { j => i + j })
}

// Execute in parallel
// result = 9
val map2FutureJob2 = (futureAction1 : FutureJob[Int], futureAction2 : FutureJob[Int]) => (zeroValue : Int) = > {val f1: Future[Int] = futureAction1(zeroValue)
    val f2: Future[Int] = futureAction2(zeroValue)
    f1.flatMap { i => f2.map(j => i + j)}
}

// ok
Await.result(map2FutureJob2(futureJob1,futureJob2)(3), 2. second)
// bad
Await.result(map2FutureJob1(futureJob1,futureJob2)(3), 2. second)
// bad
Await.result(map2FutureJob0(futureJob1,futureJob2)(3), 2. second)
Copy the code

The performance difference between map2FutureJob1 and map2FutureJob2 is that futureActionX is A function that returns the Future[A], not the Future[A] itself. Therefore, futureActionX must be explicitly called externally to create futures [A], otherwise they will be serialized within flatMap.

Future.fold

Another style of parallel implementation of map-reduce semantics is the future.fold merge method.

val f1 = Future{Thread.sleep(100);1}
val f2 = Future{Thread.sleep(200);2}
val f3 = Future{Thread.sleep(300);3}
val f4 = Future{Thread.sleep(400);4}

val jobs = List(f1,f2,f3,f4)
val parallelMR = Future.fold(jobs)(0) _} {_ +val result = Await.result(parallelMR,4. second)
println(result)
Copy the code

Future.fold merges tasks in pairs, allowing futures to run completely in parallel if there is no data correlation and there are enough threads. The cost estimate can be described by a tree graph:

From the point of view of computation time, no matter which order the sub-future is merged, the overall computation time only depends on the slowest one, so the above test code only needs.4s to complete the task.

Future.fold is left folded from left to right. If the map-reduce merge operation (R,T)=> R satisfies both the commutative and associative laws, then the final calculation is independent of the Futures order, otherwise (e.g., cartesian product).

Future.sequence & traverse

Let’s take a more complicated case. The tasks we usually deal with are not simple Future[A] types, but Future[M[A]], or M[Future[A]] types. M[A] is A sequence type that can be traversed. For easy understanding, List[A] is used as an example below.

Future.traverse first receives A List[A] and then promotes the normal List to Future[List[A]] via f: (A => Future[A]) mapping.

val list = List(1.2.3.4.5)
def lift(a : Int) = Future{Thread.sleep(100); a}val futureList: Future[List[Int]] = Future.traverse(list)(lift)
Copy the code

Future.sequence takes a List containing multiple futures. List[Future[Int]] then flips it to a Future containing a List.

val list = List(1.2.3.4.5)
def lift(a : Int) = Future{Thread.sleep(100); a}val listFuture = list map lift
val futureList = Future.sequence(listFuture)
Copy the code

Traverse and Sequence are both executed in parallel. They are related like this:


F u t u r e . t r a v e r s e ( l i s t ) ( f ) = F u t u r e . s e q u e n c e ( l i s t    m a p    f ) Future.traverse(list)(f) = Future.sequence(list\space\space map \space\space f)

The following code verifies the equivalence between the two.

def lists: List[Int] = List[Int] (1.2.3.4.5)
def lift[A](a: A) :Future[A] = Future {a}  // Lift: A => Future[A]

Future.traverse(lists)(lift).andThen {
    case Success(value) => value mustBe lists
}

Future.sequence(lists map lift).andThen {
    case Success(value) => value mustBe lists
}

// Do not compare directly in this way:
// Future.traverse(lists)(lift) mustBe Future.sequence(lists map lift)
Copy the code

Fluency with future. sequence and Future.traverse will help you make various higher-order combinations with futures.

Fault-tolerant mechanism

Future has fault tolerance and can proactively recover in the event of a non-fatal exception. Use the Recover method to mount partial functions of type PartialFunction[Throwable,T] to handle exceptions, such as logging errors or sending them to the Kafka queue. After that, RECOVER requires that the default value T be returned to keep the calculation task running.

type Recovery[T] = PartialFunction[Throwable.T]

def withNone[T] :Recovery[Option[T]] = {case NonFatal(_) = >None}

def withEmptySeq[T] :Recovery[List[T]] = {case NonFatal(_) = >List()}
// Guide => Throwable => Guide
def withPrevious(previous: Guide) :Recovery[Guide] = {case NonFatal(_) => previous}
Copy the code

Non-fatal anomalies described by NonFatal(E) do not include: VirtualMachineError (typically OutOfMemoryError, StackOverFlowError), ThreadDeath, LinkedError, InterruptedException, ControlThrowable.

DEMO: User travel entertainment APP

The prototype for this example comes from Chapter 5 of Akka in Action. However, the discussion here should focus on Future usage rather than the business itself. Based on the original example, I have made a lot of simplification, eliminating some useless fields. We basically want to implement an APP that provides users with a Guide for travel and entertainment. The core logic of the system is simple:

  1. Users carry their ownusrNameField sends a request to the system.
  2. After receiving the request, the system asynchronously invokes the weather service, travel service, entertainment service, and assembly structure toGuideSample class and return.

The structure of the Sample Guide class is as follows:

case class Guide(usrName: String,
                 weather: Option[Weather] = None, 
                 travelAdvice: Option[TravelAdvice] = None,
                 filmDetails : List[FilmDetails] = List[FilmDetails] ()
                )
Copy the code

Each service module is independent, which is the basis for the system to compose asynchronous tasks for the Future.

Asynchronous task lists based on the Builder pattern

Declare a MockWebService alls trait that emulates a Web server. The user sends a request to the server via the getServices method. Upon receiving the request, the server creates an empty Guide instance and then requests three service interface modules to fill the remaining Weather, travelAdvice, and filmDetails fields.

trait MockWebServiceCalls
extends WeatherServiceX
with WeatherServiceY
with TrafficService
with TheaterService {
    type Recovery[T] = PartialFunction[Throwable.T]
    def withNone[T] :Recovery[Option[T]] = {case NonFatal(_) = >None}  / / fault-tolerant
    def withEmptySeq[T] :Recovery[List[T]] = {case NonFatal(_) = >List()} / / fault-tolerant
    def withPrevious(previous: Guide) :Recovery[Guide] = {case NonFatal(_) => previous} / / fault-tolerant
    def getServices(usrName: String) :Future[Guide] =???// Process user requests
    def requestWeather(userGuide: Guide) :Future[Guide] =???// Request weather information
    def requestTrafficSuggestion(userGuide: Guide) :Future[Guide] =???// Request travel information
    def requestAmusement() :Future[List[FilmDetails]] =???// Request entertainment information (movie service)
}

object MockWebServiceCalls extends MockWebServiceCalls    
Copy the code

The server interface has three preset fault tolerant methods:

  1. withNone: Returns None when a non-fatal exception is detected.
  2. withEmptySeq: Returns an empty list when a non-fatal exception is detectedList(). List types are genericTDecision.
  3. withPrevious: Returns to the previous state when a non-fatal exception is detectedGuideThe data.

The design style of the system follows the Builder pattern. In this example, each field is populated by a dedicated third party. In addition to the performance advantage, this provides an additional benefit for developers: error isolation. For example, the failure of the weather service does not affect the normal operation of other modules. Even if all third party services fail, the withPrevious method ensures that at least one instance of Guide is returned with empty content, rather than a 500 server error.

The default data is returned at the third party service provider interface, with separate responsibilities for each service interface.

trait WeatherServiceX {
  this :  MockWebServiceCalls= >def callWeatherXService() :Future[Option[Weather]] = Future {
    Thread.sleep(400)
    Some(Weather(25.3))}}trait WeatherServiceY {
  this : MockWebServiceCalls= >def callWeatherYService() :Future[Option[Weather]] = Future {
    Thread.sleep(800)
    Some(Weather(25.1))}}trait TrafficService {
  this : MockWebServiceCalls= >def callTrafficService() :Future[Option[TrafficAdvice]] = Future {
    Thread.sleep(300)
    Some(TrafficAdvice("route by car"))}def callPublicTransportService() :Future[Option[PublicTransportAdvice]] = Future {
    Thread.sleep(1000)
    Some(PublicTransportAdvice("No.382 bus station"))}}trait TheaterService {
  this : MockWebServiceCalls= >// get info from DB
  def callRecentFilmNameLists() :Future[List[Film]] = Future {
    Thread.sleep(300)
    List(Film("Seabiscuit"), Film("Million Dollar Baby"), Film("King of Comedy"))}// get info from DB
  def callFilmInformation(filmName: String) :Future[FilmDetails] = Future {
    Thread.sleep(500)
    filmName match {
      case "Seabiscuit"= >FilmDetails("Seabisuit"."Tobias Vincent Maguire")
      case "Million Dollar Baby"= >FilmDetails("Million Dollar Baby"."Clint Eastwood")
      case "King of Comedy"= >FilmDetails("King of Comedy"."Robert De Niro")}}}Copy the code

Thread.sleep(…) is used in each method of the service interface to make it easier to compare performance with serial applications. Significant latency has been added to the service methods of the individual Services modules.

Service Method delay
WeatherServiceX callWeatherXService 0.4 s
WeatherServiceY callWeatherYService 0.8 s
TrafficService callTrafficService 0.3 s
callPublicTransportService 1.0 s
TheaterService callRecentFilmNameLists 0.3 s
callFilmInformation 0.5 s

Obviously, if the system is executed in complete serial condition, the response delay will reach 3.3s. We expect the response delay to be eventually reduced to less than 1s.

class WebClient extends WordSpecLike with MustMatchers {
  "The future task" must {
    "finish the job in 1s" in {
      val eventualGuide: Future[Guide] = MockWebServiceCalls.getServices("author")
      val guide: Guide = Await.result(eventualGuide, 1 second)
      println(guide)
    }
  }
}
Copy the code

Other sample classes include:

// Weather Module
case class Weather(temperature: Double)

// Travel Module
case class TravelAdvice(trafficAdvice: Option[TrafficAdvice] = None,
                        publicTransportAdvice: Option[PublicTransportAdvice] = None)
case class TrafficAdvice(advice: String)
case class PublicTransportAdvice(advice: String)

// Amusement Module
case class Film(name: String)
case class FilmDetails(name: String, actor: String)
Copy the code

Weather Service Module

The requestWeather method uses future.firstCompletedof to select the first successfully responded weather message and then directly discard the other slow result. So, the actual elapsed time of the module depends on the Future task that runs faster.

  def requestWeather(userGuide: Guide) :Future[Guide] = {
    val maybeWeatherX: Future[Option[Weather]] = callWeatherXService().recover(withNone)
    val maybeWeatherY: Future[Option[Weather]] = callWeatherYService().recover(withNone)
    Future.firstCompletedOf(List(maybeWeatherX, maybeWeatherY)) map {
      getWeather => userGuide.copy(weather = getWeather)
    }
  }
Copy the code

This logic is naturally redundant for disaster recovery, because as long as at least one of the WeatherServiceX and WeatherServiceY (or more alternate services) responds correctly, the service of the module is normal. In addition to firstCompletedOf, an alternative implementation is the find method.

Future.find(List(maybeWeatherX,maybeWeatherY))(_.isDefined) map {
      case None => userGuide
      case Some(x) => userGuide.copy(weather = x)
}
Copy the code

This example uses the copy method everywhere to pass Guide data, which is a value copy method automatically generated by the Scala compiler for sample classes with attributes, which is important:

Value copying means that Future tasks do not compete for the same Guide data, each subtask gets the same starting copy, and then only changes the part of the data it is responsible for. In this way, there are no overlapping areas between subtasks that read and write Guide data, meaning there is no data competition between futures.

Under normal circumstances, you need to consider the significant performance cost of continuous value copying of large object data. But in this case, value copying is obviously much cheaper than locking.

Travel advice module

The requestTrafficSuggestion method provides for internally starting two Future tasks at the same time and finally trying to return a binary with two results through the ZIP method. The response latency of the module depends on the Future task that is running slower.

def requestTrafficSuggestion(userGuide: Guide) :Future[Guide] = {
    val maybeTrafficAdvice: Future[Option[TrafficAdvice]] = callTrafficService().recover(withNone)
    val maybePublicTransportAdvice: Future[Option[PublicTransportAdvice]] = callPublicTransportService().recover(withNone)

    maybeTrafficAdvice.zip(maybePublicTransportAdvice) map {
      case (maybeAdvice, maybeAdvice1) =>
        userGuide.copy(travelAdvice = Some(TravelAdvice(maybeAdvice,maybeAdvice1)))
    }
}
Copy the code

The map code can be replaced with the following for expression:

for {(adviceA,adviceB) <- maybeTrafficAdvice.zip(maybePublicTransportAdvice)}
    yield userGuide.copy(travelAdvice = Some(TravelAdvice(adviceA,adviceB)))
Copy the code

Entertainment List module

The requestAmusement method first calls the callRecentFilmNameLists method to get a List of movie names, and then requests a List of FilmDetails based on that List. The business logic here dictates that the two Future tasks must be executed sequentially. Therefore, the response delay of the module is the sum of the time consuming of the two Future tasks.

In addition, no data copy is made in the requestAmusement method, and the results are sent to the getServices method to populate.

def requestAmusement() :Future[List[FilmDetails]] = {
  val maybeRecentFilms: Future[List[Film]] = callRecentFilmNameLists().recover(withEmptySeq)
  maybeRecentFilms flatMap {
      films => {
          val eventualFilmDetails : List[Future[FilmDetails]] = films.map { each => callFilmInformation(each.name)}
          Future.sequence(eventualFilmDetails)
      }
  }
}
Copy the code

Turn List[Future[FileDetails]] into Future[List[FileDetails]] with future. sequence. This flatMap logic does not translate directly into a for expression, but future. traverse gives the equivalent:

for {
      films <- maybeRecentFilms
      list <- Future.traverse(films)(f => callFilmInformation(f.name))
} yield list
Copy the code

Integrating processing results

So far, we have made a simple analysis of response delay, where F1, F2, F3 and F4 are used to mark intermediate tasks:

Ideally, it is perfectly reasonable to reduce the delay to less than 1s. The task of data consolidation is described in several ways below, which ultimately produce the same result but differ in performance.

The first kind of plan, first by copyWeatherAndTravelAdvice merge weather and travelAdvice, then another Future concatenated to copy the remaining filmDetails information.

def getServices(usrName: String) :Future[Guide] = {

  val init: Future[Guide] = Future {Guide(usrName)}
  val copyWeatherAndTrafficAdivce: Future[Guide] = init flatMap { ths =>
    val eventualWeather: Future[Guide] = requestWeather(ths)
    val eventualTrafficSuggestion: Future[Guide] = requestTrafficSuggestion(ths)
    val guides = List(eventualWeather, eventualTrafficSuggestion)
      
    Future.fold(guides)(ths) {
      (acc, elem) =>
        val (weather, travelAdvice) = (elem.weather, elem.travelAdvice)
        acc.copy(
          weather = weather.orElse(acc.weather),
          travelAdvice = travelAdvice.orElse(acc.travelAdvice)
        )
    }
  }

  val response: Future[Guide] =
    for {
        guide <- copyWeatherAndTrafficAdivce
        // requestAmusement() is not declared externally
        filmDetails <- requestAmusement()
    } yield guide.copy(filmDetails = filmDetails)

  response.recover(withPrevious(Guide(usrName))) 
}
Copy the code

Note that the copy method does not have the commutative law, and you place the cumulative result to the left of the argument list (otherwise it returns an empty result) because future. fold folds from left to right. The response delay for this code is 1.9s. The reason is that although F1 and F2 are parallel, F3 and F4 are executed in serial, so the total delay becomes the delay sum of F3 and F4, rather than the maximum value of them.

The second way is to merge all the results at once using the for expression:

 def getServicesBySlowForExpr(usrName: String) :Future[Guide] = {
    val r = for {
      // All futuresare not declared externally
      init    <- Future { Guide(usrName)}
      weatherChunk <- requestWeather(init)
      travelChunk  <- requestTrafficSuggestion(init)
      list <- requestAmusement()
    } yield {
      init.copy(
        weather = weatherChunk.weather,
        travelAdvice = travelChunk.travelAdvice,
        filmDetails = list
      )
    }
    r.recover(withPrevious(Guide(usrName)))
  }
Copy the code

This seemingly neat code has a response delay of 2.3s (the sum of the delays of F1, F2, and F4). The reason for the performance deterioration is that, as described earlier, Future creation is all placed inside the for expression code, causing the program to execute them serially. Here’s a modified version of it:

def getServicesByQuickForExpr(usrName: String) :Future[Guide] = {

    def warpedRequestTrafficSuggestion(guide : Guide) : Future[Guide] = {
        requestAmusement() flatMap(list => Future {guide.copy( filmDetails = list)})
    }

    val meta = Guide(usrName)
    // Wrap the initial value as a Future
    val unit = Future {meta}
    // Create a Future task externally.
    val weatherJob: Future[Guide] = requestWeather(meta)
    val travelJob: Future[Guide] = requestTrafficSuggestion(meta)
    val amusementJob: Future[Guide] = warpedRequestTrafficSuggestion(meta)

    val r = for {
        init    <- unit
        weatherChunk <- weatherJob
        travelChunk  <- travelJob
        amusementChunk <- amusementJob
    } yield {
        init.copy(
            weather = weatherChunk.weather,
            travelAdvice = travelChunk.travelAdvice,
            filmDetails = amusementChunk.filmDetails
        )
    }

    r.recover(withPrevious(Guide(usrName)))
}
Copy the code

The delay of this program is within 1s (Max (f3, F4)), and we use the correct for expression to achieve the most efficient version.

Attachment: Future-based map-reduce

This section is based on some of my own thoughts in conjunction with Functional Programming in Scala. Inspired by another column: Scala: Gracefully transferring state in pure Functions – Nuggets (juejin. Cn)

So far, the processing logic of the entire getServices service method has been imperative. We notice that in the system, all the tasks have a single purpose: to modify (value copy) Guide data. From a more general perspective, Guide as a whole represents a complete Work, and each sub-task is only responsible for completing one Partition.

In other words, all the tasks of the system are map-reduce, and all the tasks can be implemented in a single fold. Therefore, the following code switches to a different style. First build a type nickname like this:

type FutureJob[S] = S= >Future[S]
type GuideFutureJob = FutureJob[Guide]
Copy the code

FutureJob[S] now represents a planned subtask. Of course, in this example, you could declare the type parameter S as Guide (for example, GuideFutureJob). Any method that returns FutureJob[Guide] now represents the behavior of an output Partition.

To focus on MR tasks, we have a convention that the FutureJob[S] behaviors are data independent and they can be 100% parallel. The service method in the original example can simply be extended by Eta or standardized by Curring into FutureJob[Guide] type behavior.

// Scala supports assigning methods to expressions through Eta extensions.
val futureJob1 : FutureJob[Guide] = MockWebServiceCalls.requestWeather
val futureJob2 : FutureJob[Guide] = MockWebServiceCalls.requestTrafficSuggestion
val futureJob3 : FutureJob[Guide] = guide => {
  MockWebServiceCalls.requestAmusement().flatMap { s => Future {guide.copy(filmDetails = s)}}
}
Copy the code

We plan to declare a List of actions of type List. Simply calling the fold method on this list reduces all subtasks to a total FutureJob[Guide]. This requires implementing a function of type (Future[Guide],Future[Guide])=>Future[Guide], which can be named mergeFuture here.

MergeFuture is a merge of the Future level, and an internal merge of the Guide level is required. See the merge method. We don’t know in advance which fields in the Guide will be changed by each FutureJob, so we simply use merge to examine all fields that can be copied through tail recursion (this is really a no-go, since Scala’s pattern matching is impenetrable). When tail-recursive functions are internal methods or private methods, Scala provides the @Trailrec annotation to enable the compiler to optimize tail-recursive functions (translated into equivalent iterative representations).

The merge method has the commutative law so that either side can always complete all available data from the other side. This feature ensures that the behavior List[FutureJob[Guide]] calls the mergeFuture fold from any direction and ends up with the same result.

def mergeFuture(acc : Future[Guide],guide : Future[Guide) :Future[Guide] = {
  // Tail recursively checks and copies available fragments.
  @tailrec
  def merge(acc: Guide, guide :Guide) :Guide ={
    (acc ,guide) match {
      case _ if acc.weather.isEmpty && guide.weather.isDefined =>
        val v = acc.copy(weather = guide.weather);
        merge(v,guide)
      case _ if acc.travelAdvice.isEmpty && guide.travelAdvice.isDefined =>
        val v = acc.copy(travelAdvice = guide.travelAdvice);
        merge(v,guide)
      case _ if acc.filmDetails.isEmpty && guide.filmDetails.nonEmpty =>
        val v = acc.copy(filmDetails = guide.filmDetails);
        merge(v,guide)
      case _ => acc
    }
  }
  acc.flatMap( s =>
    Future.fold(List(guide))(s){
      (g0,g1) => merge(g0,g1)
    }
  )
}
Copy the code

MergeFuture can guarantee that the two futures passed in are parallel, for the reason described in the composition mechanism above. Now, all task descriptions of the system are simplified into convolution behavior queues. Simply pass in an initial value of type Guide, and the action will be driven and the result returned.

"Functional Style" in {
    type FutureJob[S] = S= >Future[S]
    def unit : FutureJob[Guide] = s => Future{s}
    val futureJob1 : FutureJob[Guide] = MockWebServiceCalls.requestWeather
    val futureJob2 : FutureJob[Guide] = MockWebServiceCalls.requestTrafficSuggestion
    val futureJob3 : FutureJob[Guide] = guide => {
        MockWebServiceCalls.requestAmusement().flatMap { s => Future {
            guide.copy(filmDetails = s)
        }
                                                       }

        // It doesn't matter what order the tasks are grouped, you can fold in either direction.
        val convolution: FutureJob[Guide] = List[FutureJob[Guide]](futureJob1, futureJob2, futureJob3).fold(unit) {
            (f1, f2) => { s => {mergeFuture(f1(s),f2(s))}}
        }
        // Another way of saying left fold.
        // (unit /: List[FutureJob[Guide]](futureJob1,futureJob2,futureJob3)){(f1,f2) => {s => {mergeFuture(f1(s),f2(s))}}}

        val guide: Guide = Await.result(convolution(Guide(usrName = "author")), 1 second)
        println(guide)
    }
Copy the code

Actor with Future

We touched on the use of Actor + Future in our first Akka instance. Here’s a snippet from the previous code:

import akka.pattern.{ask, pipe}
import akka.util.Timeout
class BoxOffice(implicit timeout: Timeout) extends Actor {
/ /...
    case GetEvents= >// Create a broadcast mechanism. This is equivalent to constantly polling yourself for the Event.
      def getEvents: 可迭代[Future[Option[Event]]] = context.children.map {
        child => self.ask(GetEvent(child.path.name)).mapTo[Option[Event]]}// Sequence is A common "flip" operation in FP, used to flip A[B] to B[A].
      // For example, in Option, the sequence method can flip Option[List[_]] to List[Option[_]}.
      def convertToEvents(f: Future[可迭代[Option[Event]]]) :Future[Events] = {
        // import akka.actor.TypedActor.dispatcher
        // Future { Iterable[Option[Event]] => Iterable[Event] => Events(Vector[Event]) }
        f.map(optionSeqs => optionSeqs.flatten).map(l => Events(l.toVector))
      }

      pipe(convertToEvents(Future.sequence(getEvents))) to sender()
    
/ /...
Copy the code

The akka.pattern.ask decorator pattern needs to be imported first, allowing the Actor to call the ask method to request results from the other Actor.

For any Actor that uses a Future, you need to proactively set a Timeout of type because the system does not allow waiting indefinitely for results. Just like the test code in this chapter, if the wait time times out, the Future will immediately be returned as an exception.

Because Actor can return Any message, the ask method returns a Future[Any] type. You can further convert the response from the other Actor to the desired message type using the.mapto method. However, if Ask does not get the message type expected, mapTo will end up with a failure.

Pipe allows the result to be forwarded directly to another party when the Future is available, without having to override the forwarding logic in the callback function.

When Future is applied to actors, in particular, you should avoid sharing the mutable state of actors, because actors are stateful. It is wise to send a copy of a numeric value rather than passing a reference directly when sharing data with another party.

summary

This chapter introduces the Future and shows how to build an asynchronous workflow based on the Future. Our goal is to maximize resources, minimize unnecessary delays, and avoid unexpected serialization due to incorrect presentation.

A Future is a placeholder for the final available result of a function and a powerful tool for composing functions into asynchronous flows. Because a Future is about the results of a function, combining those results requires a functional approach. The composition method provided by Future is also very similar to Scala collections. Functions are executed in parallel whenever possible and sequentially where needed, ultimately providing meaningful results. The value returned by the Future may or may not be successful. However, there are many mechanisms to ensure that failed values are replaced to keep the system running.