In order to ensure the high availability and stability of the system when developing high concurrency systems, the following three schemes are circulating in the industry

  • Cache: Improves system access speed and increases system processing capacity
  • Degraded: When the service is faulty or affects core services, it is temporarily shielded and can be enabled after the peak period or the fault is resolved
  • Flow limiting: Protects the system by limiting concurrent (or time-limited) requests to denial of service (directed to an error page or indicating that the resource is unavailable), queuing (e.g., seckill, comment, order), demotion (return of bottom data or default data)

The following focuses on the technologies related to lower limit flow. Generally speaking, the common treatment methods for current limiting are:

  • counter
  • The sliding window
  • Bucket algorithm
  • Token bucket algorithm

1. The counter

The counter is a relatively simple and crude current limiting algorithm:

The requests are counted over a period of time, compared to the threshold to determine whether traffic limiting is needed, and once the critical time point is reached, the counter is cleared

Two common practices are discussed below:

Plan a

If the request frequency is too high, subsequent requests are discarded

/** Flow limiting mode - The counter counts the requests within a period of time and determines whether the traffic limiting is needed against the threshold value. Once the critical point of time is reached, the counter is cleared. PlanA: Discard subsequent requests */ if the request frequency is too high
package main

import (
   "fmt"
   "sync"
   "time"
)

type LimitRate struct {
   rate  int           // Maximum number of requests allowed in counting period (can be equal to)
   begin time.Time     // Count the start time
   cycle time.Duration // Count cycles
   count int           // Total number of requests received in the count period
   lock  sync.Mutex
}

func (limit *LimitRate) Create(r int, cycle time.Duration) {
   limit.rate = r
   limit.begin = time.Now()
   limit.cycle = cycle // Initializes the counting period, i.e., the maximum number of requests allowed in the rate during this period
   limit.count = 0
}

func (limit *LimitRate) Reset(a) {
   limit.begin = time.Now()
   limit.count = 0
}

func (limit *LimitRate) Allow(a) bool {
   //(1) Concurrency needs to be locked to ensure safe operation of the coroutine
   limit.lock.Lock()
   defer limit.lock.Unlock()

   //(2) If the number of counts (the count has been increased by 1) exceeds the maximum number of allowed requests in the count period, determine whether the count period has reached
   if limit.count+1 > limit.rate {
      now := time.Now()
      If the interval between the current time and the last reset is greater than or equal to the count cycle, the counter needs to be reset
      if now.Sub(limit.begin) >= limit.cycle {
         limit.Reset()
         return false
         If the count period is shorter than the count period, the request is filtered directly
      } else {
         return false}}else {
      limit.count++
      return true}}func main(a) {
   var limiter LimitRate
   limiter.Create(2, time.Second) // Only two requests are allowed to pass within 1s
   wg := sync.WaitGroup{}
   // The simulation sends 20 requests in 2s, and normally 4 requests pass
   for i := 1; i <= 20; i++ {
      wg.Add(1)
      go func(i int) {
         if limiter.Allow() {
            fmt.Println("Response req", i, time.Now())
         }
         wg.Done()
      }(i)
      time.Sleep(100 * time.Millisecond)
   }
   wg.Wait()
}
Copy the code

In this example, one request is created every 100ms, which is a total of 20 requests and takes 2s. The speed is limited to 2 requests within 1s, so a maximum of 4 responses can be received. The running results are as follows:

Response Req 1 2021-08-27 10:58:13.831596 +0800 CST M =+0.000336018 Response Req 2 2021-08-27 10:58:13.936241 +0800 CST M =+0.104981751 Response req 12 2021-08-27 10:58:14.965243 +0800 CST M =+1.133992389 Response Req 13 2021-08-27 10:58:15. 069271 + 0800 CST m = + 1.238020186Copy the code

Scheme 2

When the frequency of requests is too high and subsequent requests wait for the previous one to complete, you need to modify the Allow method slightly.

