directory

  • scenario
  • Single-threaded implementation
  • Multi-threaded implementation –ExecutorService
  • Multi-threaded implementation –ForkJoinPool
  • test
  • conclusion
  • Refer to the link

In ordinary work, when the data volume is relatively large, the program runs slowly, and the program performance needs to be improved, multithreading is generally involved. For those of you who are not familiar with the use of multiple threads, this article focuses on the use of ThreadPoolExecutor and ForkJoinPool.

scenario

Let’s start with a scenario where we have an interface that computes the sum of arrays. The interface is defined as follows:

package mutilthread;

/** * the summation interface *@Author: Rebecca
 * @Description:
 * @Date: Created in 2019/6/18 15:28
 * @Modified By:
 */
public interface Calculator {
    long sumUp(int[] numbers) throws Exception;
}

Copy the code

Single-threaded implementation

In the beginning, our code will definitely use ordinary single-threaded implementation. The advantage of this is that the code is relatively simple, but the disadvantage is that when the data is large, the program will run slowly and cannot use multi-core CPU.

package mutilthread;

import java.util.ArrayList;
import java.util.List;

/** * single-threaded class *@Author: Rebecca
 * @Description:
 * @Date: Created in 2019/6/18 10:24
 * @Modified By:
 */
public class SingleThread implements Calculator {
    /** * computes the sum of arrays with a single thread@paramCalcData The array to sum@return
     * @author Rebecca 10:51 2019/6/18
     * @version1.0 * /
    @Override
    public long sumUp(int[] calcData) {
        // This code is intended to extend the runtime of the program
        List<SingleThread> tasks = new ArrayList<SingleThread>();

        int calcDataLength = calcData.length;
        long sum = 0l;
        for (int i = 0; i < calcDataLength; i++) {
            sum += calcData[i];

            // This code is intended to extend the runtime of the program
            tasks.add(new SingleThread());
        }
        returnsum; }}Copy the code

Multi-threaded implementation –ExecutorService

Because the disadvantage of single-threading severely affects the processing speed of the program, we optimized the code for multithreaded ExecutorService.

package mutilthread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;

/** * The ThreadPoolExecutor thread pool computes the sum of arrays@Author: Rebecca
 * @Description:
 * @Date: Created in 2019/6/18 10:50
 * @Modified By:
 */
public class MutilThreadOfThreadPoolExecutor implements Calculator {

