Recently, I encountered an OOM problem, indicating that I could not create more threads. I located the problem and found that there was a problem similar to the code below. I used JConsole to monitor and found that the number of threads suddenly soared in a certain period of time, which triggered the following thinking

Scene reappearance

public class DemoController {
    private ExecutorService executorService = Executors.newWorkStealingPool(20);

    @RequestMapping("/test")
    public String test() {
        ExecutorService forkJoinPool = Executors.newWorkStealingPool(10);
        CompletableFuture[] completableFutures = new CompletableFuture[600];
        for (int i = 0; i < 600; i++) {
            int j = i;
            completableFutures[i] = CompletableFuture.runAsync(() -> {
                getAssociatedInfo(forkJoinPool);
            }, forkJoinPool);
        }
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
        voidCompletableFuture.join();
        return "OK";
    }

    public String getAssociatedInfo(ExecutorService service) {
        CompletableFuture<String> trialAssociatedInfoCompletableFuture
                = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("You're supposed to be running, aren't you?");
                TimeUnit.SECONDS.sleep(100);
                System.out.println("You've already done it.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        }, executorService);
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(trialAssociatedInfoCompletableFuture);
        voidCompletableFuture.join();
        return "ok"; }}Copy the code

Code to explore

    completableFutures[i] = CompletableFuture.runAsync(() -> {
                    getAssociatedInfo(forkJoinPool);
                }, forkJoinPool);
Copy the code

The forkJoinPool starts an asynchronous task that is managed by a forkJoinPool thread pool. When the number of thread pools is less than 10, a thread is started and executed immediately. When the number of thread pools exceeds 10, a thread is added to the queue.

    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures);
Copy the code

AllOf is used to recursively construct the completion tree, summarize and return a total task, as shown below:

/ / from the perspective of multithreading, if the task has been finished, clog voidCompletableFuture. The join ();return "OK";
CompletableFuture->join():
    return reportJoin((r = result) == null ? waitingGet(false) : r);
CompletableFuture->waitingGet():
    Signaller q = null;
    boolean queued = false; int spins = -1; Object r; // The loop ends when the return task is not emptywhile ((r = result) == null) {
        if(spins < 0) spins = (Runtime.getRuntime().availableProcessors() > 1) ? 1 << 8:0; // Use brief spin-wait on multiprocessorselse if (spins > 0) {
            if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                --spins;
        }
        else if(q == null) // Instantiate a semaphore --1 q = new Signaller(interruptible, 0L, 0L);else if(! queued) queued = tryPushStack(q);else if (interruptible && q.interruptControl < 0) {
            q.thread = null;
            cleanStack();
            return null;
        }
        else if(q.thread ! = null && result = = null) {try {/ / if the delay did not return a result, will eventually go to this method, the following is ForkJoinPool. The management of the semaphore ForkJoinPool managedBlock (q); } catch (InterruptedException ie) { q.interruptControl = -1; } } } ForkJoinPool->managedBlock(): Thread t = Thread.currentThread();if((t instanceof ForkJoinWorkerThread) && (p = (wt = (ForkJoinWorkerThread)t).pool) ! = null) { WorkQueue w = wt.workQueue;while(! blocker.isReleasable()) { //if (p.tryCompensate(w)) {   // --2
                try {
                    do {} while(! blocker.isReleasable() && ! blocker.block()); } finally { U.getAndAddLong(p, CTL, AC_UNIT); }break; }}}else {
        do {} while(! blocker.isReleasable() && ! blocker.block()); } ForkJoinPool->tryCompensate(): // --2 canBlock = add && createWorker(); // throws on exceptionCopy the code
  • If the current thread is a forkJoin thread, use the else method until the result is returned.
  • A forkJoin thread performs a series of complex decisions when executing code at two points, and creates a new thread to help perform tasks in the thread pool if the result is still not returned. Those extra hundreds of threads did come from here;
CompletableFuture->Signaller->Signaller(): // --1 Signaller(boolean interruptible, long nanos, This.thread = thread.currentThread (); this.interruptControl = interruptible ? 1 : 0; this.nanos = nanos; this.deadline = deadline; }Copy the code

reflection

To the first voidCompletableFuture. The join (), the HTTP threads, thread is managed by forkJoinPool thread pool, up to 10 threads in parallel, then waitingGet (), because its not forkJoin thread, so are the else method

To the second voidCompletableFuture. The join (), the thread is forkJoinPool tasks, every task performs a getAssociatedInfo method, the executorService thread pool management, up to 20 threads in parallel, Then to waitingGet(), which is a forkJoin thread, a new thread is created to help perform tasks in the forkJoinPool thread pool. However, due to the number of executorService thread pools, the number of threads cannot be accelerated. As more and more getAssociatedInfo methods Join, the number of threads increases and can’t be released immediately, which ultimately leads to OOM

The solution

Hypothesis: Put HTTP thread tasks in the same thread pool as forkJoinPool tasks, so that every time a new forkJoinPool thread is created, it can steal the task and execute it. As the number of threads increases, more and more tasks are executed, reducing the number of threads created. And so it turned out