This is the third in a series on multithreading, so check it out if you’re interested in the first two.

Multithreading (I), basic concepts, and the use of notify() and wait()

Multithreading (two), built-in lock synchronized

Android advanced series of articles is my study at the same time to sort out the knowledge point, one is to deepen the impression, two is convenient for follow-up reference.

If there are mistakes in the article, you are welcome to point them out.

preface

If you want to create a new Thread, you should use a Thread pool. What do you know about thread pools?

ThreadPoolExecutor

What is a thread pool

In a multithreaded (a), the basic concept and notify () and wait () about the use of the thread to create, whenever there is a task to, by creating a thread to perform the task, when the mission is over, to destruction of threads, the concurrent operation, a large number of tasks need to be performed, each thread will need to repeat the creation, task execution, destruction, This causes CPU resource destruction and slows down the response time.

        new Thread(new Runnable() {

            @Override

            public void run(a) {

                // Task execution

            }

        }).start();

Copy the code

** Thread pool ** : Literally means that threads are managed by a pool. When a task comes in, a created thread is removed from the pool to execute the task. When the task is finished, the thread is put back into the pool and destroyed when the thread pool is destroyed.

2. Benefits of using thread pools

From the above comparison, using thread pools basically has previous benefits:

1. Reduce resource consumption. By reusing threads in the thread pool, the resource consumption associated with thread creation and destruction is reduced.

2, improve the response speed. Reusing threads in the pool reduces the time overhead associated with creating and destroying threads repeatedly.

3. Improve thread manageability. Threads are scarce resources, and we cannot create them without control, which will consume a lot of system resources. Using thread pools, threads can be uniformly allocated, managed and monitored.

3. Thread pool parameters

To use thread pools, you must use the ThreadPoolExecutor class,

    / * *

     * Creates a new {@code ThreadPoolExecutor} with the given initial

     * parameters.

     *

     * @param corePoolSize the number of threads to keep in the pool, even

     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set

     * @param maximumPoolSize the maximum number of threads to allow in the

     *        pool

     * @param keepAliveTime when the number of threads is greater than

     *        the core, this is the maximum time that excess idle threads

     *        will wait for new tasks before terminating.

     * @param unit the time unit for the {@code keepAliveTime} argument

     * @param workQueue the queue to use for holding tasks before they are

     *        executed.  This queue will hold only the {@code Runnable}

     *        tasks submitted by the {@code execute} method.

     * @param threadFactory the factory to use when the executor

     *        creates a new thread

     * @param handler the handler to use when execution is blocked

     *        because the thread bounds and queue capacities are reached

     * @throws IllegalArgumentException if one of the following holds:<br>

     *         {@code corePoolSize < 0}<br>

     *         {@code keepAliveTime < 0}<br>

     *         {@code maximumPoolSize <= 0}<br>

     *         {@code maximumPoolSize < corePoolSize}

     * @throws NullPointerException if {@code workQueue}

     *         or {@code threadFactory} or {@code handler} is null

* /


    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler)
 
{

        if (corePoolSize < 0 ||

            maximumPoolSize <= 0 ||

            maximumPoolSize < corePoolSize ||

            keepAliveTime < 0)

            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)

            throw new NullPointerException();

        this.corePoolSize = corePoolSize;

        this.maximumPoolSize = maximumPoolSize;

        this.workQueue = workQueue;

        this.keepAliveTime = unit.toNanos(keepAliveTime);

        this.threadFactory = threadFactory;

        this.handler = handler;

    }

Copy the code

One of the most complex constructors of ThreadPoolExecutor is posted here, and we’ll separate out the parameters

1, int corePoolSize

Core thread count: Each time a task is received, a new thread is created by the thread pool to execute the task until the number of threads in the current thread pool is equal to corePoolSize. When the task is larger than corePoolSize, it is put into the blocking queue

2, int maximumPoolSize

Number of non-core threads: The maximum number of threads allowed in the thread pool. If the current blocking queue is full, new threads will be created when new tasks are received until the number of threads in the pool equals maximumPoolSize

3, long keepAliveTime

Thread idle duration: The duration to continue to live when the number of threads is greater than the number of no tasks to execute. By default, this parameter is useful only when the number of threads is greater than corePoolSize