    /** * The ThreadPoolExecutor thread pool computes the sum of arrays@paramCalcData The array to sum@return
     * @author Rebecca 10:51 2019/6/18
     * @version1.0 * /
    @Override
    public long sumUp(int[] calcData) throws Exception {
        // Create a thread pool
        ExecutorService executorService = new ThreadPoolExecutor(5.10./ / the number of threads
                60l, TimeUnit.SECONDS,  // The timeout period
                new ArrayBlockingQueue<Runnable>(100.true),  // How the thread handles data
                Executors.defaultThreadFactory(),  // Create a factory for the thread
                new CallerRunsPolicy());  // The processing mode is out of the processing scope


        int calcDataLength = calcData.length;
        long sum = 0l;
        int threadSize = 5;

        for (int i = 0; i < threadSize; i++) {
            int arrStart = calcDataLength / threadSize * i;
            int arrEnd = calcDataLength / threadSize * (i+1);

            SumTask task = new SumTask(calcData, arrStart, arrEnd);
            // Thread pools process data
            Future<Long> future = executorService.submit(task);

            sum += future.get().longValue();
        }
        // Close the thread pool
        executorService.shutdown();

        return sum;
    }


    public static class SumTask implements Callable<Long> {
        private int[] arr;
        private int start, end;

        public SumTask(a) {}

        public SumTask(int[] arr, int start, int end)
        {
            this.arr = arr;
            this.start = start;
            this.end = end;
        }

        @Override
        public Long call(a)
        {
            // This code is intended to extend the runtime of the program
            List<SumTask> tasks = new ArrayList<SumTask>();

            long sum = 0l;
            for (int i = start; i < end; i++)
            {
                sum += arr[i];
                // This code is intended to extend the runtime of the program
                tasks.add(new SumTask());
            }

            returnsum; }}}Copy the code

Execorservice also provides ways to create ExecutorService thread pools directly, Such as newSingleThreadExecutor(), newCachedThreadPool(), newFixedThreadPool(), newScheduledThreadPool(), Compared to the constructor provided by ThreadPoolExecutor, the Executors method uses 2 or fewer parameters, but new ThreadPoolExecutor() uses a bunch of parameters. So why do we use new ThreadPoolExecutor()?

The answer is simple, in order to not make the program appear in OOM. The thread pool is also created by new ThreadPoolExecutor() if you’ve seen the source file for Executors. But there is one parameter that it passes integer.max_value. What does this parameter mean? The maximum number of threads allowed in the thread pool. If integer. MAX_VALUE is actually created in the thread pool, the application will definitely get OOM.

// Executors newCachedThreadPool method source code
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
Copy the code

To avoid this, we typically create thread pools with new ThreadPoolExecutor(). So what do all these parameters mean?

Don’t worry, we can actually memorize in groups:

Group 1 (thread count dependent) :

  1. CorePoolSize: number of core threads. Even if there are no tasks in the thread pool, these threads will not be destroyed because creating and destroying threads consumes CPU resources
  2. MaximumPoolSize: Maximum number of threads allowed to be created in the thread pool

Group 2 (non-core thread destruction time dependent) :

  1. KeepAliveTime: The destruction time of a non-core thread. Non-core threads cannot hold resources in the thread pool all the time, so they need to be destroyed
  2. Unit: Time unit of destruction. A value ofTimeUnitEnumeration type in

Group 3 (related to thread pool processing data) :

  1. WorkQueue: How a thread processes data. Generally provided with the JDKArrayBlockingQueueAndLinkedBlockingDeque(list)
  2. Handler: Indicates the processing method that goes beyond the processing scope.

    AbortPolicy: If it is out of the processing range, it is thrownRejectedExecutionExceptionThe exception;

    CallerRunsPolicy: If it is out of scope, it is processed by the thread that called the thread pool;

    DiscardOldestPolicy: If it is out of scope, the oldest element is deleted and the new element is kept

    DiscardPolicy: If the value exceeds the processing scope, the device is not processed and discarded

Group 4 (factory for creating threads) :

  1. ThreadFactory: A factory used to create threadsExecutors.defaultThreadFactory()Can be
// ThreadPoolExecutor constructor source
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                          long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
Copy the code

Suppose we have a list of tasks divided into 3 groups of 3 tasks and only 3 threads in the thread pool to process them, then the processing order is as follows:

Step 1:

Task group 1 is processed by thread 1, which processes the first task in task group 1; Task group 2 is processed by thread 2, which processes the first task in task group 2; Task group 3 is processed by thread 3, which processes the first task in task group 3;

Step 2:

Thread 2 is processing faster. All tasks in task group 2 have been processed. Because no task group is waiting for processing, thread 2 is idle at this time. The task group 1 processed by thread 1 only processed the first task. Is there any way for thread 2 to steal the second task from the task group 1 to reduce the waiting time?

After JDK7, ForkJoinPool thread pools are available. Read on

Multi-threaded implementation –ForkJoinPool

Let’s use the summation example to simulate stealing tasks.

package mutilthread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/** * Calculates the sum of an array with a ForkJoinPool thread pool@Author: Rebecca
 * @Description:
 * @Date: Created in 2019/6/18 10:50
 * @Modified By:
 */
public class MutilThreadOfForkJoinPool implements Calculator {

    private ForkJoinPool pool;

    public MutilThreadOfForkJoinPool(a) {
        // public ForkJoinPool: pool = forkJoinpool.monpool ()
        pool = new ForkJoinPool();
    }

    /** * Calculates the sum of an array with a ForkJoinPool thread pool@paramCalcData The array to sum@return
     * @author Rebecca 10:51 2019/6/18
     * @version1.0 * /
    @Override
    public long sumUp(int[] calcData) {
        SumTask task = new SumTask(calcData, 0, calcData.length - 1);
        return pool.invoke(task);
    }


    public static class SumTask extends RecursiveTask<Long> {
        private int[] numbers;
        private int start;
        private int end;

        private SumTask(a){}

        public SumTask(int[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute(a) {
            // When the number to be calculated is less than 100,000, the result is directly calculated
            if (end - start < 1000000) {
                long total = 0;

                // This code is intended to extend the runtime of the program
                List<SumTask> tasks = new ArrayList<SumTask>();
                for (int i = start; i <= end; i++) {
                    total += numbers[i];
                    // This code is intended to extend the runtime of the program
                    tasks.add(new SumTask());
                }
                return total;
            } else {  // If not, split the task in two and calculate recursively
                int middle = (start + end) / 2;
                SumTask taskLeft = new SumTask(numbers, start, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, end);
                taskLeft.fork();
                taskRight.fork();
                returntaskLeft.join() + taskRight.join(); }}}}Copy the code

The RecursiveTask fork method is similar to the Thread start method. The technical name for stealing is work-stealing, and it can be implemented using ForkJoinPool in JDK7. Before JDK7, LinkedBlockingDeque also used a job-stealing algorithm.

test

Here is the test class code

package mutilThread;

import mutilthread.CalcData;
import mutilthread.MutilThreadOfForkJoinPool;
import mutilthread.MutilThreadOfThreadPoolExecutor;
import mutilthread.SingleThread;
import org.junit.Test;

/** * Thread test class *@Author: Rebecca
 * @Description:
 * @Date: Created in 2019/6/18 10:40
 * @Modified By:
 */
public class ThreadTest {