func (limit *LimitRate) Allow(a) bool {
   //(1) Concurrency needs to be locked to ensure safe operation of the coroutine
    limit.lock.Lock()
    defer limit.lock.Unlock()

    //(2) If the number of counts (the count has been increased by 1) exceeds the maximum number of allowed requests in the count period, determine whether the count period has reached
    if limit.count+1 > limit.rate {
        // Loop to wait for requests to avoid discarding them directly
        for {
            now := time.Now()
            If the interval between the current time and the last reset is greater than or equal to the count cycle, the counter needs to be reset
            if now.Sub(limit.begin) >= limit.cycle {
                limit.Reset()
                break}}}//(3) need counter increment anyway
    limit.count++
    return true
}

Copy the code

In this example, one request is created every 100ms, which is a total of 20 requests and takes 2s. The speed is limited to two requests at most within 1s, so 20 responses can be received at most, but it takes 10s. The running results are as follows:

Response Req 1 2021-08-27 11:01:28.604854 +0800 CST M =+0.000241569 Response Req 2 2021-08-27 11:01:28.709166 +0800 CST M =+0.104554366 Response Req 3 2021-08-27 11:01:29.60482 +0800 CST M =+1.000214712 Response Req 4 2021-08-27 11:01:29.60486 +0800 CST m=+1.000254239 Response Req 5 2021-08-27 11:01:30.604809 +0800 CST M =+2.000209879 Response req 6 2021-08-27 11:01:30.604842 +0800 CST M =+2.000242591 Response Req 7 2021-08-27 11:01:31.604803 +0800 CST M =+3.000210503 Response Req 8 2021-08-27 11:01:31.604832 +0800 CST M =+3.000239866 Response Req 9 2021-08-27 11:01:32.604797 +0800 CST M =+4.000211440 Response Req 10 2021-08-27 11:01:32.604822 +0800 CST M =+4.000236953 Response Req 11 2021-08-27 11:01:33.604789 +0800 CST m=+5.000210250 Response Req 12 2021-08-27 11:01:33.60482 +0800 CST m=+5.000241864 Response Req 12 2021-08-27 11:01:33.60482 +0800 CST m=+5.000241864 Response Req 13 2021-08-27 11:01:34.604783 +0800 CST M =+6.000211564 Response Req 14 2021-08-27 11:01:34.604814 +0800 CST M =+6.000242227 Response Req 15 2021-08-27 11:01:35.604779 +0800 CST M =+7.000214163 Response Req 16 2021-08-27 11:01:35.604818 +0800 CST m=+7.000252973 Response Req 17 2021-08-27 11:01:36.60477 +0800 CST m=+8.000211783 Response req 18 2021-08-27 11:01:36.604802 +0800 CST M =+8.000244230 Response Req 19 2021-08-27 11:01:37.604764 +0800 CST M =+9.000213264 Response req 20 2021-08-27 11:01:37.604834 +0800 CST M =+9.000282691Copy the code

You can see that 20 requests were processed, but the limit is still 2 requests per second.

Time critical point defect exists in counter algorithm. For example, the speed limit is 100 requests per minute (1.7 requests per second maximum), there are no user requests between 00:00:00 and 00:00:58, then 100 requests are sent at 00:00:59, which is allowed, and then 100 requests are sent at 00:01:00. With 200 requests issued in just 1s, the system may be overwhelmed by a large number of requests from malicious users and even penetrate the system.

2. Sliding Windows (TODO)

Critical point defect for counter

The sliding window divides the fixed time slice, and moves with the passage of time. The fixed number of movable grids is counted and the threshold is judged

3. Leaky bucket algorithm

The leaky bucket algorithm is described as follows:

  • A leaky bucket with a fixed capacity drains water droplets at a fixed rate
  • If the bucket is empty, no water droplets need to flow out
  • Water droplets can flow into the drain bucket at any rate
  • If the incoming water droplets exceed the bucket’s capacity, they overflow (are discarded)

Leaky bucket algorithm idea is very simple, the water (request) first into the leaky bucket, the leaky bucket at a certain speed of water, when the water speed is too high, directly overflow, it can be seen that the leaky bucket algorithm can forcibly limit the data transmission rate. In layman’s terms, we have a fixed capacity bucket with water coming in and water going out. For the water that flows in, we can’t predict how much water will flow in, or how fast it will flow. But for the outgoing water, the bucket can fix the flow rate (processing speed), thus achieving Traffic Shaping and Traffic Policing.

