“This is the ninth day of my participation in the First Challenge 2022. For details: First Challenge 2022”

I. Analysis of asynchronous invocation mode

When writing code today, I want to call asynchronous operation, here I use java8 streaming asynchronous call, but in the process of using the asynchronous method, found two methods, as follows:

The difference is that one needs to specify a thread pool and one does not.

  • So what are the benefits of specifying a thread pool? Intuitively speaking, it has the following two advantages:

    • We can better plan our thread count with pool management based on our server performance.
    • We can customize the name of the thread we use, which is also mentioned in the Ali Java development specification.

1.1 Java8 Calls the default thread pool asynchronously

There’s nothing wrong with using the default. We use the default thread pool process through source code analysis.

   public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
Copy the code

What is asyncPool?

ForkJoinPool.commonPool() is used if useCommonPool is true, as shown below, otherwise a new ThreadPerTaskExecutor() is created:

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code

What is useCommonPool?

    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);
Copy the code
  /** * The target parallelism level for the common pool */
    public static int getCommonPoolParallelism(a) {
        return commonParallelism;
    }
Copy the code

The resulting parallelism level does not give a default value

static final int commonParallelism;
Copy the code

ForkJoinPool has a static code block that initializes commonParallelism when it is started. We will just focus on the last sentence:

    // Unsafe mechanics
    private static final sun.misc.Unsafe U;
    private static final int  ABASE;
    private static final int  ASHIFT;
    private static final long CTL;
    private static final long RUNSTATE;
    private static final long STEALCOUNTER;
    private static final long PARKBLOCKER;
    private static final long QTOP;
    private static final long QLOCK;
    private static final long QSCANSTATE;
    private static final long QPARKER;
    private static final long QCURRENTSTEAL;
    private static final long QCURRENTJOIN;

    static {
        // initialize field offsets for CAS etc
        try{ U = sun.misc.Unsafe.getUnsafe(); Class<? > k = ForkJoinPool.class; CTL = U.objectFieldOffset (k.getDeclaredField("ctl"));
            RUNSTATE = U.objectFieldOffset
                (k.getDeclaredField("runState"));
            STEALCOUNTER = U.objectFieldOffset
                (k.getDeclaredField("stealCounter")); Class<? > tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); Class<? > wk = WorkQueue.class; QTOP = U.objectFieldOffset (wk.getDeclaredField("top"));
            QLOCK = U.objectFieldOffset
                (wk.getDeclaredField("qlock"));
            QSCANSTATE = U.objectFieldOffset
                (wk.getDeclaredField("scanState"));
            QPARKER = U.objectFieldOffset
                (wk.getDeclaredField("parker"));
            QCURRENTSTEAL = U.objectFieldOffset
                (wk.getDeclaredField("currentSteal"));
            QCURRENTJOIN = U.objectFieldOffset
                (wk.getDeclaredField("currentJoin")); Class<? > ak = ForkJoinTask[].class; ABASE = U.arrayBaseOffset(ak);int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }

        commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
        defaultForkJoinWorkerThreadFactory =
            new DefaultForkJoinWorkerThreadFactory();
        modifyThreadPermission = new RuntimePermission("modifyThread");

        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run(a) { return makeCommonPool(); }});
         // Even if the thread is disabled, it is 1, or at least 1
        int par = common.config & SMASK;
        commonParallelism = par > 0 ? par : 1;
    }
Copy the code

The default is 7, as shown below:

So look at the following code:

    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);
Copy the code

This must return true to prove that it is currently parallel.

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Copy the code

A default thread pool of size seven is returned

In fact, the default value is the number of cores in the current CPU. My computer has eight cores, and the default value in the code is to reduce the number of cores by one, so it shows seven threads.

        if (parallelism < 0 && // The default value is 1, which is less than the core number
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) < =0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
Copy the code

Let’s test this with a main method. 10 threads block for 10 seconds each.

    public static void main(String[] args) {
        // Create 10 tasks, block for 10 seconds each
        for (int i = 0; i < 10; i++) {
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(10000);
                    System.out.println(new Date() + ":" + Thread.currentThread().getName());
                } catch(InterruptedException e) { e.printStackTrace(); }}); }try {
            Thread.sleep(30000);
        } catch(InterruptedException e) { e.printStackTrace(); }}Copy the code

The first seven tasks complete first, and the other three tasks are blocked for 10 seconds before completing:

Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-5
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-4
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-2
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-7
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-3
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-6
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-1
-----------------------------------------------------------  
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-2
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-5
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-4
Copy the code

Conclusion: When we use the default thread pool to make asynchronous calls, if the asynchronous task is IO intensive, in short, the processing time is long, it will cause other tasks using the shared thread pool to block, resulting in system performance degradation or even exceptions. Even when part of an interface is called, if the interface times out, it blocks for the same amount of time as the timeout period; It can actually improve performance when used in computationally intensive scenarios.

Use custom thread pools

As mentioned above, it is better to use custom thread pools for asynchronous calls in IO intensive scenarios.

  • For the two obvious benefits mentioned in the opening paragraph, here’s a new one:

    • We can better plan our thread count with pool management based on our server performance.
    • We can customize the name of the thread we use, which is also mentioned in the Ali Java development specification.
    • Other threads using the shared thread pool will not block or even exception because of blocking.

We customize the following thread pool:

/ * * *@description: global common thread pool *@author: weirx *@date: 2021/9/9 18:09 *@version: 3.0 * /
@Slf4j
public class GlobalThreadPool {

    /** * Number of core threads */
    public final static int CORE_POOL_SIZE = 10;

