This article mainly introduces and compares several concurrent tool classes we commonly use, mainly involvingCountDownLatch 、 CyclicBarrier 、 Semaphore 、 ExchangerIf you are not familiar with multithreading, you can read some of my previous articles:

  • Concurrent Programming in Java – Threading Basics
  • Finally, the thread six states of the transition clear!
  • Explain the meaning of thread pool parameters
  • “Know the four Rejection strategies for thread pools?”
  • Six common Thread pools in Java
  • In-depth Analysis of Synchronized Locks πŸ’‘ recommended
  • Common Blocking queues in JAVA
  • Gracefully Closing thread Pools

  • introduceCountDownLatch 、CyclicBarrierThe use and difference of the two, they are waiting for the completion of multi-threading, is a control means of concurrent process,
  • introduceSemaphore,ExchangerThe use ofsemaphoreIs a semaphore that can be used to control the number of threads allowed, whileExchangerCan be used to exchange data between two threads.

CountDownLatch

  • CountDownLatch 是 JDK5Later, a concurrent flow control tool was addedjava.util.concurrentUnder the bag
  • CountDownLatchAllows one or more threads to wait for other threads to complete an operation. Note that this can be one or more threads
  • CountDownLatchThe constructor of theintType parameter as counter, i.e. if you want to waitNOne thread completes, so this is passed inN
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
Copy the code
  • There are two core approachescountDown 与 await, where we callcountDownMethod corresponding toNMinus 1, minus 1awaitMethod blocks the current thread untilNBecomes zero.
  • It’s quite abstract, but let’s illustrate it through a real case.

Multiple threads wait for one thread

  • In our life, the most typical case is running in the sports, suppose now that we are going to have a race, so all the players need to wait for the referee’s start command, at this time, we will be the abstraction of each player is a thread, and the referee is a thread, it is multiple player thread to wait for the thread. The command to execute
  • We’re throughCountDownLatchTo implement this case, so the number of waitsNIs the number of referee threads above, which is 1,

    / * * *@urlI-code.onlien * Cloud simple code */
    public static void main(String[] args) throws InterruptedException {
        // In a simulated running race, the referee says go, all runners start running, we can use countDownlatch to do this

        // Wait for the referee to say start, so wait for a thread
        CountDownLatch countDownLatch = new CountDownLatch(1);

        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"Ready");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"Start running ~~");

        },"Player 1").start();
        new Thread(() ->{
            try {
                System.out.println(Thread.currentThread().getName() +"Ready");
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"Start running ~~");

        },"Player 2").start();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("Referee: Get ready...");
        countDownLatch.countDown();
        System.out.println("Referee: Run...");
    }

Copy the code
  • The running results are as follows:

In the above code, we first create a CountDownLatch object with a count of 1, which represents the number of threads we need to wait for. Then we create two more threads, which represent the contestant’s threads. In the contestant’s thread, we call await method to block the thread. Wait until the count of CountDownLatch is zero before executing. In the main thread we wait for 1 second before executing the countDown method, which will countDown by one, at which point N will be zero, and the runner thread will execute the rest. The overall output is shown in the figure above

One/more threads waits for more threads

  • Similarly, it is abstracted from the scene in our life. Suppose the company wants to organize a trip and pick up the bus. When there are five people, the bus will startCountDownLatchTo implement, then count the number ofNIt is 5, because it waits for these five, which is implemented in code as follows:

    public static void main(String[] args) throws InterruptedException {
        /** * i-code.online */
        // Number of waits
        CountDownLatch countDownLatch = new CountDownLatch(5);

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "From the residence...");
                try {
                    TimeUnit.SECONDS.sleep((long) (Math.random()*10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "Arriving at destination -----");
                countDownLatch.countDown();
            },"-"+i).start();
        }

        System.out.println("Bus is waiting for personnel.....");
        countDownLatch.await();
        System.out.println("----- Everyone here, let's go -----");
    }

Copy the code
  • The result of the above code execution is as follows:

As we can see from the code above, we define a countDownLatch with a count of 5, then create five threads through a loop, simulate five people, execute the countDown method when they reach the specified location, and subtract one from the count. The main thread is equivalent to the bus thread, blocks with await method and only executes subsequent output if N is reduced to 0

CountDownLatch main methods are introduced

  • Constructor:
public CountDownLatch(int count) {};Copy the code

