directory

  1. Executor interface introduction
  2. Common ExecutorService interfaces are described
  3. Some methods for creating a thread pool
    • 3.1 newFixedThreadPool method
    • 3.2 newCachedThreadPool method
    • 3.3 newScheduledThreadPool method
  4. Question answer
    • 4.1. Runable interface and Callable interface

Executor interface introduction

Executor is an interface that provides a execute method that takes a Runable argument, as shown below

public interface Executor {
    void execute(Runnable command);
}
Copy the code

Common class and interface structure diagram of the Executor framework

Thread objects and objects returned by thread execution

The thread object

A thread object is a task submitted to a thread pool and can implement a Runable or Callable interface. Why can Runable and Callable be executed as tasks when they are unrelated? And you can think about that, and it’s explained at the end of the article

The Future interface

The Future interface and FutureTask class are used to receive the results of asynchronous thread execution. As you can see, the Submit method of the ExecutorService interface below returns the Future.

Common ExecutorService interfaces are described

Next, look at the ExecutorService, which inherits the Executor interface

public interface ExecutorService extends Executor {
    // Normal shutdown (no new tasks are received, tasks in the queue are finished)
    void shutdown(a);
	// Forcibly close (close currently executing tasks, return a list of all tasks that have not been started)
    List<Runnable> shutdownNow(a);

    boolean isShutdown(a);

    boolean isTerminated(a);

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result); Future<? > submit(Runnable task); . }Copy the code

Introduction to the ThreadPoolExecutor constructor

The Following follows: * If ThreadPoolExecutor returns the following file: * If ThreadPoolExecutor returns the following file: * If ThreadPoolExecutor returns the following file: * If ThreadPoolExecutor returns the following file: * If ThreadPoolExecutor returns the following file: * If ThreadPoolExecutor returns the following file: * If ThreadPoolExecutor returns the following file: * If ThreadPoolExecutor returns the following file

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {... }Copy the code

Parameter is introduced

parameter type meaning
corePoolSize int Core threads
maximumPoolSize int Maximum number of threads
keepAliveTime long Survival time
unit TimeUnit Unit of time
workQueue BlockingQueue A queue that holds threads
threadFactory ThreadFactory Create a factory for threads
handler RejectedExecutionHandler Redundant thread handlers (reject policy)

Some methods for creating a thread pool

Why the ExecutorService interface? That’s because most of the ExecutorService returned by using Executors. I’ll cover the following methods for creating thread pools provided by Executors

NewFixedThreadPool (int nThreads) Creates a thread pool. If there are no idle threads, the pool is executed. If there are no idle threads, the pool is suspended in the task queue. NewWorkStealingPool () creates a thread pool that holds enough threads to support a given level of parallelism, and reduces contention by using multiple queues. It takes a parameter to pass a level of parallelism, which if not passed, is set to the default number of cpus. NewSingleThreadExecutor () this method returns a fixed number of thread pools. The threads of this method are always the same. When a task is submitted, if the thread pool is free, it is executed immediately. NewCachedThreadPool () returns a thread pool that can adjust the number of threads as required, has no limit on the number of threads, executes tasks if there are idle threads, creates no threads if there are no tasks, and automatically recycles each idle thread after 60 seconds. NewScheduledThreadPool (int corePoolSize) returns a SchededExecutorService object, but this thread pool can set the number of threads to support scheduled and periodic task execution. NewSingleThreadScheduledExecutor () to create a singleton thread pool, or delay to perform tasks on a regular basis.Copy the code

Here are a few common ways to do this, rather than creating a single one

NewFixedThreadPool method

This method creates a thread pool with a specified number of threads. There is no limit on the number of threads that can be stored (unbounded queues). It is suitable for scenarios where threaded tasks are executed quickly.

See how it works inside Executors Factory

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code

As you can see, a ThreadPoolExecutor object is returned, the number of core threads and the maximum number of threads are passed in as arguments, the lifetime is 0 in milliseconds, and the blocking queue is the unbounded queue LinkedBlockingQueue.

Since the queue uses an unbounded queue LinkedBlockingQueue, the maximum thread count maximumPoolSize and keepAliveTime are invalid parameters, so will the reject policy. Why?

