Concurrent programming

A concept,

1. Three elements of concurrent programming

  • Atomicity: Operations either all succeed or all fail
  • Orderliness: Processes run in code order
  • Visibility: When multiple threads access the same variable, one thread makes changes to the variable, and the other threads immediately get the latest value

2. Lock classification

  • Pessimistic locking

    • The data is locked before it can be modified, causing the thread to block

    • Ensure that only one thread can access a particular code segment at a time

    • The actual case

      • synchronized

        • Waiting for the strategy

          • If you can’t get the lock, spin first, spin still can’t get the lock, and then block
        • The principle of

          • Lock is an object
            • There is an internal state variable (flag bit) that records whether the lock is being held by a thread
              • 0: no threads are occupied
              • 1: a thread is occupied
            • If the lock is occupied by a Thread, record the Thread ID
            • Maintains a List of Thread ids that are waiting to acquire the lock (the lock pool). After the current Thread releases the lock, a Thread is taken from the List to continue running
          // They are equivalent
          
          public void synchronized method1(a) {}public void method1(a) {
            synchronized(this) {}}Copy the code
          // They are equivalent
          public static void synchronized method2(a) {}public static void method2(a) {
            synchronized(MyClass.class) {
            }
          }
          Copy the code
      • ReentrantLock

  • Optimistic locking

    • Version checking is only done when the data is submitted for update, and the thread is not blocked

    • The actual case

      • CAS: If the original value is expected to match the memory value, the CPU automatically updates the location to the new value, returning true; Otherwise, no operation is done and false is returned

        Compare And Swap

      • AtomicInteger: internally implements CAS, spin, and volatile

3. The synchronized modifier

  • The code block
    • Scope: code enclosed in braces {}
    • Action object: The object that calls this code block
  • methods
    • Scope: The whole method
    • Action object: The object on which this method is called
  • A static method
    • Scope: The entire static method
    • Objects: All objects of this class
  • class
    • Scope: the code enclosed in {} of the class
    • Objects: All objects of this class

Second, the thread

  • All Java processes have a Main thread

  • All threads have priority, 5 by default, up to 10 – usually the higher priority runs before the lower priority thread

  • classification

    • Daemon thread: Usually exists in an application as a garbage collector or cache manager, running auxiliary tasks

    • Non-daemons [default] : Application ends when all non-daemons stop running – whether or not the daemons are running

      When no non-daemon thread exists, the process terminates, and the daemon thread terminates with it

  • state

    A thread can only be in one of these states at any given time

    • NEW: The Thread object is created, but not yet running

    • RUNNABLE: Thread object is running

    • BLOCKING: The Thread object is BLOCKING

      Heavyweight blocking, cannot be interrupted interrupt() -synchronized

    • WAITING: Thread object is WAITING for action from another Thread

    • TIME_WAITING: Thread object is waiting for action from another Thread, but there is a time limit

      WAITING, TIME_WAITING: lightweight block that can be interrupted interrupt() -wait (), sleep(), join(), park()

    • TERMINATED: Thread object is running

  • Commonly used method

    • information
      • GetId () : Gets the identifier of the Thread object, a positive integer that is unique and unchangeable throughout the lifetime of the Thread
      • GetName ()/setName() : Gets/sets the String name of the Thread object
      • GetPriority ()/setPriority() : Gets/sets the priority of the Thread object
      • IsDaemon ()/setDaemon() : Gets/sets whether the Thread object is a daemon Thread
      • GetState () : Gets the current state of the Thread object
    • Interrupt () : Wakes up lightweight blocking and marks the target thread with an interrupt if the thread is running a method that declares InterruptedException – wait(), join(), sleep()
    • Interrupted () : Returns whether the target thread has an interrupt flag and clears the interrupt flag
    • Isinterrupted () : Returns whether the target thread has an interrupt flag, but does not clear the interrupt flag
    • Sleep (ms) : The thread pauses for ms time without releasing the lock
    • Join () : calls another thread to run until the other thread finishes running, and then runs again
    • Did not check setUncaughtExceptionHandler () : create unusual controller
    • CurrentThread () : Returns the Thread object that actually runs the code
  • Create a way

    • Thread class inheritance

      public class MyThread extends Thread { 
      	@Override
      	public void run(a) {}}public class Main {
        public static void main(String[] args) {
          MyThread thread = newMyThread(); thread.start(); }}Copy the code
    • Implement the Runnable interface

      public class MyRunnable implements Runnable {
        @Override
        public void run(a) {}}public class Main {
        public static void main(String[] args) {
          Thread thread = new Thread(newMyRunnable()); thread.start(); }}Copy the code
    • Implement Callable interface – can get thread return value

      public class MyCallable implements Callable<String> {
        @Override
        public String call(a) throws Exception {
          Thread.sleep(5000);
          return "hello world call() invoked!"; }}public class Main {
        public static void main(String[] args) {
          MyCallable myCallable = new MyCallable();
          // Set the Callable object. The generic type represents the return type of Callable
          FutureTask<String> futureTask = new FutureTask<String>(myCallable);
          // Start the processing thread
          new Thread(futureTask).start();
          // Synchronously wait for the result of the thread running
          String result = futureTask.get();
          // Get the result after 5sSystem.out.println(result); }}public class Main2 {
        public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5.5.1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) {
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
              // If there is an error during the call method run, it can be handled here
              super.afterExecute(r, t); }}; Future<String> future = executor.submit(newMyCallable()); String s = future.get(); System.out.println(s); executor.shutdown(); }}Copy the code
  • communication

    • Object class function

      • Used with synchronized
      • Wait () : blocks the current thread, releases the lock, and adds the thread to the wait pool
      • Notify () : Randomly wakes up a thread waiting for a synchronized code block lock and adds the thread to the lock pool
      • NotifyAll () : Wakes up all threads waiting for a synchronized block lock and adds the thread to the lock pool

      ○ Wait pool: Threads in the pool cannot compete for locks

      ○ Lock pool: Threads in a pool can compete for locks

    • Condition

      • Use with Lock

      • Await () : blocks the current thread and releases the lock and adds the thread to the wait pool

      • Signal () : Precisely wakes up a blocked class of threads and adds the thread to the lock pool