/** The leaky bucket algorithm is very simple, the water (request) first into the leaky bucket, the leaky bucket at a certain speed, when the water inflow speed is too large, directly overflow, can be seen that the leaky bucket algorithm can forcibly limit the data transmission rate */
package main

import (
   "fmt"
   "math"
   "sync"
   "time"
)

type LeakyBucket struct {
   rate        float64 // Fixed water outlet rate per second
   capacity    float64 // The capacity of a bucket
   water       float64 // The current amount of water in the bucket
   lastLeakyMs int64   // Pail last leak time stamp ms

   lock sync.Mutex
}

func (lb *LeakyBucket) Create(rate float64, capacity float64) {
   lb.rate = rate
   lb.capacity = capacity
   lb.water = 0
   lb.lastLeakyMs = time.Now().UnixNano() / 1e6

   //(2) Execute the leakage operation asynchronously
   go func(a) {
      for {
         lb.lock.Lock()
         now := time.Now().UnixNano() / 1e6 //ms
         expand := float64(now - lb.lastLeakyMs) / 1000 *  lb.rate
         //(2.1) To calculate the remaining water, determine whether the bucket has been dry
         lb.water = lb.water - expand
         lb.water = math.Max(0, lb.water)
         lb.lastLeakyMs = now
         lb.lock.Unlock()
         //(2.2) Sleep for a period of time before performing the leakage operation
         time.Sleep(time.Duration(1e3/rate) * time.Millisecond)
      }
   }()
}

func (lb *LeakyBucket) Allow(a) bool {
   lb.lock.Lock()
   defer lb.lock.Unlock()

   // If the current amount of water in the bucket is greater than the capacity of the bucket after the new amount of water is added, the bucket is full and the overflow operation is performed
   if lb.water+1 > lb.capacity {
      return false
   } else {
      lb.water++
      return true}}func main(a) {
   var leaky LeakyBucket
   leaky.Create(3.3)

   wg := sync.WaitGroup{}

   for i := 1; i <= 40; i++ {
      wg.Add(1)

      go func(i int) {
         if leaky.Allow() {
            fmt.Println("Response req", i, time.Now())
         }
         wg.Done()
      }(i)
      time.Sleep(100 * time.Millisecond)
   }
   wg.Wait()
}
Copy the code

Create 40 requests every 0.1s. As the requests “drip” into the bucket, the bucket drains water at a rate of 3 units per second. Part of the requests are discarded when processing:

2021-08-27 11:27:35.490245 +0800 CST M =+0.000148829 Response REq 2 2021-08-27 11:27:35.593785 +0800 CST M =+0.103690665 Response Req 3 2021-08-27 11:27:35.698835 +0800 CST M =+0.208742557 Response Req 5 2021-08-27 11:27:35.905212 +0800 CST m=+0.415122197 Response req 8 2021-08-27 11:27:36.211181 +0800 CST m=+0.721094780 Response req 8 2021-08-27 11:27:36.211181 +0800 CST m=+0.721094780 Response req 11 2021-08-27 11:27:36.523751 +0800 CST M =+1.033668336 Response req 15 2021-08-27 11:27:36.937492 +0800 CST M =+1.447414567 Response Req 18 2021-08-27 11:27:37.244769 +0800 CST M =+1.754694524 Response Req 21 2021-08-27 11:27:37.558478 +0800 CST m=+2.068407889 Response req 24 2021-08-27 11:27:37.868595 +0800 CST m=+2.378527728 Response Req 27 2021-08-27 11:27:38.182616 +0800 CST M =+2.692552789 Response REq 31 2021-08-27 11:27:38.593027 +0800 CST M =+3.102968021 Response req 34 2021-08-27 11:27:38.902344 +0800 CST M =+3.412288548 Response Req 37 2021-08-27 11:27:39. 207447 + 0800 CST m = + 3.717395642Copy the code

4. Token bucket algorithm

Due to the constant flow rate of the tank, most requests will be discarded (overflow) if there is an instantaneous burst of high flow. To solve this problem, the token bucket algorithm was developed. The token bucket algorithm is described as follows:

  • I have a bucket of fixed capacity, and the bucket starts out empty
  • Tokens are filled into tong buckets at a constant rate r until the bucket’s capacity is reached and excess tokens are discarded
  • Every time a request comes in, try to remove a token from the bucket. If there is no token, the request will not go through

