Click “like” to see, form a habit, the public account search [dime technology] pay attention to more original technical articles. This article has been included in GitHub org_Hejianhui /JavaStudy.

preface

The utility classes that control concurrent flow are designed to help us programmers make it easier for threads to collaborate, and threads to collaborate with each other to satisfy business logic. For example, let thread A wait for thread B to finish executing.

The main tool classes for controlling concurrent processes are:

class role instructions
Semaphore Semaphores can be controlled by controlling the number of “licenses” to ensure coordination between threads Threads cannot continue running until they have a “license,” which is more flexible than other synchronizers
CyclicBarrier Threads wait until enough threads have reached a predetermined number. Once the trigger condition is reached, the next action can be carried out This applies to ready scenarios where threads are waiting on each other for processing results
Phaser Similar to cyclicBarriers, but with variable counts Java 7 to join the
CountDownLatch Like a CyclicBarrier, an action is triggered when the number drops to zero Non-reusable
Exchanger Let the two threads swap objects when appropriate Application scenario: Used to exchange data when two threads are working on different instances of the same class
Condition You can control the “wait” and “wake up” of a thread Is an upgraded version of Object.wait()

Introduction to the

Semaphore, a license used to control the number of concurrent access execution threads over a period of time. Its function is to control the number of threads accessing a specific resource, and the bottom layer depends on the State of AQS. It is a common tool class in production.

For AQS, see ReentrantLock for Concurrent Programming abstract Queue Synchronizer AQS Applications

A semaphore has three and only three operations, and they are all atomic.

  • Initialize, add, and subtract.
  • Add to unblock a process.
  • Reduction can cause a process to enter a block.

Semaphore manages a range of licenses.

  • Each acquire() method blocks until a license is available and then takes one away.
  • Each release() method adds a license, which may release a blocking acquire() method.
  • Instead of using actual license objects, Semaphore only counts the number of licenses available and acts accordingly.

Semaphore releases threads when the counter is not zero. Once it reaches zero, all new threads requesting resources are blocked, including threads adding requests to permissions. Semaphore is not reentrant.

  • Each time a license is requested, the counter is decreased by 1, and each time a license is released, the counter is increased by 1. Once 0 is reached, the new license request thread is suspended.

Semaphore has two modes, fair mode and unfair mode. The default is unfair mode.

  • Fair mode is that the order of calling acquire is the order of obtaining licenses, following the FIFO.
  • The unfair mode is preemptive, meaning that it is possible for a new thread to acquire a license just as it is released, with waiting threads ahead.

Application scenarios

Semaphore can be used to limit traffic, especially when common resources are limited, such as database connections.

Since there is no limit on the number of licenses released when releasing (), the total number of licenses can be increased through this method. ReducePermits () method can reduce the total number of permits, and dynamic adjustment permits can be achieved through these two methods

Analysis: If there is a demand, need to read several million file data, because is IO intensive, we can start the dozens of threads concurrent read, but if after read into memory, also need to be stored in the database, and database connections only 10, this time we will have to control only 10 threads at the same time, access to the database connection, Otherwise, an exception is thrown indicating that the database cannot be connected. In this case, we can use Semaphore for flow control.

The code is as follows:

