Concurrent programming of the three Musketeers

There are three key players in developing high-concurrency systems: caching, degradation, and limiting traffic.

  • Cache The purpose of cache is to increase the system access speed and processing capacity.
  • If a service has a problem or the core process is affected, the service needs to be temporarily blocked and enabled after the peak or the problem is resolved.
  • Traffic limiting The purpose of traffic limiting is to limit the rate of concurrent access/requests or requests within a time window to protect the system. Once the rate reaches the limit, the system can deny services, queue or wait, and degrade.

The idea of limiting flow

Overflow thought:

It’s a fixed size queue. For example, if the traffic limit is set to 5qps, 5 requests can be accepted for 1s. So we make a queue of size 5, and if the queue is full, we reject the request; If the queue is full, the request is added to the queue.

Speed control

Tokens sound pretty cool. Tokens are issued into the bucket at a constant rate. Then the consumer can respond to the request only after acquiring the token each time. The control rate is 5qps by controlling the consumption rate of the consumer, and the consumption rate of 1s is 5.

Algorithms for limiting traffic

There are two commonly used traffic limiting algorithms: leaky bucket algorithm, token bucket algorithm and sliding window (counter) algorithm.

Counting traffic limiting algorithm

The core of both fixed Windows and sliding Windows is to count requests, the only difference is the processing of the counting time interval.

Fixed window count

Realize the principle of
  • The idea of fixed window counting method is relatively simple, and only two parameters need to be determined: counting period T and the maximum number of visits (calls) within the period N. When the request arrives, operate using the following process:

  • Fixed window counting is simple to implement and only needs to record the start time of the previous cycle and the total number of accesses in the cycle, consuming little extra storage space.

Algorithm of defect

The disadvantage of fixed window counting is also very obvious. During the period switch, the total number of visits in the previous period will be immediately set to 0, which may lead to traffic burst during the period switch

Token bucket algorithm

The token-bucket algorithm works by putting tokens into the bucket at a constant rate. If the request needs to be processed, a token needs to be fetched from the bucket first. When no token is available in the bucket, the service is denied.

  • The token bucket algorithm is a bucket that holds fixed capacity tokens and adds tokens to the bucket at a fixed rate.
  • There is a maximum number of tokens that can be stored in a bucket, beyond which they are discarded or rejected.
  • When traffic or network requests arrive, a token is obtained for each request, and if it can be obtained, it is processed directly and a token is removed by the token bucket.
  • If not, the request is curbed and either discarded or waited in the buffer.
advantages

Since tokens are issued at a fixed interval, let’s say 5qps again, if I have no requests for 1s, my token bucket is full and can respond to 5 requests in a flash (5 tokens at a time), which means I can handle instantaneous traffic.

Bucket algorithm

  • The idea of leaky bucket algorithm is very simple. The water (request) enters the leaky bucket first, and the leaky bucket flows out of the water at a certain speed. When the inflow speed is too high, the water directly overflows.

  • The image above is like a funnel, where the amount of water coming in is like access traffic and the amount of water going out is like our system processing requests.
  • When there is too much access, the funnel will fill with water, and if there is too much water it will overflow.

The implementation of leaky bucket algorithm often depends on the queue. If the queue is not full, the request is directly put into the queue, and then a processor takes out the request from the queue head for processing according to a fixed frequency. If the number of requests is large, the queue will be full and new requests will be discarded.

Token buckets versus leaky buckets

  • The token bucket adds tokens to the bucket at a fixed rate. Whether the request is processed depends on whether there are enough tokens in the bucket. When the number of tokens decreases to zero, the new request is rejected. Leaky bucket sends out requests at a constant rate. The inflow rate is arbitrary. When the number of incoming requests reaches the leaky bucket capacity, new incoming requests are rejected.

  • The token bucket limits the average inflow rate and allows emergent requests to be processed as long as there are tokens. It supports taking 3 tokens and 4 tokens at a time.

  • Leaky bucket limits the constant outflow rate, that is, the outflow rate is a fixed constant value, for example, the outflow rate is 1 at the same time and 2 at the next time, so as to smooth the burst inflow rate.

  • The token bucket allows a certain degree of burst, while the leak bucket’s main purpose is to smooth the outflow rate;

In addition to being able to limit the average transmission rate of data, some degree of burst transmission is also required. In this case, the leaky bucket algorithm may not be suitable, but the token bucket algorithm is more suitable.

Application of semaphore

  • Semaphore can control the number of resources that can be accessed at the same time, acquire a license through acquire(), and wait if it does not have one. Release () releases a license.

  • The essence of a semaphore is to control the number of resources that can be accessed simultaneously. To some extent, it can control the access frequency of a resource, but it cannot be controlled precisely.