Third, concurrent

1. The concept

  • Concurrency: Running multiple tasks on a single-core processor

  • Parallelism: Running multiple tasks simultaneously on different computers or processors at the same time

  • Critical section: a piece of code that can only be run by one task at a given time

  • Immutable object: after initialization, its visual state (property value) cannot be modified – thread-safe

    If you want to modify, you must create a new object

  • Atomic operations: operations that either succeed or fail can be implemented through critical sections

  • The atomic variable

    • The operation of setting or obtaining values by atomic operations
    • implementation
      • Synchronization mechanism
      • CAS: No lock is required and no synchronization mechanism is required

2. The synchronous

  • Control synchronization: The start of a task depends on the end of another task
  • Data access synchronization: When multiple tasks simultaneously access a shared variable, only one task can access the variable at any time
  • mechanism
    • A semaphore
      • A variable that holds the number of available resources, managed using two atomic operations
        • To obtain
        • The release of
      • Mutex (special semaphore)
        • state
          • free
          • Busy: Only tasks whose state is set to busy can be released
    • Monitor: Has one mutex, one condition variable, two operations (wait, notify) – once the condition is notified, only one of the tasks waiting for it will continue to run

3. Mission communication

  • Shared memory: Tasks read and write using the same memory area – to avoid problems, access to shared memory must be done in the critical segment
  • The messaging

Problem 4.

  • Data race: Multiple tasks write to a shared variable outside of a critical segment
  • A deadlock
    • One thread waits for another thread to release a shared resource, which in turn waits for the other thread to release another shared resource
    • Condition Coffman occurs
      • Mutually exclusive: Deadlocks involve resources that are not shareable
      • Hold and wait: A thread holds a mutex resource and requests another mutex resource without releasing the resource while it waits
      • Inalienable: Resources can only be released by thread holders
      • Loop waiting: Thread 1 waits for a resource occupied by thread 2, thread 2 waits for a resource occupied by thread 3… , thread N waits for the resource occupied by thread 1
    • Avoid way
      • Ignore: Assume that the application does not deadlock. If a deadlock occurs, restart the application
      • Detection: Create a task dedicated to analyzing application state. If a deadlock is detected, take action to fix it – terminate the thread and force the release of resources
      • Prevention: Avoid one or more Coffman conditions
      • Circumvent: All resources used by the thread are obtained before the thread starts
  • Live lock
    • Two threads in an application always change their state because of the behavior of the other, causing both to be trapped in a state change loop and unable to move down
  • Insufficient resources
    • Resolution: To ensure fairness, all threads waiting for the resource must occupy the resource within a given time
  • Priority reversal
    • The lower priority thread holds the resources required by the higher priority thread, priority inversion occurs, and the lower priority thread runs before the higher priority thread

4. Memory model JMM

Java Memory Model

1. Memory visibility

  • Cause of the problem
    • L1 and L2 caches are private to cpus, while L3 caches are common to cpus. However, due to THE CPU cache consistency protocol MESI, there is no memory visibility problem between multi-core cpus. However, due to the high performance loss caused by the cache consistency protocol, many buffers are added
    • L1, L2, and L3 are synchronous with main memory, but Store Buffer and Load Buffer are asynchronous with L1 – each logical CPU has its own cache, and the cache and main memory are not fully synchronized, causing the thread to run in reverse order from the point of view of other threads

2. The reorder

  • type
    • Compiler: Statements that have no sequential dependencies can be reordered by the compiler
    • CPU instruction: Parallel execution of multiple instructions without dependencies
    • CPU memory: CPU instructions are not executed in exactly the same order as they are written to main memory – causing memory visibility problems

3. Memory barrier

  • Tell the compiler not to reorder instructions
  • species
    • LoadLoad: disables read and read reordering
    • StoreStore: Disables write and write reorder
    • LoadStore: disables read and write reordering
    • StoreLoad: Disables write and read reordering
  • use
    • The volatile keyword
    • The Unsafe class
      • loadFence : LoadLoad + LoadStore
      • storeFence : StoreStore + LoadStore
      • fullFence : LoadFence + StoreFence + StoreLoad

4. as-if-serial

The results don’t change, and the code appears to run completely sequentially from beginning to end

  • Single-thread reordering rules
    • The result of a single-threaded run cannot be changed – there are no data dependencies between operations and you can reorder them at will
  • Multithreaded reordering rules
    • Guarantee as-if-serial semantics for each thread

5. happen-before

  • A happen-before B
    • The result of A’s operation is visible to B
    • It doesn’t mean A has to run before B
  • The JMM commitment
    • Operation in a single thread, happen-before any subsequent operation in the corresponding thread – as-if-serial semantics guarantee
    • Writes to volatile variables happen-before corresponding to subsequent reads of that variable – Volatile variables cannot be reordered, and non-volatile variables can be arbitrarily reordered
    • Synchronized unlock, happen-before corresponding to the subsequent lock of the lock
    • Write final variables, happen-before read corresponding objects, happen-before read final variables
  • transitivity
    • A happen-before B,B happen-before C,则 A happed-before C

6. volatile

  • function
    • 64-bit write atomicity
    • Ensure memory visibility
    • Disallow reordering

Concurrent containers

BlockingQueue is the interface, and the other classes are implementation classes

