This article continues our introduction to the series of high-quality Java concurrency with the 5W word series of High-quality Java concurrency tutorials (part 1).

The PDF download of this article is linked to concurrent-all-in-one

Examples of this article can be found at github.com/ddean2009/l…

Chapter 14 Daemon Threads in Java

There are two types of Threads in Java, User Threads and Daemon Threads.

User Threads are high-priority threads, and the JVM will wait for all User Threads to finish running.

Daemon Threads are low-priority threads that serve User threads. Because daemon Threads are of low priority and only serve user threads, the JVM automatically exits when all user threads are finished, regardless of whether any Daemon Threads are still running.

Because of this feature, we usually handle infinite loops in Daemon Threads because it does not affect user Threads.

Daemon Threads are not recommended for I/O operations.

Daemon Threads can also block JVM shutdowns due to improper operations, such as calling the join () method on a Daemon Thread.

Let’s see how to create a daemon thread:

public class DaemonThread extends Thread{

    public void  run(a){
        while(true){
            log.info("Thread A run");
            try {
                log.info("Thread A is daemon {}" ,Thread.currentThread().isDaemon());
                Thread.sleep(1000);
            } catch(InterruptedException e) { e.printStackTrace(); }}}public static void main(String[] args) throws InterruptedException {
        DaemonThread daemonThread = new DaemonThread();
        daemonThread.setDaemon(true); daemonThread.start(); }}Copy the code

Creating a Daemon thread is easy, just set its daemon property to true after it is created.

Note that setDaemon (true) must perform before thread start (), or sell IllegalThreadStateException

The above example will exit immediately.

If we set daemonThread.setdaemon (true); If the user Thread is not executed, the JVM will continue to run and not exit.

This is running inside main. What happens if we run inside @test?

public class DaemonThread extends Thread{

    public void  run(a){
        while(true){
            log.info("Thread A run");
            try {
                log.info("Thread A is daemon {}" ,Thread.currentThread().isDaemon());
                Thread.sleep(1000);
            } catch(InterruptedException e) { e.printStackTrace(); }}}@Test
    public void testDaemon(a) throws InterruptedException {
        DaemonThread daemonThread = newDaemonThread(); daemonThread.start(); }}Copy the code

We changed the main method to @test. After execution, we see that Test ends immediately, whether it is a daemon thread or not.

Let’s look at starting a user thread in a daemon thread:

public class DaemonBThread extends Thread{

    Thread worker = new Thread(()->{
        while(true){
            log.info("Thread B run");
            log.info("Thread B is daemon {}",Thread.currentThread().isDaemon()); }});public void  run(a){
            log.info("Thread A run");
            worker.start();
    }

    public static void main(String[] args) {
        DaemonBThread daemonThread = new DaemonBThread();
        daemonThread.setDaemon(true); daemonThread.start(); }}Copy the code

In this example, the daemonThread starts a user thread, and after we run it, we see that the JVM will immediately terminate execution even if there are user threads running.

Chapter 15 introduces and uses ThreadPools in Java

Thread Pool profile

In Java, Threads is the counterpart of the system’s Threads and is used to handle a range of system resources. On Both Windows and Linux, there is a limit to the number of threads that can be opened. If you create threads indefinitely in a Java application, you will run out of threads to create.

The CPU has a finite number of cores, and if there are multiple threads running at the same time, the CPU will cycle around and allocate a specific AMOUNT of CPU time to each thread based on its priority. So more threads aren’t always better.

In Java, there are two interfaces that represent managing ThreadPool: ExecutorService and Executor.

The typical steps for running threads are as follows: 1. Create an ExecutorService. 2. Submit tasks to the ExecutorService. 3.ExecutorService schedules threads to run tasks.

Let me draw a picture:

Let me show you how to use a ThreadPool in Java.

Executors, Executor and ExecutorService

Executors provides a series of easy methods to help build ThreadPool.

The Executor interface defines a method:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
Copy the code

ExecutorService takes over from Executor, providing additional thread pool operations. Is a complement to Executor.

Following the principle of separation of interface implementation, we usually use ExecutorService or Executor in Java code rather than concrete implementation classes.

Let’s see how to create an Executor and ExecutorService by following Executors:

        Executor executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> log.info("in Executor"));


        ExecutorService executorService= Executors.newCachedThreadPool();
        executorService.submit(()->log.info("in ExecutorService"));
        executorService.shutdown();
Copy the code

We’ll cover more details about ExecutorService here, but if you’re interested, refer to my previous detailed article on ExecutorService.

ThreadPoolExecutor

ThreadPoolExecutor is an implementation of the ExecutorService interface that can add finer configuration to the thread pool. Specifically, it can control these three parameters: CorePoolSize, maximumPoolSize, and keepAliveTime.

PoolSize is the number of threads in a thread pool. CorePoolSize is the minimum number of threads that can be initialized and held in a thread pool.

If there are too many threads currently waiting, maximumPoolSize can be set to provide the maximum number of thread pools that will create more threads for the task to execute.

KeepAliveTime is the amount of time an extra thread will wait for an unassigned task. After this time, the thread will be reclaimed by the thread pool.

Let’s see how to create a ThreadPoolExecutor:

        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
        threadPoolExecutor.submit(()->log.info("submit through threadPoolExecutor"));
        threadPoolExecutor.shutdown();
Copy the code

In the example above we create ThreadPoolExecutor using the constructor of ThreadPoolExecutor.

Typically, Executors have built in many implementations of ThreadPoolExecutor, so let’s take a look at the following example:

ThreadPoolExecutor executor1 =
                (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor1.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        log.info("executor1 poolsize {}",executor1.getPoolSize());
        log.info("executor1 queuesize {}", executor1.getQueue().size());
        executor1.shutdown();
Copy the code

In the case of our Executors. NewFixedThreadPool (2) to create a ThreadPoolExecutor.

In the example above we submitted three tasks. But our pool size is only 2. So there’s one more one that can’t be executed immediately and has to wait in the queue.

Let’s look at another example:

ThreadPoolExecutor executor2 =
                (ThreadPoolExecutor) Executors.newCachedThreadPool();
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });
        executor2.submit(() -> {
            Thread.sleep(1000);
            return null;
        });

        log.info("executor2 poolsize {}", executor2.getPoolSize());
        log.info("executor2 queue size {}", executor2.getQueue().size());
        executor2.shutdown();
Copy the code

The above example we use Executors. NewCachedThreadPool () to create a ThreadPoolExecutor. After running, we can see that poolsize is 3 and queue size is 0. This indicates that newCachedThreadPool automatically increases the pool size.

If thread is not activated within 60 seconds or so, it is withdrawn.

The Queue here is a SynchronousQueue, and because inserts and fetches are almost always done at the same time, the Queue size is almost always zero.

ScheduledThreadPoolExecutor

There is a very common ScheduledThreadPoolExecutor, it inherits from ThreadPoolExecutor, and implements the ScheduledExecutorService interface.

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService
Copy the code

Let’s see how it works:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        executor.schedule(() -> {
            log.info("Hello World");
        }, 500, TimeUnit.MILLISECONDS);
Copy the code

In the example above, we defined a scheduled task that will be executed after 500 milliseconds.

We also talked about ScheduledExecutorService before and there are two more methods that are very common:

  • ScheduleAtFixedRate – Start time interval.
  • ScheduleWithFixedDelay – Interval with end time.

CountDownLatch lock = new CountDownLatch(3);

        ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(5); ScheduledFuture<? > future = executor2.scheduleAtFixedRate(() -> { log.info("in ScheduledFuture");
            lock.countDown();
        }, 500.100, TimeUnit.MILLISECONDS);

        lock.await(1000, TimeUnit.MILLISECONDS);
        future.cancel(true);
Copy the code

ForkJoinPool

ForkJoinPool is a new framework introduced in Java 7 that will be explained in more detail in a later article. Here is a brief introduction.

ForkJoinPool is used to generate a large number of tasks to perform algorithmic operations. If you do it with threads, it consumes a lot of threads. This problem does not occur in fork/ Join frameworks.

In fork/join, any task can generate a large number of sub-tasks and then wait for the sub-tasks to finish by using join ().

Here’s an example:

static class TreeNode {
 
    int value;
 
    Set<TreeNode> children;
 
    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children); }}Copy the code

Define a TreeNode, then iterate over all the values and add them up:

public  class CountingTask extends RecursiveTask<Integer> {

    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute(a) {
        return node.value + node.children.stream()
                .map(childNode -> newCountingTask(childNode).fork()).mapToInt(ForkJoinTask::join).sum(); }}Copy the code

Here is the code called:

    public static void main(String[] args) {
        TreeNode tree = new TreeNode(5.new TreeNode(3), new TreeNode(2.new TreeNode(2), new TreeNode(8)));

        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        int sum = forkJoinPool.invoke(new CountingTask(tree));
    }
Copy the code

Chapter 16 fork Join Framework in Java

The Fork Join framework was introduced in Java 7 to improve parallel computing capabilities.

Fork join mainly consists of two steps. The first step is fork, which divides a large task into many small tasks. The second step is join, which joins the results of the first task to generate the final result. If no value is returned in the first step, the Join will wait until all small tasks have finished.

Remember when we talked about the basic structure of thread pools in the previous article?

  1. Executorservice-forkjoinpool is used to execute tasks.
  2. WorkerThread – ForkJoinWorkerThread is a workerThread that performs specific tasks.
  3. Task-forkjointask defines a task to be executed.

Let’s take a closer look at the fork Join framework from these three perspectives.

ForkJoinPool

ForkJoinPool is an implementation of the ExecutorService that provides easy management of worker threads and thread pools.

public class ForkJoinPool extends AbstractExecutorService 
Copy the code

A work thread can only handle one task at a time, but ForkJoinPool does not create a separate thread for each task. Instead, it uses a special data structure, double-ended Queue, to store tasks. Such structures can be used to facilitate work-stealing.

What is work-stealing?

By default, the Work thread fetches tasks from the queue header assigned to it. If the queue is empty, the work Thread fetches tasks from the tail of other queues or from the global queue. Such a design can make full use of the performance of work threads and improve the concurrency capability.

Now let’s see how to create a ForkJoinPool.

The most common method is to use ForkJoinPool.commonPool(). CommonPool () provides a common default thread pool for all ForkJoinTasks.

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Copy the code

Another way is to use the constructor:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
Copy the code

The argument here is the parallelism level, where 2 means that the thread pool will use 2 processor cores.

ForkJoinWorkerThread

A ForkJoinWorkerThread is a worker thread that is used in a ForkJoinPool.

public class ForkJoinWorkerThread extends Thread
}
Copy the code

Unlike normal threads, it defines two variables:

    final ForkJoinPool pool;                // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
Copy the code

One is the ForkJoinPool to which the worker thread belongs. The other is queues that support work-stealing.

Look again at its run method:

   public void run(a) {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception); }}}}Copy the code

The simplest way to do this is to fetch the task from a Queue.

ForkJoinTask

ForkJoinTask is the type of task that can be run in a ForkJoinPool. Typically, we use two subclasses of RecursiveAction and RecursiveTask.

They both define a Compute () method that needs to be implemented to implement specific business logic. The difference is that a RecursiveAction is only used to execute a task, whereas a RecursiveTask can have a return value.

Since both classes are Recursive, the implementation logic is also Recursive. Here’s an example of using RecursiveAction to print a string:

public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger =
            Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute(a) {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else{ processing(workload); }}private List<CustomRecursiveAction> createSubtasks(a) {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "+ Thread.currentThread().getName()); }}Copy the code

The above example uses dichotomy to print strings.

Let’s look at another RecursiveTask example:

public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute(a) {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .mapToInt(ForkJoinTask::join)
                    .sum();
        } else {
            returnprocessing(arr); }}private Collection<CustomRecursiveTask> createSubtasks(a) {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
                .filter(a -> a > 10 && a < 27)
                .map(a -> a * 10) .sum(); }}Copy the code

Very similar to the example above, except that we need a return value here.

Submit a Task in ForkJoinPool

With the above two tasks, we can submit them in ForkJoinPool:

int[] intArray= {12.12.13.14.15};
        CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

        int result = forkJoinPool.invoke(customRecursiveTask);
        System.out.println(result);
Copy the code

In the above example, we use invoke to commit and the Invoke will wait for the result of the task execution.

If we don’t use invoke, we can also replace it with fork () and join () :