4, TimeUnit unit

KeepAliveTime Unit of keepAliveTime

5, BlockingQueue workQueue

Blocking queue: when the number of threads in the thread pool exceeds corePoolSize, the thread enters the blocking queue for blocking wait. When the blocking queue is full, new threads are opened for execution according to the maximumPoolSize number.

Queue:

Is a special kind of linear table, special in that it only allows delete at the front of the table, and insert at the back of the table. Like a stack, a queue is a linear table with limited operation.

The end that inserts is called the end of the queue, and the end that deletes is called the head of the queue.

A queue with no elements is called an empty queue. The data elements of a queue are also called queue elements.

Inserting a queue element into a queue is called enqueueing, and removing a queue element from a queue is called enqueueing.

Because the queue can only be inserted at one end and deleted at the other end, only the earliest element can be deleted from the queue first, so the queue is also called FIFO – first in first out (FIFO – first in first out) linear table.

Blocking queues are often used in producer and consumer scenarios, where the producer is the thread that adds elements to the queue and the consumer is the thread that takes elements from the queue. A blocking queue is a cache container in which producers hold elements, and consumers only take elements from the container.

Let’s start with BlockingQueue, which is an interface that inherits from Queue

public interface BlockingQueue<Eextends Queue<E>

Copy the code

And look at the methods in it

For these methods, a brief introduction is given:

An exception is thrown Return special value blocking timeout
insert add(e) offer(e) put(e) offer(e, time, unit)
remove remove() poll() take() poll(time, unit)
check element() peek()

• Throw exception: IllegalStateException(“Queue Full “) will be thrown if an element is inserted into the Queue when the blocking Queue is full. When the queue is empty, NoSuchElementException is thrown when an element is fetched from the queue.

• Returns a special value: The insert method returns whether it succeeded or true on success. Remove method, which removes an element from the queue or returns NULL if none is present

• Block: When the blocking queue is full, if the producer thread puts elements to the queue, the queue will block the producer thread until it gets data or exits in response to an interrupt. When the queue is empty, the consumer thread tries to take elements from the queue, and the queue blocks the consumer thread until the queue is available.

• Timeout: When the blocking queue is full, the queue blocks the producer thread for a certain amount of time, and if it is longer than a certain amount of time, the producer thread exits.

Let’s take a look at some of the blocking queues provided by the JDK, as shown below:

Brief description:

Blocking queue usage
ArrayBlockingQueue A bounded blocking queue consisting of an array structure.
LinkedBlockingQueue A bounded blocking queue consisting of a linked list structure.
PriorityBlockingQueue An unbounded blocking queue that supports priority sorting.
DelayQueue An unbounded blocking queue implemented using a priority queue.
SynchronousQueue A blocking queue that does not store elements.
LinkedTransferQueue An unbounded blocking queue consisting of a linked list structure.
LinkedBlockingDeque A two-way blocking queue consisting of a linked list structure.

6, ThreadFactory ThreadFactory

Set the default Thread Factory name for each new thread to Executors. The thread name is pool-digital-thread-number.

7. RejectedExecutionHandler Handler (saturation policy)

The saturation strategy of the thread pool, if there are too many tasks, the queue is full, and there are no idle threads to process, the thread pool will have to adopt a saturation strategy for the new task, that is, provide a way to process this part of the task.

The JDK gives us four strategies, as shown below:

strategy role
AbortPolicy Throw an exception directly. This strategy is alsoThe default policy
CallerRunsPolicy The task is performed in the caller thread
DiscardOldestPolicy Discards the task at the top of the blocking queue and executes the current task
DiscardPolicy Drop the task directly

We can see that the RejectedExecutionHandler is actually an interface and there is only one rejectedExecution so we can define our own saturation policy according to our own needs.

/ * *

 * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.

 *

 * @since 1.5

 * @author Doug Lea

* /


public interface RejectedExecutionHandler {



    / * *

     * Method that may be invoked by a {@link ThreadPoolExecutor} when