1. BlockingQueue

  • Queue with blocking function

  • Blocks the sender when the queue is full

  • When the queue is empty, block the consumer

  • methods

    • Add: non-blocking, throws an exception when the queue is full

    • Remove: non-blocking, throws an exception when the queue is empty

    • Offer: non-blocking, returns false when the queue is full

    • Poll: non-blocking and returns null if the queue is empty

    • Put: Blocks when the queue is full

    • Take: Blocks when the queue is empty

  • ArrayBlockingQueue

    • Array implementation of the ring queue, need to pass in the array capacity

    • implementation

      • 1 lock already

      • There are two conditions

        • notEmpty

        • notFull

  • LinkedBlockingQueue

    • A blocking queue for a one-way list
    • Default space size integer.max_value
    • implementation
      • 2 lock already
        • takeLock
        • putLock
      • There are two conditions
        • notEmpty
        • notFull
  • PriorityQueue

    • Unqueue according to the priority size of the element
    • The default space size is 11. If the space size exceeds the default size, the system automatically expands the space
    • implementation
      • 1 lock already
      • 1 Condition Condition – notEmpty
  • DelayQueue

    • Exits the queue in ascending order of delay time

      Delay time: the time to run in the future – the current time

    • When the queue is empty, the heap-top element delay time is not blocked

    • implementation

      • 1 lock already
      • 1 Condition – available
  • SynchronousQueue

    • There is no capacity
    • When the thread calling put() blocks, both threads are unlocked at the same time until another thread calls take()
    • model
      • Fair mode: first come, first match
      • Unfair mode: Last come first match
    • implementation
      • Singly linked list
      • When the PUT and Take nodes meet, the queue exits

2. BlockingDeque

Deque: Double End Queue

  • A blocked two – end queue interface

  • Inherit BlockingQueue, Deque

  • LinkedBlockingDeque

    • implementation
      • Two-way linked list
      • 1 lock already
      • There are two conditions
        • notEmpty
        • notFull

3. CopyOnWrite

  • When writing, the data is not written directly, but a copy of the data is modified, and then written back through optimistic or pessimistic locking – [purpose] read without locking

  • CopyOnWriteArrayList

  • CopyOnWriteArraySet

    • Set is implemented using Array to ensure that elements are not duplicated
    • Internal wrapper CopyOnWriteArrayList

4. ConcurrentLinkedQueue/Deque

  • Based on a bidirectional linked list, CAS operations are performed on the head and tail to implement queue entry and queue exit

5. ConcurrentHashMap

  • The key disorderly
  • The first Node is of Node type, and the next is a linked list
  • If the head node is of type TreeNode, the following is a red-black tree
  • Lock each head node
  • The initial array length is 16
  • When the array length is less than 64, the linked list is not converted to a red-black tree, but expanded directly
  • When the list element exceeds 8, the list is converted to a red-black tree

6. ConcurrentSkipListMap

  • The key is sortable, based on SkipList

7. ConcurrentSkipListSet

  • The key is sortable, based on SkipList

Six, synchronization tools

1. Semaphore

  • Concurrent access control of the number of resources
  • When the initial number of resources is 1, degenerate into rows of other locks
  • There are fair and unfair

2. CountDownLatch

  • Wait for multiple Worker threads to run before exiting
  • There is no such thing as fair or unfair

3. CyclicBarrier

  • Coordinate multiple threads to run operations synchronously
  • Can be reused
  • You can wake up all blocked threads by breaking breakBarrier() in response and reset count to its initial value
  • You can set the callback method to run once when the thread completes batch

4. Exchanger

  • Used to exchange data between threads

5. Phaser

  • Wakes up the main thread when the number of unreached threads decreases to 0

  • Used in place of CyclicBarrier and CountDownLatch

  • Wait for everyone to get to that sync point

  • You can dynamically adjust the number of threads to synchronize at run time

  • Multiple phasers can form a tree structure

    • A Phaser knows its parent node but not its children, so operations on the parent node are performed through the children
    • When the number of registered participants in the child Phaser equals zero, the parent node is automatically unregistered

Seven, Atomic classes

1. AtomicInteger, AtomicLong

  • Adding and subtracting integers requires synchronized or Atomic classes if thread-safe

  • Internal use CAS, spin, volatile semantics implementation

    ○ Purpose CAS and Spin: Reduce the lock range

    ○ [Purpose] Volatile: Run results are visible between multiple threads

2. AtomicBoolean, AtomicReference

  • Implement atomicity when Compare and Set are combined

3. AtomicStampedReference, AtomicMarkableReference

  • Solve ABA problems – compare not only values, but also version numbers

    Change from A to B, and then from B to A

  • An AtomicMarkableReference is similar to an AtomicStampedReference, except that the version number in an AtomicMarkableReference is a Boolean rather than an integer

4. AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater

  • Resolve existing classes that cannot change their source code to implement atomic operations on their member variables
  • Restriction: Member variables must be the base type of volatile, not the wrapper type

5. AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray

  • Implement atomic operations on an element in an array

6. Striped64, LongAdder, LongAccumulator, DoubleAdder, DoubleAccumulator

  • Split variables into multiple variables to improve performance over Atomic
  • Accumulator is similar to Adder but more powerful

Condition, Lock

