Traffic Limiting Strategies for High Concurrency systems: Leaky buckets and token buckets

preface

Hello, everybody. I am studying for PS technology asong, this is my article 5 of the concurrent programming series, and a good chat with you today in the high concurrency system current limiting technology, also known as flow control, current limiting refers to the number of concurrent requests, limit the system when reach constraints can reject the request, can rise to protect the downstream service, Prevent service overload. Common traffic limiting strategies include leaky bucket algorithm, token bucket algorithm and sliding window. Below mainly with you to analyze the leaky bucket algorithm and token bucket algorithm, sliding window is not introduced here. All right, let’s cut to the chase.

The test code has been uploaded: github.com/asong2020/G… Welcome ` star

Bucket algorithm

Bucket algorithm is better understood, assuming that we now have a bucket, we have to add water in the bucket, although we cannot predict how much time will add water, cannot expect the water inflow rate, but can be fixed speed of water, no matter how big is the feed rate, according to the fixed rate of outflow, if the bucket is full, the top of the overflow water directly. We treat water as an HTTP request, each time we put the request into a bucket and process it at a fixed rate.

The principle is actually very simple, it depends on how we implement it. Uber team has an open source Uber-Go/Ratelimit library, which is a kind of realization of the leaky bucket. Let’s take a look at his implementation ideas.

The sample

When you learn a new things, often begins with, slowly to understand its implementation principle, so now let’s look at this library is how to use, here we provide a practical application example, directly with Gin framework, we add a current-limiting middleware, to meet the request of current limiting function, the test code is as follows:

// Define the full restrictor object
var rateLimit ratelimit.Limiter

// Add limiting logic to gin.HandlerFunc
func leakyBucket(a) gin.HandlerFunc {
	prev := time.Now()
	return func(c *gin.Context) {
		now := rateLimit.Take()
		fmt.Println(now.Sub(prev)) // To print the interval
		prev = now // Record the last time, there is a problem without this print}}func main(a) {
	rateLimit = ratelimit.New(10)
	r := gin.Default()
	r.GET("/ping", leakyBucket(), func(c *gin.Context) {
		c.JSON(200.true)
	})
	r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
Copy the code

We simply use the pressure measuring tools ab test: ab – 10 – c n 2 http://127.0.0.1:8080/ping, execution results part is as follows:

Observations show that each request is processed at an interval of 10ms, with subsequent requests taking longer and longer. Why is this? Here is a quick tip, you will know after seeing the realization of Uber ~

The source code to achieve

Let’s start with the core structure:

type limiter struct {
	sync.Mutex
	last       time.Time
	sleepFor   time.Duration
	perRequest time.Duration
	maxSlack   time.Duration
	clock      Clock
}
type Limiter interface {
	// Take should block to make sure that the RPS is met.
	Take() time.Time
}
Copy the code

The limiter interface provides only one method, take(), which blocks to ensure that the time between requests runs out, as we’ll examine below. The meanings of the fields in the structure implementing the limiter interface are as follows:

  • sync.Mutext: mutex, which controls concurrency
  • last: Records the last time
  • sleepFor: Time to wait until the next request is processed
  • perRequest: Interval for each request
  • maxSlack: Maximum relaxation, used to deal with burst traffic
  • clock: a clock or analog clock is providednowandsleepMethod is to instantiate the rate limiter

If you use this limiter, you first need to initialize it with the New method. The required parameter is rate, which represents the number of requests per second (RPS). There is also an optional parameter, type option, which allows you to customize the limit. I’ll focus on how he guarantees a fixed rate, intercept part of the New method code as follows:

l := &limiter{
		perRequest: time.Second / time.Duration(rate),
		maxSlack:   - 10 * time.Second / time.Duration(rate),
	}
Copy the code

According to the number of the incoming request, we can calculate the within 1 s to n request, how much is the time intervals between each request, so in the take method can according to the field to handle the request of fixed rate problem, here also initialize the biggest relaxation field, his value is negative, the default maximum relaxation time interval is the 10 requests.

Let’s focus on the take method:

func (t *limiter) Take(a) time.Time {
	t.Lock()
	defer t.Unlock()
	now := t.clock.Now()
	if t.last.IsZero() {
		t.last = now
		return t.last
	}
	t.sleepFor += t.perRequest - now.Sub(t.last)
	if t.sleepFor < t.maxSlack {
		t.sleepFor = t.maxSlack
	}
	if t.sleepFor > 0 {
		t.clock.Sleep(t.sleepFor)
		t.last = now.Add(t.sleepFor)
		t.sleepFor = 0
	} else {
		t.last = now
	}

	return t.last
}
Copy the code

The take() method performs the following steps:

  • In order to control concurrency, the lock is required to enter the method. The lock is very granular and the whole method is locked
  • throughIsZeroMethod to determine whether the current is the first request, if it is the first request, directly fetchnowTime can be returned.
  • If it is not the first request, you need to calculate how long it will take to process the next request. An important point here is to add up the wait time so that it can be used for later offsets
  • If the time required to wait for the current accumulation is greater than the maximum relaxation, set the waiting time to the maximum relaxation time.
  • Call if the excess time of the current request does not fully offset the amount required this timesleepMethod to block while clearing the waiting time. ifsleepForIf the value is less than 0, it indicates that the request interval is longer than expected, which means that the request can be processed directly without waiting.

In fact, there are not many steps, the main point to pay attention to is the maximum relaxation.

The leaky bucket algorithm has a natural defect that it cannot handle burst traffic (constant speed, delay between two requests reQ1 and REQ2 should be at least >=perRequest). For example: Suppose that we now have three requests req1, req2, and req3 processed in sequence with an interval of 100ms for each request. Req2 requests arrive 150ms after the req1 request is processed. Req2 requests can be processed immediately according to the rate limiting policy. This is less than 100ms since the last request, so it will take 50ms to continue, but in this case, it actually took 250ms for the three requests to complete, not 200ms as expected.

For the above this kind of situation, we can put the interval before a long time well to the back of the request judge the use of current limiting, reduce the waiting time for the request, but when arrived at a larger interval between two requests, will produce very big can offset time, behind so that a large number of requests moment arrived, cannot offset the time, That would have lost the sense of limiting the flow, so the concept of maximum slack was introduced, which is a negative value indicating the maximum time allowed to cancel to prevent the above situation.

The above is the basic idea of leaky bucket implementation, the whole is very simple, have you learned?

Token bucket algorithm

In fact, Token bucket is similar to the principle of leaky bucket. Token bucket is to imagine a fixed size bucket, the system will put tokens into the bucket at a constant rate, until the bucket is full. I found the picture from the Internet, and the expression is very appropriate:

Regarding the implementation of token bucket traffic limiting algorithm, Github has an efficient library based on token bucket traffic limiting algorithm: Github.com/juju/ratelimit, Golang timer/rate is also the token bucket an implementation, this paper did not introduce juju/ratelimit library, oneself are interested to learn of his ideas, we mainly take a look at is how to realize the time/rate.

The sample

As always, we will write a current-limiting middleware with gin to see how it is used. Examples are as follows:

import (
	"net/http"
	"time"

	"github.com/gin-gonic/gin"
	"golang.org/x/time/rate"
)

var rateLimit *rate.Limiter

func tokenBucket(a) gin.HandlerFunc {
	return func(c *gin.Context) {
		if rateLimit.Allow() {
			c.String(http.StatusOK, "rate limit,Drop")
			c.Abort()
			return
		}
		c.Next()
	}
}

func main(a) {
	limit := rate.Every(100 * time.Millisecond)
	rateLimit = rate.NewLimiter(limit, 10)
	r := gin.Default()
	r.GET("/ping", tokenBucket(), func(c *gin.Context) {
		c.JSON(200.true)
	})
	r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
Copy the code

The first parameter is r limit, which represents how many tokens can be generated into the Token bucket per second. The second parameter is b int, which represents the capacity of the Token bucket. One token is added to the bucket every 100ms. That is, 10 tokens are generated within 1s. The bucket capacity is 10. AllowN(time.now (),1); AllowN(time.now (),1); Consume n tokens from the bucket at the same time. Otherwise returns no consumption Token. As in the above example, when the number of buckets is less than one, the request is dropped.

Source analysis

Limit type

Time /rate This parameter defines a limit type, which is the alias of Float64. Limit specifies the maximum event frequency (the number of events per second). 0 indicates that float64 is unlimited. Inf is an infinite rate limit; It allows all events (even bursts of 0). The Every method is also provided to specify the interval at which tokens are placed into the Token bucket, calculating the amount of data per second.

type Limit float64

// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)

// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
	if interval <= 0 {
		return Inf
	}
	return 1 / Limit(interval.Seconds())
}
Copy the code

LimiterThe structure of the body

type Limiter struct {
	mu     sync.Mutex
	limit  Limit
	burst  int
	tokens float64
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}
Copy the code

The meanings of the fields are as follows:

  • mu: mutex, to control concurrency
  • limit: Number of events allowed to be processed per second, that is, event processing frequency per second
  • burst: Maximum number of token buckets, ifburstIs 0, and limit == Inf is allowed to handle any event, otherwise not
  • tokens: The number of tokens available in the token bucket
  • last: Record the last time Limiter tokens were updated
  • lastEvent:lastEventThe point in time at which the record rate is limited (there is no token in the bucket), which may be past or future (ReservationScheduled end point)

ReservationThe structure of the body

type Reservation struct {
	ok        bool
	lim       *Limiter
	tokens    int
	timeToAct time.Time
	// This is the Limit at reservation time, it can change later.
	limit Limit
}
Copy the code

The meanings of the fields are as follows:

  • ok: Whether enough tokens can be obtained by the deadline
  • lim:limiterobject
  • tokens: The number of tokens to obtain
  • timeToAct: Point in time to wait
  • limit: indicates a scheduled time, which can be changed.

A reservation is an operation to reserve a token, and timeToAct is a token to wait for a specified point in time for this reservation to be sufficient.

LimiterConsumption token

Limiter has three token consumption methods, namely Allow, Reserve and Wait. Finally, the three consumption methods all call the two methods of reserveN and Advance to generate and consume tokens. So let’s focus on the implementation of the reserveN and Advance functions.

  • advanceMethod implementation:
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	// Last cannot be elapsed after the current time now, otherwise the count for Elapsed will be negative and the token bucket count will decrease
  last := lim.last
	if now.Before(last) {
		last = now
	}

	// Calculate the maximum time that the token bucket has not been updated based on the number of missing token buckets
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
	elapsed := now.Sub(last) // The period during which the token bucket was not updated
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}

	// Calculate the number of tokens generated based on the unupdated time (the period during which tokens were not added to the bucket)
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta // Count the number of tokens available
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}

	return now, last, tokens
}
Copy the code

The advance method updates the state of the token bucket, calculates the elapsed time of the token bucket, calculates the number of tokens to add to the bucket, delta, and newTokens available in the bucket based on Elapsed.

  • reserveNMethod implementation:reserveNis AllowN. ReserveNand WaitNThe auxiliary method is used to determine whenmaxFutureReserveWhether there are enough tokens in time.
// @param n The number of tokens to consume
// @param maxFutureReserve is willing to wait the maximum time
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	// If there is no limit
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true.// There are enough tokens in the bucket
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
	// Update the status of token buckets with tokens being the number of tokens currently available
	now, last, tokens := lim.advance(now)
  // Calculate the number of tokens left in the bucket
	tokens -= float64(n)
	var waitDuration time.Duration
  // If the token is less than 0, the current token is insufficient and you need to wait for a period of time
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}
	ok := n <= lim.burst && waitDuration <= maxFutureReserve
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
  // timeToAct Specifies the time when the number of tokens in a bucket is equal to N
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}
  // Update the token count in the bucket
	// Update the last time
	// lastEvent
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}
	lim.mu.Unlock()
	return r
}
Copy the code

I have commented the above code, so here is a summary of the flow:

  • The preference is to determine whether there is a rate limit, which means there are consistently enough tokens in the bucket.
  • Calculate the total number of new tokens generated from the last time the Token was fetched to the current momentToken: We are only fetching TokenBefore generating a new one TokenThat means every time I take itTokenThe interval is actually generatedTokenIn the interval. We can usetokensFromDurationIt’s easy to calculate the total amount of time producedTokenThe number of. So the currentTokenNumber = newly generated TokenNumber plus what’s left beforeTokenNumber? – To consume TokenThe number.
  • If the number of remaining tokens after consumption is greater than zero, the Token bucket is not empty. In this case, the tokens are sufficient and the call side does not need to wait. If the number of tokens is less than zero, wait a period of time. So at this point, we can usedurationFromTokensConvert the currently negative Token number to the time required to wait.
  • Returns relevant results such as the time needed to wait to the caller

In fact, the whole process takes advantage of the principle that Token number can be converted with time. If the Token number is negative, you need to wait for the corresponding time.

DurationFromTokens and tokensFromDuration are the two key methods mentioned above. Their implementation is as follows:

func (limit Limit) durationFromTokens(tokens float64) time.Duration {
	seconds := tokens / float64(limit)
	return time.Nanosecond * time.Duration(1e9*seconds)
}
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	// Split the integer and fractional parts ourself to minimize rounding errors.
	// See golang.org/issues/34861.
	sec := float64(d/time.Second) * float64(limit)
	nsec := float64(d%time.Second) * float64(limit)
	return sec + nsec/1e9
}
Copy the code
  • durationFromTokensThe: function is to calculate the birth rateNA new one TokenHow long will it take?
  • tokensFromDuration: Given a period of time, how many tokens can be generated during this period.

TokensFromDuration = tokensFromDuration = tokensFromDuration = tokensFromDuration = tokensFromDuration = tokensFromDuration In fact, the original version of Golang was implemented by multiplying d.seconds () * float64(Limit) directly. Although it looks like there is no problem, the precision of multiplying two decimals will be lost. Therefore, the current method is used to calculate the integer part and decimal part of the second respectively. Multiply them and add them up, so you get the most accurate result.

limiterThe returnToken

Since we can consume tokens, we can also Cancel the consumption and return the tokens. When we call Cancel(), the number of tokens consumed will be returned to the Token bucket as much as possible. Returning tokens is not that simple. Let’s look at how returning tokens is implemented.

func (r *Reservation) CancelAt(now time.Time) {
	if! r.ok {return
	}

	r.lim.mu.Lock()
	defer r.lim.mu.Unlock()
  /* 1. If no flow restriction is required 2. Tokens are 0 The preceding three cases do not need to be handled. Cancel */
	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
		return
	}

	// Calculate the number of tokens to restore
	// r.lim.lastEvent here may be the end time of the current Reservation or the end time of a later Reservation, so the number of tokens generated after the current r.timetoact is subtracted
	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  // If the value is less than 0, it means that the advance is exhausted and cannot be returned
	if restoreTokens <= 0 {
		return
	}
	// Recompute the status of the token bucket
	now, _, tokens := r.lim.advance(now)
	// Restore the current token bucket token number, the current token number plus the token number to restore restoreTokens
	tokens += restoreTokens
  // If tokens are greater than the maximum capacity of the bucket, set tokens to the maximum capacity of the bucket
	if burst := float64(r.lim.burst); tokens > burst {
		tokens = burst
	}
	// update state
	r.lim.last = now // Record the update time of the bucket
	r.lim.tokens = tokens // Update the number of tokens
 // If they are all equal, it means that there is no consumption. Let's just go back to where we were last time
	if r.timeToAct == r.lim.lastEvent {
		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
		if! prevEvent.Before(now) { r.lim.lastEvent = prevEvent } }return
}
Copy the code

The comment has been added, so I won’t explain it in detail, but this line of code: RestoreTokens: = float64 (r.t okens) – r.l imit. TokensFromDuration (r.l im. LastEvent. Sub (r.t imeToAct)), R.tokens refers to the number of tokens consumed at this time, r.Timtoacr refers to the moment when the token bucket can satisfy the number of tokens consumed at this time, that is, the time of consumption + the waiting time, r.lim.lastEvent refers to the value of the last timeToAct consumed. Results through r.l imit. TokensFromDuration method refers to from the consumer to the current time, the total number and how many Token consumption, so eventually reached the code meaning of this passage is:

Token to be returned = Token for this consumption – Token for new consumption.

Well, the source code on the temporary analysis of this, because the implementation of the standard library code a bit large, and part of the here did not say, to leave you to analyze it ~.

conclusion

This paper focuses on the leaky bucket algorithm and the token bucket algorithm. The main difference between the leaky bucket algorithm and the token bucket algorithm is that the leaky bucket algorithm can restrict the transmission rate (or request frequency) of data, while the token bucket algorithm can limit the average transmission rate of data, but also allows some degree of burst transmission. In some cases, the leaky bucket algorithm cannot efficiently use network resources because the leaky bucket leakage rate is fixed, so even if there is no congestion in the network, the leaky bucket algorithm cannot make a single data flow reach the port rate. Therefore, leaky bucket algorithm is inefficient for traffic with burst characteristics. The token bucket algorithm can satisfy the traffic with burst characteristics. In general, the leaky bucket algorithm is combined with the token bucket algorithm to provide more efficient control of network traffic.

The test code in this article has been uploaded:Github.com/asong2020/G…welcomestar

Well, that’s all for this article, the three qualities (share, like, read) are the author’s motivation to continue to create more quality content!

We have created a Golang learning and communication group. Welcome to join the group and we will learn and communicate together. Join the group: add me vX pull you into the group, or the public number to get into the group two-dimensional code

At the end, I will send you a small welfare. Recently, I was reading the book [micro-service architecture design mode], which is very good. I also collected a PDF, which can be downloaded by myself if you need it. Access: Follow the public account: [Golang Dreamworks], background reply: [micro service], can be obtained.

I have translated a GIN Chinese document, which will be maintained regularly. If you need it, you can download it by replying to [GIN] in the background.

Translated a Machinery Chinese document, will be regularly maintained, there is a need for friends to respond to the background [Machinery] can be obtained.

I am Asong, an ordinary programming ape. Let’s get stronger together. We welcome your attention, and we’ll see you next time

Recommended previous articles:

  • Unsafe package
  • Source analysis panic and recover, do not understand you hit me!
  • Atomic Operations: The basics of concurrent programming
  • Detail the implementation mechanism of defer
  • You really understand interface
  • Leaf-segment Distributed ID Generation System (Golang implementation version)
  • 10 GIFs to help you understand sorting algorithms (with go implementation code)
  • Go parameter transfer type
  • Teach my sister how to write message queues
  • Cache avalanche, cache penetration, cache breakdown
  • Context package, read this article enough!!
  • Go -ElasticSearch: How to get started
  • Interviewer: Have you used for-range in go? Can you explain the reasons for these problems