package com.niuh.tools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/** * 

* Semaphore example *

*/
public class SemaphoreRunner { /** * Number of threads */ private static final int THREAD_COUNT = 30; /** * thread pool */ private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore semaphore = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { executor.execute(new Runnable() { public void run(a) { try { // Get a "license" semaphore.acquire(); // Simulate data saving TimeUnit.SECONDS.sleep(2); System.out.println("save date..."); // Return the "license" semaphore.release(); } catch(InterruptedException e) { e.printStackTrace(); }}}); } executor.shutdown(); }}Copy the code

Source code analysis

Semaphore class diagram

  • Semaphore is implemented by inherits AQS using the internal Syn class.

Its main internal variables and methods are as follows:

The framework flow chart is as follows:

The constructor

  • Permitting indicates the number of permitting threads
  • Fair means fair, and if this is set to true, the next thread to execute will be the one that has waited the longest
public Semaphore(int permits) {
	sync = new NonfairSync(permits);
}
/ * * *@paramPermitstotal number of permits *@paramFair Fair =true Fair lock fair=false Unfair lock */
public Semaphore(int permits, boolean fair) {
	sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code

Inner class synchronizer

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    // Assign setState to the total number of permits
    Sync(int permits) {
        setState(permits);
    }
    // Number of remaining licenses
    final int getPermits(a) {
        return getState();
    }
    // Spin + CAS not fair access
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // Number of available licenses
            int available = getState();
            // After this license is obtained, the remaining license is available
            int remaining = available - acquires;
            // If the remaining license is greater than 0, CAS updates the remaining license. Otherwise, CAS fails to obtain the remaining license
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                returnremaining; }}// Spin + CAS release permission
    // Since there is no limit on the number of licenses to be released, you can dynamically increase the number of licenses through release
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // Currently available licenses
            int current = getState();
            // The value can be updated
            int next = current + releases;
            // If the license update value is negative, indicating the license quantity benefit, throw an error
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // CAS updates the license number
            if (compareAndSetState(current, next))
                return true; }}// Spin + CAS reduces the number of permissions
    final void reducePermits(int reductions) {
        for (;;) {
            // Currently available licenses
            int current = getState();
            / / update the value
            int next = current - reductions;
            // If the updated value is greater than the current remaining license, the benefit is thrown
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // CAS updates license number
            if (compareAndSetState(current, next))
                return; }}// Discard all permissions
    final int drainPermits(a) {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                returncurrent; }}}Copy the code

Unfair model

/** * unfair mode */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        returnnonfairTryAcquireShared(acquires); }}Copy the code

Fair mode

/** * fair mode */
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }
    // Get a license in fair mode
    // Fair mode determines whether a thread is waiting in the synchronization queue regardless of whether the permission is sufficient. If so, the acquisition fails and the queue blocks
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // If a thread is queued, return immediately
            if (hasQueuedPredecessors())
                return -1;
            // Spin + CAS get permission
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                returnremaining; }}}Copy the code

To obtain permission

Semaphore provides two ways to obtain resources.

  • Response interrupt and non-response interrupt.

Response interrupt to obtain resources

Both methods support Interrupt Interrupt mechanism. Acquire () can be used to acquire one semaphore at a time, or acquire(int permits) can be used to acquire a specified number of semaphore at a time.

To obtain a license from semaphore, the thread will block until a license is obtained or interrupted. After obtaining a license, the thread will immediately return and decrease the number of licenses by one. If no licenses are available, the current thread will sleep until:

  1. Some other thread calls the Release method, and the current thread is the next thread to be granted permission
  2. Some other thread interrupts the current thread

If the current thread is set to on by the Acquire method or is interrupted while waiting for permission, InterruptedException is thrown and the interrupted state of the current thread is cleared.

Acquire Execution Process:

public void acquire(a) throws InterruptedException {
	sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}


public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    If the remaining license is >= 0, the license is successfully obtained. <0, the license fails to be obtained
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

/** * Failed to obtain permission, the current thread entered the synchronization queue, queue blocked *@param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // Create a synchronization queue node and merge it into a queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // If the current node is the second node, try to acquire the lock
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return; }}// Block the current thread
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

The code execution steps are as follows:

AQS subclasses use shared mode and need to implement the tryAcquireShared() method.

  1. In a fair lock, as in ReentrantLock, the first check is whether there are other waiting threads in the synchronization queue, or directly return failure. Otherwise subtract the state value and return the remaining semaphore.
  2. An unfair lock directly calls nonfairTryAcquireShared in the parent class as well as ReentrantLock.
// How to obtain an unfair lock
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}


final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();// Get the number of semaphores removed
        int remaining = available - acquires;// The remaining semaphores
        If the semaphores are greater than 0, obtain the shared lock and run compareAndSetState(available, remaining) to return the remaining semaphores
        If the semaphore number is less than or equal to 0, return a negative number
        if (remaining < 0 || compareAndSetState(available, remaining))
            returnremaining; }}// Fair lock acquisition
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1; 
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            returnremaining; }}Copy the code

The state variable is visible with volatile.

/** * The synchronization state. */
private volatile int state;
 
/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a <tt>volatile</tt> read.
 * @return current state value
*/
protected final int getState(a) {
    return state;
}
Copy the code

Do not respond to interrupts to get resources

Both methods do not respond to the Interrupt mechanism, and the other functions are identical to those of the acquire() method.

To obtain a license from semaphore, the thread will block until a license is obtained or interrupted. After obtaining a license, the thread will immediately return and decrease the number of licenses by one. If no licenses are available, the current thread will sleep until:

  1. Some other thread calls the Release method, and the current thread is the next thread to be assigned permissions;
  2. If the current thread is interrupted while waiting for permission, it will continue to wait, but the time for assigning permission to the thread may change compared to when no interruption occurred.