1. Condition

  • It is an interface and needs to be used with Lock

    Synchronized can be used with wait() and notify()

  • The Condition itself still has only one lock (only one thread can run at a time), but it can precisely notify the blocked thread

  • Block the thread with the await() method

  • Using the signal() method, the blocked thread is notified to run

  • Realize the principle of

    • Queues are formed using bidirectional linked lists to maintain blocked threads
  • use

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
    			implements BlockingQueue<E>, java.io.Serializable {
      
      / /...
      final Object[] items;
      int takeIndex;
      int putIndex;
      int count;
      
      // One lock + two conditions
      final ReentrantLock lock;
      private final Condition notEmpty;
      private final Condition notFull;
      
      public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
        	throw new IllegalArgumentException();
        this.items = new Object[capacity];
        Create a lock with two conditions in the constructor
        lock = new ReentrantLock(fair);
        Create a lock with two conditions in the constructor
        notEmpty = lock.newCondition();
        Create a lock with two conditions in the constructor
        notFull = lock.newCondition();
      } 
      
      public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          while (count == items.length)
            // The non-full condition is blocked. The queue capacity is full
            notFull.await();
          enqueue(e);
        } finally{ lock.unlock(); }}private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        
        // put data terminates, notifying the consumer of a non-null condition
        notEmpty.signal();
      } 
      
      public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          while (count == 0)
            // Block in non-empty condition, number of queue elements is 0, cannot consume
            notEmpty.await();
          return dequeue();
        } finally{ lock.unlock(); }}private E dequeue(a) {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] ! = null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if(itrs ! =null)
        	itrs.elementDequeued();
        
        // Consume successfully, notify not full condition, there is space in queue, can produce element.
        notFull.signal();
        
        return e;
      } 
      // ...
    }
    Copy the code

2. ReentrantLock

  • Mechanism to prevent two threads from reading or writing to a common resource at the same time

  • Read mutually exclusive, read and write mutually exclusive, write mutually exclusive

  • Support Condition

  • concept

    • reentrant
      • When a thread obtains the lock by calling Object.lock (), it still obtains the lock by calling object.lock() again
      • In general, locks must be designed to be reentrant, otherwise deadlocks will occur
      • synchronized
    • fair
      • A new thread comes in, sees a lot of threads queuing up, and goes to the end of the queue
    • Not fair
      • When a new thread arrives, it goes straight to the lock
      • 【 Objective 】 To improve efficiency and reduce thread switching
  • The implementation class

    • NonfairSync [default] – Unfair lock
    • FairSync – Fair lock
  • Realize the principle of

    • AbstractOwnableSynchronizer

      • Records which thread currently holds the lock
    • AbstractQueuedSynchronizer(AQS)

      • A state variable that records the lock state and uses CAS to ensure thread safety. The value of state can be greater than 1 because reentrant locks are supported
    • A thread-safe lockless queue maintains all blocked threads – using a bidirectional linked list + CAS implementation

    • The underlying support for a thread to block or wake up operations

      • The Unsafe class provides park() blocking and unpark() awakening

        Unpark (Thread) wakes up a thread precisely, and notify() only wakes up a thread

3. Read/write lock ReadWriteLock

  • Reading is not mutually exclusive, reading is mutually exclusive, writing is mutually exclusive

  • If the pessimistic read policy is adopted, after the first thread obtains the read lock, the second and third threads can still obtain the read lock. As a result, the writer thread cannot obtain the lock all the time, causing the writer thread to starve to death

  • The implementation class

    • ReadLock – Condition is not supported
    • WriteLock – Supports Condition
  • Realize the principle of

    • Using a lock, threads are divided into reader and writer threads
    • The reader thread and the writer thread are not mutually exclusive (they can get the lock at the same time), the reader thread is not mutually exclusive, the writer thread is mutually exclusive
    • When the state! If = 0, either a thread holds a read lock or a thread holds a write lock. Both cannot be true because read and write are mutually exclusive
  • use

    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    readLock.lock();
    // Perform the read operation
    readLock.unlock();
    Lock writeLock = readWriteLock.writeLock();
    writeLock.lock();
    // Write operations
    writeLock.unlock();
    Copy the code

4. StampedLock

  • Reading is not mutually exclusive, reading is not mutually exclusive, writing is mutually exclusive

  • An optimistic read strategy is adopted, which does not lock the read. When reading data, it is found that the data is modified and then upgraded to pessimistic read to reduce the status of the read and prevent the writer thread from starving to death

  • Realize the principle of

    • Make a snapshot of the data state and copy it to memory before reading
  • use

    • Before reading, the version number is compared again. If the version number changes, it indicates that other threads have modified the data during reading. Then the read data is discarded, the pessimistic read lock is acquired again, and the data is read again
    class Point {
      
    	private double x, y;
    	private final StampedLock sl = new StampedLock();
      
    	// Multiple threads call this method, changing the values of x and y
    	void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
          x += deltaX;
          y += deltaY;
        } finally{ sl.unlockWrite(stamp); }}// Multiple threads call this method to find the distance
    	double distenceFromOrigin(a) {
    
      // Use optimistic reading
      long stamp = sl.tryOptimisticRead();
      // Copy the shared variable to the thread stack
      double currentX = x, currentY = y;
        
      // Other threads are modifying data during the read
      if(! sl.validate(stamp)) {// Read dirty data, discard it.
        // Re-use "pessimistic reading"
        stamp = sl.readLock();
        try {
          currentX = x;
          currentY = y;
        } finally{ sl.unlockRead(stamp); }}return Math.sqrt(currentX * currentX + currentY * currentY);
    }
    Copy the code

ThreadPool, Future

