The problem

(1) What is Semaphore?

(2) What are the features of Semaphore?

(3) In what situations is Semaphore usually used?

(4) Can the number of Semaphore licenses be dynamically increased or decreased?

(5) How does Semaphore implement traffic limiting?

Introduction to the

Semaphore, which holds a series of permitseach acquire() call will consume one and each release() call will return one.

features

Semaphore is commonly used to limit the number of accesses to a shared resource at the same time, also known as flow limiting.

Let’s learn how Semaphore is implemented in Java.

Class structure

Semaphore includes Sync, a synchronizer that implements AQS, and its two subclasses FairSync and NonFairSync, indicating that Semaphore also distinguishes between fair and unfair modes.

Source code analysis

This article is relatively simple based on the previous analysis of ReentrantLock and ReentrantReadWriteLock. Some of the methods discussed above will be skipped. If you are interested, you can scroll to the bottom of the article to see the previous article.

The inner class Sync

// java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    // constructor, pass in the number of permissions, put in state
    Sync(int permits) {
        setState(permits);
    }
    // Number of licenses obtained
    final int getPermits(a) {
        return getState();
    }
    // Unfair mode tries to get permission
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // See how many permissions are left
            int available = getState();
            // Subtracting the permissions that need to be obtained this time leaves several permissions
            int remaining = available - acquires;
            // If the remaining license is less than 0, return it directly
            // If the remaining permissions are not less than 0, try atomic updating the value of state and return the remaining permissions on success
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                returnremaining; }}// Release permission
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // See how many permissions are left
            int current = getState();
            // Add permission for this release
            int next = current + releases;
            // Detect overflow
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // If the atom successfully updated the value of state, the release permission was successful, and true is returned
            if (compareAndSetState(current, next))
                return true; }}// Reduce permissions
    final void reducePermits(int reductions) {
        for (;;) {
            // See how many permissions are left
            int current = getState();
            // Subtract the permissions to be reduced
            int next = current - reductions;
            // check the list
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // Atom updates state, returns true on success
            if (compareAndSetState(current, next))
                return; }}// Destroy the license
    final int drainPermits(a) {
        for (;;) {
            // See how many permissions are left
            int current = getState();
            // If 0, return directly
            // If not, update the state atom to 0
            if (current == 0 || compareAndSetState(current, 0))
                returncurrent; }}}Copy the code

Through several implementations of Sync, we get the following information:

(1) Permissions are passed in when the method is constructed;

(2) Permission is stored in the state variable state;

(3) When attempting to obtain a license, the value of state is reduced by 1;

(4) When the value of state is 0, the license cannot be obtained;

(5) When a license is released, the value of state increases by 1;

(6) The number of licenses can be dynamically changed;

The inner class NonfairSync

// java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    // constructor, which calls the parent constructor
    NonfairSync(int permits) {
        super(permits);
    }
    // Try to get permission by calling the parent's nonfairTryAcquireShared() method
    protected int tryAcquireShared(int acquires) {
        returnnonfairTryAcquireShared(acquires); }}Copy the code

In unfair mode, call nonfairTryAcquireShared() directly from the parent class to try to get permission.

The inner class FairSync

// java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    // constructor, which calls the parent constructor
    FairSync(int permits) {
        super(permits);
    }
    // Try to obtain permission
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // Fair mode needs to check if there is a queue in front of it
            // If there is a queue, return failure
            if (hasQueuedPredecessors())
                return -1;
            // No queued attempt is made to update the value of state
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                returnremaining; }}}Copy the code

In fair mode, check whether there is a queue in front first. If there is a queue, the license fails to be obtained and the queue is entered; otherwise, atomic update of the state value is attempted.

A constructor

// constructor, which is created by passing in the number of permissions, using unfair mode by default
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// construct method, need to pass in the number of permissions, and whether fair mode
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code

Semaphore creation requires the number of permissions passed in.

Semaphore is also unfair by default, but you can declare it fair by calling the second constructor.

The following methods are easy to use after learning the previous content. Here are just some of the features Semaphore supports.

The following methods are used to describe unfair patterns.

Acquire () method

public void acquire(a) throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
Copy the code

To obtain a license, the default mode is interruptible. If the attempt to obtain a license fails, it will enter the queue of AQS.

AcquireUninterruptibly () method