/** The token bucket algorithm works by putting tokens into the bucket at a constant rate. If the request needs to be processed, a token needs to be fetched from the bucket first. When no token is available in the bucket, the service is denied */
package main

import (
   "fmt"
   "math"
   "sync"
   "time"
)

type TokenBucket struct {
   rate         int64   // Fixed token input rate r/s
   capacity     float64 // The capacity of a bucket
   tokens       float64 // Number of tokens in the bucket
   lastTokenMs int64   // The last time the bucket put the token timestamp ms

   lock sync.Mutex
}

func (token *TokenBucket) Create(rate int64, capacity float64) {
   token.rate = rate
   token.capacity = capacity
   token.lastTokenMs = time.Now().UnixNano() / 1e6

   //(1) Initialize the number of tokens in the bucket to the bucket capacity
   token.tokens = capacity

   //(2) Add tokens to the bucket asynchronously. The maximum number of tokens is the capacity of the bucket
   go func(a) {
      for {
         token.lock.Lock()
         now := time.Now().UnixNano() / 1e6
         //(2.1) Add the token to the bucket first
         token.tokens = token.tokens + float64((now-token.lastTokenMs)*token.rate)/1e3
         token.tokens = math.Min(token.capacity, token.tokens)
         token.lastTokenMs = now
         token.lock.Unlock()
         //(2.2)sleep for some time before adding the token
         time.Sleep(time.Duration(1e3/rate) * time.Millisecond)
      }
   }()
}

func (token *TokenBucket) Allow(a) bool {
   if token.tokens- 1 < 0 {
      return false
   } else {
      token.tokens--
      return true}}func main(a) {
   var wg sync.WaitGroup
   var lr TokenBucket
   lr.Create(3.5) // The access rate is limited to 3 requests per second and the bucket capacity is 3

   for i := 1; i <= 40; i++ {
      wg.Add(1)

      go func(i int) {
         if lr.Allow() {
            fmt.Println("Response req", i, time.Now())
         }
         wg.Done()
      }(i)

      time.Sleep(100 * time.Millisecond)
   }
   wg.Wait()
}

Copy the code

Here the bucket is initialized with 5 units, and there are 5 tokens in the bucket. After that, three tokens are generated every 1s. Then create 40 requests every 0.1s to get access tokens:

Response req 1 2021-08-27 11:32:41.56202041 +0800 CST M =+0.000127316 Response Req 2 2021-08-27 11:32:41.664894 +0800 CST M =+0.102981922 Response req 3 2021-08-27 11:32:41.767195 +0800 CST M =+0.205283293 Response Req 4 2021-08-27 11:32:41.872047 +0800 CST m=+0.310136350 Response Req 5 2021-08-27 11:32:41.976412 +0800 CST m=+0.414501683 Response req 6 2021-08-27 11:32:42.080658 +0800 CST M =+0.518748402 Response Req 8 2021-08-27 11:32:42.287345 +0800 CST M =+0.725436938 Response Req 11 2021-08-27 11:32:42.600802 +0800 CST M =+1.038896037 Response Req 14 2021-08-27 11:32:42.910744 +0800 CST m=+1.348840433 Response Req 18 2021-08-27 11:32:43.324245 +0800 CST m=+1.762343491 Response Req 21 2021-08-27 11:32:43.631339 +0800 CST M =+2.069440057 Response REq 24 2021-08-27 11:32:43.943049 +0800 CST M =+2.381152027 Response req 27 2021-08-27 11:32:44.250166 +0800 CST M =+2.688271272 Response Req 31 2021-08-27 11:32:44.662027 +0800 CST m=+3.100135390 Response req 34 2021-08-27 11:32:44.974085 +0800 CST m=+3.412194673 Response 2021-08-27 11:32:45.28616 +0800 CST M =+3.724272456 Response Req 40 2021-08-27 11:32:45.599233 +0800 CST M = + 4.037347100Copy the code

Since tokens can be stored in the bucket, the token bucket algorithm supports a certain degree of sudden high traffic concurrent access, that is, assuming 100 tokens are in the bucket, 100 requests can be allowed through instantly.