customRecursiveTask.fork();
        int result2= customRecursiveTask.join();
        System.out.println(result2);
Copy the code

Fork () submits the task to the pool, but does not trigger execution; join() actually executes and returns the result.

Chapter 17 the use of CountDownLatch in Java Concurrency

In Java concurrency, controlling access to shared variables is very important, and sometimes we want to control the order of execution of concurrent threads, such as waiting for all threads to complete before executing another thread, or waiting for all threads to be ready before executing all threads.

At this point we can use CountDownLatch.

In a nutshell, CountDownLatch holds a counter placed in the QueuedSynchronizer. When the countdown() method is called, this counter is reduced by one. Then call await() to wait for the counter to zero.


private static final class Sync extends AbstractQueuedSynchronizer {... }private final Sync sync;

    public void countDown(a) {
        sync.releaseShared(1);
    }
Copy the code
    public void await(a) throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

Copy the code

Here are two examples:

The main thread waits for all the child threads to finish before starting to run

Here we define the child thread class, where we pass in a CountDownLatch to count, and then call the countDown method for that CountDownLatch before the child thread terminates. Finally, the await () method is called in the main thread to wait for the child thread to finish executing.

@Slf4j
public class MainThreadWaitUsage implements Runnable {

    private List<String> outputScraper;
    private CountDownLatch countDownLatch;

    public MainThreadWaitUsage(List<String> outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run(a) {
        outputScraper.add("Counted down"); countDownLatch.countDown(); }}Copy the code

See how to call:

    @Test
    public void testCountDownLatch(a)
            throws InterruptedException {

        List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch countDownLatch = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new MainThreadWaitUsage(outputScraper, countDownLatch)))
                .limit(5)
                .collect(toList());

        workers.forEach(Thread::start);
        countDownLatch.await();
        outputScraper.add("Latch released");
        log.info(outputScraper.toString());

    }
Copy the code

The result is as follows:

07:37:27.388 [main] INFO MainThreadWaitUsageTest - [Counted down, Counted down, Counted down, Counted down, Counted down, Counted down, Counted down Counted down, Latch released]Copy the code

Wait until all threads are ready to execute together

In the example above, we have the main thread waiting for the child threads, so in this example, we will see how the child threads wait together until they are ready and then execute together.

The idea is very simple: after the child thread starts, subtract the waiting child thread counter by one, await the counter in the main thread, wait for the counter to return to zero, and then tell the child thread to run.

public class ThreadWaitThreadUsage implements Runnable {

    private List<String> outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter;

    public ThreadWaitThreadUsage( List
       
         outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter)
        {

        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run(a) {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            outputScraper.add("Counted down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ completedThreadCounter.countDown(); }}}Copy the code

See how to call:

    @Test
    public void testCountDownLatch(a)
            throws InterruptedException {

        List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch readyThreadCounter = new CountDownLatch(5);
        CountDownLatch callingThreadBlocker = new CountDownLatch(1);
        CountDownLatch completedThreadCounter = new CountDownLatch(5);
        List<Thread> workers = Stream
                .generate(() -> new Thread(new ThreadWaitThreadUsage(
                        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
                .limit(5)
                .collect(toList());

        workers.forEach(Thread::start);
        readyThreadCounter.await();
        outputScraper.add("Workers ready");
        callingThreadBlocker.countDown();
        completedThreadCounter.await();
        outputScraper.add("Workers complete");

        log.info(outputScraper.toString());

    }
Copy the code

The following output is displayed:

07:41:47. 861. [the main] INFO ThreadWaitThreadUsageTest - [Workers ready, Counted down, Counted down, Counted down, Counted down, Counted down, Workers complete]Copy the code

Stop the await of CountdownLatch

If we call await (), the method will wait until count=0. However, if an exception occurs during thread execution, the countdown method may not execute. Then the await () method might wait indefinitely.

At this point we can use:

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
Copy the code

Chapter 18 the use of CyclicBarrier in Java

CyclicBarrier is a thread-safe component introduced in Java 5. It has a barrier concept, which waits for all threads to complete before performing specific operations.

If we have a number of threads, each thread has computed some data, and we need to wait for all threads to complete and then add up the data to the final result, we can use a CyclicBarrier.

The method of CyclicBarrier

Let’s look at the CyclicBarrier constructor:

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

        public CyclicBarrier(int parties) {
        this(parties, null);
    }
Copy the code

CyclicBarrier has two constructors, the first of which takes a single argument representing the number of threads that need to act in unison. The second argument, called barrierAction, indicates that starting the barrier is the method that needs to be executed.

The barrierAction is a Runnable where we can define the final work to be performed.

Take a look at the important await method:

    public int await(a) throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false.0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen}}public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
Copy the code

Await also has two methods, one with time parameters and one without time parameters.

Await essentially calls the lock.newcondition ().await() method.

Since there are multiple parties, let’s consider two cases.

  1. This thread is not the last to call await

In this case, the thread will enter the wait state until the following situation is sent:

  • The last thread calls await ()
  • Another thread interrupts the current thread
  • Other threads interrupt other waiting threads
  • Another thread timed out while waiting for the barrier
  • The reset () method called by other threads on the barrier

If the thread has been set in interrupted status while calling await () or is interrupted while waiting, InterruptedException is thrown and the interrupted status is cleared. (This is consistent with Thread interrupt().)

If any thread is in the wait state, the barrier is reset. A BrokenBarrierException will be thrown if the barrier is broken, either while the thread calls the await method or while waiting.

If any thread breaks while waiting, all other waiting threads will throw a BrokenBarrierException and the barrier state will be broken.

  1. If the thread is the last to call the await method

In this case, if the barrierAction is not empty, the thread will call the barrierAction before any other thread can continue.

If this operation throws an exception, the Barrier state will be set to broken.

Now look at the reset() method:

    public void reset(a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally{ lock.unlock(); }}Copy the code

This method sets the barrier to broken and starts a new generation for the next round of operations.

The use of the CyclicBarrier

We generate a random queue of integers in the child thread, and when all the threads have generated integers, we add them all up. Let’s see how we do that.

Define the child thread that generates the integer queue:

public class CyclicBarrierUsage implements Runnable {

    private CyclicBarrier cyclicBarrier;
    private List<List<Integer>> partialResults;
    private Random random = new Random();

    public CyclicBarrierUsage(CyclicBarrier cyclicBarrier,List<List<Integer>> partialResults){
        this.cyclicBarrier=cyclicBarrier;
        this.partialResults=partialResults;
    }

    @Override
    public void run(a) {
        String thisThreadName = Thread.currentThread().getName();
        List<Integer> partialResult = new ArrayList<>();

        // Crunch some numbers and store the partial result
        for (int i = 0; i < 10; i++) {
            Integer num = random.nextInt(10);
            System.out.println(thisThreadName
                    + ": Crunching some numbers! Final result - " + num);
            partialResult.add(num);
        }

        partialResults.add(partialResult);
        try {
            System.out.println(thisThreadName
                    + " waiting for others to reach barrier.");
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            // ...
        } catch (BrokenBarrierException e) {
            // ...}}}Copy the code

The child thread receives the cyclicBarrier passed in from the outside and saves the partialResults of the data, and calls cyclicBarrier.await() after the run to wait for the other thread to complete.

Take a look at CyclicBarrier construction:

CyclicBarrier cyclicBarrier=new CyclicBarrier(5,()->{
            String thisThreadName = Thread.currentThread().getName();

            System.out.println(
                    thisThreadName + ": Computing sum of 5 workers, having 10 results each.");
            int sum = 0;

            for (List<Integer> threadResult : partialResults) {
                System.out.print("Adding ");
                for (Integer partialResult : threadResult) {
                    System.out.print(partialResult+"");
                    sum += partialResult;
                }
                System.out.println();
            }
            System.out.println(thisThreadName + ": Final result = " + sum);
        });
Copy the code

In CyclicBarrier, we define a BarrierAction to do the summary processing of the final data.

Run:

        for (int i = 0; i < 5; i++) {
            Thread worker = new Thread(new CyclicBarrierUsage(cyclicBarrier,partialResults));
            worker.setName("Thread " + i);
            worker.start();
        }
Copy the code

The following output is displayed:

Spawning 5 worker threads to compute 10 partial results each
Thread 0: Crunching some numbers! Final result - 5
Thread 0: Crunching some numbers! Final result - 3
Thread 1: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 7
Thread 1: Crunching some numbers! Final result - 8
Thread 0: Crunching some numbers! Final result - 4
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 9
Thread 1: Crunching some numbers! Final result - 3
Thread 2: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 0
Thread 2: Crunching some numbers! Final result - 9
Thread 1: Crunching some numbers! Final result - 3
Thread 2: Crunching some numbers! Final result - 7
Thread 0: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 6
Thread 1: Crunching some numbers! Final result - 6
Thread 2: Crunching some numbers! Final result - 5
Thread 0: Crunching some numbers! Final result - 0
Thread 2: Crunching some numbers! Final result - 1
Thread 1: Crunching some numbers! Final result - 5
Thread 2: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 7
Thread 2: Crunching some numbers! Final result - 8
Thread 1: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 4
Thread 0 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 7
Thread 1: Crunching some numbers! Final result - 6
Thread 1: Crunching some numbers! Final result - 9
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 9
Thread 3: Crunching some numbers! Final result - 3
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 1
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 0
Thread 3: Crunching some numbers! Final result - 5
Thread 3: Crunching some numbers! Final result - 9
Thread 3: Crunching some numbers! Final result - 1
Thread 3 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 5
Thread 4: Crunching some numbers! Final result - 5
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 7
Thread 4: Crunching some numbers! Final result - 4
Thread 4: Crunching some numbers! Final result - 8
Thread 4: Crunching some numbers! Final result - 4
Thread 4: Crunching some numbers! Final result - 3
Thread 4 waiting for others to reach barrier.
Thread 4: Computing sum of 5 workers, having 10 results each.
Adding 5 3 7 4 6 9 0 2 0 7 
Adding 1 9 7 6 5 1 1 8 4 0 
Adding 1 8 3 3 6 5 2 7 6 9 
Adding 9 3 8 8 1 8 0 5 9 1 
Adding 2 2 5 5 3 7 4 8 4 3 
Thread 4: Final result = 230

Process finished with exit code 0
Copy the code

Chapter 19 Performance testing using JMH (Java Microbenchmark Harness) in Java

JMH is a Java Microbenchmark Harness used for performance testing in the Open JDK. The suite has been included in JDK 12.

This article will show you how to use JMH to do performance testing in Java.

If you’re not using JDK 12, add the following dependencies:

<dependency>
    <groupId>org.openjdk.jmh</groupId>
    <artifactId>jmh-core</artifactId>
    <version>1.19</version>
</dependency>
<dependency>
    <groupId>org.openjdk.jmh</groupId>
    <artifactId>jmh-generator-annprocess</artifactId>
    <version>1.19</version>
</dependency>
Copy the code

Perform performance tests using JMH

If we want to test the performance of a method, we typically repeat the method n times, figure out the total execution time, and then average it.

But there are usually problems with this, such as the first few executions of a program are usually slow because the JVM optimizes code that is executed multiple times. In addition, the statistical results obtained are not intuitive, so we need to analyze by ourselves.

These problems can be easily solved if JMH is used.

In JMH, the method to be tested can be annotated with @benchmark:

    @Benchmark
    public void measureThroughput(a) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(100);
    }
Copy the code

See how to call:

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(BenchMarkUsage.class.getSimpleName())
// .include(BenchMarkUsage.class.getSimpleName()+".*measureThroughput*")
                // Preheat 3 rounds
                .warmupIterations(3)
                // Measure 5 rounds
                .measurementIterations(5)
                .forks(1)
                .build();

        new Runner(opt).run();
    }
Copy the code

In the example above, we added the classes to be tested using the Include method of OptionsBuilder.

By default, all @benchmark methods of this class will be tested. If we want to test only one of these methods, we can add the name of the method after the class:

.include(BenchMarkUsage.class.getSimpleName()+".*measureAll*")
Copy the code

The code above supports wildcards.

WarmupIterations (3) means to warmup three times before you go through real iterations.

MeasurementIterations (5) indicates that we will run the method five times to test performance.

Forks (1) means to start a process to carry out the task.

Here is the basic run. let’s look at the result:

# VM version: JDK 1.8.0_171, VM 25.171-b11 # VM invoker: / Library/Java/JavaVirtualMachines jdk1.8.0 _171. JDK/Contents/Home/jre/bin/Java # VM options: -javaagent:/Applications/IntelliJ IDEA 2.app/Contents/lib/idea_rt.jar=55941:/Applications/IntelliJ IDEA 2.app/Contents/bin -Dfile.encoding=UTF-8 # Warmup: 3 iterations, 1 s each # Measurement: 5 iterations, 1 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Throughput, ops/time # Benchmark: Com. Flydean. BenchMarkUsage. # measureThroughput Run progress: 26.66% complete, ETA 00:01:42 # Fork: 1 of 1 # Warmup Iteration 1: 9.727 OPS /s # Warmup Iteration 2: 9.684 OPS /s # Warmup Iteration 3: 9.678 OPS /s Iteration 1: 9.652 OPS /s Iteration 2: 9.678 OPS /s Iteration 3: 9.733 OPS /s Iteration 4: 9.651 ops/s Iteration 5:9.678 ops/s Result ". Com. Flydean BenchMarkUsage. MeasureThroughput ": Ops /s [Average] (min, AVG, Max) = (9.651, 9.678, 9.733), STdev = 0.034CI (99.9%): [9.549, 9.808] (assuming Normal Distribution)Copy the code

Ops /s is the number of ops per second. The program will give the minimum, average, and maximum values of the run. The standard deviation STdev and confidence interval CI are also given.

BenchmarkMode

In the example above, we used the simplest @benchmark. For a more complex and customized BenchMark, we can use @benchmarkMode.

Here’s an example:

    @Benchmark
    @BenchmarkMode(Mode.Throughput)
    @OutputTimeUnit(TimeUnit.SECONDS)
    public void measureThroughput(a) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(100);
    }
Copy the code

In the example above, we specified @benchmarkMode (mode.throughput), Throughput means the total Throughput, the number of times it is executed in a given amount of time.

Here we specify the TimeUnit with @outputtimeunit (timeunit.seconds).

Mode In addition to Throughput, there are several modes as follows:

  • AverageTime – Average call time
  • SampleTime – Random sampling and distribution of the final output sample results
  • SingleShotTime – is executed only once, usually to test performance on a cold start.
  • All – All benchmark modes.

The Fork and Warmup

In the above example we explicitly specify Fork and Warmup in code. We can also use annotations to do this:

    @Fork(value = 1, warmups = 2)
    @Warmup(iterations = 5)
Copy the code

In the example above, value indicates how many times the benchMark executes and warmups indicates how many processes are forked to execute. Iterations indicates the number of warmup iterations.

If you configure both the code and the annotations, the annotations will override the display configuration in the code.

The State and the Scope

If we were using beachMark in a multithreaded environment, would the class variables used in multithreading be shared or one per thread?

So this is where we’re going to use the @state annotation.

@State(Scope.Benchmark)
public class StateUsage {}Copy the code

There are three kinds of Scope:

  • Scope.Thread: The default State, with one instance assigned to each test Thread;
  • Scope.Benchmark: All test threads share an instance, used to test the performance of stateful instances in multi-threaded sharing;
  • Scope.Group: each thread Group shares an instance;

Chapter 20 The use of ThreadLocalRandom in Java

In Java, we usually need to use java.util.Random to facilitate the production of Random numbers. Random, however, is thread-safe and can cause performance bottlenecks in a threaded environment.

Let’s take the nextInt method commonly used in Random as an example:

    public int nextInt(a) {
        return next(32);
    }
Copy the code

The nextInt method actually calls the following method:

    protected int next(int bits) {
        long oldseed, nextseed;
        AtomicLong seed = this.seed;
        do {
            oldseed = seed.get();
            nextseed = (oldseed * multiplier + addend) & mask;
        } while(! seed.compareAndSet(oldseed, nextseed));return (int)(nextseed >>> (48 - bits));
    }
Copy the code

As you can see from the code, the AtomicLong is used inside the method and its compareAndSet method is called to ensure thread-safety. So this is a thread-safe approach.

In a multi-threaded environment, Random needs to share the instance at all, so what can be done?

A class called ThreadLocalRandom was introduced in JDK 7. A ThreadLocal is a ThreadLocal variable, and a ThreadLocalRandom is a thread-local Random.

Let’s see how to call:

ThreadLocalRandom.current().nextInt();
Copy the code

Let’s write a benchMark test for each of these classes:

public class RandomUsage {

    public void testRandom(a) throws InterruptedException {
        ExecutorService executorService=Executors.newFixedThreadPool(2);
        Random random = new Random();
        List<Callable<Integer>> callables = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            callables.add(() -> {
                return random.nextInt();
            });
            }
        executorService.invokeAll(callables);
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(RandomUsage.class.getSimpleName())
                // Preheat 5 rounds
                .warmupIterations(5)
                // Measure 10 rounds
                .measurementIterations(10)
                .forks(1)
                .build();

        newRunner(opt).run(); }}Copy the code
public class ThreadLocalRandomUsage {

    @Benchmark
    @BenchmarkMode(Mode.AverageTime)
    @OutputTimeUnit(TimeUnit.MICROSECONDS)
    public void testThreadLocalRandom(a) throws InterruptedException {
        ExecutorService executorService=Executors.newFixedThreadPool(2);
        List<Callable<Integer>> callables = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            callables.add(() -> {
                return ThreadLocalRandom.current().nextInt();
            });
            }
        executorService.invokeAll(callables);
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(ThreadLocalRandomUsage.class.getSimpleName())
                // Preheat 5 rounds
                .warmupIterations(5)
                // Measure 10 rounds
                .measurementIterations(10)
                .forks(1)
                .build();

        newRunner(opt).run(); }}Copy the code

Analyzing the run results, we can see that ThreadLocalRandom is faster than Random in a multi-threaded environment.

Chapter 21 The use of FutureTask in Java

FutureTask profile

FutureTask is a class introduced in Java 5 that is both a Future and a Task, as its name suggests.

Let’s look at the definition of FutureTask:

public class FutureTask<V> implements RunnableFuture<V> {... }Copy the code
public interface RunnableFuture<V> extends Runnable.Future<V> {
    /** * Sets this Future to the result of its computation * unless it has been cancelled. */
    void run(a);
}
Copy the code

FutureTask implements the RunnableFuture interface, which is a hybrid of Runnable and Future.

As a Future, FutureTask can perform asynchronous computations, see if an asynchronous program has finished executing, start and cancel the program, and get the final execution result of the program.

In addition, FutureTask provides a runAndReset() method that runs the task and resets the state of the Future.

Callable and Runnable conversion

We know that Callable has a return value and Runnable has no return value. Executors provides many useful methods for converting Runnable to Callable:

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
Copy the code

FutureTask contains a Callable inside and can accept Callable and Runnable as constructors:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
Copy the code
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
Copy the code

The inside of it is calling Executors. Callable (runnable, result); Method to convert.

Run at a Runnable

Since it is a Runnable, FutureTask can be executed as a thread. Let’s look at an example:

@Test
    public void convertRunnableToCallable(a) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
            @Override
            public Integer call(a) throws Exception {
               log.info("inside callable future task ...");
                return 0; }}); Thread thread=new Thread(futureTask);
        thread.start();
        log.info(futureTask.get().toString());
    }