The idea of limiting flow

The RateLimiter in Guava can limit the rate of a method in a single process. This article describes how to use the RateLimiter in Guava. Please refer to the documentation for the implementation principle. RateLimiter source code analysis (Guava and Sentinel implementations).

Guava RateLimiter

Google’s open source toolkit Guava provides the RateLimiter tool class, which is very convenient to use and implements traffic limiting based on the token bucket algorithm.

Principle: Guava RateLimiter based on the token bucket algorithm,

  • The RateLimiter system limits what QPS are, and RateLimiter will put tokens into the bucket at this rate.
  • Then, when requested, obtain the license (token) from RateLimiter through the tryAcquire() method.

Guava RateLimiter control operation

Guava RateLimiter speed limit

  • RateLimiter conceptually, a RateLimiter allocates licenses at configurable rates. If necessary, each acquire() blocks the current thread until the license is available. Once a permit has been obtained, it does not need to be released.
  • RateLimiter supports a degree of burst request (pre-consumption) by limiting the wait time for subsequent requests.

Maven configuration

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0 the jre</version>
</dependency>
Copy the code

Java Simple Case

public class RateLimiterService {
    // Issue 5 tokens per second
    RateLimiter rateLimiter = RateLimiter.create(5);
    /**
     * 尝试获取令牌
     */
    public boolean tryAcquire(a) {
        return rateLimiter.tryAcquire();
    }
	public  void acquire(a) {
        rateLimiter.acquire();
    }
   public static void main(String[] args){
        if (accessLimitService.tryAcquire()) {
            log.info("start");
            // Simulate business execution for 500 milliseconds
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "access success [" + LocalDateTime.now() + "]";
        } else {
            / / the warn (" current limit ");
            return "access limit [" + LocalDateTime.now() + "]"; }}}public void testMethod(a){
	ExecutorService pool = Executors.newFixedThreadPool(10);
        RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second"
        IntStream.range(0.10).forEach(i -> pool.submit(() -> {
            if (rateLimiter.tryAcquire()) {
                try {
                    log.info("start");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                }
            } else {
                log.warn("Current limit"); }}));public void testMethod2(a){
	ExecutorService pool = Executors.newFixedThreadPool(10);
        RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second"
        IntStream.range(0.10).forEach(i -> pool.submit(() -> {
            rateLimiter.acquire();
            log.info("start");
            try {
                Thread.sleep(500);
            } catch(InterruptedException e) { e.printStackTrace(); }})); pool.shutdown(); }}}Copy the code
public class GuavaRateLimiter {
    public static ConcurrentHashMap<String, RateLimiter> resourceRateLimiter = new ConcurrentHashMap<String, RateLimiter>();
    // Initialize the RateLimiter tool
    static {
        createResourceRateLimiter("order".50);
    }
    public static void createResourceRateLimiter(String resource, double qps) {
        if (resourceRateLimiter.contains(resource)) {
            resourceRateLimiter.get(resource).setRate(qps);
        } else {
            // Create a stream limiting tool that issues 50 tokens per secondRateLimiter rateLimiter = RateLimiter.create(qps); resourceRateLimiter.putIfAbsent(resource, rateLimiter); }}public static void main(String[] args) {
        for (int i = 0; i < 5000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run(a) {
                    // If a token instruction is obtained, the business logic is executed
                    if (resourceRateLimiter.get("order").tryAcquire(10, TimeUnit.MICROSECONDS)) {
                        System.out.println("Execute business logic");
                    } else {
                        System.out.println("Current limit"); } } }).start(); }}}Copy the code

Methods in this paper,

Limiting and creating methods

The create method
public static RateLimiter create(double permitsPerSecond)
Copy the code

RateLimiter is created based on the specified steady throughput rate, which is the number of permissions per second (usually QPS, number of queries per second).

The returned RateLimiter ensures that on average no more than permitsPerSecond are issued during any given second, with sustained requests being smoothly spread over each second. When the incoming request rate exceeds permitsPerSecond the rate limiter will release one permit every (1.0 / permitsPerSecond) seconds. When the rate limiter is unused, bursts of up to permitsPerSecond permits will be allowed, with subsequent requests being smoothly limited at the stable rate of permitsPerSecond.

The returned RateLimiter
  • Ensures that, on average, the number of licenses issued per second does not exceed permitsPerSecond and requests are continuously sent per second.

  • When the rate of incoming requests exceeds permitsPerSecond, the rate limiter releases one permit per second (1.0 / permitsPerSecond is set to 1.0).

  • When the rate limiter is idle, the number of permits is ballooned to permitsPerSecond, and subsequent requests are smoothly capped at the steady rate permitsPerSecond.

