Author: Xianyu Technology – Code treasure

Outlook profile

In our daily development process, we often encounter non-blood related flow account logic (data completion, notification logic, etc.), at this time, we usually adopt asynchronous way to deal with it to speed up the response. At the same time, with the increasing dependence of upstream and downstream services, a series of problems may also occur, including but not limited to problems that are difficult to troubleshoot and performance is difficult to guarantee. In idle fish is no exception:

  • Completion of theSerial operation(here refers to serial to achieve multiple network IO) there is a performance bottleneck, seriously affecting the interface RT.
  • The degree of logic unit is not good, the “free flight” of logic leads to poor code readability, and the unit logic related indicators are difficult to measure.

Based on the above description, we want to abstract an asynchronous component to solve the corresponding problem. Set a few small goals for yourself.

  1. Monitorable: All states and behaviors (success, failure, RT and other key metrics) within the logical life cycle of each unit are monitored.
  2. Disaster recovery: Fallback is used when a large number of exceptions (usually timeouts) occur in a logical unit.
  3. Zero cost access: easy access and use, do out-of-box.

Take a chestnut

Non-timeout scenario

Assume that logical unit L has three tasks A, B, and C, whose execution duration is T1 =1, T2 =2, t3=3 respectively, and their timeout duration is 4. As shown in the figure:

Now the desired outcome is:

  1. The execution time of the logical unit L is 3
  2. The duration is 1 when task A is successfully executed, 2 when task B is successfully executed, and 3 when task C is successfully executed

Timeout scenario

Assume that LOGICAL unit L has three tasks A, B, and C, whose execution duration is T1 =3, T2 =5, and T3 =6 respectively, and whose timeout duration is 4. As shown in the figure:

Now the desired outcome is:

  1. The execution time of the LOGICAL unit L is 4
  2. The time required for task A to be executed successfully is 3. Task B fails to be executed. Task C fails to be executed
  3. Threads B and C are terminated after a failure and do not continue to occupy thread pool resources

plan

Note that all of the following scenarios revolve around timeout scenarios

Scheme of akka

  1. The current service gets the corresponding LogicActor
  2. LogicActor Gets all current UnitActors (a, b, c)
  3. usetellMode to send messages to A, B, and C respectively. Note that this process is asynchronous
  4. A returns normally, but B and C both time out
  5. LogicActor will countDown the current task regardless of whether the timeout occurs
  6. Go back to combine data when all tasks are complete
  7. Finally, the result data is returned

LogicActor- Logical unit actors that distribute tasks and merge data

UnitActor- Actors for specific tasks

advantages

  1. Message driven.
  2. No additional thread pool management & exception tolerance required.
  3. Gracefully stop the executing worker(PoinsonPill).
  4. People familiar with Akka can easily get started.

disadvantages

  1. Achieving this requires a layer of encapsulation, and it’s not easy.
  2. Greatly increase system complexity.
  3. This is a disaster for those unfamiliar with Scala/Akka.

The cost of learning Scala/Akka directly leads to the failure to achieve the goal of “zero-cost access”, so we give up.

Scheme of rxjava

Logic implemented using RXJava

  1. Define a CountDownLatch for timeout control
  2. Define three tasks
  3. Through RXJava to achieve asynchronous processing
  4. Give a thread pool of fixed size 10 to process
  5. To perform the sum operation in reduce
  6. Latch makes a unified latch

The timeout here plays a role of the overall freeze point, but it is not known which business causes the timeout

advantages

  1. Responsive programming
  2. Code reduction
  3. Mask thread pool operations
  4. Encapsulates the#timeoutand#onErrorReturnMethod, the existing timeout processing module does not need secondary encapsulation

disadvantages

  1. A layer of encapsulation is needed to achieve the goal
  2. There is a timeout processing module, but the implantation of business units will destroy the original rXJava packaging and transformation is difficult

Although “onErrorReturn” is met, “monitorable” cannot be achieved, because we cannot know which service has timed out after the timeout, so we give up.

Complete solution – Based packaging under JUC package

The difficulties in