1. The thread pool

  • The principle of

    • The caller constantly submits tasks to a task queue in a thread pool, which has a group of threads constantly fetching tasks from the task queue to run the producer-consumer model
    • Use blocking queues to manage tasks and wake up threads
  • Inheritance relationships

  • The implementation class

    • ThreadPoolExecutor
      • parameter
        • CorePoolSize: Number of threads maintained at all times
        • MaxPoolSize: This value is used to extend the thread if corePoolSize is full and the queue is full
        • KeepAliveTime: specifies how long the threads in maxPoolSize are idle to be destroyed. The total number of threads shrinks back to corePoolSize
        • TimeUnit: Indicates the keepAliveTime unit
        • BlockingQueue: The type of task queue to use
        • ThreadFactory: thread creation factories – [default] Executors. DefaultThreadFactory ()
        • rejectedExecutionHandler
          • Reject policy when the number of threads reaches maxPoolSize and blockingQueue is full
          • strategy
            • AbortPolicy [default] : Throws an exception
            • CallerRunsPolicy: The caller runs directly in its own thread
            • DiscardPolicy: Discards tasks
            • DiscardOldestPolicy: Discards the earliest task in the task queue
    • ScheduledThreadPoolExecutor
      • Can delay running tasks – by DelayQueue
      • You can run tasks periodically
        • ScheduleAtFixedRate () : Runs tasks at a fixed frequency, regardless of the task running time
        • ScheduleWithFixedDelay (1) : Runs tasks at a fixed frequency, which is related to the running time of the task
        • [Principle] After periodically running a task, the task is thrown back to the task queue
  • Task Submission Process

    1. Check whether the current number of threads is smaller than corePoolSize, if smaller, create a new thread
    2. Check whether the queue is full, if not, put it into the queue
    3. Determine if the current number of threads is smaller than maxPoolSize, if smaller, create a new thread, if larger, reject the task according to the reject policy – if the task queue is unbounded, it will never get to this step
  • Correct Closing steps

    1. Call Executor’s shutdown() or shutdownNow() methods

      ○ Shutdown () : interrupts idle threads but does not clear the task queue

      ○ shutdownNow() : Clears the task queue and interrupts all threads

    2. The loop calls Executor’s awaitTermination() method – to determine whether the thread pool has reached its final state

    // executor.shutdownNow();
    executor.shutdown();
    try {
    	boolean flag = true;
    do {
    	flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
    } while (flag);
    } catch (InterruptedException e) {
    	// ...
    }
    Copy the code
  • use

    • public class ThreadPoolExecutorDemo {
        public static void main(String[] args) {
          ThreadPoolExecutor executor = new ThreadPoolExecutor(
          	3.5.1,
          	TimeUnit.SECONDS,
          	new ArrayBlockingQueue<>(3),
            // new ThreadPoolExecutor.AbortPolicy()
            // new ThreadPoolExecutor.CallerRunsPolicy()
            // new ThreadPoolExecutor.DiscardOldestPolicy()
            new ThreadPoolExecutor.DiscardPolicy()
        	);
          
          for (int i = 0; i < 20; i++) {
          	int finalI = i;
        		executor.execute(new Runnable() {
              @Override
              public void run(a) {
                
                System.out.println(Thread.currentThread().getId() + "[" + finalI + "] -- 开始");
                try {
                	Thread.sleep(5000);
                } catch (InterruptedException e) {
                	e.printStackTrace();
                } 
                	System.out.println(Thread.currentThread().getId() + "["+ finalI + "] -- end"); }});try {
              Thread.sleep(200);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          } 
          executor.shutdown();
        	boolean flag = true;
          
          try {
            do{ flag = ! executor.awaitTermination(1, TimeUnit.SECONDS);
              System.out.println(flag);
            } while (flag);
          } catch (InterruptedException e) {
          	e.printStackTrace();
          } 
          
          System.out.println("Thread pool closed successfully..."); System.out.println(Thread.currentThread().getId()); }}Copy the code
  • Executors utility class

    It is prohibited in the Alibaba development manual

    • Single-threaded thread pool: newSingleThreadExecutor()
    • Fixed number of thread pools: newFixedThreadPool()
    • For each request received, create a thread: newCachedThreadPool()
    • Single thread, the thread pool with periodic scheduling functions: newSingleThreadScheduledExecutor ()
    • Multithreaded, periodic thread pool: newSheduledThreadPool()

2. CompletableFuture

  • For submitting multithreaded tasks

  • use

    • Complete () : Used for multithreaded communication so that the result can be obtained with get()
    • Get () : Runs the submitted task until a result is returned
    public class CompletableFutureDemo {
      public static void main(String[] args) throws ExecutionException,InterruptedException {
        
        CompletableFuture<String> future = new CompletableFuture<>();
        
        new Thread(() -> {
          try {
          	Thread.sleep(1000);
          } catch (InterruptedException e) {
          	e.printStackTrace();
          } 
          future.complete("hello world");
        }).start();
        
        System.out.println("In obtaining the results...");
        String result = future.get();
        System.out.println("Results obtained:"+ result); }}Copy the code
    • RunAsync (Runnable) : Submits a task with no return value
    public class CompletableFutureDemo2 {
      public static void main(String[] args) throws ExecutionException,InterruptedException {
        
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
          try {
            Thread.sleep(2000);
            System.out.println("Mission completed.");
          } catch(InterruptedException e) { e.printStackTrace(); }});// Block and wait for the task to complete
        voidCompletableFuture.get();
        System.out.println("Process completed"); }}Copy the code
    • SupplyAsync (Supplier) : Submit a task with a return value
    public class CompletableFutureDemo3 {
      public static void main(String[] args) throws ExecutionException,InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
          @Override
          public String get(a) {
            try {
              TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
              e.printStackTrace();
            } 
            return "Here are the results."; }}); String result = future.get(); System.out.println("Task Execution Result:"+ result); }}Copy the code
    • ThenRun (Runnable) : Runs the next task without any connection between the two tasks
    public class CompletableFutureDemo4 {
      public static void main(String[] args) throws ExecutionException,InterruptedException {
        CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
          try {
            Thread.sleep(5);
          } catch (InterruptedException e) {
            e.printStackTrace();
          } 
          return "Here are the results.";
        }).thenRun(() -> {
          try {
            Thread.sleep(2);
          } catch (InterruptedException e) {
            e.printStackTrace();
          } 
          System.out.println("Statement executed after completion of task execution");
        });
        // Block waiting for task execution to complete null
        voidCompletableFuture.get();
        System.out.println("Mission completed."); }}Copy the code
    • ThenAccept (Consumer) : The task can get the return value of the previous task, but has no return value of its own
    public class CompletableFutureDemo5 {
      public static void main(String[] args) throws ExecutionException,InterruptedException {
        
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
          
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          } 
          System.out.println("Return intermediate result");
          return "This is the intermediate result.";
        }).thenAccept((param) -> {
          try {
            Thread.sleep(2000);
          } catch (InterruptedException e) {
          	e.printStackTrace();
          } 
          System.out.println("The intermediate result above is obtained after the task is executed:" + param);
        });
        // Block and wait for the task to complete
        future.get();
        System.out.println("Mission completed."); }}Copy the code
    • ThenApply (Function) : The task retrieves the return value of the previous task and has a return value of its own
    public class CompletableFutureDemo6 {
      
      public static void main(String[] args) throws ExecutionException,InterruptedException {
        
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          } 
          System.out.println("Return intermediate result");
          return "abcdefg";
        }).thenApply(new Function<String, Integer>() {
            @Override
            public Integer apply(String middle) {
              try {
                Thread.sleep(2000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              } 
              System.out.println("Get the intermediate result, compute again and return.");
              returnmiddle.length(); }}); Integer integer = future.get(); System.out.println("The end result is:"+ integer); }}Copy the code
    • ThenCompose (Function) : nested CompletableFuture
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
      @Override
      public String get(a) {
      	return "hello world";
      }
    }).thenCompose(new Function<String, CompletionStage<Integer>>() {
      @Override
      public CompletionStage<Integer> apply(String s) {
      	return CompletableFuture.supplyAsync(new Supplier<Integer>() {
          @Override
          public Integer get(a) {
          	returns.length(); }}); }}); Integer integer = future.get(); System.out.println(integer);Copy the code
    • ThenCombine () : After the 2 CompletableFutures are complete, pass in the return values of the 2 CompletableFutures and do some additional things
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {
      @Override
      public String get(a) {
      	return "hello";
      }
    }).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {
      @Override
      public String get(a) {
      	return "lagou"; }}),new BiFunction<String, String, Integer>() {
      @Override
      public Integer apply(String s, String s2) {
      	returns.length() + s2.length(); }}); Integer result = future.get(); System.out.println(result);Copy the code
    • AllOf () : All CompletableFuture ends, returning Void
    • AnyOf () : Any CompletableFuture ends, returning type Object
    public class CompletableFutureDemo11 {
    
        private static final Random RANDOM = new Random();
        private static volatile int result = 0;
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture[] futures = new CompletableFuture[10];
    
            for (int i = 0; i < 10; i++) {
    
                int finalI = i;
    
                CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
                    @Override
                    public void run(a) {
                        try {
                            Thread.sleep(1000 + RANDOM.nextInt(1000));
                        } catch(InterruptedException e) { e.printStackTrace(); } result++; }}); futures[i] = future; } System.out.println(result);// for (int i = 0; i < 10; i++) {
            // futures[i].get();
            // System.out.println(result);
            // }
            // Integer allResult = CompletableFuture.allOf(futures).thenApply(new Function<Void, Integer>() {
            // @Override
            // public Integer apply(Void unused) {
            // return result;
            / /}
            // }).get();
            //
            // System.out.println(allResult);
            Integer anyResult = CompletableFuture.anyOf(futures).thenApply(new Function<Object, Integer>() {
                @Override
                public Integer apply(Object o) {
                    returnresult; } }).get(); System.out.println(anyResult); }}Copy the code

