Block the queue

Concept: When the blocking queue is empty, the take operation blocks; The add (PUT) operation blocks when the blocking queue is full.

Benefits: Blocking queues simplify operations without having to manually control when to block and when to wake up.

System: Collection→Queue→BlockingQueue→ seven BlockingQueue implementation classes.

Note that LinkedBlockingQueue is bounded, but there is a huge pit with a default size of integer.max_value, up to 2.1 billion, which is generally out of memory (as shown in ThreadPoolExecutor).

API: Throw exception: when the queue is full, insert again will throw an exception; Return Boolean means that when the queue is full, insert again returns false; Blocking means that when the queue is full, further insertions will be blocked until an element is removed from the queue. A timeout is when a timeout expires before insertion or extraction.

SynchronousQueue has only one element. If you want to insert more than one element into the SynchronousQueue, you must wait until the element has been removed

1. Producers and consumers

1.1 Callable interface

Difference from Runnable:

  1. Callable with a return value.
  2. An exception is thrown.
  3. Override the call() method instead of the run() method.

use

Class MyThread implements Callable<Integer> {@override public Integer call();  throws Exception { System.out.println("callable come in ..." ); return 1024; }} public static void main(String[] args) throws ExecutionException, InterruptedException { Accept MyThread. FutureTask<Integer> futureTask = new FutureTask<>(new MyThread()); // Place the FutureTask object in the Thread constructor. new Thread(futureTask, "AA").start(); int result01 = 100; // Use FutureTask's get method to get the return value. int result02 = futureTask.get(); System.out.println("result=" + (result01 + result02)); }}Copy the code

2. The traditional version

lock.lock(); While (number =1) try {//1 Condition. Await (); } //2 work otherwise, proceed to produce number++; System.out.println(Thread.currentThread().getName() + "\t" + number); Condition. SignalAll (); } catch (Exception e) { e.printStackTrace(); } finally {// Finally unlock lock.unlock(); }}Copy the code

3. Blocking queue mode

Using blocking queues eliminates the need for manual locking

String data = null; boolean retValue; while (FLAG) { data = atomicInteger.incrementAndGet() + ""; //++i retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); If (retValue) {system.out.println (thread.currentThread ().getName() + "\t" + "insert queue" + data + "success "); } else {ystem.out.println(thread.currentThread ().getName() + "\t" + "insert queue" + data + "failed "); } TimeUnit.SECONDS.sleep(1); } system.out.println (thread.currentThread ().getName() + "\tFLAG==false, stop production "); }Copy the code

Thread pools

Concept: Thread pools control the number of running threads, place pending tasks on a wait queue, and then create threads to execute those tasks. If the maximum number of threads is exceeded, wait.

Advantages:

  1. Thread reuse: no need to keep new threads, reuse the created threads to reduce the overhead of thread creation and destruction, save system resources.
  2. Improved response time: Instead of creating new threads, use threads from the thread pool when a task is reached.
  3. Manage threads: You can control the maximum number of concurrent requests, control the creation of threads, etc.

System: Executor→ExecutorService→AbstractExecutorService→ThreadPoolExecutor ThreadPoolExecutor is the core class for thread pool creation. Exector, like the Arrays and Collections utility classes, has its own Executors class.

There are three creation methods

NewFixedThreadPool: Implemented using LinkedBlockingQueue, a thread pool of fixed length.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code

NewSingleThreadExecutor: Implemented using LinkedBlockingQueue, only one thread per pool.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

Copy the code

NewCachedThreadPool: A variable thread pool implemented using SynchronousQueue.

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                 60L, TimeUnit.SECONDS,
                                 new SynchronousQueue<Runnable>());
}

Copy the code

1. Parameters for creating a thread pool

Understanding: Thread pool creation parameters, like a bank.

CorePoolSize is like a bank’s “duty window,” for example, today there are two tellers handling customer requests (tasks). If there are more than two customers, the new customer will wait in the waiting area (workQueue). When the waiting area is also full, the “overtime window” will be opened to allow the other 3 tellers to work overtime. At this time, the maximum window size is 5. If all Windows are open and the waiting area is still full, the “rejection policy” handler should be activated to tell the incoming customers not to enter because the waiting area is full. Due to no more influx of new customers, the number of finished customers increased, the window began to idle, at this time through keepAlivetTime to cancel the three extra “overtime window”, restore to the two “duty window”.

2. Reject policy of the thread pool

When the waiting queue is full and the maximum number of threads is reached, the rejection policy needs to be started when new tasks arrive. The JDK provides four rejection policies, namely.

  1. AbortPolicy: the default strategy, direct selling RejectedExecutionException abnormalities, prevent normal operation of system.
  2. CallerRunsPolicy: Neither throws an exception nor terminates the task, but returns the task to the caller.
  3. DiscardOldestPolicy: Discards the longest waiting task in the queue and then adds the current task to the queue to try to submit the task again.
  4. DiscardPolicy: Discards the task without doing any processing.

Which thread pool is used in the actual production?

Single, variable, fixed length are not used!

The reason for this is that both FixedThreadPool and SingleThreadExecutor are underwritten with LinkedBlockingQueue, which has a maximum length of integer.max_value, which obviously results in OOM. So the actual production typically customizes its own thread pool with seven parameters of ThreadPoolExecutor.

ExecutorService threadPool=new ThreadPoolExecutor(2,5, 1L, timeunit. SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());Copy the code