An unbounded queue means that there is no upper limit for tasks. If the tasks that are executed are time-consuming, new tasks will always be stored in the thread pool, and the number of tasks in the thread pool will increase. What will happen? Try the following code

public class Main {

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        while (true){
            pool.submit(new Runnable() {
                @Override
                public void run(a) {
                    try {
                        Thread.sleep(10000);
                    } catch(InterruptedException e) { e.printStackTrace(); }}}); }}}Copy the code

The sample code

public class Main {

    public static void main(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    System.out.println("Task"+ finalI +": Start wait 2 seconds, time :"+LocalTime.now()+", current thread name:"+Thread.currentThread().getName());
                    Thread.sleep(2000);
                    System.out.println("Task"+ finalI +": End wait 2 seconds, time :"+LocalTime.now()+", current thread name:"+Thread.currentThread().getName());
                } catch(InterruptedException e) { e.printStackTrace(); }}); } pool.shutdown(); }}Copy the code

The output

Task 4: Start wait 2 seconds, time :17:13:22.048, current thread name: pool-1-thread-4 Task 2: Start wait 2 seconds, time :17:13:22.048, current thread name: Pool-1-thread-2 Task 3: Start wait 2 seconds, time :17:13:22.048, current thread name: Pool-1-thread-3 Task 1: Start wait 2 seconds, time :17:13:22.048, current thread name: Pool-1-thread-1 Task 2: End Wait 2 seconds, time :17:13:24.048, current thread name: Pool-1-thread-2 Task 3: End wait 2 seconds, time :17:13:24.048, current thread name: Pool-1-thread-3 Task 1: End Wait 2 seconds, time :17:13:24.048, current thread name: Pool-1-thread-1 Task 4: End wait 2 seconds, time :17:13:24.048, current thread name: Pool-1-thread-4 Task 6: Start wait 2 seconds, time :17:13:24.049, current thread name: Pool-1-thread-4 Task 7: Start wait 2 seconds, time :17:13:24.049, current thread name: Pool-1-thread-1 Task 5: Start wait 2 seconds, time :17:13:24.049, current thread name: Pool-1-thread-3 Task 8: Start wait 2 seconds, time :17:13:24.049, current thread name: Pool-1-thread-2 Task 5: End Wait 2 seconds, time :17:13:26.050, current thread name: Pool-1-thread-3 Task 7: End wait 2 seconds, time :17:13:26.050, current thread name: Pool-1-thread-1 Task 8: End Wait 2 seconds, time 17:13:26.051, current thread name: pool-1-thread-2 Task 6: End wait 2 seconds, time 17:13:26.050, current thread name: pool-1-thread-4Copy the code

It can be seen that tasks 1-4 are executed at the same time and finish 2 seconds later. Tasks 5-8 start at the same time. Only four threads are created in the method. Other tasks are queued for execution.

NewCachedThreadPool method

The thread pool created by the newCachedThreadPool method automatically creates new threads as needed.

See how it works inside Executors Factory

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code

The newCachedThreadPool method also returns a ThreadPoolExecutor object, the core thread is 0, the maximum number of threads is Integer’s MAX_VALUE, the lifetime is 60, the time unit is seconds, and the SynchronousQueue.

As you can see from the parameters passed in, the idle thread lifetime in the newCachedThreadPool method is 60 seconds, after which the thread is terminated. An implicit problem here is that if the executing thread is slow and the task is submitted faster than the thread is executing, new threads will be constantly created, resulting in CPU and memory growth.

The code loops to add new thread tasks like newFixedThreadPool, and my computer runs with the following error

An unrecoverable stack overflow has occurred.

Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
	at java.lang.Thread.start0(Native Method)
	at java.lang.Thread.start(Thread.java:714)
	at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.learnConcurrency.executor.cachedThreadPool.Main.main(Main.java:11)
Process finished with exit code -1073741571 (0xC00000FD)

Copy the code

For the SynchronousQueue, which is a blocking queue with no capacity, the task delivery diagram is shown below

The sample code

public class Main {
    public static void main(String[] args) throws Exception{
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    System.out.println("Task"+ finalI +": Start waiting 60 seconds, time :"+LocalTime.now()+", current thread name:"+Thread.currentThread().getName());
                    Thread.sleep(60000);
                    System.out.println("Task"+ finalI +": End wait 60 seconds, time :"+LocalTime.now()+", current thread name:"+Thread.currentThread().getName());
                } catch(InterruptedException e) { e.printStackTrace(); }});// Sleep 10 seconds
            Thread.sleep(10000); } pool.shutdown(); }}Copy the code