     * {@link ThreadPoolExecutor#execute execute} cannot accept a

     * task.  This may occur when no more threads or queue slots are

     * available because their bounds would be exceeded, or upon

     * shutdown of the Executor.

     *

     * <p>In the absence of other alternatives, the method may throw

     * an unchecked {@link RejectedExecutionException}, which will be

     * propagated to the caller of {@code execute}.

     *

     * @param r the runnable task requested to be executed

     * @param executor the executor attempting to execute this task

     * @throws RejectedExecutionException if there is no remedy

* /


    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

}

Copy the code

4. How thread pools work

Familiar with the meanings of the above parameters of the thread pool, we can also summarize the working principle of the thread pool as follows:

When a thread pool is first created, there are no threads running in it. When a task comes in and the thread pool starts executing, it will be processed according to the actual situation.

2. When the number of threads in the current thread pool is less than corePoolSize, a new thread is created to execute each new task as it comes in.

3. When the number of threads running in the thread pool is greater than or equal to corePoolSize, new tasks are added to the blocking queue whenever they come in.

4. When the blocking queue is full and no more tasks can be added, a new non-core thread is created based on the maximumPoolSize number to execute the task.

4. When the number of threads in the thread pool is greater than or equal to maximumPoolSize, when a new task comes, the task is rejected and the saturation policy is adopted.

5. When a thread has nothing to do for more than a certain keepAliveTime, the thread pool determines that if the number of threads currently running is greater than corePoolSize, the thread is stopped. So after all the tasks in the thread pool are complete, it will eventually shrink to the size of corePoolSize.

Create a thread pool

5.1, ThreadPoolExecutor

Create directly from ThreadPoolExecutor:

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(

                2.10

                , 1, TimeUnit.SECONDS

                , new LinkedBlockingQueue<Runnable>(50)

                , Executors.defaultThreadFactory()

                , new ThreadPoolExecutor.AbortPolicy());

Copy the code
5.2. Executors Static method

Through tools Java. Util. Concurrent. Executors to create a thread pool, the essence of which is also called ThreadPoolExecutor created, only for different needs, to set the parameters.

1, FixedThreadPool

Fixed number of reusable threads

    public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>());

    }

Copy the code

Parameter Description:

Int corePoolSize: nThreads

Int maximumPoolSize: nThreads

Long keepAliveTime: 0 l

TimeUnit unit: TimeUnit. MILLISECONDS

BlockingQueue workQueue: New LinkedBlockingQueue()

As you can see, the core thread is the same as the non-core thread, and no non-core thread is created. The timeout is 0, that is, the thread is not recycled even if it is idle. The blocking queue is LinkedBlockingQueue.

When a task is coming, the core thread is created first. When the number of threads exceeds corePoolSize, the core thread enters the blocking queue. When there are idle threads, the task is executed in the blocking queue.

Usage scenario: The number of threads in the thread pool is fixed and cannot be reclaimed. The thread life cycle is synchronized with the thread pool life cycle.

2, newSingleThreadExecutor

Single thread execution

    public static ExecutorService newSingleThreadExecutor(a) {

        return new FinalizableDelegatedExecutorService

            (new ThreadPoolExecutor(1.1.

                                    0L, TimeUnit.MILLISECONDS,

                                    new LinkedBlockingQueue<Runnable>()));

    }

Copy the code

Parameter Description:

Int corePoolSize: 1

Int maximumPoolSize: 1

Long keepAliveTime: 0 l

TimeUnit unit: TimeUnit. MILLISECONDS

BlockingQueue workQueue: New LinkedBlockingQueue()

Basically the same as a FixedThreadPool, the most obvious difference being that there is only one core thread in the thread pool to execute tasks.

Usage scenario: With only one thread, ensure that all tasks are executed sequentially in one thread. There is no need to deal with thread synchronization. Multiple tasks can be executed sequentially.

3, newCachedThreadPool
    public static ExecutorService newCachedThreadPool(a) {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>());

    }

Copy the code

Parameter Description:

Int corePoolSize: 0

Int maximumPoolSize: Integer. MAX_VALUE

Long keepAliveTime: 60 l

TimeUnit unit: TimeUnit) SECONDS

BlockingQueue workQueue: new SynchronousQueue()

No core threads, number of non-core threads integer. MAX_VALUE, which can be created indefinitely but is reclaimed in 60 seconds. The task queue is SynchronousQueue, which cannot be inserted and executes immediately when a task is available.

Usage scenario: Because non-core threads are unlimited and use a SynchronousQueue that cannot be pluggable, it is suitable for high-volume but low-time tasks.