Its constructor is to pass in an argument, count, which is the value to be reciprocated.

  • await(): callawait()The thread of the method starts to wait until the countdown ends, i.ecount A value of0Will continue to execute.
  • await(long timeout, TimeUnit unit):await() There is an overloaded method that passes in a timeout parameterawait()Similar, but here you can set the timeout, if the timeout, no longer wait.
  • countDown(): Invert the value1, that is, willcountValue reduction1, until it is reduced to zero0, the previously waiting thread will be aroused.

The above case introduces the use of CountDownLatch, but CountDownLatch has a feature that it cannot be reused. For example, if CountDownLatch has been completed, can it be counted again next time? Once the countdown to 0 is over, you can’t set up a loop again. However, there are many scenarios that need to be CyclicBarrier

CyclicBarrier

  • CyclicBarrier 与 CountDownLatchSimilarly, when a certain number of threads are waiting to execute a task
  • CyclicBarrierThe barrier will be blocked when a group of threads reaches a barrier (synchronization point). The barrier will not convene until the last thread reaches the barrier, at which point all threads blocked by the barrier will continue to execute. The following presentation

  • As you can see in the figure above, it blocks until the last thread reaches the barrier, and then all threads are released
  • Let’s first look at its constructor, as follows:
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
Copy the code
  • CyclicBarrier(int parties)The constructor providesintType, which represents the number of threads that need to be intercepted per thread through the callawaitMethods to tellCyclicBarrierI reached the barrier point and blocked
  • CyclicBarrier(int parties, Runnable barrierAction)The constructor is an advanced method provided for us, plus onebarrierActionParameter, this is aRunnableType, that is, a thread, which means that when all threads have reached the barrier, they fire leisurelybarrierActionThe thread executes, and then executes what follows each thread

case

  • Let’s say you have a date with your girlfriend, and you set a time and place. Whichever one of you arrives first will wait for the other to arrive before setting out to pick up the dateCyclicBarrierHere we need to intercept two threads. The concrete implementation is as follows:
    /* The number of countDownLatch threads waiting for countDownLatch to complete is await. CountDownLatch waits for countDownLatch threads to complete. CyclicBarrier is the number of 'await' items, */
    public static void main(String[] args) {
        // It's like a couple's date. Whoever arrives first has to wait for the other one.
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        new Thread(() ->{
            System.out.println("Pack up quickly and get out...");
            try {
                TimeUnit.MILLISECONDS.sleep(500);
                System.out.println("Waiting for my girlfriend to arrive at the rendezvous place ~~");
                cyclicBarrier.await();
                System.out.println("Girlfriend arrival happy departure ~~ date");
            } catch(InterruptedException | BrokenBarrierException e) { e.printStackTrace(); }},"Boyfriend").start();
        new Thread(() ->{
            System.out.println("Take your time and get out.");
            try {
                TimeUnit.MILLISECONDS.sleep(5000);
                System.out.println("Waiting for my boyfriend to arrive at the rendezvous place ~~");
                cyclicBarrier.await();
                System.out.println("Boyfriend arrives happy departure ~~ date");
            } catch(InterruptedException | BrokenBarrierException e) { e.printStackTrace(); }},"Girlfriend").start();

    }
Copy the code
  • The code execution result is as follows:

The above code, which is relatively simple, creates a barrier with 2 interceptors, then creates two threads, calls the await method, and triggers the subsequent flow only if called twice.

  • Let’s write another case, sh, using containsRunnableParameter constructors; And beforeCountDownLatchThe company organizes a trip, and there must be a lot of buses waiting for pickup at this time. The buses will not leave until all the people arrive, but each full bus will leave. We can use this scenarioCyclicBarrierTo implement, implement as follows:

    /* CyclicBarrier is reusable, that is, every time several requirements are not to wait for execution, for example, the company organized a tour, arrange a lot of cars, every one is full, no longer wait, such a scenario, implementation as follows: */

    public static void main(String[] args) {
        // Number of employees
        int peopleNum = 2000;
        // There is one car for every 25 people
        CyclicBarrier cyclicBarrier = new CyclicBarrier(25, () - > {// Start with 25 people
            System.out.println("------------25 head out ------------");
        });

        for (int j = 1; j <= peopleNum; j++) {
            new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start(); }}static class PeopleTask implements Runnable{

        private String name;
        private  CyclicBarrier cyclicBarrier;
        public PeopleTask(String name,CyclicBarrier cyclicBarrier){
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run(a) {
            System.out.println(name+"From home, heading to rendezvous....");
            try {
                TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name+"Arrive at the assembly point and wait for the others...");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch(BrokenBarrierException e) { e.printStackTrace(); }}}Copy the code

Similarities and differences between CyclicBarrier and CountDownLatch

Similarities:

  • Can block one or a group of threads until a preset condition is reached and then set off in unison

Difference:

  • Repeatability:CountDownLatchCan only be used once, and cannot be used again after 0 unless a new instance is created; whileCyclicBarrierThe counter is reusable loop, soCyclicBarrierIt can be used in more complex scenarios and can be called at any timeresetMethod to reproduce the number of intercepts, such as calculation error can directly recharge the counter, let the thread to execute again.
  • Action Object:CyclicBarrierA fixed number of threads must reach the barrier before execution can continue, and CountDownLatchJust wait for the numbers to count down0That is to sayCountDownLatch Student: On the event, butCyclicBarrier Applied to threads;CountDownLatch It’s calledcountDown After the method, subtract the number backwards 1And theCyclicBarrier Subtracting the count after a thread starts to wait1.
  • Perform an action:CyclicBarrierThere are executive actionsbarrierActionAnd theCountDownLatch It doesn’t do that.

Semaphore

  • Semaphore(Semaphore) is used to control the number of threads accessing a particular resource at the same time. It coordinates the threads to ensure proper use of common resources.

  • As you can see from the figure, one of the primary roles of semaphores is to control resources that need to limit concurrent access. Specifically, semaphores maintain a count of “licenses,” which threads must obtain before accessing a shared resource (acquireMethods). A thread can “acquire” a license from a semaphore. Once the thread has acquired the license, the semaphore’s license is transferred, so the remaining license in the semaphore’s hand is reduced by one.
  • Similarly, a thread can “release” a license if the thread releases the license (releaseMethod), the license is returned to the semaphore, so the number of licenses available in the semaphore is increased by one. When the number of licenses owned by a semaphore drops to zero, if the next thread still wants to acquire the license, that thread must wait until the previously licensed thread is released before it can acquire it. Since threads cannot further access a protected shared resource without obtaining a license, this controls concurrent access to the resource. That’s the whole idea.

case

  • As is typical of database operations in our daily development, this is an intensive oneIOOperation, we can start many threads but the database connection pool is limited, let’s say we set up to allow five links, if we open too many threads direct operation will be abnormal, this time we can control through the semaphore, at most five threads to get connections at all times. The code is as follows:
    /* Semaphore is a Semaphore that can be used to control the number of concurrent threads. It can be used to coordinate the use of common resources */

    public static void main(String[] args) {
        // Create a thread pool of 10 capacity
        final ExecutorService service = Executors.newFixedThreadPool(100);
        // Set the semaphore to a value of 5, that is, allow five threads to execute
        Semaphore s = new Semaphore(5);
        for (int i = 0; i < 100; i++) {
            service.submit(() ->{
                try {
                    s.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println("Database time-consuming operation"+Thread.currentThread().getName());
                    TimeUnit.MILLISECONDS.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "Executing...."); s.release(); }); }}Copy the code

In the code above, we created a thread pool of 100 capacity to simulate the large number of threads in our program, adding 100 tasks and letting the thread pool execute them. Create a semaphore of capacity 5, in the thread we call acquire to obtain the permission of the semaphore, only get the following content or block. When the license is released, using the release method,

  • The semaphore capacity is set to 1. The semaphore capacity is set to 1. The semaphore capacity is set to 1. This is of course possible, when we set it to 1, it is actually consistent with our lock function, as follows:
    private static int count = 0;
    /* Semaphore works like a lock if the number of licenses we allow is 1. * /
    public static void main(String[] args) throws InterruptedException {
        final ExecutorService service = Executors.newFixedThreadPool(10);

        Semaphore semaphore = new Semaphore(1);
        for (int i = 0; i < 10000; i++) {
            service.submit(() ->{
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "Executed.");
                count ++;
                semaphore.release();
            });
        }
        service.shutdown();
        TimeUnit.SECONDS.sleep(5);
        System.out.println(count);

    }
Copy the code

Other main methods are introduced

  • public boolean tryAcquire():tryAcquireAnd the locktrylockThe thinking is the same, it’s trying to get a license, it’s kind of like seeing if there’s a license available right now, get it if there’s one, and if you can’t get it right now, it doesn’t matter, you don’t have to get blocked, you can do something else.
  • public boolean tryAcquire(long timeout, TimeUnit unit): is an overloaded method that passes in a timeout. For example, if 3 seconds is passed in, the maximum waiting time is 3 seconds. If the license is obtained during the waiting period, the execution continues. If no license is obtained after the timeout, it considers the license failed and returns false.
  • int availablePermits(): Returns the number of licenses currently available in this semaphore
  • int getQueueLength(): Returns the number of threads waiting for a license
  • boolean hasQueuedThreads(): Determines whether any thread is waiting for a license
  • void reducePermits(int reduction): reduce reductionA license. A licenseprotectedmethods
  • Collection<Thread> getQueuedThreads(): returns the collection of threads that are waiting for a licenseprotected methods

Exchanger

  • Exchanger(exchange) is a tool for collaboration between threads, it is mainly used to exchange data between threads, it has a synchronization point, when two threads reached synchronization points to their data to the other party, if a thread synchronization points to reach synchronization points will wait for another, to reach synchronization points after the callexchangeMethods can pass their own data and get each other’s data.
  • Let’s say we need to enter some important billing information. To be prepared, have two people enter it separately and then compare it to see if it’s the same to prevent errors from multiplying. To demonstrate this, use the following code:
public class ExchangerTest {

    / Sano1100Exchange, a tool class used for collaboration between threads, which exchanges data between threads, which provides a synchronization point where threads can interact with data when they reach the synchronization point, which is used for genetic algorithms, and correction work */

    public static void main(String[] args) {
        /* Analog two staff entry records, in order to prevent errors, both record the same content, the program only from proofreading, see if there are errors inconsistent */

        // Create two capacity thread pools
        final ExecutorService service = Executors.newFixedThreadPool(2);

        Exchanger<InfoMsg> exchanger = new Exchanger<>();

        service.submit(() ->{
            // simulate data thread A
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="This is thread A.";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("Performing other...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("Thread A exchanges data ====== gets"+ exchange);
                if(! exchange.equals(infoMsg)){ System.out.println("Data inconsistent ~~ please check");
                    return; }}catch(InterruptedException e) { e.printStackTrace(); }}); service.submit(() ->{// simulate data thread B
            InfoMsg infoMsg = new InfoMsg();
            infoMsg.content="This is thread B.";
            infoMsg.id ="10001";
            infoMsg.desc = "1";
            infoMsg.message = "message";
            System.out.println("Performing other...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                final InfoMsg exchange = exchanger.exchange(infoMsg);
                System.out.println("Thread B exchanges data ====== gets"+ exchange);
                if(! exchange.equals(infoMsg)){ System.out.println("Data inconsistent ~~ please check");
                    return; }}catch(InterruptedException e) { e.printStackTrace(); }}); service.shutdown(); }static class InfoMsg{
        String id;
        String name;
        String message;
        String content;
        String desc;

        @Override
        public String toString(a) {
            return "InfoMsg{" +
                    "id='" + id + '\' ' +
                    ", name='" + name + '\' ' +
                    ", message='" + message + '\' ' +
                    ", content='" + content + '\' ' +
                    ", desc='" + desc + '\' ' +
                    '} ';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null|| getClass() ! = o.getClass())return false;
            InfoMsg infoMsg = (InfoMsg) o;
            return Objects.equals(id, infoMsg.id) &&
                    Objects.equals(name, infoMsg.name) &&
                    Objects.equals(message, infoMsg.message) &&
                    Objects.equals(content, infoMsg.content) &&
                    Objects.equals(desc, infoMsg.desc);
        }

        @Override
        public int hashCode(a) {
            returnObjects.hash(id, name, message, content, desc); }}}Copy the code
  • The running results are as follows:

As can be seen from the above code running, when thread A/B reaches the synchronization point, we call exchange to exchange data, get the other party’s data and compare it with our own data to achieve the effect of auditing

  • ExchangerIt can also be used in genetic algorithms, where two objects are selected to interact with each other and the data of the two objects are given two confounding results by crossing the rules.
  • ExchangerZhonghi offers a waypublic V exchange(V x, long timeout, TimeUnit unit)This is mainly used in case one of the two programs never executesexchangeYou can use this method, set the timeout period, beyond this time, no longer wait.


This article was published by AnonyStar. It may be reproduced but the original source must be claimed. Welcome to pay attention to wechat public account: cloud habitat Jane code to obtain more quality articles more articles to pay attention to the author’s blog: cloud habitat Jane code i-code.online