Ten, ForkJoin

  • Parallel computing frameworks that implement divide-and-conquer algorithms based on multiple threads – which can be viewed as stand-alone Versions of Map/Reduce – divide a large task into smaller tasks and combine the results of smaller tasks

  • This provides better computational load balancing and resource utilization than ThreadPoolExecutor

  • class

    • ForkJoinTask

      • RecursiveAction: No return value
      • RecursiveTask: Has a return value
      • methods
        • Compute () : Specifies the concrete business logic
    • ForkJoinPool

      • There is a global task queue, and each Worker thread has a local task queue

      • When a Worker thread finishes running its own queue, it steals tasks from other threads’ queues to run them

        • In case one thread is idle and another thread is busy
        • By itself, the Worker thread performs addition and subtraction operations on the top pointer at the head of the queue to realize the joining and dequeuing of tasks
        • Other Worker threads, at the tail of the queue, accumulate the base pointer to realize the task out of queue operation
          • Because it is multi-threaded, you need to operate through CAS
        • The task queue is circular. If top-base = queue. length-1, the queue is full and needs to be expanded
      • Use long variables to hold state

        • AC: maximum 16 bits, number of Active threads – Parallelism
        • TC: the second highest 16 bits, Total number of threads – Parallelism
        • ST: 1 bit. If it is 1, ForkJoinPool is being shut down
        • EC: 15 bits to block the wait count of the thread at the top of the stack
        • ID: 16 bits, ID of the thread at the top of the stack blocking
      • Block the Treiber Stack

        • With a chain – free table, the implementation of multiple threads blocking and wake up
        • When a Worker thread fails to steal any tasks, it enters the blocking stack
        • Wakes up the Worker thread and exits the blocking stack when there is a new task
      • Thread state

        • Idle: Put in blocking stack
        • Active: A ForkJoinTask is running, but is not blocked
        • Blocked: A ForkJoinTask is running, but is blocked while waiting for a subtask to complete
      • methods

        • fork()
          • Put the subtask into the task queue and run it
          • If called by a Worker thread, the task is placed in the local queue of the current thread
          • If it is called by another thread, the task is placed in a shared queue
        • join()
          • Wait for the subtask to complete and return the result of the subtask
          • Implementation: Each ForkJoinTask has multiple threads waiting for it to complete. Therefore, each ForkJoinTask is a synchronous object. When a thread calls join(), it blocks the ForkJoinTask and notifies all waiting threads when it is complete
        • Get () : gets the task result
        • Shutdown () : Rejects a new submitted task and ends ForkJoinPool
        • ShutdownNow () : Cancels tasks in existing global and local queues and wakes up all idle threads for automatic exit
  • Applicable scenario

    • Quick sort
    • Sum from 1 to n