4, newScheduledThreadPool

Timing delay execution

    public ScheduledThreadPoolExecutor(int corePoolSize,

                                       ThreadFactory threadFactory)
 
{

        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

              new DelayedWorkQueue(), threadFactory);

    }

Copy the code

Parameter Description:

Int corePoolSize: corePoolSize (set)

Int maximumPoolSize: Integer. MAX_VALUE

Long keepAliveTime: 0

TimeUnit unit: NANOSECONDS

BlockingQueue workQueue: new DelayedWorkQueue()

The number of core threads is fixed (set), the number of non-core threads is unlimited, but the idle time is 0, that is, the non-core threads are recycled once idle, the DelayedWorkQueue() unbounded queue will sort the tasks and delay the execution of queue tasks.

Usage scenario: newScheduledThreadPool is the only thread pool that can execute tasks periodically. Schedule (Runnable Command, long delay, TimeUnit Unit) is used to perform periodic or delayed tasks.

6. Thread pool execution

The thread pool provides the execute and submit methods to execute

execute:

public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        // Get the binary status code corresponding to the current thread's life cycle

        int c = ctl.get();

        If the number of threads is smaller than the number of core threads, create a core thread to execute the task. If the number is smaller than the number of core threads, create a core thread to execute the task.

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        // Check whether the thread pool is RUNNING and add the task to the queue.

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            // Check the thread pool status. If it is not RUNNING, remove it from the queue

            if (! isRunning(recheck) && remove(command))

                reject(command);

            // If the current number of threads is 0, a separate thread is created without specifying a task.

            else if (workerCountOf(recheck) == 0)

                addWorker(null.false);

        }

        // If the above conditions are not met, try to create a non-core thread to execute the task, and if that fails, call reject().

        else if(! addWorker(command,false))

            reject(command);

    }

Copy the code

submit():