    @Test
    public void testThread(a) throws Exception {
        int[] data = CalcData.getCalcData();
        // Single thread test
        SingleThread singleThread = new SingleThread();
        long startTime = System.currentTimeMillis();
        System.out.println("Array sum:" + singleThread.sumUp(data));
        System.out.println("Single thread time:" + (System.currentTimeMillis() - startTime) + " ms");

        // A ThreadPoolExecutor test
        MutilThreadOfThreadPoolExecutor threadPool = new MutilThreadOfThreadPoolExecutor();
        startTime = System.currentTimeMillis();
        System.out.println("Array sum:" + threadPool.sumUp(data));
        System.out.println(ThreadPoolExecutor time: + (System.currentTimeMillis() - startTime) + " ms");

        // Multi-threaded (ForkJoinPool) tests
        MutilThreadOfForkJoinPool forkJoinPool = new MutilThreadOfForkJoinPool();
        startTime = System.currentTimeMillis();
        System.out.println("Array sum:" + forkJoinPool.sumUp(data));
        System.out.println("ForkJoinPool time:" + (System.currentTimeMillis() - startTime) + " ms"); }}Copy the code

Program running results:

Array sum: 499913683383 Single thread sum: 3307 ms Multi-thread sum: 197 ms array sum: 499913683383 ForkJoinPool time: 169 msCopy the code

The table is as follows:

The thread type Time (ms)
Single thread 3307
Multithreading (ThreadPoolExecutor) 197
Multithreading (ForkJoinPool) 169

conclusion

  1. Usually when we use multiple threadsExecuterServiceThe structure withnew ThreadPoolExecutor(), generally not usedExecutorsProvides method to construct line thread pool, avoid OOM;
  2. Thread pools are more manageable than thread groups (not mentioned in this article);
  3. Available after JDK7ForkJoinPoolRelative to theExecuterServiceFaster execution.
  4. There is a cost to communicating between threads. If you’re careful, you’ll notice two lines of redundant code in each of the above examples:
// This code is intended to extend the runtime of the program
List<SumTask> tasks = new ArrayList<SumTask>();

// This code is intended to extend the runtime of the program
tasks.add(new SumTask());
Copy the code

If you don’t add the extra code to create the object and just add up the array sum, you’ll find that single-threaded execution is much more efficient. Therefore, in actual use, the appropriate method should be selected according to the actual business logic comparison. If the business logic is simple and the program is fast, there is no need to use multithreading at all. In ForkJoinPool, the number of arrays is set to 100,000. The reason for this number is to compare with the ExecutorService method. If the number of arrays in ForkJoinPool is too small, the performance may be inferior to that of the ExecutorService.


A class used in a program to generate calculated data

package mutilthread;

import java.util.Random;

/** * Class that generates calculated data *@Author: Rebecca
 * @Description:
 * @Date: Created in 2019/6/18 10:25
 * @Modified By:
 */
public class CalcData {
    // The length is 10 million
    private static int calcDataLength = 10000000;

    public static int[] getCalcData() {
        Random random = new Random();
        int[] calcData = new int[calcDataLength];
        for (int i = 0; i < calcDataLength; i++) {
            Rrand. NextInt (n-m + 1) +m; rrand. NextInt (n + 1) +m;
            calcData[i] = random.nextInt(100001);
        }
        returncalcData; }}Copy the code

Refer to the link

Java Concurrency Notes: How to Use ForkJoinPool and how does it work

The Java Concurrent ThreadPool series (2) constructs thread pools using ThreadPoolExecutor