1. An overview of the

There are two articles in this series. In the last Principles, we looked at the asynchronous non-blocking model, which can reduce the time spent on thread IO states and improve resource utilization and system throughput. The asynchronous API can take the form of a listener or a Promise; The Promise API provides more flexibility, supporting synchronous returns and asynchronous callbacks, and allowing any number of callbacks to be registered.

In this article, “Applications,” we’ll explore the asynchronous pattern and the use of Promise further:

Chapter 2: Promise and thread pools. ExecutorService+Future is an alternative for asynchronously executing time-consuming requests; However, Promise supports purely asynchronous retrieving of response data, which eliminates more blocking than Future.

Chapter 3: Exception Handling. Java programs don’t always execute requests successfully, and sometimes they encounter force majeure such as network problems. For unavoidable exceptions, asynchronous APIs must provide exception handling to improve program fault tolerance.

Chapter 4: Request scheduling. Java programs sometimes need to submit multiple requests, and there may be some correlation between these requests, including sequential execution, parallel execution, and batch execution. The asynchronous API needs to support these constraints.

This article does not limit the implementation of Promise; you can select A Promise utility class in A production environment (Netty DefaultPromise[A], JDK CompletableFuture[B], etc.). Moreover, since Promise’s principles are not complicated, readers can implement the functionality they need.

2.Promise and thread pool

Java programs sometimes need to perform time-consuming IO operations, such as database access; During this period, IO operations last significantly longer than purely in-memory computations. To reduce IO blocking and improve resource utilization, we should use an asynchronous model where requests are submitted to other threads for execution so that multiple requests can be submitted in succession without waiting for previous requests to return.

This chapter compares several IO models (see Section 2.1) and examines the blocking of the caller thread. Promise supports purely asynchronous request submission and response data processing, minimizing unnecessary blocking. In a real project, if the underlying API does not support pure asynchrony, we can refactor it to be compatible with Promise (see Section 2.2).

2.1 Comparison: Synchronization, Future, Promise

This section compares several IO models, including synchronous I/O, asynchronous I/O based on thread pool (ExecutorService), and asynchronous I/O based on Promise, looking at blocking of caller threads. Suppose we want to perform a database access request. Due to the need to cross the network, a single request requires time-consuming IO operations before the response data is finally received; But there are no constraints between requests, allowing new requests to be submitted at any time without receiving previous response data.

First let’s look at the sample code for several models:

Synchronize IO. The db.writeSync() method is synchronously blocked. The function blocks until the response data is received. Therefore, the caller can submit only one request at a time, and must wait for that request to return before submitting the next request.

/* Submit the request and block until the response data is received */ String result = db.writeSync("data"); process(result);

2. Asynchronous IO based on thread pool. The db.writeSync() method remains unchanged; However, it is committed to a thread pool for execution so that the caller thread does not block, so that multiple requests DATA1-3 can be submitted consecutively.

After submitting the request, the thread pool returns the Future object, and the caller calls Future.get() to get the response data. The future.get () method is blocked, so the caller cannot submit further requests until the response data is available.

// / Executor: ExecutorService Future<String> resultFuture1 = executor.submit(() -> db.writeSync("data1")); Future<String> resultFuture2 = executor.submit(() -> db.writeSync("data2")); Future<String> resultFuture3 = executor.submit(() -> db.writeSync("data3")); Result1 = resultFuture1.get(); ResultFuture1.get (); String result2 = resultFuture2.get(); String result3 = resultFuture3.get(); process(result1); process(result2); process(result3);

Asynchronous IO based on Promise. The db.writeAsync() method is purely asynchronous and returns a Promise object after submitting the request; The caller calls Promise.await() to register the callback, which is fired when the response data is received.

In Principles, we saw that the Promise API could be implemented based on a thread pool or a responsive model; Either way, the callback function can be executed in the thread receiving the response without the caller thread blocking waiting for the response data.

/* Promise<String bb0 resultPromise1 = db.writeAsync("data1"); Promise<String> resultPromise2 = db.writeAsync("data2"); Promise<String> resultPromise3 = db.writeAsync("data3"); Async */ resultpromise1.await (result1-> process(result1)); resultPromise2.await(result2 -> process(result2)); resultPromise3.await(result3 -> process(result3));

Next, let’s look at how the state of the caller thread changes over time in the above models, as shown in Figure 2-1.

A. Synchronize IO. The caller can submit only one request at a time and cannot submit the next request until a response is received.