How to implement the monitoring of logical units
  1. Within the group for the wholetraceIdEncapsulate (ThreadLocal), using thread poolsLost contextThe corresponding log cannot be traced.
  2. There arebizCodeConcept to facilitate monitoring of a logical unit and every operation within a logical unit.
How can I clean up a timeout thread so that it does not occupy thread pool resources

When the Callable and Runnable execution times out, you need to stop them so that they do not continue to occupy thread pool resources.

How is concurrency (timeout) controlled

There are many tool classes for controlling timeouts

  • CountDownLatch: Java. Util. Concurrent. CountDownLatch# await (long, Java. Util. Concurrent. TimeUnit) timeout will not throw an exception only through Boolean to judge whether a timeout, This means that it cannot be handled by short-circuiting abnormal flow mode.
  • CyclicBarrier: Java. Util. Concurrent. CyclicBarrier# await (long, Java. Util. Concurrent. TimeUnit) for abnormal encapsulation is not friendly, example
    • Now there are four threads A, B, C, and D. each time out is 2s. Thread D executes 3s, and the following happens
      1. The first task to complete and enter barrier. Await will throw a timeout exception, while the other four tasks will all throw a broken Barrier
      2. A timeout task completes subsequent actions and continues to occupy resources
  • Semaphore: Does not meet the conditions of use, but more details.
  • Future: Java. Util. Concurrent. Future# get (long, Java. Util. Concurrent. TimeUnit), futureList can read serial, cause T > = Max (t1, t2, t3)

A classic example of a mistake

  1. Submit the future to the corresponding list
  2. Get the corresponding result by traversing
  3. Once you get the result, do the sum
  4. Final output and execution time

Although the above three tasks are executed concurrently, because future.get(long, TimeUnit) is blocked, timeout will expire here. As shown in the figure above, the final time may be 3+2+1=6s.

The overall plan

The core class diagram is as follows:

  1. ConcurrentCallable is the class for external use
  2. BizBaseCallable inherits Callable and encapsulates the internal context and corresponding business unit
  3. BizCountDownLatch adds the business unit pool attribute to solve the problem of business timeout monitoring
  4. BizBaseCallable encapsulates the CTX presentation context, ensuring that context is not lost in multi-threaded situations (for intra-group traceId compatibility)
CountDownLatch retrofit – surveillable

Inherit CountDownLatch to override the #await() and countDown() methods

  1. Supplementary Business Unit Concept (bizCode)
  2. The corresponding bizSet is actually an unfinished business pool, which completes a business and removes the corresponding business unit
  3. Rewrite the countDown method to represent it by removing a business unitHas been completed
  4. “Await” timeout: throw exception, simulate short circuit environment, upper layer unified processing
  5. A timeout exception throws unfinished tasks

Upper-layer monitoring logic encapsulation

ConcurrentCallable Timeout processing n/A Disaster recovery

  1. Monitoring and control logicContains the handling of the timeout logic
  2. Callable gets the return value from the get timeout period0: Everything that needs to be done is done. Everything that needs to be done is over time.
  3. Abort the ongoing callable with future.cancel
  4. Fallback is returned by default, and fallback is implemented by defaultrenturn null;.
  5. A default value is needed to make it easier for the outer call to passlist.forEach(l -> deal(l))To deal with the list logic
Look at the example above – zero cost access

  1. To simulate context placement, TestCtx encapsulates a ThreadLocal
  2. 4000L indicates that the timeout period of the task is 4000ms. The logical unit is named “L”.
  3. Add tasks A, B, c to the component
  4. Finally, it shows the processing after obtaining the data

  1. A will be at the endcountDownLatchTo remove the corresponding cell
  2. B and C are forcibly terminated when the timeout expires

This completes an asynchronous component that can be monitored, disaster – tolerant, and accessed at zero cost.

summary

In conclusion, we chose “JUC(Upgraded version)” as our asynchronous component.

This paper mainly introduces the components of asynchronous scheduling used inside Idle Fish. The business components derived from business scenarios may not have strong universality, but it is hoped that the background, objectives and scheme selection in this paper can be used for reference.