public void acquireUninterruptibly(a) {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
Copy the code

Try to get a semaphore

There are three ways to try to get a semaphore.

  • Attempts to retrieve the semaphore, return true on success, false immediately otherwise, and will not block the current thread.
  • Attempts to acquire a semaphore, returning true if obtained within the specified time, false otherwise.
  • Attempts to acquire a specified number of semaphores, returning true if obtained within the specified time, false otherwise.
public boolean tryAcquire(a) {
    return sync.nonfairTryAcquireShared(1) > =0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
Copy the code

Release restitution permit

Release method, the main function is to release resources, need to ensure that the release execution, otherwise the thread exits but the resource is not released.

  • General code is best written in Finally.
  • If acquire(10) is executed at the beginning, release(10) will be released. If acquire(10) is released at the end, release(10) will be released.
// Try to release the lock
public final boolean release(int arg) {
    // Wake up the successor node in the synchronization queue if the lock release succeeds
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// Put the two codes side by side for comparison and you can see that the structure in release is exactly the same
// The difference is that doReleaseShared has more judgment operations
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();  // Unparksucceeded (h)
        return true;
    }
    return false;
}
Copy the code

Classes that subclass shared mode need to implement the tryReleaseShared() method to determine whether the release is successful.

  • The reason this method is a CAS spin is because Semaphore is a shared lock and there may be multiple threads releasing resources at the same time, so the CAS operation may fail.
// Since there is no limit on the number of licenses to be released, you can dynamically increase the number of licenses through release
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // Get the current license number
        int current = getState();
        // Calculate the number of collected
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS succeeded in changing the number of licenses, returning true
        if (compareAndSetState(current, next))
            return true; }}Copy the code

Once CAS has successfully changed the number of permits, the doReleaseShared() method is called to release the blocked thread.

private void doReleaseShared(a) {
    // spin to wake up the first thread waiting (other threads will be woken up by the first thread passing backwards)
    for (;;) {
        Node h = head;
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;            // loop to recheck cases
                Wake up the first waiting thread
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break; }}Copy the code

Other methods

Gets the current number of remaining semaphores

  • This method returns the value of the state variable in the AQS, the number of semaphores currently remaining.
public int availablePermits(a) {
    return sync.getPermits();
}

// Sync
final int getPermits(a) {
    return getState();
}
Copy the code

Exhausted license quantity

  • Gets and returns all permissions that are immediately available.
  • The drainPermits() method of the Sync class, which takes 1 semaphore and sets the number of available semaphores to 0.
    • For example, there are a total of 10 semaphores and five have already been used. After calling the drainpermitting () method, one semaphore is obtained and the remaining four disappear, making the total number of available semaphores six.
    • Empty the remaining resources with CAS spin.
public int drainPermits(a) {
    return sync.drainPermits();
}

// Sync
final int drainPermits(a) {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            returncurrent; }}Copy the code

Reducing the number of licenses

  • Reduction must be one-way, that is, it can only be reduced, not increased. Use CAS spin to shrink the remaining shared resources.
protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

// Sync
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return; }}Copy the code

There are two points to note when modifying the number of shared resources in the above two methods

  • It’s irreversible
  • Is an operation on remaining resources, not all resources, and returns when the number of remaining resources is insufficient or has reached zero.
  • The resource being occupied does not participate.

Check whether there are any Nodes in the AQS synchronization queue

public final boolean hasQueuedThreads(a) {
    return sync.hasQueuedThreads();
}

// AbstractQueuedSynchronizer
public final boolean hasQueuedThreads(a) {
   // If the head node is not equal to the tail node, there are still elements in the list
   returnhead ! = tail; }Copy the code

conclusion

  • Semaphore’s internal workflow is also based on AQS. Unlike CyclicBarrier and ReentrantLock, conditional queues of AQS are not used, but are operated in synchronous queues, except that the current thread is park.
  • Semaphore is a typical shared lock provided by the JUC package, which provides both fair and unfair working modes by customizing two different synchronizers (FairSync and NonfairSync). Both modes provide time-limited/open-ended, interrupt-responsive/interrupt-unresponsive resource acquisition methods (time-limited acquisition always responds to interrupts in a timely manner), while all release() operations to release resources are unified.

PS: The above code is submitted to Github: github.com/Niuh-Study/…

GitHub Org_Hejianhui /JavaStudy GitHub Hejianhui /JavaStudy