B. Asynchronous IO based on thread pool. The same set of requests (requests 1-3, and requests 4-6) can be submitted consecutively without waiting for the previous request to return. However, once the caller uses Future.get() to get the response data (result1-3), it blocks and cannot submit the next set of requests (requests 4-6) until the response data is actually received.

C. Asynchronous IO based on Promise At any time, the caller can submit the request and register a callback function to the response data with the Promise; Later, the receiving thread notifies the Promise of the response data to trigger the callback function. In the above process, the caller thread does not need to wait for the response data and is never blocked.

Figure 2-1A Thread Timeline: Synchronize IO

Figure 2-1B Thread Timeline: Asynchronous I/O based on thread pool

Figure 2-1C Thread Timeline: Asynchronous IO based on Promise

2.2 Promise binds the thread pool

Promise has the advantage of being purely asynchronous compared to ExecutorService+Future; However, there are some scenarios in which a Promise may need to be used in conjunction with a thread pool. For example: 1. The underlying API only supports synchronous blocking model, not pure asynchronous; 2. The only way to do this is to call the API from the thread pool. 2. Need to refactor a piece of legacy code to change its thread model from thread pool model to responsive model; You can change the external interface to the Promise API, while the underlying implementation temporarily uses the thread pool.

The following code snippet shows the use of a Promise combined with a thread pool:

  1. Create a Promise object as the return value. Note that PromiseORException is used here in case an exception is encountered; It can notify the response data as well as the Exception thrown in the event of a failure. See Section 3.1 for details.
  2. Execute the request (2a) in the thread pool and notify the Promise (2b) when the response data is received
  3. Handle thread pool full exceptions. The bottom layer of the thread pool is associated with a BlockingQueue to store the tasks to be executed. It is generally set to a bounded queue to prevent unlimited memory usage, and a task is discarded when the queue is full. In order to inform the exception to the caller, thread pool refusal strategies must be set to AbortPolicy, discarding the submitted by the queue when it is full of tasks, and throw RejectedExecutionException; Once the exception is caught, the Promise is notified that the request failed.