Comparison between the leaky bucket algorithm and the token bucket algorithm

  • The token bucket adds tokens to the bucket at a fixed rate. Whether the request is processed depends on whether there are enough tokens in the bucket. When the number of tokens decreases to 0, new requests are rejected
  • Leaky bucket sends out requests at a constant rate. The inflow rate is arbitrary. When the number of incoming requests reaches the leaky bucket capacity, new incoming requests are rejected
  • Token buckets limit the average inflow rate (allowing burst requests, which can be processed as long as there are tokens, 3 tokens at a time, 4 tokens at a time) and allow a certain amount of burst traffic
  • The drain bucket limits the constant outflow rate (that is, the outflow rate is a constant value, for example, the outflow rate is 1 at the same time, not 2 at the next time), so as to smooth the burst inflow rate
  • Token buckets allow a degree of burst, while leaky buckets are primarily intended to smooth the inflow rate
  • The two algorithms can be implemented in the same way, but in opposite directions, and the current limiting effect is the same for the same parameters

5. Golang native Rate package

The golang.org/x/time/rate package in the Go language uses the token bucket algorithm to implement rate limiting.

Brief introduction to the RATE package

Usage examples

package main

import (
   "net/http"
   "time"

   "golang.org/x/time/rate"
)

var (
   limiter = rate.NewLimiter(2.5))func limit(next http.Handler) http.Handler {
   return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
      if! limiter.Allow() { http.Error(w, http.StatusText(http.StatusTooManyRequests)+"; time="+time.Now().Format("The 2006-01-02 15:04:05. 000"), http.StatusTooManyRequests)
         return
      }
      next.ServeHTTP(w, r)
   })
}

func okHandler(w http.ResponseWriter, r *http.Request) {
   _, _ = w.Write([]byte("OK; time=" + time.Now().Format("The 2006-01-02 15:04:05. 000") + "\n"))}func main(a) {
   mux := http.NewServeMux()
   mux.HandleFunc("/", okHandler)
   _ = http.ListenAndServe(": 3000", limit(mux))
}

Copy the code

The test results

while true; do curl http://localhost:3000/; Sleep 0.1 s; done; OK; Time = 2021-08-27 11:39:40. 465 OK; Time = 2021-08-27 11:39:40. 591 OK; Time = 2021-08-27 11:39:40. 716 OK; Time = 2021-08-27 11:39:40. 842 OK; Time = 2021-08-27 11:39:40. 959 OK; Time =2021-08-27 11:39:41.081 Too Many Requests; Time =2021-08-27 11:39:41.205 Too Many Requests; Time =2021-08-27 11:39:41.328 Too Many Requests; Time = 2021-08-27 11:39:41. 448 OK; Time =2021-08-27 11:39:41.570 Too Many Requests; Time =2021-08-27 11:39:41.692 Too Many Requests; Time =2021-08-27 11:39:41.817 Too Many Requests; Time = 2021-08-27 11:39:41. 942 OK; Time =2021-08-27 11:39:42.066 Too Many Requests; Time =2021-08-27 11:39:42.192 Too Many Requests; Time =2021-08-27 11:39:42.310 Too Many Requests; Time = 2021-08-27 11:39:42. 431 OK; Time =2021-08-27 11:39:42.548 Too Many Requests; Time =2021-08-27 11:39:42.674 Too Many Requests; Time =2021-08-27 11:39:42.799 Too Many Requests; Time = 2021-08-27 11:39:42. 921 OK; Time =2021-08-27 11:39:43.038 Too Many Requests; Time =2021-08-27 11:39:43.154 Too Many Requests; Time =2021-08-27 11:39:43.269 Too Many Requests; Time = 2021-08-27 11:39:43. 391 OK; Time =2021-08-27 11:39:43.513 Too Many Requests; Time =2021-08-27 11:39:43.636 Too Many Requests; Time =2021-08-27 11:39:43.757 Too Many Requests; Time = 2021-08-27 11:39:43. 880 OK; Time =2021-08-27 11:39:43.997 Too Many Requests; Time =2021-08-27 11:39:44.122 Too Many Requests; Time =2021-08-27 11:39:44.244 Too Many Requests; Time = 2021-08-27 11:39:44. 363Copy the code

Subsequent requests only pass at a rate of two per second

reference

  • Hxzqlh.com/2018/09/12/…