Parameters:
  • PermitsPerSecond – the RateLimiter rate returned, which means how many permitsPerSecond become valid.
  • Throws:
  • IllegalArgumentException – If permitsPerSecond is negative or 0
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit)
Copy the code

RateLimiter is created based on the specified steady throughput rate and the warm up period, where the throughput rate is the number of permissions per second (usually QPS, queries per second). During this warm up period, RateLimiter’s number of permits allocated per second grows steadily until it reaches its maximum rate at the end of the warm-up period (as long as there are enough requests to saturate it). Similarly, if the RateLimiter is idle during the warmupPeriod, it will gradually return to the cooling state. That is, it will go through the same warm-up period as it did when it was first created. The RateLimiter returned is primarily used for resources that require a warm-up period that actually satisfies the request (such as a remote service), rather than resources that can be accessed immediately at a steady (maximum) rate. The returned RateLimiter starts in the cooling state (that is, the warm-up period will follow) and returns to the cooling state if it is left unused for a long period of time.

Parameters:
  • PermitsPerSecond – the RateLimiter rate returned, which means how many permitsPerSecond become valid.
  • WarmupPeriod – The period during which RateLimiter increases its rate before reaching its steady rate or maximum rate
  • Unit – Time unit of parameter warmupPeriod
Throws:
  • IllegalArgumentException – If permitsPerSecond is negative or 0

Current limiting and blocking methods

acquire

public double acquire()
Copy the code

Get a license from RateLimiter, which blocks until the request is obtained. If there is a wait, tell the caller how much sleep it needs to get the request. This method is equivalent to acquire(1).

Returns:

time spent sleeping to enforce rate, in seconds; 0.0 if not rate – limited

The amount of sleep time required for the execution rate, in units. If not, 0 is returned

acquire
public double acquire(int permits)
Copy the code

Gets the specified number of permissions from RateLimiter, which blocks until the requested number is obtained. If there is a wait, tell the caller how much sleep it takes to get the number of requests.

Parameters:
  • Permitting – The number of permits to be obtained
Returns:
  • The amount of sleep time required for the execution rate, in units. If not, 0 is returned
Throws:
  • IllegalArgumentException – If the requested permission number is negative or 0

tryAcquire

public boolean tryAcquire(long timeout,TimeUnit unit)
Copy the code

Get permission from RateLimiter Return false immediately (without waiting) if the permission can be obtained within timeout or if it cannot be obtained before timeout expires. This method is equivalent to tryAcquire(1, timeout, unit).

Parameters:
  • Timeout – The maximum time to wait for permission. Negative numbers are treated as 0
  • Unit – The time unit of the timeout parameter
Returns:
  • True indicates that permission has been obtained, or false otherwise
Throws:
  • IllegalArgumentException – If the requested permission number is negative or 0

tryAcquire

public boolean tryAcquire(int permits,long timeout,TimeUnit unit)
Copy the code

Get the specified number of permissions from RateLimiter Return false immediately (without waiting) if the number of permissions can be obtained within timeout, or if it cannot be obtained before timeout expires.

Parameters:
  • Permitting – The number of permits to be obtained
  • Timeout – The maximum amount of time to wait for permission. Negative numbers are treated as 0
  • Unit – The time unit of the timeout parameter
Returns:
  • True indicates that permission has been obtained, or false otherwise

Current limiting and status setting

public final void setRate(double permitsPerSecond)
Copy the code

Update the RateLimite’s steady rate with the parameter permitsPerSecond supplied by the factory method that constructed the RateLimiter. After calling this method, the current limiting thread will not wake up, so they will not notice the latest rate; Only subsequent requests will. Note that since each request reimburses (by waiting, if necessary) the cost of the previous request, this means that the next request immediately following it will not be affected by the latest rate, after setRate is called; It reimburses the cost of the previous request, which depends on the previous rate. RateLimiter’s behavior is not changed in any way, for example, if RateLimiter is configured with a 20-second warmup period, it will still warm up for 20 seconds after the method is called.

Parameters:
  • New steady rate for permitsPerSecond — RateLimiter
Throws:
  • IllegalArgumentException – If permitsPerSecond is negative or 0
public final double getRate()
Copy the code

Returns the steady rate in the RateLimiter configuration, in permits per second. Its initial value is equivalent to permitsPerSecond, the parameter in the factory method that constructed the RateLimiter, and is updated only after setRate(double) is called.