public PromiseOrException<String, Exception> writeAsync() { // 1. Create Promise Object PromiseOreXception <String, Exception> ResultPromise = New PromiseOreXception <>(); try { executor.execute(() -> { String result = db.writeSync("data"); // 2a. Execute the request. Only supports synchronous blocking resultPromise. SignalAllWithResult (result); // 2b. Notify Promise}); }catch (RejectedExecutionException e){ // 3. Exception: thread pool full resultPromise. SignalAllWithException (e); } return resultPromise; }

3. Exception Handling: PromiseORException

Java programs sometimes encounter inevitable exceptions, such as a broken network connection; Therefore, programmers need to design appropriate exception handling mechanism to improve the fault tolerance of the program. This chapter introduces the asynchronous API exception handling, first introduces the Java language exception handling specification; Then we introduce Promise variant PromiseoreException, which enables the Promise API to support canonical exception handling.

3.1 Exception handling specification

In my opinion, exception handling in Java code should conform to the following specifications:

  1. Explicitly distinguish between normal and abnormal exits.
  2. Supports compile-time checking to force the caller to handle unavoidable exceptions.

 

Distinguish between normal and abnormal exits

Exceptions are an important feature of the Java language and a basic control flow. In the Java language, a function is allowed to have a single return value and to throw several different types of exceptions. The return value of the function is the normal exit. The return of the function indicates that the function can work normally and calculate the correct result. Instead, if the function encounters an exception that prevents it from working, such as a broken network connection, an illegal request, and so on, it throws an exception.

Although if-else and exceptions are both control flows, the programmer must distinguish between the scenarios in which they are used. The branches of the if-else are generally equivalent and are used to handle the normal case; However, the return value of the function is not equal to the exception. Throws an exception means that the function encounters a fault that cannot be handled and cannot calculate the result normally, which is essentially different from the return value generated by the normal operation of the function. Confusing a normal exit (return value) with an exception exit (throw an exception) or not throwing an exception when it can no longer work is a serious design flaw in the API design.

Taking database access as an example, the following code compares two forms of API exception handling. In the process of database access, if the network connection is smooth and the server can handle the request correctly, then db.write() should return the response data of the server, such as the autoincrement ID generated by the server for the written data, the number of data items actually affected by conditional update, etc. If the network connection is broken or the client and server versions do not match and the request cannot be resolved, then db.write() should throw an exception to explain why. The nature of the two cases above is very different from the “does it work” point of view, and it is obvious that exceptions should be chosen as the control flow rather than if-else.

/ / try {String result = db.write("data"); / / try {String result = db.write("data"); process(result); } catch (Exception e) {log.error(" Write fails", e); // ResultorError = db.write("data"); if (resultOrError.equals("OK")) { process(resultOrError); } else {log.error(" Write fails, error: "+ resultorError); // Exception exit}

Force handling of unavoidable exceptions

In the Exception handling system of Java language, exceptions are mainly divided into the following categories: Exception, RuntimeException and Error; All three are subclasses of Throwable and can be thrown by the function. Note that since RuntimeException is a subclass of Exception, in this article to avoid confusion, “Exception” specifically refers to those exceptions that are “Exception but not RuntimeException.”

In my opinion, several types of exceptions are used in the following scenarios:

1. Exception: Exception caused by force majeure outside the program, such as network disconnection. Even if your Java code is flawless, it’s absolutely impossible to avoid such exceptions (try unplugging!). . Since it cannot be avoided, such exceptions should be handled forcefully to improve the system’s fault tolerance. RuntimeException: Exception caused by programming error, such as array index out of bounds ArrayOutOfBoundException, parameter out of range IllegalArgumentException If the programmer is well aware of the API’s input constraints and checks the function parameters properly before calling the API, then RuntimeException can be absolutely avoided (unless the called API throws Exception where it should, Actually threw a RuntimeException). Since it can be avoided, there is no need to force handling of such exceptions.

Of course, no one is perfect. If the programmer does violate some constraint and the function throws a RuntimeException that is not handled, then as a punishment, the thread or process exits, reminding the programmer to correct the bad code. If the thread or process must be permanent, override RuntimeException, as shown in the following code. Here, code defects are treated as unavoidable exceptions that can be logged and alerted to be fixed later.

new Thread(()->{ while (true){ try{ doSomething(); }catch (RuntimeException e){// RuntimeException is executed in case of log.error("error occurs", e); }}});

3. Error: Exception defined within the JVM, such as OutOfMemoryError. The business logic typically does not throw an Error, but instead throws some kind of Exception or RuntimeException.

Only Exception, called Checked Exception [C], is mandatory. An example of a Checked Exception is shown below. Database access db.write () throws an Exception to indicate a network disconnect, message parsing failure, and other irresistible conditions. The Exception type is Exception instead of RuntimeException to force the caller to add a catch clause to handle the above situation; If the caller omits a catch clause, the compiler will report an error, thus telling the caller that “there must be an exception here, which must be handled” to improve the program’s fault tolerance.

/** * throw an exception if: * 1. Network connection is broken * 2. Message cannot be parsed * 3. Business logic is related, such as insufficient balance found when the server deducts payment * 4....... // Any situation that cannot be avoided should throw an Exception! */ public String write(Object data) throws Exception { return ""; } / / try {String result = db.write("data"); / / try {String result = db.write("data"); process(result); } fail to pass log.error(" Write fails, db: fails... ") {// fails to pass log.error(" Write fails, db: fails... , data: ..." , e); }

3.2 Exception handling for the Promise API

The previous section discussed the specification for exception handling:

  • The normal outlet and abnormal outlet are explicitly distinguished;
  • An irresistible exception to be handled forcefully at compile time. The following code shows how the Promise API would design the exception handling mechanism to conform to the above specification.
  1. Use PromiseORException to notify response data and exceptions. Generic template X is a data object ResultorException

    , which contains two fields result and E: (e==null); e! = NULL indicates an exception. Do not use the field result.
    ,>
  2. In Overload 1, the caller gets a ResultorException object from the callback function. Call ResultorException. Get () to get the response data Result, or the get() method throws an exception E. The code structure of this approach is consistent with traditional exception handling in that multiple catch clauses can be used to handle different types of exceptions separately.
  3. In “overload 2,” the caller gets result and e directly from the callback function. Same meaning as above. This method leaves out ResultorException. Get (); However, if you need to handle different types of exceptions, you need to use E instanceof MyException to determine the type of exception.
// extends Promise<ResultOrException<String, Exception>> PromiseOrException<String, Exception> resultPromsie = db.writeAsync("data"); /* overload 1*/ ResultPromSe.await (ResultorException -> {try {String result = ResultorException. Get (); process(result); } catch (Exception e) {log.error(" Write fails", e); // abnormal exit}}); ResultProSisie. await((result, e) -bb0 {if (e == null) {process(result); } else {log.error("write fails", e); // abnormal exit}});

Subject to the Exception Handling specification proposed in the previous section, PromiseoreException has the following advantages:

  1. Distinguish between normal and abnormal exits. The response data and exception are passed using two variables, result and e, respectively, and e==null can be used to determine whether the response is normal or not. Note that result==null cannot be used as a judgment condition, because null may be a valid value for the response data.
  2. Force exception handling. Whichever type of callback is used, there is no code structure that gets result without e, so the syntax does not miss the exception handling of e.
  3. Allows you to define exception types. Excetion of the generic template E is not required and may be made of any other type. Note that due to Java syntax, only one exception type is allowed in the generic template, not multiple exceptions, as function exceptions are allowed. To deal with this limitation, we can only define an exception superclass for the API, and the caller can cast down with a catch clause or instanceof. Of course, this “defining an exception parent” approach is also acceptable and widely used in existing tools, because you can distinguish the exception thrown by the tool from the exception types built into the Java language.

Finally, I would like to make a suggestion on the exception-handling structure: All exceptions will be notified through PromiseoreException, instead of the API itself throwing exceptions. Taking the database access API writeAsync() as an example, the following code compares two ways to throw exceptions. If the API implementation throws a Submit () throws an Exception, then the Exception should be wrapped inside a PromiseORException object. Instead of throwing (writeAsync() throws Exception) directly from the API function.

/ * right: <String, Exception BB0 WriteAmarc (Object Data) {try {submit(data); // throws exception } catch (Exception e) { return PromiseOrException.immediatelyException(e); } PromiseOrException<String, Exception> resultPromise = new PromiseOrException<>(); return resultPromise; } /* Error: Throws Exception and Public promiseException */ Public promiseException <String, Exception> writeAsync(Object data) throws Exception { submit(data); // throws exception PromiseOrException<String, Exception> resultPromise = new PromiseOrException<>(); return resultPromise; }

If the API is incorrectly designed with two exception exits, the caller will have to rewrite the exception handling logic, as shown in the code below.

try { PromiseOrException<String, Exception> resultPromise = db.writeAsync("data"); resultPromise.await((result, e) -> { if (e == null) { process(result); } else {log.error("write fails", e); // abnormal exit 2}}); } catch (Exception e) { log.error("write fails", e); // Exception exit 1}

4. Request scheduling

Java programs sometimes need to submit multiple asynchronous requests, and there is a certain correlation between these requests. In an asynchronous non-blocking scenario, these associations can be implemented with the help of Promises.

1. Sequential requests, as shown in Figure 4-1. The latter request depends on the response data of the previous request; Therefore, you must wait for the previous request to return before you can construct and commit the next request.

Figure 4-1 Sequential requests

2. Parallel requests, as shown in Figure 4-2. Submit multiple requests at once, and then wait for all of them to return. Submitted requests have no dependencies between them, so they can be executed simultaneously; However, the response data for each request must be received (occurrence of the ChannelRead () event with the response data as the event parameter) before the actual processing can be performed (result1,2,3).



Figure 4-2 Parallel Requests

3. Batch requests, as shown in Figure 4-3. The caller submits multiple requests in succession, but temporarily queues (offer()) instead of executing them immediately. After some time, several requests are taken out of the queue and assembled as bulk requests to submit (writeBulk()); When a response message for a batch request is received, the response data for each request can be retrieved from it. Due to the additional overhead associated with each network IO, batch requests are often used to reduce network IO frequency to improve overall throughput.



Figure 4-3 batch request

4.1. Sequential Requests: Promise.Then ()

Suppose a series of operations need to be completed in sequence, that is, the previous operation is completed before the next operation can be executed. If these operations are represented by the Promise API, we can wrap promise.await (listener) to make the code structure cleaner.

An asynchronous Promise API is shown below. The submit method submits the request and returns a Promise object. The Promise object is notified when the response data is received.

/** * async Promise API */ public static Promise<String> submit(Object request) {Promise<String> resultPromise = new Promise<>(); / /... return resultPromise; }

Now let’s assume that there are five requests called “A” – “E” and that these requests must be submitted in turn. For example, since the parameters of request B depend on the response data of request A, its response data resultA must be processed before submission of request B. This scenario can be implemented with the code shown below. Once we call submit(“X”), we register the callback on the Promise object it returns; The response data, resultX, is processed inside the callback function and submit(“X+1”) is called to submit the next request.

While this approach satisfies the functional requirements, the nested code structure is very unreadable — each additional request requires the code to be nested and indented one more level. When the invocation logic is complex and the number of requests is high, the code can be very difficult to maintain.

This situation is also known as “callback hell” [D] and has been discussed extensively in the JavaScript language for reference.

submit("A").await(resultA -> {
    submit("B").await(resultB -> {
        submit("C").await(resultC -> {
            submit("D").await(resultD -> {
                submit("E").await(resultE -> {
                    process(resultE);
                });
            });
        });
    });
});

To improve the code structure, we wrap the Promise<T>.await(Consumer<T>) method and provide the Promise<T>. Then (Function<T, Promise<Next>>) method, as shown below. Similar to await(), then() can also register a callback from resultx-> submit(“X+1”), which processes the response data resultX and submits the next request submit(“X+1”); The return value of then() is the return value of submit(“X+1”), which notifies the response data of the next request, resultX+1.

Promise<String> resultPromiseA = submit("A");
Promise<String> resultPromiseB = resultPromiseA.then(resultA -> submit("B"));
Promise<String> resultPromiseC = resultPromiseB.then(resultB -> submit("C"));
Promise<String> resultPromiseD = resultPromiseC.then(resultC -> submit("D"));
Promise<String> resultPromiseE = resultPromiseD.then(resultD -> submit("E"));
resultPromiseE.await(resultE -> process(resultE));

Next, we inline the intermediate variable resultpromisea-e, that is, we get a chain-like call structure based on then(). In contrast to await(), then() removes the nesting doll callback.

Submit ("A").then(resulta-> submit("B")) // return A resultpromiseB. Then (resultB-> submit("C")) // return A resultpromiseC .then(resultTD-> submit("D")) // return resultPromised. Then (resultTD-> submit("E")) // return resultPromisee.await (resulte->) process(resultE));

Finally, let’s look at a simple implementation of Promise

.then(), as follows:

  1. The then() method provides a generic template Next to indicate the response data type for the Next request.
  2. Create a Promise

    from within the generic template Next, then() as the return value to notify the response data for the Next request.
  3. For the current request, call await() to register the callback result of the response data; When the response data is received, the function func is executed to submit the next request: func.apply(result).
  4. When the response data for the Next request is received, the Promise

    is notified: nextPromise::signalAll.
public <Next> Promise<Next> then(Function<T, Promise<Next>> func) {
    Promise<Next> nextPromise = new Promise<>();
    await(result -> {
        Promise<Next> resultPromiseNext = func.apply(result);
        resultPromiseNext.await(nextPromise::signalAll);
    });
    return nextPromise;
}

(Function<T, Promise<Next>>); (Function<T, Promise<Next>>); Depending on whether the callback function returns a value and whether it executes synchronously or asynchronously, Promise can provide more than one overloading of then(); Restricted by Java syntax, if the compiler cannot distinguish between overloads, it can use function names to make explicit distinctions, such as:

thenRun(Runnable)

thenAccept(Consumer<T>)

thenApply(Function<T, Next>)

thenApplyAsync(Function<T, Promise<Next>>)

4.2. Parallel Request: LatchPromise

The previous section described the “sequential request” scenario, where multiple requests need to be executed in sequence; In the “parallel request” scenario, there is no order constraint between multiple requests, but we still need to wait for all requests to return before we can perform subsequent operations. For example, we need to query multiple database tables, and these queries can be executed simultaneously. But we have to wait for each query to return before we can get the full information. The JDK provides CountDownLatch to implement this scenario, but it only supports synchronous waiting; As an improvement, we use LatchPromise to achieve the same functionality and support purely asynchronous APIs.

Taking database access as an example, the following code demonstrates the use of LatchPromise:

  1. Submit 3 requests and get the Promise object corresponding to each request, ResultPromise1-3, to get the response data.
  2. Create a LatchPromise object and register with it the Promise object ResultPromise1-3 that you need to wait on.
  3. LatchPromise. UntilAllSignaled () returns a Promise allSignaled object. Wait becomes notified when all registered resultPromise1-3 are notified. Wait is of type voidPromise, which means that there is no response data for allwait to handle when they are notified.
  4. Wait registers a callback on allwait and calls resultpromise.await () to get the actual response data. At this point, since the request has been completed, await() returns immediately without blocking.
/* Promise<String promise1 = db.writeAsync("a"); Promise<String> resultPromise2 = db.writeAsync("b"); Promise<String> resultPromise3 = db.writeAsync("c"); /* LatchPromise = new LatchPromise(); /* LatchPromise(); latch.add(resultPromise1); latch.add(resultPromise2); latch.add(resultPromise3); Wait on their wait = wait on their wait (); allSignaled.await(() -> { String result1 = resultPromise1.await(); String result2 = resultPromise2.await(); String result3 = resultPromise3.await(); process(result1, result2, result3); });

By contrast, the following code uses CountDownLatch to do the same thing, but has the following drawbacks:

  1. CountDownLatch.await() only supports synchronous waiting. This is not acceptable in a purely asynchronous scenario.
  2. CountDownLatch is intrusive on the business logic. Programmers need to add a call to countDownLatch.countdown () to the business logic to control the timing of CountDownLatch; In contrast, LatchPromise relies on a ResultPromise object that already exists without the need to write additional timing control code.
  3. CountDownLatch introduces redundant logic. When you create CountDownLatch, you must specify the number of requests to wait in the construct parameter. Therefore, as soon as the number of requests submitted changes, you must update the code that creates CountDownLatch and modify the construct parameters accordingly.
CountDownLatch latch = new CountDownLatch(3);
resultPromise1.await(result1 -> latch.countDown());
resultPromise2.await(result2 -> latch.countDown());
resultPromise3.await(result3 -> latch.countDown());

latch.await();
String result1 = resultPromise1.await();
String result2 = resultPromise2.await();
String result3 = resultPromise3.await();
process(result1, result2, result3); 

Finally, let’s look at the reference implementation of LatchPromise. The code works as follows:

  1. Set the countunFinished variable to record the number of Promise objects that have not yet been notified. Each time a Promise object is registered, CountunFinished increments; Each time a Promise is notified, CountFinished decrements. Wait notifies allwait when countunFinished drops to 0.
  2. Wait sets up the noMore variable to record whether new Promise objects are waiting to be registered. Wait considers registration completed only when untilallwait () is called. Wait should not notify allwait until then, even if countunFinished is reduced to 0. Consider a case where it is required to register and wait for resultpromise1-3, in which resultPromise1 and 2 are notified during registration and resultPromise3 is not notified. Wait = 0 If noMore is not judged, countunFinished is reduced to 0 after registration of resultPromise1 and 2, causing advance notification to allwait. This is a timing error, because actually ResultPromise3 is not finished yet.
  3. To ensure thread safety, variables must be accessed with locks, which are implemented here using synchronized.
  4. Wait () notify allwait immediately if the initial value of countFinished is already 0. Wait are released from their wait because countFinished is not available and they have no chance to notify allwait.
Private static class Lock. Synchronized (Lock) private Final Lock Lock = new Lock(); private int countUnfinished = 0; private final VoidPromise allSignaled = new VoidPromise(); public void add(Promise<? > promise) { if (promise.isSignaled()) { return; } synchronized (lock) { countUnfinished++; } promise.await(unused -> { synchronized (lock) { countUnfinished--; if (countUnfinished == 0 && noMore) { allSignaled.signalAll(); }}}); } public VoidPromise untilAllSignaled() { synchronized (lock) { if (countUnfinished == 0) { allSignaled.signalAll(); } else { noMore = true; } } return allSignaled; }

4.3. Batch Requests: ExecutorAsync

Batch request features

Bulk requests (also known as “bulk” or “batch”) are those that can carry multiple requests with a single message, and are used in scenarios such as database access and remote calls. Batch requests can effectively improve throughput by reducing network IO times and saving the overhead of constructing and transmitting messages.

Many database APIs support batch reads and writes, Other examples include JDBC PreparedStatement[E], Elasticsearch Bulk API[F], Mongo DB Insertmany ()[G], Influx DB Batchpoints [H], etc. Check the references for further information. Some APIs sacrifice usability in order to improve performance. Among them, Elasticsearch Bulk API has the least restrictions on callers, allowing different types of requests to be mixed, added, deleted, or changed, and allowing different indexes to be written. Following is the Mongo DB, Influx DB. A batch request can only write to the same database table, but the field of each data can be customized. PreparedStatement has the least flexibility in that it defines a template for an SQL statement, and callers can only fill in the template parameters, not modify the statement structure.

Although database APIs already support batch access, many native APIs still require the caller to construct the batch request himself, requiring the caller to handle the complex details of request assembly, batch size, number of concurrent requests, and so on.

Here, we design a generic component called ExecutorAsync that encapsulates the request scheduling policy to provide a cleaner API. The use flow for ExecutorAsync is shown in the following code snippet:

  1. Similar to thread pool executorService.submit (), a caller can call executorAsync.submit () to submit a request. Where, the Request is represented by the data object Request, which is used to store the Request type and Request parameters.
  2. After submitting the request, the caller gets the Promise object to get the response data. With Promise, ExecutorAsync supports purely asynchronous operations, with no blocking required to submit the request or get the response data.
  3. ExecutorAsync schedules requests internally, instead of submitting one request and executing it immediately, it collects a batch of requests at regular times, assembles them into a batch request, and calls the actual database access API. If the database access API allows it, a batch of requests can be intermixed with different request types, or operate on different database tables.
ExecutorAsync executor = new ExecutorAsync(); Promise<... > resultPromise1 = executor.submit(new Request("data1")); Promise<... > resultPromise2 = executor.submit(new Request("data2")); Promise<... > resultPromise3 = executor.submit(new Request("data3"));

Specifically, ExecutorAsync supports the following scheduling policies:

1. Queuing, as shown in Figure 4-4a. The caller does not execute the Request immediately after submitting the Request, but cache it in the queue.



Figure 4-4a ExecutorAsync feature: Queue

2. Batch, as shown in Figure 4-4b. At regular intervals, ExecutorAsync pulls several requests from the queue, assembles them as bulk requests, and calls the underlying database API to deliver them to the server. If the length of the queue grows very rapidly, we can also define a bulk size to assemble a bulk request and submit it as soon as the queue length reaches this value.



Figure 4-4b ExecutorAsync feature: Batching

3. Concurrency, as shown in Figure 4-4C. If the underlying database API supports asynchronous submission of requests, then ExecutorAsync can take advantage of this feature to submit multiple batch requests in a row without having to wait for previous batch requests to return. To avoid database server overload, we can define parallelism to limit the number of batch requests being executed in flight. When the limit is reached, if the caller submits a new request, it is temporarily placed in a queue waiting to be executed, rather than assembling a new batch request.



Figure 4-4C ExecutorAsync feature: Concurrency

4. The discarded. As shown in Figure 4-4D. Under the constraints of Bulk Size and Parallelism mentioned above, if a request is submitted at a rate much higher than the server response rate, a large number of requests will pile up in the queue waiting to be processed, resulting in a timeout failure. In this case, sending the request to the server is meaningless because the caller has already decided that the request failed and no longer cares about the response data.

Figure 4-4D request timeout

Therefore, ExecutorAsync should remove the invalid request from the queue in a timely manner while the remaining requests remain “fresh.” This policy can force the length of the queue to reduce the accumulation time of subsequent requests in the queue and prevent requests from timeout. This strategy also saves memory and IO overhead by avoiding storing and sending invalid requests.

Figure 4-4E ExecutorAsync feature: Discard

Batch request implementation

In the previous section we saw the scheduling policies for ExecutorAsync, including queuing, batching, concurrency, and discarding. As shown in the following code, ExecutorAsync only needs to provide the submit(Request) method to submit a single Request. The Request is represented by the data object Request, and its field request. resultPromise is a Promise object used to notify the response data. For scenarios where Exception handling is required, we use PromiseORException <T, Exception> as the implementation of the Promise, where the generic template T is changed to the actual type of the response data.

public class ExecutorAsync { public PromiseOrException<T, Exception> submit(Request<T> request) { return request.resultPromise; }}

Now let’s look at how ExecutorAsync works. Because the source code details more, the length is longer, so this section with the form of flow chart, to explain the higher level of the design, as shown in Figure 4-5.

Figure 4-5 Principle of ExecutorAsync

1. Submit the request. The caller calls executorAsync.submit (Request), one Request per call. This request is put into QUEUE and waits for subsequent scheduling. The structure of the parameter Request is shown in the following code, including the following fields:

Predicate: A function that determines whether a request is valid or not. Invalid requests, such as those that time out, are discarded. See Step 2 for details.

ResultPromise: Notify the response data.

public class Request<Response> {
    public final PredicateE predicate; 
    public final PromiseOrException<Response, Exception> resultPromise;
}

2. Attempts to assemble bulk requests at regular intervals, or if queue.size() reaches bulk size. The request. predicate function is executed for each Request in the queue to determine whether the Request should still be submitted. Bulk size does not exceed the number of valid requests fetched. Predicate is a function, similar to the JDK Predicate interface, in the form shown in the following code. The interface function test() returns normally, indicating that the request is still valid. You can also throw an exception to explain why the request is invalid, such as waiting for a timeout. If an exception is thrown, the Request is dropped directly and the exception is notified to request.resultPromise, causing the caller to perform the exception-handling logic.

public interface PredicateE {
    void test() throws Exception;
}

3. Submit bulk requests. Step 2 retrieves up to bulk size bar requests from the queue and calls requestFunc.execute (Requests) as a parameter to submit bulk requests. The form of the interface RequestFunc is shown in the following code. The interface method execute(Requests) takes several requests as parameters, assemblies them into batch requests, and calls the underlying database API to submit them.

public interface RequestFunc<T>{
    void execute(List<Request<T>> requests);
}

4. When the response is received, for each Request, the response data is notified to request.resultPromise in turn.

5. To prevent server overload, ExecutorAsync can limit the number of concurrent requests to no more than Parallelism. We set the count variable inFlight=0 to count the number of batch requests being executed:

A. When trying to assemble a batch request (Step 2), first determine the Inflight < Parallelism to get the request to be executed from the queue.

B. After submitting the batch request (step 3, requestFunc.execute ()), inFlight++.

C. After a batch of requests have received the response data (step 4, request. resultPromise is notified), infLight –. If the number of requests in the queue still exceeds bulk size, then go back to step 2 and fetch another batch of requests for execution.

In summary, ExecutorAsync uses a queue to hold pending requests; When you need to submit batch requests, use Predicatee to filter valid requests and discard invalid ones. For a batch of requests, call requestFunc.execute () to bulk commit, and request.resultPromise notification when the response data is received. The above procedure satisfies the constraints to prevent server overload: the number of requests in a batch is at most bulk size; The number of batch requests being executed at the same time does not exceed PARALLELISM. That’s the basic principle of ExecutorAsync; In practical application, configuration parameters, generic templates and other details need to be dealt with, which will not be explained in detail in this paper due to space constraints.

5. To summarize

This paper introduces the practical application scenarios of the asynchronous model and the Promise design pattern, discusses the design principles of the asynchronous API, and introduces the corresponding solutions. Asynchronous patterns are more than a simple process of “submit a request and handle a response”, and sometimes require the design of exception handling mechanisms and scheduling based on the relationship between requests. In handling these complex scenarios, the API needs to be purely asynchronous, with no blocking in the process of submitting a request or processing a response. Make full use of compile-time checking to prevent callers from missing branches, especially the inevitable abnormal branches; The API needs to encapsulate complex, repetitive implementation details to keep the caller’s code structure simple and easy to understand.

The purpose of this series of articles is to popularize the asynchronous mode, hoping to play a role in attracting jade, to help readers understand the basic principle of asynchronous mode, to have an understanding of the practical problems that may be encountered, and initially explore the implementation mechanism of asynchronous mode. However, the real projects and tools are far more complex than described in this article, please do a good job of research, selection work. Looking at the various asynchronous APIs available, the reader will find that there is still no standard for asynchronous patterns. In the case of a database client, the asynchronous API functions are different. The Listener API and the Promise API are used. Some APIs are asynchronous in form, but may block in some cases. The underlying asynchronous API is based on thread pool/connection pool implementations, as well as responsive model implementations such as Netty. Therefore, it is important that the reader fully understands the API form, blocking properties, and threading model of asynchronous tools before they can be used in a project; If the existing tools do not conform to the development specifications, you can be bold to encapsulate them, or implement the required features yourself. Readers are also welcome to comment on the packaging and implementation of asynchronous tools.

reference

[A] jdk CompletableFuture

https://www.baeldung.com/java…

[B] netty DefaultPromise

https://www.tabnine.com/code/… 

[C] checked exception

https://www.geeksforgeeks.org… 

[D] Calling Hell

https://blog.avenuecode.com/c… 

[E] batch request: JDBC PreparedStatement

https://www.tutorialspoint.co… 

[F] Elasticsearch Bulk API

https://www.elastic.co/guide/… 

[G] InsertMany () [G] InsertMany ()

https://mongodb.github.io/mon… 

[H] BatchPoints: Influx DB BatchPoints

https://github.com/influxdata…