Copy the code

The example above is executed on a single thread, but we can also submit FutureTask to a thread pool for execution:

    @Test
    public void workWithExecutorService(a) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
            @Override
            public Integer call(a) throws Exception {
                log.info("inside futureTask");
                return 1; }}); ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(futureTask); executor.shutdown(); log.info(futureTask.get().toString()); }Copy the code

Chapter 22 The use of CompletableFuture in Java

In the previous article, we covered Future, and in this article we’ll continue the use of CompletableFuture introduced in Java 8.

The CompletableFuture is first and foremost a Future, and it has all the functionality of a Future, including getting asynchronous execution results, canceling ongoing tasks, and so on.

In addition, the CompletableFuture is also a CompletionStage.

Let’s look at the definition of CompletableFuture:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 
Copy the code

What is the CompletionStage?

In asynchronous programs, it is often difficult to control the order of execution if each asynchronous execution is treated as a stage. In javascript, we need to execute callbacks within callbacks. This creates the fabled callback hell.

Fortunately, ES6 introduced the promise concept, which can write the callback in the callback as a chained call, thus greatly improving the readability and writability of the program.

Similarly, in Java, we use the CompletionStage to implement chain operations for asynchronous calls.

The CompletionStage defines a set of THEN *** operations to accomplish this function.

CompletableFuture is used as a Future

Call CompletableFuture.com plete method can immediately return a result, we look at how to use this method to build a basic Future:

    public Future<String> calculateAsync(a) throws InterruptedException {
        CompletableFuture<String> completableFuture
                = new CompletableFuture<>();

        Executors.newCachedThreadPool().submit(() -> {
            Thread.sleep(500);
            completableFuture.complete("Hello");
            return null;
        });

        return completableFuture;
    }
Copy the code

Above we get a Future by mobilizing the ExecutorService to commit a task. If you know the result of the execution, you can use the CompletableFuture method to return a Future directly.

    public Future<String> useCompletableFuture(a){
        Future<String> completableFuture =
                CompletableFuture.completedFuture("Hello");
        return completableFuture;
    }
Copy the code

CompletableFuture also provides a cancel method to cancel the task immediately:

    public Future<String> calculateAsyncWithCancellation(a) throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
    return completableFuture;
    }
Copy the code

If the Future’s get method is called at this point, a CancellationException will be raised.

Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException
Copy the code

Execute code asynchronously

CompletableFuture provides methods for runAsync and supplyAsync to execute code in an asynchronous manner.

Let’s look at a basic runAsync application that accepts a Runnable argument:

    public  void runAsync(a){
        CompletableFuture<Void> runAsync= CompletableFuture.runAsync(()->{
            log.info("runAsync");
        });
    }
Copy the code

SupplyAsync accepts a Supplier: supplyAsync

    public void supplyAsync(a){
        CompletableFuture<String> supplyAsync=CompletableFuture.supplyAsync(()->{
            return "supplyAsync";
        });
    }
Copy the code

The difference between them is that one has no return value and the other has a return value.

Combination of Futures

One of the big things that CompletableFuture does, as I mentioned above, is to combine Futures by changing callbacks to chained calls.

While the chained call returns CompletableFuture, let’s look at an example of thenCompose:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
Copy the code

ThenCompose takes the return result of the previous Future as input to the latter operation.

If we want to combine the results of two CompletableFutures, we can use thenCombine:

    public void thenCombine(a){
        CompletableFuture<String> completableFuture
                = CompletableFuture.supplyAsync(() -> "Hello")
                .thenCombine(CompletableFuture.supplyAsync(
                        () -> " World"), (s1, s2) -> s1 + s2));
    }
Copy the code

If you don’t want to return the result, you can use thenAcceptBoth:

    public void thenAcceptBoth(a){
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
                        (s1, s2) -> System.out.println(s1 + s2));
    }
Copy the code

ThenApply () and thenCompose()

ThenApply () and thenCompose() both connect CompletableFuture together, but the two are slightly different.

ThenApply () receives the result returned by the previous call and processes the result.

ThenCompose () receives the stage of the previous call and returns the CompletableFuture after flat.

To simplify the comparison, they are like the difference between a Map and a flatMap.

Parallel task execution

When we need to execute tasks in parallel, we usually need to wait for all tasks to complete before processing other tasks, so we can use the CompletableFuture.allOf method:

    public void allOf(a){
        CompletableFuture<String> future1
                = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2
                = CompletableFuture.supplyAsync(() -> "Beautiful");
        CompletableFuture<String> future3
                = CompletableFuture.supplyAsync(() -> "World");

        CompletableFuture<Void> combinedFuture
                = CompletableFuture.allOf(future1, future2, future3);
    }
Copy the code

AllOf only guarantees that the task will execute without returning a value. If we want a return value, we can use join:

    public void join(a){
        CompletableFuture<String> future1
                = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2
                = CompletableFuture.supplyAsync(() -> "Beautiful");
        CompletableFuture<String> future3
                = CompletableFuture.supplyAsync(() -> "World");

        String combined = Stream.of(future1, future2, future3)
                .map(CompletableFuture::join)
                .collect(Collectors.joining(""));
    }
Copy the code

The above program will return: “Hello Beautiful World”.

Exception handling