public void acquireUninterruptibly(a) {
    sync.acquireShared(1);
}
Copy the code

Obtain a license, non-interrupt mode. If an attempt to obtain a license fails, the AQS queue will be queued.

TryAcquire () method

public boolean tryAcquire(a) {
    return sync.nonfairTryAcquireShared(1) > =0;
}
Copy the code

Try to obtain a license, use Sync’s unfair mode to try to obtain a license method, regardless of whether the license was obtained or not, only try once, will not be queued.

TryAcquire (long Timeout, TimeUnit Unit) method

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
Copy the code

If it fails to obtain a license, the system will wait for a timeout period. If it does not obtain a license within this period, the system will return false; otherwise, the system will return true.

Release () method

public void release(a) {
    sync.releaseShared(1);
}
Copy the code

Release a license, which increases the value of state by one and wakes up the next thread waiting to acquire the license.

Acquire (int permits) method

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
Copy the code

Obtain more than one license at a time, can interrupt mode.

AcquireUninterruptibly (int permits) method

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
Copy the code

Obtain more than one license at a time, in non-interrupt mode.

TryAcquire (int permits) method

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}
Copy the code

Try to get more than one license at a time, try only once.

TryAcquire (int Permits, long timeout, TimeUnit Unit) method

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
Copy the code

Attempts to obtain multiple permissions and waits for a timeout, during which time false is returned, otherwise true is returned.

Release (int permits) method

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
Copy the code

The value of state increases the number of permitts when multiple permitts are released at one time.

AvailablePermits () method

public int availablePermits(a) {
    return sync.getPermits();
}
Copy the code

Get the number of permissions available.

DrainPermits () method

public int drainPermits(a) {
    return sync.drainPermits();
}
Copy the code

Destroying the number of currently available licenses does not affect the number of licenses that have been obtained. The remaining licenses will be destroyed.

ReducePermits (int) reduction method

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}
Copy the code

Reduce the number of permits.

conclusion

(1) Semaphore, also known as Semaphore, is commonly used to control access to shared resources at the same time.

(2) Semaphore’s internal implementation is based on AQS shared lock;

(3) Semaphore initialization needs to specify the number of permissions, the number of permissions is stored in state;

(4) When obtaining a license, the state value is reduced by 1;

(5) When a license is released, the state value increases by 1;

(6) N licenses can be dynamically reduced;

(7) Can N licenses be dynamically added?

eggs

(1) How to dynamically add N licenses?

Call Release (int permitting) Semaphore does not check whether the current thread has been granted permission to release the Semaphore lock. So you can add some licenses dynamically by calling the release license method.

(2) How to achieve current limiting?

A: Current limit, that is, at the time of flow increases suddenly, the upper to be able to restrict sudden heavy traffic impact to the downstream service, in the distributed system in current limit in the gateway layer do you usually do, of course, can also be in individual function simply to current limiting, seconds kill scenes, for example, if only 10 goods need to kill, then the service itself can limit only in 100 requests at the same time, All other requests are nullified so that the service is not too stressed.

Using Semaphore, you can limit the flow directly for this function. Here is the code:

public class SemaphoreTest {
    public static final Semaphore SEMAPHORE = new Semaphore(100);
    public static final AtomicInteger failCount = new AtomicInteger(0);
    public static final AtomicInteger successCount = new AtomicInteger(0);

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            newThread(()->seckill()).start(); }}public static boolean seckill(a) {
        if(! SEMAPHORE.tryAcquire()) { System.out.println("no permits, count="+failCount.incrementAndGet());
            return false;
        }

        try {
            // Process the business logic
            Thread.sleep(2000);
            System.out.println("seckill success, count="+successCount.incrementAndGet());
        } catch (InterruptedException e) {
            // Todo handles exceptions
            e.printStackTrace();
        } finally {
            SEMAPHORE.release();
        }
        return true; }}Copy the code

Recommended reading

The beginning of the Java Synchronization series

2, Unbroadening Java magic class parsing

JMM (Java Memory Model)

Volatile parsing of the Java Synchronization series

Synchronized parsing of Java series

6, Deadknock Java synchronization series write a Lock Lock yourself

7. AQS of The Java Synchronization series

ReentrantLock (a) — fair lock, unfair lock

ReentrantLock – Conditional lock

ReentrantLock VS Synchronized Java series

ReentrantReadWriteLock source code parsing

Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.