Source:

  public <T> Future<T> submit(Runnable task, T result) {

        if (task == nullthrow new NullPointerException();

          // Encapsulate runnable as a Future object

        RunnableFuture<T> ftask = newTaskFor(task, result);

          // Execute the execute method

        execute(ftask);

          // Return the wrapped Runable

        return ftask;

    }







  // newTaskFor: through FutureTask

  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {

        return new FutureTask<T>(runnable, value);

    }

Copy the code

The RunnableFuture

method returned by newTaskFor inherits the Runnable interface, so it can be executed directly through the execute method.

public interface RunnableFuture<Vextends Runnable.Future<V{

    / * *

     * Sets this Future to the result of its computation

     * unless it has been cancelled.

* /


    void run(a);

}

Copy the code

As you can see, submit actually calls the execute() method, but before calling the method, encapsulates the Runnable object as a FutureTask object, and then returns the Future

. Get the return value of the finished task.

FutureTask provides functions such as cancel, isCancelled, isDone, and GET to perform corresponding operations on tasks.

Conclusion:

In general, we don’t need to execute on the thread or get the result, we can use the execute method directly.

If we want to get the result of the task execution, or want to cancel the task, we use the Submit method.

7. Thread pool closure

Thread interrupts are covered in multithreading (I), basic concepts, and the use of notify() and wait().

  • Shutdown () : The thread pool is not terminated immediately, but is not terminated until all tasks in the task cache queue have completed, but no new tasks are accepted
  • ShutdownNow () : Immediately terminates the thread pool and attempts to interrupt tasks in progress, and empties the task cache queue to return tasks that have not yet been executed

8. Proper configuration of thread pools

Thread pool parameters are flexible and can be set freely, but how much of each parameter is appropriate? This depends on the tasks we deal with, and the tasks are generally analyzed from the following points:

8.1 Nature of the task

CPU intensive, IO intensive, and hybrid

CPU intensive should be configured with the smallest possible thread, such as a thread pool of Ncpu+1 thread

IO intensive, I/O operation related, such as disk, memory, network, etc., CPU requirements are not high, should be configured as many threads as possible, such as 2*Ncpu thread pool

The hybrid type needs to be divided into CPU intensive and IO intensive respectively, and the number of threads is determined according to the number of tasks and execution time

8.2. Task priority

High, middle and low priority

8.3 Task execution time

A long short

8.4. Task adherence

Whether you need to rely on other system resources, such as database connections

Runtime.getruntime ().availableProcessors() : number of cpus on the current device

Thread pool practice

And bala Bala said a big push, I think only the code run, through the results of the analysis of the most impressive, the following code run results to analyze.

Let’s start with this code:

    public static void main(String[] args) {

        Create a basic thread pool with ThreadPoolExecutor

        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(

                3.

                5.

                1.

                TimeUnit.SECONDS,

                new LinkedBlockingQueue<Runnable>(50));

        for (int i = 0; i < 30; i++) {

            final int num = i;

            Runnable runnable = new Runnable() {

                @Override

                public void run(a) {

                    try {

                        // Execute after 2 seconds of sleep

                        Thread.sleep(2000);

                        System.out.println("run : " + num + "Current thread:" + Thread.currentThread().getName());

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            };

            / / execution

            threadPoolExecutor.execute(runnable);

        }

    }

Copy the code

We create a thread pool with ThreadPoolExecutor and then execute 30 tasks.

Parameter Description:

Int corePoolSize: 3

Int maximumPoolSize: 5

Long keepAliveTime: 1

TimeUnit unit: TimeUnit) SECONDS

BlockingQueue workQueue: New LinkedBlockingQueue(50)

The number of core threads in the thread pool is 3, the number of non-core threads is 5, the non-core threads are idle for 1 second and are reclaimed. The blocking queue uses new LinkedBlockingQueue and specifies a queue capacity of 50.

Results:

We see that every two seconds, three tasks are executed. This is because we set the number of core threads as 3, and when the extra tasks arrive, they will be put into the blocking queue first. Moreover, we set the blocking queue capacity as 50, so the blocking queue will never be full and non-core threads will not be started.

Let’s change our thread pool as follows:

final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(

                2.

                5.

                1.

                TimeUnit.SECONDS,

                new LinkedBlockingQueue<Runnable>(25));

Copy the code

Instead of analyzing the parameters, let’s look directly at the results:

We saw five missions every two seconds this time. Why? Here according to our previous working principle of the thread pool to the analysis, we have thirty need to perform a task, the core number of threads to 2, the rest of the tasks in the blocking queue, blocking queue capacity of 25, shall not exceed the non-core threads remaining tasks, when the blocking queue is full, started the non-core threads to execute.

Let’s make a simple change to our thread pool as follows:

        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(

                2.

                5.

                1.

                TimeUnit.SECONDS,

                new LinkedBlockingQueue<Runnable>(24));

Copy the code

We changed the blocking queue size to 24. If you understand how thread pools work, you can see why I changed the blocking queue size to 24.

The most direct way is to throw an exception, but the thread pool still executes the task. Why throw an exception in the first place? First, we need to execute 30 tasks, but we have a blocking queue capacity of 24. When the queue is full, we start non-core threads, but the number of non-core threads is 5. When the remaining task comes, the thread pool will be saturated. If we manually set the saturation policy as follows:

        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(

                2.

                5.

                1.

                TimeUnit.SECONDS,

                new LinkedBlockingQueue<Runnable>(24),new ThreadPoolExecutor.DiscardPolicy());

Copy the code

The saturation strategy adopted here is DiscardPolicy, that is, discarding redundant tasks. In the end, you can see that there are no exceptions thrown, only 29 tasks are executed, and the last one is discarded.

* If the thread pool created by Executors is true, run the following tasks: * If the thread pool created by Executors is true, run the following tasks: * If the thread pool created by Executors is true, run the thread pool created by Executors.

1, FixedThreadPool

ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(3);

Copy the code

Results:

2, newSingleThreadExecutor

ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();

Copy the code

Results:

3, newCachedThreadPool

ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();

Copy the code

Results:

4, newScheduledThreadPool

ScheduledExecutorService threadPoolExecutor = Executors.newScheduledThreadPool(3);

Copy the code

conclusion

This is the third article multithreading, this article length is a little bit more, a little messy, follow-up will organize, basic it is to follow his own ideas, at the same time of writing, your operation will be again, source code analysis in the process, will be as detailed as possible, in-depth step by step, time and subsequent access to convenient, some place is not very detailed in the article, It may be updated again later or covered in a separate article.