If an exception is thrown during a chain call, handle can be used to receive it at the end:

    public void handleError(a){
        String name = null;

        CompletableFuture<String> completableFuture
                =  CompletableFuture.supplyAsync(() -> {
            if (name == null) {
                throw new RuntimeException("Computation error!");
            }
            return "Hello, "+ name; }).handle((s, t) -> s ! =null ? s : "Hello, Stranger!");
    }
Copy the code

This is similar to the catch method used in Promises.

Chapter 23 Building a pool of blocking objects using Semaphore in Java

Semaphore is a concept introduced in Java 5 called counting semaphores. It is used to control the number of simultaneous accesses to a particular resource or the number of operations performed.

Semaphore defines a set of virtual perps. Semaphore can control the number of resources by obtaining and releasing perps.

This feature of Semaphore can be used to construct resource pools such as database connection pools.

Semaphore has two constructors:

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
Copy the code
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Copy the code

Permits defines the number of permitted resources, while fair indicates whether FIFO order is supported.

The two most common methods are acquire and Release.

    public void acquire(a) throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
Copy the code
    public void release(a) {
        sync.releaseShared(1);
    }
Copy the code

Acquire is used to obtain resources and release is used to release resources.

With these two features in mind, let’s see how Semaphore can be used to define a bounded container.

We can initialize Semaphore to the container pool size and call Acquire when the container pool acquires the resource, return the resource to the container pool and then call Release.

Let’s look at the following implementation:

public class SemaphoreUsage<T> {

    private final Set<T> set;
    private final Semaphore sem;

    public SemaphoreUsage(int bound){
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem= new Semaphore(bound);
    }

    public boolean add (T o) throws InterruptedException{
        sem.acquire();
        boolean wasAdded = false;
        try{
            wasAdded=set.add(o);
            return wasAdded;
        }finally {
            if(! wasAdded){ sem.release(); }}}public boolean remove(Object o){
        boolean wasRemoved = set.remove(o);
        if(wasRemoved){
            sem.release();
        }
        returnwasRemoved; }}Copy the code

In the above example we defined a bounded synchronizedSet. One thing to note is that in the add method, the release method is not called until after the add has succeeded.

Chapter 24: Building efficient result caching in Java

Caching is a very common component in modern application servers. In addition to third-party caches, we often need to build caches for internal use in Java as well. So how do you build an efficient cache? This article will reveal the secrets step by step.

Use a HashMap

A common use of caching is to build an in-memory Map and query the Map to see if the result exists before performing a long operation, such as a calculation, and perform the calculation if it does not.

We define an interface that represents computation:

public interface Calculator<A.V> {

    V calculate(A arg) throws InterruptedException;
}
Copy the code

This interface defines a calculate method that takes a parameter and returns the result of the calculation.

The cache we will define is an encapsulation of the Calculator implementation.

Let’s see how this works with HashMap:

public class MemoizedCalculator1<A.V> implements Calculator<A.V> {

    private final Map<A, V> cache= new HashMap<A, V>();
    private final Calculator<A, V> calculator;

    public MemoizedCalculator1(Calculator<A, V> calculator){
        this.calculator=calculator;
    }
    @Override
    public synchronized V calculate(A arg) throws InterruptedException {
        V result= cache.get(arg);
        if( result ==null ){
            result= calculator.calculate(arg);
            cache.put(arg, result);
        }
        returnresult; }}Copy the code

MemoizedCalculator1 encapsulates Calculator, and in calling the Calculate method, you actually call the calculate method of the encapsulated Calculator.

Because HashMap is not thread-safe, we use the synchronized keyword here to ensure that only one thread at a time can access the Calculate method.

While this design ensures proper execution, only one thread at a time is allowed to execute the Calculate operation, and other threads calling the Calculate method will block, which can severely affect speed in a multithreaded execution environment. As a result, using the cache may take longer than not using it.

The use of ConcurrentHashMap

Since HashMap is not thread-safe, we can try to use thread-safe ConcurrentHashMap instead. As follows:

public class MemoizedCalculator2<A.V> implements Calculator<A.V> {

    private final Map<A, V> cache= new ConcurrentHashMap<>();
    private final Calculator<A, V> calculator;

    public MemoizedCalculator2(Calculator<A, V> calculator){
        this.calculator=calculator;
    }
    @Override
    public V calculate(A arg) throws InterruptedException {
        V result= cache.get(arg);
        if( result ==null ){
            result= calculator.calculate(arg);
            cache.put(arg, result);
        }
        returnresult; }}Copy the code

The above example solved the problem of thread waiting, but there is still no guarantee of cache reuse when there are two threads working on the same calculation at the same time. In this case, both threads call the calculation method separately, resulting in double calculation.

What we want is that if one thread is doing a calculation, the other threads just wait for the result of that thread’s execution. Naturally, we thought of FutureTask. FutureTask represents a calculation process, and we can get the results of the execution by calling FutureTask’s GET method, or wait if the execution is in progress.

Let’s use FutureTask to rewrite this.

FutureTask

@Slf4j
public class MemoizedCalculator3<A.V> implements Calculator<A.V> {

    private final Map<A, Future<V>> cache= new ConcurrentHashMap<>();
    private final Calculator<A, V> calculator;

    public MemoizedCalculator3(Calculator<A, V> calculator){
        this.calculator=calculator;
    }
    @Override
    public V calculate(A arg) throws InterruptedException {
        Future<V> future= cache.get(arg);
        V result=null;
        if( future ==null ){
            Callable<V> callable= new Callable<V>() {
                @Override
                public V call(a) throws Exception {
                    returncalculator.calculate(arg); }}; FutureTask<V> futureTask=new FutureTask<>(callable);
            future= futureTask;
            cache.put(arg, futureTask);
            futureTask.run();
        }
        try {
            result= future.get();
        } catch (ExecutionException e) {
           log.error(e.getMessage(),e);
        }
        returnresult; }}Copy the code

In the example above, we use FutureTask to encapsulate the calculation and use FutureTask as the value of the Map.

The above example already shows good concurrency performance. But because the if statement is non-atomic, it is still possible to call at the same time for an operation that is checked before it is executed.

In this case, we can override the above class with the atomic operation putIfAbsent of ConcurrentHashMap:

@Slf4j
public class MemoizedCalculator4<A.V> implements Calculator<A.V> {

    private final Map<A, Future<V>> cache= new ConcurrentHashMap<>();
    private final Calculator<A, V> calculator;

    public MemoizedCalculator4(Calculator<A, V> calculator){
        this.calculator=calculator;
    }
    @Override
    public V calculate(A arg) throws InterruptedException {
        while (true) {
            Future<V> future = cache.get(arg);
            V result = null;
            if (future == null) {
                Callable<V> callable = new Callable<V>() {
                    @Override
                    public V call(a) throws Exception {
                        returncalculator.calculate(arg); }}; FutureTask<V> futureTask =new FutureTask<>(callable);
                future = cache.putIfAbsent(arg, futureTask);
                if (future == null) {
                    future = futureTask;
                    futureTask.run();
                }

                try {
                    result = future.get();
                } catch (CancellationException e) {
                    log.error(e.getMessage(), e);
                    cache.remove(arg, future);
                } catch (ExecutionException e) {
                    log.error(e.getMessage(), e);
                }
                returnresult; }}}}Copy the code

A while loop is used to determine if the value fetched from the cache exists, and if it does not, the calculation method is called.

There is also a cache contamination problem, because we have modified the cache results, and if the calculation is cancelled or fails during the calculation, we need to remove the FutureTask from the cache.

Chapter 25 The use of CompletionService in Java

With ExecutorService, you can submit tasks one by one, return a Future, and then call the future. get method to return the execution result of the task.

ExecutorService provides an invokeAll method to store all Future values. The ExecutorService provides an invokeAll method to store all Future values.

   public void useExecutorService(a) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        Callable<String> callableTask = () -> {
            TimeUnit.MILLISECONDS.sleep(300);
            return "Task's execution";
        };

        List<Callable<String>> callableTasks = new ArrayList<>();
        callableTasks.add(callableTask);
        callableTasks.add(callableTask);
        callableTasks.add(callableTask);

        List<Future<String>> futures = executor.invokeAll(callableTasks);

        executor.shutdown();

    }
Copy the code

In the example above, we defined three tasks and returned a List<Future> by calling executor.invokeAll(callableTasks) so that we could get all the returned values.

In addition to the invokeAll method above, today we are going to introduce a CompletionService interface.

CompletionService is essentially a combination of ExecutorService, which executes tasks, and BlockingQueue, which holds execution results encapsulated as futures. The Future value is retrieved by calling the take and poll methods.

CompletionService is an interface, we look at it under a specific implementation ExecutorCompletionService:

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
Copy the code

ExecutorCompletionService receives an Executor as parameters.

If we look at the example above with ExecutorCompletionService rewrite what is:

   public void useCompletionService(a) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletionService<String> completionService=new ExecutorCompletionService<String>(executor);
        Callable<String> callableTask = () -> {
            TimeUnit.MILLISECONDS.sleep(300);
            return "Task's execution";
        };
        for(int i=0; i< 5; i ++){
            completionService.submit(callableTask);
        }

        for(int i=0; i<5; i++){ Future<String> result=completionService.take(); System.out.println(result.get()); }}Copy the code

The above example submits the task with CompletionService.submit and gets the result value with CompletionService.take ().

The CompletionService also has a poll method. The difference between poll and take is that a take waits if it doesn’t get a value, and poll returns null.

Chapter 26 Uses the ExecutorService to stop the threading service

As mentioned in previous articles, the ExecutorService can be shutdown using shutdown and shutdownNow.

The two types of shutdown differ in their security and responsiveness. ShutdownNow is faster to force a shutdown, but also riskier, because a task may be terminated in the middle of execution. Shutdown is slower, but safer, because it waits until all tasks in the queue have finished.

Using the shutdown

Let’s start with an example using shutdown:

    public void useShutdown(a) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        Runnable runnableTask = () -> {
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch(InterruptedException e) { e.printStackTrace(); }}; executor.submit(runnableTask); executor.shutdown(); executor.awaitTermination(800, TimeUnit.MILLISECONDS);
    }
Copy the code

AwaitTermination will block until all executing tasks complete or a specified timeout is reached.

Using shutdownNow

When forcibly shutting down the ExecutorService via shutdownNow, it tries to cancel ongoing tasks and returns any tasks that have been committed but not yet started. This allows you to save these tasks for later processing.

But in this way we only know the tasks that have not been executed, and we cannot get the tasks that have been executed but have not been cancelled.

Let’s look at how to get a task that has started but not finished:

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService executorService;
    private final Set<Runnable> taskCancelledAtShutdown= Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService executorService){
         this.executorService=executorService;
    }
    @Override
    public void shutdown(a) {
        executorService.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow(a) {
        return executorService.shutdownNow();
    }

    @Override
    public boolean isShutdown(a) {
        return executorService.isShutdown();
    }

    @Override
    public boolean isTerminated(a) {
        return executorService.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return executorService.awaitTermination(timeout,unit);
    }

    @Override
    public void execute(Runnable command) {
        executorService.execute(() -> {
            try {
                command.run();
            }finally {
                if(isShutdown() && Thread.currentThread().isInterrupted()){ taskCancelledAtShutdown.add(command); }}}); }public List<Runnable> getCancelledTask(a){
        if(! executorService.isTerminated()){
            throw new IllegalStateException("executorService is not terminated");
        }
        return newArrayList<>(taskCancelledAtShutdown); }}Copy the code

In the example above, we built a new ExecutorService, passed in an ExecutorService, and packaged it.

We overwrite the execute method to determine whether the task is interrupted after execution and add it to the CancelledTask list if it is interrupted.

A getCancelledTask method is provided to return unfinished tasks.

Let’s see how it works:

    public void useShutdownNow(a) throws InterruptedException {
        TrackingExecutor trackingExecutor=new TrackingExecutor(Executors.newCachedThreadPool());

        Runnable runnableTask = () -> {
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch(InterruptedException e) { e.printStackTrace(); }}; trackingExecutor.submit(runnableTask); List<Runnable> notrunList=trackingExecutor.shutdownNow();if(trackingExecutor.awaitTermination(800, TimeUnit.SECONDS)){ List<Runnable> runButCancelledList= trackingExecutor.getCancelledTask(); }}Copy the code

TrackingExecutor. ShutdownNow () returns the task is not executed. While trackingExecutor. GetCancelledTask () returns the task has been canceled.