11. Design mode

1. Single Threaded Execution

  • Run as one thread
  • Implemented with the synchronized keyword, unsafeMethod is protected so that it can only be accessed by one thread at a time
  • Usage scenarios
    • Multithreaded access to shared resources
    • Resource (collection) security needs to be ensured

2. Immutable

  • Ensure that the instance state does not change
  • Implemented using the final keyword
  • Usage scenarios
    • After the instance is created, the state does not change
    • Instances are shared and frequently accessed
  • The actual case
    • java.lang.String
    • java.math.BigInteger
    • java.math.Decimal
    • java.util.regex.Pattern
    • java.lang.Boolean
    • java.lang.Byte
    • java.lang.Character
    • java.lang.Double
    • java.lang.Float
    • java.lang.Integer
    • java.lang.Long
    • java.lang.Short
    • java.lang.Void

3. Guarded Suspension

  • When the daemon condition is not established, let the thread continue to wait

  • Use while, wait(), notify()/notifyAll()

  • public class RequestQueue {
    
        private final Queue<Request> queue = new LinkedList<>();
    
        public synchronized Request getRequest(a) {
            // daemon condition
            while (queue.peek() == null) {
                try {
                    // If the guard condition is not set, wait
                    wait();
                } catch(InterruptedException e) { e.printStackTrace(); }}return queue.remove();
        }
    
        public synchronized void putRequest(Request request) { queue.offer(request); notifyAll(); }}Copy the code

4. Balking

  • When the daemon condition is not established, processing is interrupted immediately

  • public class Data {
    
        private final String filename;
        private String content;
        private boolean changed;
    
        public Data(String filename, String content) {
            this.filename = filename;
            this.content = content;
        }
    
        public synchronized void change(String newContent) {
            this.content = newContent;
            this.changed = true;
        }
    
        public synchronized void save(a) throws IOException {
    
            // If the daemon condition is true, it runs normally
            if (changed) {
                doSave();
                changed = false;
            }
            // If the daemon condition is not true, the processing is interrupted
            else {
                System.out.println(Thread.currentThread().getName() + "No need to save"); }}private void doSave(a) throws IOException {
            System.out.println(Thread.currentThread().getName() + "Call doSave with the content:" + content);
            Writer writer = newFileWriter(filename); writer.write(content); writer.close(); }}Copy the code
  • Usage scenarios

    • If the daemon condition is not valid, no action is required
    • The daemon condition is only true on the first run

5. Producer-Consumer

  • The producer gives the data to the Channel, and the consumer gets the data from the Channel

  • There are multiple producers and consumers. When there is only one producer and consumer, the pattern is called Pipeline

  • public class Table {
    
        private final String[] buffer;
        private int tail;
        private int head;
        private int count;
    
        public Table(int count) {
            this.buffer = new String[count];
            this.head = 0;
            this.tail = 0;
            this.count = 0;
        }
    
        public synchronized void put(String steamedBread) throws
                InterruptedException {
            System.out.println(Thread.currentThread().getName() + "Steam it out" + steamedBread);
            while (count >= buffer.length) {
                wait();
            }
            buffer[tail] = steamedBread;
            tail = (tail + 1) % buffer.length;
            count++;
            notifyAll();
        }
    
        public synchronized String take(a) throws InterruptedException {
            
            while (count <= 0) {
                wait();
            }
    
            String steamedBreak = buffer[head];
            head = (head + 1) % buffer.length;
            count--;
            notifyAll();
            System.out.println(Thread.currentThread().getName() + "Take away" + steamedBreak);
            returnsteamedBreak; }}Copy the code
  • Threads have to coordinate things that are put in channels

  • Threads are mutually exclusive of what should be protected

  • The actual case

    • BlockingQueue: Blocks a queue
    • ArrayBlockingQueue: Array-based BlockingQueue
    • LinkedBlockingQueue: BlockingQueue based on a linked list
    • PriorityBlockingQueue: BlockingQueue with a priority
    • DelayQueue: BlockingQueue that can be taken after a specified time
    • SynchronousQueue: BlockingQueue that is always passed
    • ConcurrentLinkedQueue: Thread-safe BlockingQueue with no limit on the number of elements

