Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.

preface

The performance optimization of the system is a necessary path for every programmer, but it may also be the deepest path through. It requires not only an in-depth understanding of the various tools, but also sometimes a customized optimization solution based on a specific business scenario. Of course, you can also sneak a thread. sleep into your code to save a few milliseconds of sleep when tuning is needed (manual head). The subject of performance optimization is so vast that there is currently no good book on the market that can fully summarize the subject. Moreover, the means of performance optimization are also very rich and dazzling, even if it goes deep into various segments.

This article will not cover all optimization routines, but only present a common solution for the scenario of concurrent calls encountered in the recent project development process. You can package it or copy and paste it into your project. You are also welcome to give more ideas and optimize the scene.

background

I don’t know if you’ve encountered A scenario in development where you call service A, then service B, assemble the data and then call service C. (If you haven’t encountered this scenario in microservice development, I would say, either your system’s granularity is too coarse, Either this is an underlying system that fortunately has no downstream service dependencies ~)

Duration of this link is duration(A) + duration(B) + duration(C) + other operations. As a rule of thumb, most of the time is spent processing downstream services and network IO, while the time spent on in-application CPU operations is negligible by comparison. However, when we know that there is no dependency between calls to services A and B, can we call A and B concurrently to reduce the waiting time of synchronous calls? Ideally, the link time can be optimized to Max (duration(A),duration(B)) + duration(C) + other operations

As another example, sometimes we might need to batch invoke downstream services, such as batch query for user information. For service protection, downstream query interfaces often restrict the number of queries that can be queried at a time, for example, only 100 users can be queried at a time. Therefore, we need to split multiple requests for multiple queries, so the time becomes N *duration(A) + other operations. Similarly, with concurrent request optimization, the time can ideally be reduced to Max (duration(A)) + other operations

The code implementation of the two scenarios is basically similar, and this article will provide the idea and complete implementation of the second scenario.

A profound

The overall implementation class diagram of concurrent RPC calls is as follows:

First we need to create a thread pool for concurrent execution. Because there are usually other scenarios in the program that use thread pools, and we want RPC calls to use a separate thread pool, we encapsulate this with factory methods.

@Configuration
public class ThreadPoolExecutorFactory {

    @Resource
    private Map<String, AsyncTaskExecutor> executorMap;