The above tasks actually has a drawback, because we are in the task list has been canceled when the frontal taskCancelledAtShutdown. Add (command), because before the judgment is not atomic operation, is likely to produce false positives.

Chapter 27 Our thread is starving to death

When we build thread pools, we can build single-thread thread pools and multi-thread thread pools.

Is it possible for thread pools to be improperly used to cause deadlocks? We know that deadlocks occur as a result of repeated contention for resources. Threads in a thread pool are also resources, and deadlocks can occur if threads in the pool are contested.

Threads in a single thread pool, if a currently executing thread, use the thread pool to submit a second task, because of the threads in the pool is only one, then the second task will be waiting for the first task completes to release the thread, and the first task is waiting for his second business executive to complete the task. This creates a Thread Starvation Deadlock.

Thread-hunger deadlocks do not have to occur in a single thread pool, but can occur whenever there is such a cycle of thread pool usage.

Let’s look at an example:

public class ThreadPoolDeadlock {

    ExecutorService executorService= Executors.newSingleThreadExecutor();

    public class RenderPageTask implements Callable<String> {
        public String call(a) throws Exception{
            Future<String> header, footer;
            header= executorService.submit(()->{
                return "header";
            });
            footer= executorService.submit(()->{
                return "footer";
            });
            returnheader.get()+ footer.get(); }}public void submitTask(a){
        executorService.submit(newRenderPageTask()); }}Copy the code

We submitted a RenderPageTask in the executorService, and RenderPageTask submitted two tasks. Because the ExecutorService thread pool has only one thread, deadlocks occur.

Our thread is starving to death!

Chapter 28 Saturation Policy for Bounded Queues in Java (Reject Policy)

As we knew from ExecutorService, there is a Queue to store the tasks that are submitted, with different constructors, We can create an unbounded queue (ExecutorService newCachedThreadPool) and bounded queue (ExecutorService newFixedThreadPool (int nThreads)).

Unbounded queues are easy to understand, and we can submit tasks to the ExecutorService indefinitely. So for bounded queues, what happens if the queue is full?

Today we’ll introduce the Saturation Policy for ExecutorService in Java.

The specific implementation of ExecutorService, ThreadPoolExecutor, defines four saturation strategies. These are AbortPolicy, DiscardPolicy, DiscardOldestPolicy, and CallerRunsPolicy.

If you want to set in the ThreadPoolExecutor saturated strategy can call setRejectedExecutionHandler method, as shown below:

        ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(5.10.10, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(20));
        threadPoolExecutor.setRejectedExecutionHandler(
                new ThreadPoolExecutor.AbortPolicy()
        );
Copy the code

In the example above, we define a Thread Pool with an initial size of 5 and a maximum of 10 workers, and define a Queue with a size of 20. If the number of submitted tasks exceeds the capacity, the AbortPolicy policy is used.

AbortPolicy