6. Read-Write Lock

  • Multiple threads can read simultaneously, but cannot write while reading

  • While a thread is writing, other threads cannot read or write

  • Take advantage of the non-conflicting nature of thread read operations to improve performance

  • public class ReadWriteLock {
        
        private int readingReaders = 0;
        private int waitingWriters = 0;
        private int writingWriters = 0;
        private boolean preferWriter = true;
    
        /** * For read operations: * 1. If a writer is being written, the current thread waits for the writer to complete writing. * 2. If no writer is being written but there is a writer waiting to be written, and the write operation is biased to *, the current thread waits for the writer to complete * *@throws InterruptedException
         */
        public synchronized void readLock(a) throws InterruptedException {
            while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
                // The current thread waits
                wait();
            }
            readingReaders++;
        }
    
        public synchronized void readUnlock(a) {
            readingReaders--;
            preferWriter = true;
            // Wake up other threads waiting on the current object
            notifyAll();
        }
    
        public synchronized void writeLock(a) throws InterruptedException {
            waitingWriters++;
            try {
                // If there is a thread reading, or a thread writing, the current thread is waiting to wake up
                while (readingReaders > 0 || writingWriters > 0) { wait(); }}finally {
                waitingWriters--;
            }
            writingWriters++;
        }
    
        public synchronized void writeUnlock(a) {
            writingWriters--;
            preferWriter = false;
            // The current thread completes writing and wakes up other waiting threads to read and writenotifyAll(); }}Copy the code
  • The system of readLock() and writeLock() uses the system of “Guarded Suspension” mode

  • Usage scenarios

    • The read operation has heavy load
    • Read write less
  • The actual case

    • ReadWriteLock

7. Thread-Per-Message

  • Allocate one thread per request

  • process

    1. The Client submits a request to the Host

    2. Host enables the thread, which calls the Helper to process the request

  • public class Host {
        private final Helper helper = new Helper();
    
        public void request(final int count, final char c) {
            System.out.println("\t request: [" + count + "," + c + "Begin...");
            new Thread() {
                @Override
                public void run(a) {
                    helper.handle(count, c);
                }
            }.start();
            System.out.println("\t request: [" + count + "," + c + "] Over!!"); }}Copy the code
    public class Helper {
        
        public void handle(int count, char c) {
            System.out.println("\t\t treatment: [" + count + "," + c + "Begin...");
            for (int i = 0; i < count; i++) {
                slowly();
                System.out.print(c);
            }
            System.out.println("");
            System.out.println("\t\t treatment: [" + count + "," + c + "] Over!!");
        }
    
        private void slowly(a) {
            try {
                Thread.sleep(100);
            } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
  • Usage scenarios

    • Need for quick response
    • There is no order of operation required
    • No return value is required
  • The actual case

    • Thread class
    • The Runnable interface
    • ThreadFactory interface
    • The Executor interface
    • Executors class
    • The ExecutorService interface
    • ScheduledExecutorService class

8. Worker Thread

  • Worker threads retrieve work and process it one by one

  • Also known as the Thread Pool pattern

  • process

    1. The Client submits the task to the Channel
    2. Worker threads constantly fetch tasks from channels to run
  • public class Channel {
        
        private static final int MAX_REQUEST = 100;
        private final Request[] requestQueue;
        private int tail;
        private int head;
        private int count;
        private final WorkerThread[] threadPool;
    
        public Channel(int threads) {
            this.requestQueue = new Request[MAX_REQUEST];
            this.head = 0;
            this.tail = 0;
            this.count = 0;
            threadPool = new WorkerThread[threads];
            for (int i = 0; i < threadPool.length; i++) {
                threadPool[i] = new WorkerThread("Worker-" + i, this); }}public void startWorkers(a) {
            for (int i = 0; i < threadPool.length; i++) { threadPool[i].start(); }}public synchronized void putRequest(Request request) {
            while (count >= requestQueue.length) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }
            requestQueue[tail] = request;
            tail = (tail + 1) % requestQueue.length;
            count++;
            notifyAll();
        }
    
        public synchronized Request takeRequest(a) {
            while (count <= 0) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }
            Request request = requestQueue[head];
            head = (head + 1) % requestQueue.length;
            count--;
            notifyAll();
            returnrequest; }}Copy the code
    public class WorkerThread extends Thread {
    
        private final Channel channel;
    
        public WorkerThread(String name, Channel channel) {
            super(name);
            this.channel = channel;
        }
    
        @Override
        public void run(a) {
            while (true) { Request reqeust = channel.takeRequest(); reqeust.execute(); }}}Copy the code
  • Usage scenarios

    • Improved throughput
    • Controls the number of Worker threads
    • Call separated from run
  • The actual case

    • ThreadPoolExecutor
    • Executors

9. Future

  • Submit the task, first get a delivery card, after a period of time through the delivery card, get the task processing results

  • process

    1. The Client submits the task to the Host and returns FutureData
    2. Host generates a new thread, which runs the task on RealData and sets the result of the task to FutureData
    3. The Client accesses FutureData to obtain the task result
  • public class Host {
        
        public Data request(final int count, final char c) {
            
            System.out.println("\trequest(" + count + "," + c + ")");
            
            // Create the FutureData object
            final FutureData future = new FutureData();
            
            // Start a new thread to create a RealData object
            new Thread() {
                @Override
                public void run(a) {
                    RealData realData = new RealData(count, c);
                    future.setRealData(realData);
                }
            }.start();
            
            System.out.println("\trequest(" + count + "," + c + End ")");
            
            // Return bill of lading
            returnfuture; }}Copy the code
    public class FutureData implements Data {
        
        private RealData realData = null;
        private boolean ready = false;
        
        public synchronized void setRealData(RealData realData) {
            
            // balking, if ready, returns
            if (ready) {
                return;
            } 
            
            this.realData = realData;
            this.ready = true;
            notifyAll();
        } 
        
        @Override
        public synchronized String getContent(a) {
            // guarded suspension
            while(! ready) {try {
                    wait();
                } catch(InterruptedException e) { e.printStackTrace(); }}returnrealData.getContent(); }}Copy the code
    public class RealData implements Data {
    
        private final String content;
    
        public RealData(int count, char c) {
            System.out.println("\ t RealData assembly (" + count + "," + c + ")");
            char[] buffer = new char[count];
            for (int i = 0; i < count; i++) {
                buffer[i] = c;
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("T \ \ t RealData assembly (" + count + "," + c + End ")");
            this.content = new String(buffer);
        }
    
        @Override
        public String getContent(a) {
            returncontent; }}Copy the code
  • Usage scenarios

    • Improves responsiveness and requires task results

      Thread-per-message only improves the response, but does not capture the result of the task

  • The actual case

    • Callable
    • Future
    • FutureTask