    /** * Maximum number of threads */
    public final static int MAX_NUM_POOL_SIZE = 20;

    /** * Task queue size */
    public final static int BLOCKING_QUEUE_SIZE = 30;

    /** * Thread pool instance */
    private final static ThreadPoolExecutor instance = getInstance();


    /** * description: Initializes thread pool **@return: java.util.concurrent.ThreadPoolExecutor
     * @author: weirx
     * @time: 2021/9/10 likewise * /
    private synchronized static ThreadPoolExecutor getInstance(a) {
        // Generate a thread pool
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_NUM_POOL_SIZE,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE),
                new NamedThreadFactory("Thread-wjbgn-".false));
        return executor;
    }

    private GlobalThreadPool(a) {}public static ThreadPoolExecutor getExecutor(a) {
        returninstance; }}Copy the code

Call:

    public static void main(String[] args) {
        // Create 10 tasks, block for 10 seconds each
        for (int i = 0; i < 10; i++) {
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(10000);
                    System.out.println(new Date() + ":" + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },GlobalThreadPool.getExecutor());
        }

        try {
            Thread.sleep(30000);
        } catch(InterruptedException e) { e.printStackTrace(); }}Copy the code

Print the thread whose thread name we specified:

Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-1 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-10 Mon Sep 13 11:32:35  CST 2021:Thread-Inbox-Model-2 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-9 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-5 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-6 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-3 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-7 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-8 Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-4Copy the code

Third, digression, dynamic thread pool

3.1 What is a dynamic thread pool?

When working with thread pools, do you sometimes struggle with how much thread pool parameter to set? What if I run out of them? Do I need to change the code and redeploy?

In fact, it is not necessary. I remember reading an article by Meituan at the beginning, which really makes people suddenly understand ah, dynamic thread pool.

The ThreadPoolExecutor class allows you to dynamically modify the following attributes of a thread pool:

From top to bottom:

  • Thread factory (for specifying thread names)
  • Core threads
  • Maximum number of threads
  • The active time
  • Rejection strategy.

In the Meituan article, it monitors server thread usage, alarms when thresholds are reached, and then dynamically changes these values through the configuration center.

We can also do this using @refreshScope plus nacos.

3.2 practice

I wrote a scheduled task to monitor the thread usage of the current service, expand the capacity when it is small, and restore the initial value when the usage drops after a period of time.

The thread pool GlobalThreadPool is used for scheduling tasks.

/ * * *@description: global thread pool daemon *@author: weirx *@date: 2021/9/10 * roar@version: 3.0 * /
@Slf4j
@Component
public class DaemonThreadTask {

    /** * The service supports the maximum number of threads */
    public final static int SERVER_MAX_SIZE = 50;

    /** * Maximum threshold, percentage */
    private final static int MAXIMUM_THRESHOLD = 8;

    /** * Increments the maximum number of threads */
    private final static int INCREMENTAL_MAX_NUM = 10;

    /** * increments the number of core threads */
    private final static int INCREMENTAL_CORE_NUM = 5;

    /** * Number of current threads */
    private static int currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;

    /** * Number of current core threads */
    private static int currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;

    @Scheduled(cron = "0 */5 * * * ?" )
    public static void execute(a) {
        threadMonitor();
    }


    /** * description: dynamically monitors and sets thread parameters **@return: void
     * @author: weirx
     * @time: 2021/9/10 did * /
    private static void threadMonitor(a) {
        ThreadPoolExecutor instance = GlobalThreadPool.getExecutor();
        int activeCount = instance.getActiveCount();
        int size = instance.getQueue().size();
        log.info("GlobalThreadPool: the active thread count is {}", activeCount);
        // The number of threads is insufficient
        if(activeCount > GlobalThreadPool.MAX_NUM_POOL_SIZE % MAXIMUM_THRESHOLD && size >= GlobalThreadPool.BLOCKING_QUEUE_SIZE) {  currentSize = currentSize + INCREMENTAL_MAX_NUM; currentCoreSize = currentCoreSize + INCREMENTAL_CORE_NUM;Threads can be added only if the current maximum number of threads is less than the maximum number of threads supported by the service
            if (currentSize <= SERVER_MAX_SIZE) {
                instance.setMaximumPoolSize(currentSize);
                instance.setCorePoolSize(currentCoreSize);
                log.info("this max thread size is {}", currentSize);
            } else {
                log.info("current size is more than server max size, can not add"); }}// The number of active threads is less than the default core thread number
        if (activeCount < GlobalThreadPool.MAX_NUM_POOL_SIZE
                && size == 0&& currentSize > GlobalThreadPool.MAX_NUM_POOL_SIZE) { currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE; currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE; instance.setMaximumPoolSize(currentSize); instance.setCorePoolSize(currentCoreSize); }}}Copy the code

3.3 What is the significance of dynamic thread pools?

Some friends actually asked me, I just set the thread pool bigger, what is the meaning of this dynamic thread pool?

That’s actually a good question. In traditional software, standalone deployment, hardware deployment, it is true that the number of threads we can use depends on the number of core threads on the server, and there are few other services competing for those threads.

But this is the age of containers, the age of cloud natives.

When multiple containers are deployed on a host, a container will consume a large amount of CPU resources at peak times. If all containers consume most of the resources, the container will inevitably be blocked or even crashed.

When the peak period is over, the released resources can be released for other containers that need them.

Combined with the current cloud server node expansion, the capacity needs to be dynamically expanded and shrunk, which is similar to threads. Under the condition of high availability, the cost can be saved as much as possible.