AbortPolicy mean if the queue is full, the latest submitted task will be rejected, and throw RejectedExecutionException exception:

   public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy(a) {}/**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from "+ e.toString()); }}Copy the code

We directly in the code above, rejectedExecution method throws RejectedExecutionException anomalies.

DiscardPolicy

DiscardPolicy will quietly discard the submitted task without raising an exception.

public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy(a) {}/**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code

DiscardOldestPolicy

DiscardOldestPolicy: Discards the oldest task and saves the most recently inserted task.

   public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy(a) {}/**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

We see that in the rejectedExecution method, the oldest task is polled, and then a new task is submitted using ThreadPoolExecutor.

CallerRunsPolicy

CallerRunsPolicy differs from the other strategies in that it does not discard the task or throw an exception. Instead, it rolls the task back to the caller and uses the caller’s thread to execute the task, slowing the caller down. Let’s see how this works:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy(a) {}/**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if(! e.isShutdown()) { r.run(); }}}Copy the code

In the rejectedExecution method, the r.run() method is called directly, which causes the method to be executed directly in the caller’s main thread rather than in the thread pool. This causes the main thread to be unable to submit any tasks until the task has finished. This effectively prevents the task from being submitted.

Using a Semaphore

If we don’t define a saturation strategy, is there any way to control the delivery rate of tasks? Considering Semaphore, we can specify a certain resource Semaphore to control the submission of tasks, as follows:

public class SemaphoreUsage {

    private final Executor executor;
    private final Semaphore semaphore;

    public SemaphoreUsage(Executor executor, int count) {
        this.executor = executor;
        this.semaphore = new Semaphore(count);
    }

    public void submitTask(final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(() -> {
                        try {
                            command.run();
                        } finally{ semaphore.release(); }}); }catch(RejectedExecutionException e) { semaphore.release(); }}}Copy the code

Chapter 29 Deadlocks due to improper execution sequence

Locking is introduced to keep threads safe, but if used unchecked, it can lead to a lock-ordering Deadlock. In the previous article, we also talked about Resource deadlocks in a thread word that can result from a lack of resources.

This article discusses sequential deadlocks.

Let’s talk about a recurring problem of account transfers. Account A wants to transfer money to account B. In order to ensure that A and B are not accidentally operated by other threads in the process of transfer, we need to lock A and B and then transfer. Let’s take A look at the transfer code:

    public void transferMoneyDeadLock(Account from,Account to, int amount) throws InsufficientAmountException {
        synchronized (from){
            synchronized(to){ transfer(from,to,amount); }}}private void transfer(Account from,Account to, int amount) throws InsufficientAmountException {
        if(from.getBalance() < amount){
            throw new InsufficientAmountException();
        }else{ from.debit(amount); to.credit(amount); }}Copy the code

It seems that there is no problem with the above program, because we have locked from and to, the program should be perfectly executed according to our requirements.

So if we consider the following scenario:

A: transferMoneyDeadLock (accountA, accountB,20transferMoneyDeadLock10)Copy the code

If A and B execute at the same time, it may result that A has acquired the lock of accountA and B has acquired the lock of accountB. Subsequent code cannot continue to execute, resulting in a deadlock.

Is there any good way to deal with this situation?

If we lock accountA first and then lock accountB no matter how the argument is passed, will deadlock not occur?

Let’s look at the code implementation:

    private void transfer(Account from,Account to, int amount) throws InsufficientAmountException {
        if(from.getBalance() < amount){
            throw new InsufficientAmountException();
        }else{ from.debit(amount); to.credit(amount); }}public void transferMoney(Account from,Account to, int amount) throws InsufficientAmountException {

       int fromHash= System.identityHashCode(from);
       int toHash = System.identityHashCode(to);

       if(fromHash < toHash){
           synchronized (from){
               synchronized(to){ transfer(from,to, amount); }}}else if(fromHash < toHash){
            synchronized (to){
                synchronized(from){ transfer(from,to, amount); }}}else{
           synchronized (lock){
           synchronized (from) {
               synchronized (to) {
                   transfer(from, to, amount);
               }
             }
           }
       }
    }
Copy the code

In the above example, we use system. identityHashCode to get the hash value of the two accounts, and compare the hash value to determine the lock order.

If the hash value of two accounts is exactly equal, we introduce a new external lock to ensure that only one thread can run the internal method at a time, thus ensuring that the task can execute without deadlock.

Chapter 30 Non-blocking synchronization and CAS

We know that synchronization was implemented with the Synchronized keyword prior to Java 5, and since Java 5, many more powerful synchronization classes have been added to the java.util.concurrent package. Many of these powerful classes implement non-blocking synchronization mechanisms to help improve performance.

What is non-blocking synchronization

Non-blocking synchronization means that multiple threads competing for the same data do not block and can coordinate on a more fine-grained dimension, greatly reducing the overhead of thread scheduling and thus improving efficiency. Non-blocking algorithms do not have locking mechanisms and therefore do not have the problem of deadlocks.

In lock-based algorithms, if one thread holds a lock, no other thread can proceed. Using locks can guarantee consistent access to resources, but there is a lot of overhead in the execution of suspended and resumed threads. If there is a lot of contention on the lock, the scheduling overhead may be higher than the actual work overhead.

Pessimistic locks and optimistic locks

We know that an exclusive lock is a pessimistic lock, which means that in the worst case, if you don’t lock the resource, some other thread will modify the resource. Pessimistic locking can ensure the smooth execution of tasks, but it is not efficient.

An optimistic lock assumes that no other thread will change a resource, but we need to determine whether the resource has been changed by another thread when updating it. If it was changed then the update failed, we can try again, if it was not changed then the update succeeded.

Optimistic locking is based on the assumption that most of the time the system updates resources without conflict.

Atomic comparison and update operations for optimistic locks are generally supported by the underlying hardware.

CAS

Most processors implement A CAS instruction (compare and swap). Typically, A CAS receives three parameters, the present value of data V, the value A to compare, and the value B to write. B is written only if V and A are equal. V is returned whether the write succeeded or not. I think the value of V is now A, if so then update the value of V to B, otherwise do not change the value of V, and tell me what the value of V is now.

That’s what CAS means. The parallel classes in the JDK use CAS through the Unsafe class. You can build your own parallel class, as follows:

public class CasCounter {

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    private volatile int value;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                    (CasCounter.class.getDeclaredField("value"));
        } catch (Exception ex) { throw newError(ex); }}public CasCounter(int initialValue) {
        value = initialValue;
    }

    public CasCounter(a) {}public final int get(a) {
        return value;
    }

    public final void set(int newValue) {
        value = newValue;
    }

    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }}Copy the code

In the above example, we define an atomic operation, compareAndSet, which internally calls the Unsafe compareAndSwapInt method.

It may seem more complicated to use CAS than to use locks directly, but actually implementing locks in the JVM requires traversing a very complex code path in the JVM and can result in operating system level locking, thread hanging, and context switching. In the best case, locking requires a CAS command.

The main disadvantage of CAS is that it requires the caller to handle race issues (retries, fallbacks, abandons), which can be handled automatically in the lock.

In the previous article we also talked about atomic variables, and the bottom line of atomic variables is the use of CAS.

Chapter 31 implementation of lock-free algorithm

In the last article we looked at the disadvantages of using locks, and this article will show you how to use non-blocking algorithms. Non-blocking algorithms typically use CAS to coordinate thread operations.

Although the non-blocking algorithm has many advantages, its implementation is more complicated and responsible than the lock-based algorithm.

This article introduces two data structures implemented using non-blocking algorithms.

A non-blocking stack

Let’s start by building several non-blocking stacks using CAS. The stack is the simplest chain structure, which is essentially a linked list, and the root node of the list is the top of the stack.

Let’s start by building the Node data structure:

public class Node<E> {
    public final E item;
    public Node<E> next;

    public Node(E item){
        this.item=item; }}Copy the code

This Node holds the memory item and its next Node, Next.

We then build a non-blocking stack where we need to implement the POP and push methods, use an Atomic class to hold a reference to the top node, and call compareAndSet before pop and push to keep the command atomicity. Also, we need constant loops to ensure that updates can be retried in case of thread conflicts.

public class ConcurrentStack<E> {

    AtomicReference<Node<E>> top= new AtomicReference<>();

    public void push(E item){
        Node<E> newNode= new Node<>(item);
        Node<E> oldNode;
        do{
            oldNode=top.get();
            newNode.next= oldNode;
        }while(! top.compareAndSet(oldNode, newNode)); }public E pop(a){
        Node<E> oldNode;
        Node<E> newNode;
        do {
            oldNode = top.get();
            if(oldNode == null) {return null;
            }
            newNode=oldNode.next;
        }while(! top.compareAndSet(oldNode, newNode));returnoldNode.item; }}Copy the code

A non-blocking linked list

Building linked lists is more complex than building stacks. Because we want to keep the head and tail Pointers. In the case of the PUT method, we need to perform two steps: 1. Insert a new node at the end. 2. Point the tail pointer to the latest node.

The best we can do with CAS is ensure that one of the steps is atomic. So what do we do with the combination of steps 1 and 2?

If we think about it a little more carefully, we don’t have to execute 1 and 2 in the same thread. The other threads do exactly that when they detect that a thread has inserted a node without pointing tail at the last node.

Let’s look at the code implementation:

public class LinkedNode<E> {
    public final E item;
    public final AtomicReference<LinkedNode<E>> next;

    public LinkedNode(E item, LinkedNode<E> next){
        this.item=item;
        this.next=newAtomicReference<>(next); }}Copy the code

Start by building a LinkedNode class.

public class LinkedQueue<E> {
    private final LinkedNode<E> nullNode= new LinkedNode<>(null.null);
    private final AtomicReference<LinkedNode<E>> head= new AtomicReference<>(nullNode);
    private final AtomicReference<LinkedNode<E>> tail= new AtomicReference<>(nullNode);

    public boolean put(E item){
    LinkedNode<E> newNode = new LinkedNode<>(item, null);
    while (true){
        LinkedNode<E> currentTail= tail.get();
        LinkedNode<E> tailNext= currentTail.next.get();
        if(currentTail == tail.get()){
            if(tailNext ! =null) {
                // Another thread has inserted a node, but has not yet pointed tail to the latest node
                tail.compareAndSet(currentTail, tailNext);
            }else{
                // If no other thread inserts a node, do two things: 1. Insert a new node, 2. Point tail to the latest node
                if(currentTail.next.compareAndSet(null, newNode)){
                    tail.compareAndSet(currentTail, newNode);
                }
            }
        }
    }
    }
}
Copy the code

Chapter 32 Java Memory Model (JMM) and happens-Before

We know that Java programs run in JVMS, and JVMS are virtual machines built on memory, so what does the memory model JMM do?

Let’s consider a simple assignment problem:

int a=100;
Copy the code

The JMM takes into account when a thread reading variable A can see a value of 100. This seems like a pretty simple problem, but shouldn’t we read the value after we assign it?

But this is just the order in which our source code is written. When the source code is compiled, the instructions generated in the compiler are not in exactly the same order as the source code. The processor may execute instructions out of order or in parallel (such reordering is allowed in the JVM as long as the program’s final execution results are the same as those in a strictly serial environment). The processor also has a local cache, and when the results are stored in the local cache, no other thread can see the results. In addition, the order in which caches are committed to main memory may change.

All of the above can lead to different results in a multi-threaded environment. In a multi-threaded environment, most of the time the multiple threads are performing their own tasks, and only when multiple threads need to share data, the operation between the threads needs to be coordinated.

The JMM is a set of minimum guarantees that the JVM must adhere to, specifying when writes to variables are visible to other threads.

reorder

Reordering in the JVM has been described above, but here is an example to give you a better understanding of reordering:

@Slf4j
public class Reorder {

    int x=0, y=0;
    int a=0, b=0;

    private  void reorderMethod(a) throws InterruptedException {

        Thread one = new Thread(()->{
            a=1;
            x=b;
        });

        Thread two = new Thread(()->{
            b=1;
            y=a;
        });
        one.start();
        two.start();
        one.join();
        two.join();
        log.info("{}, {}", x, y);
    }

    public static void main(String[] args) throws InterruptedException {

        for (int i=0; i< 100; i++){
            newReorder().reorderMethod(); }}}Copy the code

The example above is a very simple concurrent program. Since we are not using synchronization restrictions, the order of execution of threads one and two is variable. It may be that one is executed before two, it may be executed after two, or both. Different execution sequences may result in different output results.

Also, although we specify in the code that a=1 is executed before x=b, the two statements are actually unrelated, and it is entirely possible in the JVM to reorder the two statements so that x=b comes first and a=1 comes last, resulting in more unexpected results.

Happens-Before

To ensure the order of operations in the Java memory model, the JMM defines a sequence relationship for all operations in the program called happens-before. To be sure that operation B sees the result of operation A, regardless of whether they are on the same or different threads, A and B must satisfy the happens-before relationship. If two operations do not satisfy the happens-before relationship, the JVM can reorder them at will.

Let’s look at the happens-before rule:

  1. Program order rule: If operation A precedes operation B in A program, operation A will precede operation B in the same thread.

Note that operation A is executed before operation B, which means that in A single-threaded environment, the virtual machine will reorder the instructions, but the result is the same as if the instructions were executed in code order. The virtual machine will only reorder code that has no dependencies.

  1. Monitor lock rule: Unlock operations on the monitor must precede lock operations on the same monitor.

Lock we all know that the order must refer to the same lock, if it is on different locks, then the execution order is not guaranteed.

  1. Rule for volatile Variables: Writes to volatile variables must be performed before reads to them.

Atomic and volatile variables have the same semantics for read and write operations.

  1. Thread start rule: Operations on thread.start must be performed before any operations are performed in the Thread.

  2. Thread termination rule: Any operation in a thread must be performed before another thread detects that the thread is terminated.

  3. Interrupt rules: When an interrupt call is made by one thread on another, it must be executed before the interrupted thread detects the interrupt call.

  4. Finalizer rule: An object’s constructor must complete before starting the object’s finalizer.

  5. Transitivity: If operation A is performed before operation B, and operation B is performed before operation C, then operation A must be performed before operation C.

Rule 2 above is well understood. While a lock is being held, no other thread is allowed to acquire the lock, which means that other threads must wait for the lock to be released before they can hold the lock and execute their business logic.

The rule of 4, 5, 6 and 7 is easy to understand. This is consistent with our general understanding of procedures.

The transitivity of 8 is not hard to understand for those who have studied mathematics.

Let’s focus on the combination of rule 3 and Rule 1. Before we talk about it, let’s summarize what happens-before actually does.

Because the JVM reorders the instructions it receives, we have the happens-before rule to ensure that the instructions are executed in the correct order. The rules 2,3,4,5,6,7 mentioned above can be considered as reordering nodes. These nodes are not allowed to be reordered. Only instructions between these nodes are allowed to be reordered.

In combination with the rule 1 program ordering rule, we get what it really means: the instructions written in the code before the reordering node are executed before the reordering node is executed.

The reorder node is a cut-off point whose position cannot be moved. Take a look at the following intuitive example:

Thread 1 has two instructions: set I =1 and set volatile A =2. Thread 2 also has two instructions: get volatile A and get I.

According to the above theory, set and get volatile are two reordering nodes, and set must precede GET. According to rule 1, set I =1 must be executed before set volatile A =2. Because set volatile is a reordering node, the order of execution must be followed, so set I =1 must be executed before set volatile a=2. Similarly, get volatile A is executed before get I. Finally causes I =1 to be executed before get I.

This operation is called aided synchronization.

Security issued

We often use the singleton pattern to create a single object. Let’s see what’s wrong with the following methods:

public class Book {

    private static Book book;

    public static Book getBook(a){
        if(book==null){
            book = new Book();
        }
        returnbook; }}Copy the code

The above class defines a getBook method to return a new book object. Before returning the object, we check whether the book object is empty. If not, we create a new book object.

At first glance, this may seem fine, but if you consider the JMM’s rearrangement rules, you’ll see where the problem lies. Book =new book () is a complex command, not an atomic operation. It can be broken down into 1. Allocate memory, 2. Instantiate an object, and 3.

It is possible that 2 and 3 will be reordered, and then it is possible that the book will return, but it has not been initialized. Unexpected errors occur.

According to the happens-before rule we discussed above, the easiest way to do this is to prefix a method with the synchronized keyword:

public class Book {

    private static Book book;

    public synchronized static Book getBook(a){
        if(book==null){
            book = new Book();
        }
        returnbook; }}Copy the code

Let’s look at the following static field implementation:

public class BookStatic {
    private static BookStatic bookStatic= new BookStatic();

    public static BookStatic getBookStatic(a){
        returnbookStatic; }}Copy the code

The JVM statically initializes classes after they are loaded and before they are used by threads, and during this initialization phase a lock is acquired to ensure that memory writes are visible to all threads during the static initialization phase.

The above example defines static variables that will be instantiated during the static initialization phase. This approach is called preinitialization.

Let’s look at another lazy initialization placeholder class pattern:


public class BookStaticLazy {

    private static class BookStaticHolder{
        private static BookStaticLazy bookStatic= new BookStaticLazy();
    }

    public static BookStaticLazy getBookStatic(a){
        returnBookStaticHolder.bookStatic; }}Copy the code

In the above class, the class is initialized only when the getBookStatic method is called.

Next, we will introduce double-checked locking.

public class BookDLC {
    private volatile static BookDLC bookDLC;

    public static BookDLC getBookDLC(a){
        if(bookDLC == null) {synchronized (BookDLC.class){
                if(bookDLC ==null){
                    bookDLC=newBookDLC(); }}}returnbookDLC; }}Copy the code

The above class checks the value of the bookDLC twice and locks it only if the bookDLC is empty. Everything looks perfect, but it is important to note that the bookDLC must be volatile.

Because bookDLC’s assignment and return operations are not happens-before, it is possible to get an instance of only a partial construct. That’s why we add volatile.

Initialization security

At the end of this article, we will discuss object initialization with a final field in the constructor.

For a properly constructed object, initializing the object ensures that all threads see the correct values set by the constructor for each final field, including any variables that can be reached by the final field (such as elements ina final array, final hashMap, etc.).

public class FinalSafe {
    private final HashMap<String,String> hashMap;

    public FinalSafe(a){
        hashMap= new HashMap<>();
        hashMap.put("key1"."value1"); }}Copy the code

In the example above, we define a final object and initialize it in the constructor. The final object will not be reordered after the constructor.

Chapter 33 Phaser for Java Multithreading

In the previous article, we talked about the use of CyclicBarrier and CountDownLatch. As a reminder, CountDownLatch is mainly used when one thread is waiting for multiple threads to complete, while CyclicBarrier is used when multiple threads are waiting for each other to complete.

Phaser is a new concurrency API introduced in Java 7. He introduced the concept of a new Phaser, which we can think of as phases, each of which has a threaded task that needs to be executed before moving on to the next phase. Phaser is therefore particularly well suited for repeated execution or reuse situations.

The basic use

In CyclicBarrier, CountDownLatch, and Phaser, we use counters to control the sequential execution of programs. Counters are called parties in Phaser, and can be registered through the Phaser constructor or register() method.

We can dynamically control the number of phasers by calling the register() method. If we need to unregister, we can call the arriveAndDeregister() method.

Let’s look at arrive:

    public int arrive(a) {
        return doArrive(ONE_ARRIVAL);
    }
Copy the code

In Phaser, Arrive actually calls the doArrive method, and doArrive receives a adjust parameter, ONE_ARRIVAL for ARRIVE and ONE_DEREGISTER for arriveAndDeregister.

The arrive() and arriveAndDeregister() methods in Phaser do not block, but return the corresponding phase number. When the last party in the phase also arrives, the phase number increases and phase enters the next cycle. Also fires (onAdvance) threads that are blocking in the previous phase. This is similar to the arrival mechanism of a CyclicBarrier; More flexibly, we can override the onAdvance method to implement more triggering behavior.

Here’s a basic use:

    void runTasks(List<Runnable> tasks) {
        final Phaser phaser = new Phaser(1); // "1" to register self
        // create and start threads
        for (final Runnable task : tasks) {
            phaser.register();
            new Thread() {
                public void run(a) {
                    phaser.arriveAndAwaitAdvance(); // await all creation
                    task.run();
                }
            }.start();
        }

        // allow threads to start and deregister self
        phaser.arriveAndDeregister();
    }
Copy the code

In the above example, we call Register () to register before each Runnable, and then arriveAndAwaitAdvance() to wait for the Phaser cycle to end. Finally, we call phaser. ArriveAndDeregister (); To unregister the main thread.

The following is a detailed analysis of the operation steps:

  1. final Phaser phaser = new Phaser(1);

In this step we initialize a Phaser and specify that it now has 1 party.

  1. phaser.register();

This step registers the Runnable task to phaser and party+1.

  1. Phaser. ArriveAndAwaitAdvance ()

This step will wait until all parties arrive. Only parties registered in Step 2 will be marked as arrive, and parties initialized in Step 1 will never be arrived.

  1. phaser.arriveAndDeregister();

In the main thread, the party in step 1 is arrived and the number of parties is reduced by one.

  1. The phaser in step 3. ArriveAndAwaitAdvance () will continue, because the last phaser arrive in step 4.

Multiple Phaser cycles

The Phaser values run from 0 to integer. MAX_VALUE, incrementing by one after each cycle, and continuing at 0 if integer. MAX_VALUE is reached.

If we perform multiple Phaser cycles, we can override the onAdvance method:

    protected boolean onAdvance(int phase, int registeredParties) {
        return registeredParties == 0;
    }
Copy the code

OnAdvance will be called when the last registeredParties () is called, and if registeredParties are 0, the Phaser will call isTerminated to terminate the Phaser.

If we want to implement the multi-period case, we can rewrite this method:

protected boolean onAdvance(int phase, int registeredParties) {
                return phase >= iterations || registeredParties == 0;
            }
Copy the code

In the example above, if the number of phases exceeds the specified iterations number, the iterations automatically terminate.

Let’s look at a practical example:

   void startTasks(List<Runnable> tasks, final int iterations) {
        final Phaser phaser = new Phaser() {
            protected boolean onAdvance(int phase, int registeredParties) {
                return phase >= iterations || registeredParties == 0; }}; phaser.register();for (final Runnable task : tasks) {
            phaser.register();
            new Thread() {
                public void run(a) {
                    do {
                        task.run();
                        phaser.arriveAndAwaitAdvance();
                    } while(! phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister();// deregister self, don't wait
    }
Copy the code

The above example will go through iterations.

Chapter 34: Use of Locks in Java

As discussed in the previous article, synchronization in Java is achieved using synchronized Blocks. In Java 5, Locks was introduced to provide more flexible synchronization control.

This article will take a closer look at the use of Lock.

The difference between Lock and Synchronized Block

We covered the use of Synchronized for Java synchronization in an earlier article on Synchronized Block. If Synchronized blocks are so good, why introduce a new Lock?

The main differences are as follows:

  1. Synchronized blocks can only be written in one method, whereas Lock () and unlock() can be written in different methods.
  2. Synchronized blocks do not support fair locking; once a lock is released, any thread has the opportunity to acquire it. Using the Lock APIs supports fair locking. Thus, the thread with the longest wait time has limited execution.
  3. With synchronized blocks, threads can be Blocked if they do not get a lock. The Lock API provides a tryLock() method to determine whether a Lock can be obtained, which reduces the time a thread is blocked.
  4. A thread cannot be interrupted while waiting for a synchronized block lock. If you use the Lock API, you can interrupt the thread with lockInterruptibly().

Lock interface

Let’s look at the Lock interface definition. The Lock interface defines the following main methods to use:

  • Void lock() – Attempts to acquire the lock, if not, the lock is blocked.
  • Void lockInterruptibly () – and the lock () are very similar, but it is being blocked thread can be interrupted, and throw the Java. Lang. InterruptedException.
  • Boolean tryLock() — This is the non-blocking version of lock(), which attempts to acquire the lock and immediately returns whether it succeeded.
  • Boolean tryLock(long timeout, TimeUnit TimeUnit) — Much like tryLock(), but with an extra time to try to acquire the lock.
  • Void unlock() – Instance of unlock.
  • Condition newCondition() – Generates a Condition bound to the current Lock instance.

Be unlocked when using locks to avoid deadlocks. So, usually we use try catch:

Lock lock = ... ; lock.lock();try {
    // access to the shared resource
} finally {
    lock.unlock();
}
Copy the code

In addition to the Lock interface, there is also a ReadWriteLock interface that defines two methods to separate read and write locks:

  • Lock readLock() – returns the readLock
  • Lock writeLock() – returns a writeLock

The read lock can be acquired by many threads at the same time, as long as there is no write operation. Write locks can only be acquired by one thread at a time.

Next, our several locks are commonly used implementation classes.

ReentrantLock

ReentrantLock is an implementation of Lock. What is a ReentrantLock?

To put it simply, a reentrant lock is a lock that has already been acquired by the current thread. If other methods of the thread also need to acquire the lock when called, the lock number +1 and the method is allowed to enter.

Non-reentrant lock: Only if the lock is locked, the thread requesting the lock will be asked to wait. Implement a simple

Reentrant lock: determines not only whether the lock is locked, but also who locked it. When it is locked, it can still access the critical resource again and increase the lock count by one.

Let’s look at how to use ReentrantLock:

    public void perform(a) {

        lock.lock();
        try {
            counter++;
        } finally{ lock.unlock(); }}Copy the code

Here is an example using tryLock () :

    public void performTryLock(a) throws InterruptedException {
        boolean isLockAcquired = lock.tryLock(1, TimeUnit.SECONDS);

        if(isLockAcquired) {
            try {
                counter++;
            } finally{ lock.unlock(); }}}Copy the code

ReentrantReadWriteLock

ReentrantReadWriteLock is an implementation of ReadWriteLock. As mentioned above, ReadWriteLock has two main methods:

  • Read Lock – Multiple threads can acquire a Read Lock if no thread acquires a write Lock.
  • Write Lock – Only one thread can acquire the Write Lock if no other thread has acquired the read Lock and Write Lock.

Let’s see how to use writeLock:

    Map<String,String> syncHashMap = new HashMap<>();
    ReadWriteLock lock = new ReentrantReadWriteLock();

    Lock writeLock = lock.writeLock();

    public void put(String key, String value) {
        try {
            writeLock.lock();
            syncHashMap.put(key, value);
        } finally{ writeLock.unlock(); }}public String remove(String key){
        try {
            writeLock.lock();
            return syncHashMap.remove(key);
        } finally{ writeLock.unlock(); }}Copy the code

Let’s see how readLock works:

    Lock readLock = lock.readLock();
    public String get(String key){
        try {
            readLock.lock();
            return syncHashMap.get(key);
        } finally{ readLock.unlock(); }}public boolean containsKey(String key) {
        try {
            readLock.lock();
            return syncHashMap.containsKey(key);
        } finally{ readLock.unlock(); }}Copy the code

StampedLock

StampedLock also supports read/write locks. The StampedLock acquires a stamp, which is used to release the lock.

We mentioned above that a read lock cannot be acquired if a write lock exists. But sometimes we do not want to lock the read operation, we need to use optimistic read lock.

Stamped in StampedLock is similar to the concept of version in optimistic lock. When the lock method is called in StampedLock, a stamp will be returned representing the state of the lock at that time. In the use of optimistic read lock, after data is read, If the stamp status changes, it means that the stamp has been modified by another write thread, which means that our previous read is invalid. At this time, we need to upgrade the optimistic read lock to read lock to retrieve data again.

Let’s take a look at the write exclusive lock as an example:

    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) { // an exclusively locked method
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally{ sl.unlockWrite(stamp); }}Copy the code

Take a look at optimistic read locks:

    double distanceFromOrigin(a) { // A read-only method
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        if(! sl.validate(stamp)) { stamp = sl.readLock();try {
                currentX = x;
                currentY = y;
            } finally{ sl.unlockRead(stamp); }}return Math.sqrt(currentX * currentX + currentY * currentY);
    }
Copy the code

TryOptimisticRead () is used to try to obtain the optimistic read lock, and sl.validate(stamp) is used to determine whether the stamp has changed.

Finally, StampedLock also provides a capability to upgrade read and optimistic read locks to write locks:

   void moveIfAtOrigin(double newX, double newY) { // upgrade
        // Could instead start with optimistic, not read mode
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = sl.tryConvertToWriteLock(stamp);
                if(ws ! =0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                }
                else{ sl.unlockRead(stamp); stamp = sl.writeLock(); }}}finally{ sl.unlock(stamp); }}Copy the code

The above example is upgraded by using tryConvertToWriteLock(Stamp).

Conditions

One of these methods was mentioned in the Lock interface:

Condition newCondition(a);
Copy the code

Condition provides await and signal methods, similar to wait and notify in Object.

The difference is that Condition provides more fine-grained wait set partitioning. Let’s take an example:

public class ConditionUsage {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally{ lock.unlock(); }}public Object take(a) throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally{ lock.unlock(); }}}Copy the code

The above example implements an ArrayBlockingQueue, and we can see that in the same Lock instance, two conditions are created representing the queue is full and the queue is not empty. With this fine-grained division, we can better control the business logic.

Chapter 35 the nature of ABA problem and its solution

Introduction to the

CAS stands for compare and Swap, and is the basis of the Java synchronization classes, which in java.util.Concurrent use CAS to implement their atomicity.

The principle of CAS is very simple, to ensure that in a multi-threaded environment we update the object as expected, or that when one thread updates an object, no other thread changes the object. Before the thread updates an object (or value), it saves the value before the update, and then passes in the saved value during the actual update. If the value is consistent, the update will be performed. Otherwise, the update fails.

Note that CAS is implemented in Java in a native way, taking advantage of the atomic operations provided by the system itself.

So what are the problems in using CAS? Generally speaking, if CAS design is not perfect, ABA problems may occur, and ABA problems can be divided into two categories, let’s first look at the first category.

For more, visit www.flydean.com

The first kind of question

Consider the following case of ABA:

  1. In a multithreaded environment, thread A reads object A from the shared address X.
  2. Before thread A is ready to update address X, thread B changes the value in address X to B.
  3. Thread B then changes the value of address X back to A.
  4. The latest thread A executes CAS on address X and finds that object A is still stored in X. The object matches and the CAS succeeds.

In the example above, the CAS succeeded, but in fact the CAS is not an atomic operation, and there may be hidden bugs if we try to rely on CAS to implement atomic operations.

The key to the first type of problem is in steps 2 and 3. In these two steps we can see that thread B directly replaces the contents of memory address X.

In a programming language with an automatic GC environment, such as Java, 2,3 is not possible because in Java, two objects are equal as long as they have the same address.

Step 2,3 is possible in a programming language like C++ that doesn’t have an automatic GC environment. Since we can control the life cycle of objects, if we delete an object from a list and then re-allocate an object and add it back to the list, then according to MRU memory allocation algorithm, The new object will most likely have the same memory address as the deleted object. This can lead to ABA problems.

Problem of the second kind

If we were in a programming language with automatic GC, would there still be a CAS problem?

Consider the case where we have A list with data A->B->C, and we want to perform A CAS operation to replace A with D to generate A list D->B->C. Consider the following steps:

  1. Thread A reads the head node A of the linked list.
  2. Thread B removes node B from the list and the list becomes A->C
  3. Thread A performs the CAS operation and replaces a with D.

Our final list is D->C, not D->B->C.

What’s the problem? The CAS compares node A to the latest header node, and it does not care whether node A’s content changes between steps 1 and 3.

Let’s take an example:

public void useABAReference(a){
        CustUser a= new CustUser();
        CustUser b= new CustUser();
        CustUser c= new CustUser();
        AtomicReference<CustUser> atomicReference= new AtomicReference<>(a);
        log.info("{}",atomicReference.compareAndSet(a,b));
        log.info("{}",atomicReference.compareAndSet(b,a));
        a.setName("change for new name");
        log.info("{}",atomicReference.compareAndSet(a,c));
    }
Copy the code

In the example above, we use the CAS method of the AtomicReference to determine if the object has changed. After CAS B and A, we modified the name of A. Let’s look at the final output:

[main] INFO com.flydean.aba.ABAUsage - true
[main] INFO com.flydean.aba.ABAUsage - true
[main] INFO com.flydean.aba.ABAUsage - true
Copy the code

The result of all three CAS is true. This indicates that CAS does not care about the changes in the contents of the unified object.

The second type of problem may result in some collection class operations that are not atomic, because you cannot guarantee that other nodes will send changes during the CAS process.

The solution of the first type of problem

The first type of problem doesn’t exist in languages with automatic GC, so let’s focus on how to solve it in languages like C++.

According to the authorities, there are roughly four solutions to the first type of problem:

  1. Use intermediate nodes – Use intermediate nodes that do not represent any data to indicate that some nodes are marked for deletion.
  2. Use automatic GC.
  3. Using Hazard Pointers – Hazard Pointers saves the addresses of nodes that are being accessed by the current thread. Nodes in these Hazard Pointers cannot be modified or deleted.
  4. Use read-copy update (RCU) – a copy of the new structure is made before each update.

The solution of the second type of problem

The second type of problem is actually the CAS problem of the global collection object. A simple solution is to add a version number every time you make a CAS update. If the version number is not the expected version, another thread has updated some nodes in the collection, and the CAS failed this time.

Here’s an example of an AtomicStampedReference:

public void useABAStampReference(a){
        Object a= new Object();
        Object b= new Object();
        Object c= new Object();
        AtomicStampedReference<Object> atomicStampedReference= new AtomicStampedReference(a,0);
        log.info("{}",atomicStampedReference.compareAndSet(a,b,0.1));
        log.info("{}",atomicStampedReference.compareAndSet(b,a,1.2));
        log.info("{}",atomicStampedReference.compareAndSet(a,c,0.1));
    }
Copy the code

The compareAndSet method of AtomicStampedReference has two extra parameters, expectedStamp and newStamp, which are both ints and need to be passed in manually.

conclusion

ABA problems are actually composed of two types of problems that need to be treated and solved separately.

Chapter 36 Concurrency and Read-copy Update (RCU)

Introduction to the

In our last article on concurrency and ABA issues, we mentioned that one solution to the MEMORY RECLAMATION problem in ABA is to use RCU.

See the nature of ABA problems and their solutions for more details. Today, this article will discuss in depth what RCU is and the relationship between RCU and COPy-on-write (COW).

RCU(Read-copy Update) is a synchronization mechanism that was added to the Linux kernel in 2002. The advantage is that multiple readers can be run while updates are being made.

Those familiar with locking should know that for exclusive locking, only one operation is allowed at a time, regardless of whether the operation is read or write.

For read/write locks, simultaneous reads are allowed, but simultaneous writes are not allowed, and the write lock is exclusive, that is, simultaneous writes are not allowed.

RCU can support one write operation and multiple reads at the same time.

For more, visit www.flydean.com

Copy on Write and RCU

What is Copy on Write? What does this have to do with Read Copy Update?

We abbreviate Copy on Write COW, COW is a common algorithm used in concurrency, Java in Java. Util. Concurrent. CopyOnWriteArrayList and Java. Util. Concurrent. CopyOnWriteArraySet.

The essence of COW is that in a concurrent environment, if you want to update an object, you first copy it, modify it in the copied object, and finally point the pointer to the original object back to the updated object.

COW in CopyOnWriteArrayList and CopyOnWriteArraySet is used for traversal.

We know use Iterator to traverse the set time, is not allowed on the outside of the Iterator modification of data collection, only inside the Iterator traverses the modification, otherwise it will throw ConcurrentModificationException.

For CopyOnWriteArrayList and CopyOnWriteArraySet, when you create an Iterator, you make a copy of the original List, and the Iterator is iterated in the copied List. If other threads to modify the original List object, the normal program execution, ConcurrentModificationException.

Also, CopyOnWriteArrayList and Iterators in CopyOnWriteArraySet do not support remove, set, or add methods because they are copied objects that are discarded after being iterated. It doesn’t make any sense to modify it.

In the case of concurrency, COW still has a problem to deal with. That is, when can the copied object be recycled? Can the object be recycled immediately? Are there any other threads accessing this object? To deal with this problem, we need to use the object lifecycle tracing technique, also known as RCU-sync in RCU.

The relationship between RCU and COW is that RCU consists of RCU-SYNC and COW.

Because Java has automatic garbage collection, we don’t need to worry about the life cycle of copying objects, so we usually see COW in Java, not RCU.

RCU processes and apis

We compare RCU to exclusive and read-write locks.

For exclusive locks, these two apis are required:

lock()
unlock()
Copy the code

For read/write locks, you need these four apis:

read_lock()
read_unlock()
write_lock()
write_unlock()
Copy the code

RCU requires the following three apis:

rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
Copy the code

Rcu_read_lock and rcu_read_unlock must occur in pairs, and synchronize_rcu cannot occur between rcu_read_lock and rcu_read_unlock.

Although RCU does not provide any exclusive locks, RCU must satisfy two conditions:

  1. If synchronize_rcu in Thread1(T1) returns before rcu_read_lock in Thread2 (T2), The action happens before SYNCHRONize_rcu must be visible after T2’s rcu_read_lock method.
  2. If T2’s rcu_read_lock method is called before T1’s synchronize_rcu method, the action happens after SYNCHRONize_rcu must not be visible before T2’s rcu_read_unlock method.

It sounds like a mouthful, but it doesn’t matter. Let’s draw a picture to get a sense of it:

Remember that RCU compares synchronize_rcu to rcu_read_lock.

If rcu_read_lock is executed before synchronize_rcu in Thread2 or Thread3, then b=2 must not be visible in T2 or T3.

Rcu_read_lock in Thread4 is executed after synchronize_rcu has started, but rcu_read_unlock is executed after synchronize_rcu has returned, so it is equivalent to Thread5.

In Thread5, rcu_read_lock is executed after synchronize_rcu returns, so a=1 must be visible.

RCU considerations

RCU does not provide a locking mechanism, but allows multiple threads to read simultaneously. Note that RCU allows only one synchronize_rcu operation at a time, so we need to implement synchronize_rcu’s exclusive lock operation ourselves.

So for RCU, it is a write multiple read synchronization mechanism, not a multiple write multiple read synchronization mechanism.

Java implementation of RCU

RCU Java implementation code:

public class RCU {
    final static long NOT_READING = Long.MAX_VALUE;
    final static int MAX_THREADS = 128;
    final AtomicLong reclaimerVersion = new AtomicLong(0);
    final AtomicLongArray readersVersion = new AtomicLongArray(MAX_THREADS);

    public RCU(a) {
        for (int i=0; i < MAX_THREADS; i++) readersVersion.set(i, NOT_READING);
    }

    public static int getTID(a) {
        return (int)(Thread.currentThread().getId() % MAX_THREADS);
    }

    public void read_lock(final int tid) {  // rcu_read_lock()
        final long rv = reclaimerVersion.get();
        readersVersion.set(tid, rv);
        final long nrv = reclaimerVersion.get();
        if(rv ! = nrv) readersVersion.lazySet(tid, nrv); }public void read_unlock(final int tid) { // rcu_read_unlock()
        readersVersion.set(tid, NOT_READING);
    }

    public void synchronize_rcu(a) {
        final long waitForVersion = reclaimerVersion.incrementAndGet();
        for (int i=0; i < MAX_THREADS; i++) {
            while (readersVersion.get(i) < waitForVersion) { } // spin}}}Copy the code

A brief explanation of the RCU implementation:

ReadersVersion is a 128 Long array that holds readings for each reader. By default, reader stores the value NOT_READING, indicating that no data is stored.

These readers are initialized when RCU initializes.

The read_unlock method resets reader to NOT_READING.

The reclaimerVersion stores modified data, and its value is updated in the synchronize_rcu method.

Synchronize_rcu traverses all readers until all have been read.

Finally, the read_lock method will read the value of reclaimerVersion. This is done twice, and if the results are different, the readersVersion.lazySet method is called to delay setting the value of Reader.

Why read it twice? Since both reclaimerVersion and readersVersion are atomic operations, in a multi-threaded environment there is no guarantee that reclaimerVersion will be executed before readersVersion, we need to add a memory barrier: Memory barriers to do this.

conclusion

This paper introduces RCU algorithm and its application. I hope you enjoy it.

The basis of chapter 37 synchronization class AbstractQueuedSynchronizer (AQS)

We introduced many synchronization class before, such as already, Semaphore, CountDownLatch, ReentrantReadWriteLock, FutureTask, etc.

AQS encapsulates many of the details of the design when implementing synchronizers. It provides the wait queues for FIFO and provides an int state to represent the current state.

According to the JDK, it is not recommended that we use AQS directly. We usually need to build an inner class that inherits AQS and override the following methods as needed:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

In these methods, we need to call getState, setState, or compareAndSetState to change the state value.

The above method refers to two types of operations, exclusive (e.g., ReentrantLock) and shared (e.g., Semaphore,CountdownLatch).

The difference is that more than one thread can obtain the synchronization state at the same time.

For example, if we run multiple threads to read at the same time, but only one thread is allowed to write at the same time, then the read lock is a shared operation and the write lock is an exclusive operation.

In synchronous classes built on QAS, the most basic operations are the get and release operations. The state represents the values on which the get and release operations depend.

State is an int value that you can use to indicate any State, such as ReentrantLock to indicate how many times the owner thread has repeatedly acquired the lock. Semaphore uses it to indicate the remaining permissions, while FutureTask uses it to indicate the status of the task (start, run, complete, or cancel). Of course, you can also customize additional state variables to represent other information.

The pseudo-code below represents the form of the get and release operation in AQS:

   Acquire:
       while(! tryAcquire(arg)) { enqueue threadif it is not already queued;
          possibly block current thread;
       }
  
   Release:
       if (tryRelease(arg))
          unblock the first queued thread;
Copy the code

The fetch operation first determines whether the current state allows the fetch operation, and if not, the current thread is queued and may block.

If yes, remove the thread from the queue and run.

Let’s look at a concrete implementation:

public class AQSUsage {

    private final Sync sync= new Sync();

    private class Sync extends AbstractQueuedSynchronizer{
        protected int tryAcquireShared(int ignored){
            return (getState() ==1 )? 1: -1;
        }
        protected boolean tryReleaseShared(int ignored){
            setState(1);
            return true; }}public void release(a) {
        sync.releaseShared(0);
    }
    public void acquire(a) throws InterruptedException {
        sync.acquireSharedInterruptibly(0); }}Copy the code

In the example above, we define an internal class Sync that implements the tryAcquireShared and tryReleaseShared methods, where we determine and set the value of state.

Sync. ReleaseShared and sync. AcquireSharedInterruptibly will call tryAcquireShared and tryReleaseShared method respectively.

As mentioned earlier, many synchronization classes are implemented using AQS, so let’s look at tryAcquire implementations of other standard synchronization classes.

Let’s start with ReentrantLock:

   final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
Copy the code

ReentrantLock supports only exclusive locks. So it needs to implement the tryAcquire method. It also maintains an owner variable that holds the identifier of the current owner thread to implement reentrant locking.

Let’s look at the implementation of Semaphore and CountDownLatch. Since they are shared operations, we need to implement the tryAcqureShared method:

        final int tryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    returnremaining; }}Copy the code

Chapter 38 Use of Java concurrent recovery

Introduction to the

Sanodomain is a concurrency class introduced in Java 5, and as its name implies, sanodomain is used for exchange. This is basically an exchange of objects held between two threads. When the EXCHANGE method is called in one thread, the SERVER waits for the other thread to call the same exchange method.

After both threads call the Exchange method, the parameters passed in are swapped.

The class definition

public class Exchanger<V>
Copy the code

V indicates the type of the object to be exchanged.

Class inheritance

Java.lang.object ↳ Java. Util. Concurrent. Exchanger < V >Copy the code

Sano1100directly inherits from Object.

The constructor

Exchanger() 
Copy the code

San_recovery Provides a non-parameter constructor.

Two main methods

  1. public V exchange(V x) throws InterruptedException

When this method is called, the current thread will wait until another thread calls the same method. When another thread calls Exchange, the current thread will continue to execute.

While waiting, InterruptedException is thrown if other threads interrupt the current thread.

  1. public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

Similar to the first method, except that there is an extra timeout time. If no other thread calls the Exchange method within the timeout period, a TimeoutException is thrown.

Concrete examples

Let’s define a class with swaps:

@Data
public class CustBook {

    private String name;
}
Copy the code

Then define two runnables that call the Exchange method in the run method:

@Slf4j
public class ExchangerOne implements Runnable{

    Exchanger<CustBook> ex;

    ExchangerOne(Exchanger<CustBook> ex){
      this.ex=ex;
    }

    @Override
    public void run(a) {
    CustBook custBook= new CustBook();
        custBook.setName("book one");

        try {
            CustBook exhangeCustBook=ex.exchange(custBook);
            log.info(exhangeCustBook.getName());
        } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
@Slf4j
public class ExchangerTwo implements Runnable{

    Exchanger<CustBook> ex;

    ExchangerTwo(Exchanger<CustBook> ex){
      this.ex=ex;
    }

    @Override
    public void run(a) {
    CustBook custBook= new CustBook();
        custBook.setName("book two");

        try {
            CustBook exhangeCustBook=ex.exchange(custBook);
            log.info(exhangeCustBook.getName());
        } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code

Finally called in the main method:

public class ExchangerUsage {

    public static void main(String[] args) {
        Exchanger<CustBook> exchanger = new Exchanger<>();
        // Starting two threads
        new Thread(new ExchangerOne(exchanger)).start();
        new Thread(newExchangerTwo(exchanger)).start(); }}Copy the code

Let’s take a look at the results:

22:14:09.069 [Thread-1] INFO com.flydean.ExchangerTwo - book one
22:14:09.073 [Thread-0] INFO com.flydean.ExchangerOne - book two
Copy the code

You can see that the object has been swapped.

conclusion

Sano1100this is useful when two threads need to exchange objects. You can use it in your real work life.

Examples of this article github.com/ddean2009/l…

The PDF download of this article is linked to concurrent-all-in-one

Welcome to pay attention to my public number: procedures those things, more wonderful waiting for you! For more, visit www.flydean.com