The execution result

Task 1: Start wait 60 seconds, time :17:15:21.570, current thread name: pool-1-thread-1 Task 2: Start wait 60 seconds, time :17:15:31.553, current thread name: Pool-1-thread-2 Task 3: Start wait 60 seconds, time :17:15:41.555, current thread name: Pool-1-thread-3 Task 4: Start wait 60 seconds, time :17:15:51.554, current thread name: Pool-1-thread-4 Task 5: Start wait 60 seconds, time :17:16:01.554, current thread name: Pool-1-thread-5 Task 6: Start wait 60 seconds, time :17:16:11.555, current thread name: Pool-1-thread-6 Task 7: Start wait 60 seconds, time :17:16:21.555, current thread name: Pool-1-thread-7 Task 1: End Wait 60 seconds, time :17:16:21.570, current thread name: Pool-1-thread-1 Task 2: End Wait 60 seconds, time :17:16:31.554, current thread name: Pool-1-thread-2 Task 8: Start wait 60 seconds, time :17:16:31.556, current thread name: Pool-1-thread-2 Task 3: End Wait 60 seconds, time :17:16:41.555, current thread name: Pool-1-thread-3 Task 4: End wait 60 seconds, time :17:16:51.556, current thread name: Pool-1-thread-4 Task 5: End Wait 60 seconds, time :17:17:01.556, current thread name: Pool-1-thread-5 Task 6: End wait 60 seconds, time :17:17:11.555, current thread name: Pool-1-thread-6 Task 7: End Wait 60 seconds, time :17:17:21.556, current thread name: pool-1-thread-7 Task 8: End wait 60 seconds, time :17:17:31.557, current thread name: pool-1-thread-2Copy the code

In the example code, each task sleeps for 60 seconds, and each cycle adds tasks to sleep for 10 seconds. From the execution result, the 7 added tasks are executed by different threads, and at this time, threads 1 and 2 have finished executing, and task 8 is added to be executed by the previously created pool-1-Thread-2.

NewScheduledThreadPool method

This thread pool is mainly used to defer or execute tasks on a regular basis.

See how it works inside Executors Factory

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
Copy the code

Here is returned ScheduledThreadPoolExecutor object, we further check

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
Copy the code

Here is call the superclass constructor, ScheduledThreadPoolExecutor is the parent of the ThreadPoolExecutor, so the return is ThreadPoolExecutor object. The number of core threads is the parameter corePoolSize passed in, the maximum number of threads is Integer’s MAX_VALUE, the lifetime is 0, the time is in nanoseconds, and the queue is DelayedWorkQueue.

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {}
Copy the code

Here are some methods of ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {
	//delay delay time, unit Delay unit. The command is executed only once after the delay time
    publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
	InitialDelay after initialDelay+period after initialDelay+ 2 * period and so on
    publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
	// Execute the first time and then after initialDelay and then delay delay
    publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}
Copy the code

Question answer

Runable and Callable interfaces

Check it out from the submit task entry

The Submit method is implemented by the abstract class AbstractExecutorService

publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
Copy the code

You can see that you pass the Runnable object and Callable into a newTaskFor method and return a RunnableFuture object

Let’s look at the newTaskFor method again

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
Copy the code

Here are all the constructors that call FutureTask, so let’s move on

private Callable<V> callable;

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;      
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       
}
Copy the code

The FutureTask class has a member variable called Callable, and the incoming Runnable object returns a Callable object by continuing to call the Execable Factory method

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
/ / adapter
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call(a) {
        task.run();
        returnresult; }}Copy the code

Well, this is where the truth becomes clear: The Runnable object goes through a series of method calls and is eventually adapted into a Callable object by the RunnableAdapter adapter. The method call diagram is shown below

Making the address

The address on this

Hit a star if you feel good

Custom thread pools will be covered in the next article, and the newWorkStealingPool method introduction will be updated later

The resources

[1] The art of Concurrent programming in Java

[2] Java Concurrent programming practice