    /** * The default thread pool */
    @Bean(name = ThreadPoolName.DEFAULT_EXECUTOR)
    public AsyncTaskExecutor baseExecutorService(a) {
        // Support customization of each service later
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // Set the thread pool parameter information
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix(ThreadPoolName.DEFAULT_EXECUTOR + "--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setDaemon(Boolean.TRUE);
        // Change the rejection policy to execute using the current thread
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // Initialize the thread pool
        taskExecutor.initialize();

        return taskExecutor;
    }

    /** * concurrent calls to separate thread pools */
    @Bean(name = ThreadPoolName.RPC_EXECUTOR)
    public AsyncTaskExecutor rpcExecutorService(a) {
        // Support customization of each service later
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // Set the thread pool parameter information
        taskExecutor.setCorePoolSize(20);
        taskExecutor.setMaxPoolSize(100);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix(ThreadPoolName.RPC_EXECUTOR + "--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setDaemon(Boolean.TRUE);
        // Change the rejection policy to execute using the current thread
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // Initialize the thread pool
        taskExecutor.initialize();

        return taskExecutor;
    }
    /** * Get thread pool by thread pool name * if no thread pool is found, throw exception *@paramName Name of the thread pool *@returnThe thread pool *@throwsRuntimeException If the thread pool with the name */ cannot be found
    public AsyncTaskExecutor fetchAsyncTaskExecutor(String name) {
        AsyncTaskExecutor executor = executorMap.get(name);
        if (executor == null) {
            throw new RuntimeException("no executor name " + name);
        }
        returnexecutor; }}public class ThreadPoolName {

    /** * Default thread pool */
    public static final String DEFAULT_EXECUTOR = "defaultExecutor";

    /** * The thread pool used for concurrent calls */
    public static final String RPC_EXECUTOR = "rpcExecutor";
}
Copy the code

As shown in the code, we declare two Spring thread pools AsyncTaskExecutor, the default thread pool and the thread pool for RPC calls, and load them into the Map. The caller can specify thread pool execution by using the fetchAsyncTaskExecutor method and passing in the name of the thread pool. Another detail here is that the number of threads in the Rpc thread pool is significantly higher than in the other thread pool because Rpc calls are not CPU-intensive logic and tend to be accompanied by a lot of waiting. Therefore, increasing the number of threads can effectively improve the concurrency efficiency.

@Component
public class TracedExecutorService {

    @Resource
    private ThreadPoolExecutorFactory threadPoolExecutorFactory;


    /** * Specifies the thread pool to submit the asynchronous task and get the task context *@paramExecutorName Name of the thread pool *@paramTracedCallable Asynchronous task *@param<T> Return type *@returnThread context */
    public <T> Future<T> submit(String executorName, Callable<T> tracedCallable) {
        returnthreadPoolExecutorFactory.fetchAsyncTaskExecutor(executorName).submit(tracedCallable); }}Copy the code

The Submit method encapsulates the logic to get a thread pool and submit an asynchronous task. A combination of Callable+Future is used to obtain the results of asynchronous thread execution.

With the thread pool in place, we need to declare an interface for submitting the concurrent invocation of the service:

public interface BatchOperateService {

    /** * Concurrent batch operation *@paramFunction executes logic *@paramRequests the request *@paramThe config configuration *@returnAll responses */
    <T, R> List<R> batchOperate(Function<T, R> function, List<T> requests, BatchOperateConfig config);
}

@Data
public class BatchOperateConfig {

    /** * Timeout duration */
    private Long timeout;

    /** * Timeout duration unit */
    private TimeUnit timeoutUnit;

    /** * Whether all the commands need to be executed successfully */
    private Boolean needAllSuccess;

}
Copy the code

The function object is passed in to the batchOperate method, which is code logic that needs to be executed concurrently. Requests are all requests that are recursed by concurrent calls and submitted to the asynchronous thread. The config object configures the concurrent call, such as the timeout for the concurrent query and whether the entire batch query continues if some calls fail.

Let’s take a look at the implementation class:

@Service
@Slf4j
public class BatchOperateServiceImpl implements BatchOperateService{

    @Resource
    private TracedExecutorService tracedExecutorService;

    @Override
    public <T, R> List<R> batchOperate(Function<T, R> function, List<T> requests, BatchOperateConfig config) {
        log.info("batchOperate start function:{} request:{} config:{}", function, JSON.toJSONString(requests), JSON.toJSONString(config));

        // The current time
        long startTime = System.currentTimeMillis();

        / / initialization
        int numberOfRequests = CollectionUtils.size(requests);

        // Results of all asynchronous threads
        List<Future<R>> futures = Lists.newArrayListWithExpectedSize(numberOfRequests);
        // Use countDownLatch for concurrent call management
        CountDownLatch countDownLatch = new CountDownLatch(numberOfRequests);
        List<BatchOperateCallable<T, R>> callables = Lists.newArrayListWithExpectedSize(numberOfRequests);

        // Commit asynchronous thread execution separately
        for (T request : requests) {
            BatchOperateCallable<T, R> batchOperateCallable = new BatchOperateCallable<>(countDownLatch, function, request);
            callables.add(batchOperateCallable);

            // Commit asynchronous thread execution
            Future<R> future = tracedExecutorService.submit(ThreadPoolName.RPC_EXECUTOR, batchOperateCallable);
            futures.add(future);
        }

        try {
            // Wait for all executions to complete, and throw an exception if a timeout occurs and all calls are required to succeed
            boolean allFinish = countDownLatch.await(config.getTimeout(), config.getTimeoutUnit());
            if(! allFinish && config.getNeedAllSuccess()) {throw new RuntimeException("batchOperate timeout and need all success");
            }
            // Iterate over the execution result, throw an exception if any execution fails and all calls are required to succeed
            boolean allSuccess = callables.stream().map(BatchOperateCallable::isSuccess).allMatch(BooleanUtils::isTrue);
            if(! allSuccess && config.getNeedAllSuccess()) {throw new RuntimeException("some batchOperate have failed and need all success");
            }

            // Get the results of all asynchronous calls and return
            List<R> result = Lists.newArrayList();
            for (Future<R> future : futures) {
                R r = future.get();
                if(Objects.nonNull(r)) { result.add(r); }}return result;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        } finally {
            double duration = (System.currentTimeMillis() - startTime) / 1000.0;
            log.info("batchOperate finish duration:{}s function:{} request:{} config:{}", duration, function, JSON.toJSONString(requests), JSON.toJSONString(config)); }}}Copy the code

Usually we just iterate through the Future and wait for the results after submitting to the thread pool. But here we use CountDownLatch for more uniform timeout management. Take a look at the BatchOperateCallable implementation:

public class BatchOperateCallable<T.R> implements Callable<R> {

    private final CountDownLatch countDownLatch;

    private final Function<T, R> function;

    private final T request;

    /** * Whether the thread processing succeeded */
    private boolean success;

    public BatchOperateCallable(CountDownLatch countDownLatch, Function<T, R> function, T request) {
        this.countDownLatch = countDownLatch;
        this.function = function;
        this.request = request;
    }

    @Override
    public R call(a) {
        try {
            success = false;
            R result = function.apply(request);
            success = true;
            return result;
        } finally{ countDownLatch.countDown(); }}public boolean isSuccess(a) {
        returnsuccess; }}Copy the code

Regardless of whether the call is successful or an exception, we decrement the counter by one at the end of the call. When the counter is reduced to 0, all concurrent calls are completed. Otherwise, if the counter does not return to zero within the specified time, the concurrent call times out and an exception is thrown.

A potential problem

One problem with concurrent calls is that we magnify the traffic to the downstream interface, in extreme cases by hundreds or thousands of times. If the downstream service does not take defensive measures such as limiting traffic, we are very likely to suspend the downstream service (such failures are common). Therefore, flow control is required for the entire concurrent call. There are two methods of traffic control. One is that if the micro service adopts mesh mode, QPS for RPC calls can be configured in sidecar to control the access to downstream services globally. (The choice of single-node traffic limiting or cluster traffic limiting depends on the mode supported by sidecar and the traffic volume of the service. Generally speaking, if the average traffic is small, it is recommended to choose single-node traffic limiting, because the fluctuation of cluster traffic limiting is often higher than that of single-node traffic limiting, and too small traffic will cause misjudgment. If mesh is not enabled, you need to implement traffic limiter in your code. Guava’s RateLimiter class is recommended here, but it only supports single-node traffic limiting. If you want to implement traffic limiting in a cluster, the complexity of the solution will be further increased

summary

Abstract the scenarios encountered in the project development and provide a general solution as far as possible is an important way for every developer to improve code reuse and stability. Concurrent Rpc calls are a common solution, and I hope the implementation of this article will help you.