What is Semaphore?

Semaphore is a counting Semaphore. Semaphore manages a range of licenses. Each acquire method blocks until a license is available and then takes a license; Each release method adds a license, which may free a blocking acquire method. However, there is no actual license object, Semaphore just maintains a number of licenses available.

Application scenarios

Semaphore can be used for flow control, especially when common resources are limited, such as database connections. If there is a demand, want to read tens of thousands of file data, because is IO intensive tasks, we can start the dozens of threads concurrent read, but if after read into memory, also need to be stored in the database, and database connections only ten, then we must control only ten thread for connection to the database to save data at the same time, Otherwise, an error will be reported and the database connection cannot be obtained. At this point, we can use Semaphore to do flow control, the code is as follows:

package org.java.base.thread;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors

.newFixedThreadPool(THREAD_COUNT);

private static Semaphore s = new Semaphore(10);

public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System. The out. Println (” save data “); s.release(); } catch (InterruptedException e) { } } }); }

threadPool.shutdown(); }}

In the code, there are 30 threads executing, but only 10 concurrent executions are allowed. Semaphore(int permits) accepts an integer indicating the number of permits available. Semaphore(10) indicates that 10 threads are allowed to obtain the license, i.e., the maximum number of concurrent requests is 10. Semaphore is also very simple to use, first the thread uses Semaphore’s acquire() to acquire a license, then calls Release () to return the license. You can also try to obtain a license using the tryAcquire() method.

Other methods

Semaphore also offers some other methods:

  • Int availablePermits() : Returns the number of permits currently available in this semaphore.
  • Int getQueueLength() : Returns the number of threads waiting for a license.
  • Boolean hasQueuedThreads() : Specifies whether a thread is waiting to obtain a license.
  • Void reducePermits(int Reduction) : Reduces reduction permits. It’s a protected method.
  • Collection getQueuedThreads() : Returns a Collection of all threads waiting to obtain a license. It’s a protected method.

The source code parsing

Semaphore has two modes, fair mode and unfair mode. Fair mode is that the order of calling acquire is the order of obtaining license, following THE FIFO; The non-fair mode is preemptive, meaning it is possible for a new thread to acquire a license just as it is released, with waiting threads ahead.

A constructor

Semaphore has two constructors, as follows:

       public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }Copy the code

As you can see from the above, both constructors must provide the number of permits. The second constructor can specify fair or unfair mode, and the default is unfair mode. Semaphore’s internal shared mode is based on AQS, so implementations are delegated to the Sync class. Here’s a look at the NonfairSync constructor:

 NonfairSync(int permits) {
            super(permits);
        }Copy the code

You can see that the parent constructor is called directly, and Sync constructor looks like this:

Sync(int permits) {
            setState(permits);
        }Copy the code

You can see that the setState method is called, which means that the resources in AQS are the number of licenses.

To obtain permission

Start with getting a license, and look at the implementation in unfair mode. Start with the Acquire method, which has several overloads, but mainly the following one

public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }Copy the code

From the above you can see, call the Sync acquireSharedInterruptibly method, this method in the superclass AQS, as follows:

Public final void acquireSharedInterruptibly (int arg) throws InterruptedException {/ / if the thread has been interrupted, throw an exceptionif(Thread.interrupted()) throw new InterruptedException(); // Failed to get permission. Thread added to wait queueif (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }Copy the code

If the AQS subclass wants to use shared mode, it needs to implement the tryAcquireShared method.

 protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }Copy the code

This method calls the nonfairTyAcquireShared method in its parent class as follows:

final int nonfairTryAcquireShared(int acquires) {
            for(;;) Int available = getState(); Int remaining = available - acquires; // Return if the number of permissions is insufficient or can be resetif (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    returnremaining; }}Copy the code

As you can see from the above, the return value is less than 0 only when there are insufficient permits, and the remaining number of permits is returned, which explains why subsequent threads will block if there are insufficient permits. Having looked at unfair acquisition, let’s look at fair acquisition. The code is as follows:

 protected int tryAcquireShared(int acquires) {
            for(;;) {// If a thread is waiting, return -1if (hasQueuedPredecessors())
                    return- 1; Int available = getState(); int remaining = available - acquires;if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    returnremaining; }}Copy the code

If a thread is waiting in the queue, it will enter the queue. If a thread is waiting in the queue, it will enter the queue. Unlike NonfairSync, where you try first and just might get a permit to jump the queue. After looking at the license, look at the release license.

Release the license

There are several overloaded methods for releasing permissions, but they all call the following method with arguments,

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }Copy the code

ReleaseShared method in AQS, as follows:

Public Final Boolean releaseShared(int arg) {// If the license number is changed successfullyif (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }Copy the code

AQS subclasses implementing shared mode need to implement the tryReleaseShared class to determine whether the release is successful, the implementation is as follows:

protected final boolean tryReleaseShared(int releases) {
            for(;;) Int current = getState(); Int next = current + releases;if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded"); //CAS succeeded in changing the number of licensestrue
                if (compareAndSetState(current, next))
                    return true; }}Copy the code

As you can see above, once CAS has successfully changed the number of licenses, the doReleaseShared() method is called to release the blocked thread.

Reduced number of permits

Semaphore also has a method of reducing the number of licenses, which can be used to reduce licenses when resources run out and can no longer be used. The code is as follows:

protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }Copy the code

As can be seen, entrusted to Sync, Sync’s reducePermits method is as follows:

  final void reducePermits(int reductions) {
            for(;;) Int current = getState(); // have the number of bilaterals removed from the ship.if (next > current) // underflow
                    throw new Error("Permit count underflow"); // If the CAS change succeedsif (compareAndSetState(current, next))
                    return; }}Copy the code

As you can see above, it is CAS that changes the state variable in the AQS because it represents the number of licenses.

Obtain the number of remaining licenses

Semaphore can also take the remaining license quantity all at once, using the drain method as follows:

public int drainPermits() {
        return sync.drainPermits();
    }Copy the code

Sync is implemented as follows:

 final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    returncurrent; }}Copy the code

As you can see, CAS sets the number of permits to 0.

conclusion

Semaphore is a Semaphore used to manage a group of resources. The internal sharing mode is based on AQS. The status of AQS indicates the number of licenses. If the number of licenses is insufficient, the thread will be suspended. Once a thread releases a resource, it is possible to wake up the threads in the waiting queue to continue executing.