This is the second day of my participation in the November Gwen Challenge. Check out the details: the last Gwen Challenge 2021.

Go is a token-based current limiter

Introduction to the

If the general flow is too large, the downstream system can not respond, this time the need to limit the flow, in fact, and the subway is the same, is to slow down the upstream access to the downstream speed.

Limit the frequency or frequency of accessing services to prevent service overload and overloading.

Golang’s official extension, Time (golang.org/x/time/rate), provides an implementation based on token buckets and other flow limiter.

The principle of overview

  • Token: Each time you get a token, you can access it
  • Buckets, the maximum capacity of the bucket is fixed, and tokens are added to the bucket at a fixed frequency until it is full
  • Each request consumes one token.
  • When the limiter initializes, the token bucket is usually full.

The specific use

package limiter

import (
	"fmt"
	"testing"
	"time"

	"golang.org/x/time/rate"
)

func TestLimter(t *testing.T) {
	limiter := rate.NewLimiter(rate.Every(time.Millisecond*31), 2)
	//time.Sleep(time.Second)
	for i := 0; i < 10; i++ {
		var ok bool
		if limiter.Allow() {
			ok = true
		}
		time.Sleep(time.Millisecond * 20)
		fmt.Println(ok, limiter.Burst())
	}
}
Copy the code

Execution Result:

=== RUN   TestLimter
true 2
true 2
true 2
false 2
true 2
true 2
false 2
true 2
true 2
false 2
--- PASS: TestLimter (0.21s)
Copy the code

As you can see from the execution results, the token bucket starts with two full tokens, and every two requests will fail because the token interval is 11ms (31-20) longer than the request interval.

Specific implementation principle

First look at the creation method of the lower limit flow: NewLimiter

func NewLimiter(r Limit, b int) *Limiter {
	return &Limiter{
		limit: r,
		burst: b,
	}
}
Copy the code

View the Limiter data structure

// The methods AllowN, ReserveN, and WaitN consume n tokens.
type Limiter struct {
	mu     sync.Mutex
	limit  Limit
	burst  int
	tokens float64
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}
Copy the code
  • Burst is the size of the bucket
  • Limit indicates the frequency of adding buckets
  • Tokens represents the number of remaining tokens
  • Last Indicates the last time when the token was withdrawn
  • LastEvent Indicates the time of the last traffic limiting event

When the token bucket is issued, it will be retained in the Reservation object, which is defined as follows. The Reservation object describes the number of tokens that can be obtained after the timeToAct time is reached.

type Reservation struct {
  ok        bool  // Whether the condition is met to allocate tokens
  lim       *Limiter // The current limiter that sends the token
  tokens    int   // The number of tokens
  timeToAct time.Time  // Meet the token issue time
  limit Limit  // Token issuance speed
}
Copy the code

How does the current limiter limit the current

The current limiter provided by the government has blocking wait, also has direct judgment mode, and provides maintenance reservation type. How to implement stream limiting code in reserveN.

When used, the Allow() method is called each time


// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow(a) bool {
	return lim.AllowN(time.Now(), 1)}// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok
}
Copy the code

Continue to look at the reserverN algorithm

Method description:

  • Three arguments: now, n, maxFutureReserve
  • innowTime needs to getnThe maximum waiting time ismaxFutureReserve
  • The result is to return an object with a reserved tokenReservation
// maxFutureReserve specifies the maximum reservation wait duration allowed. // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. func (lim *Limiter) reserveN(now time.Time, n int, MaxFutureReserve time.duration) Reservation {lim.mu.lock () If lim.limit == Inf {lim.mu.Unlock() return Reservation{ok: true, lim: lim, tokens: n, timeToAct: Now,}} // The number of tokens you can get at the end of now The last time the token was taken was last Now, last, Tokens := lim. Advance (now) // Calculate the remaining number of tokens resulting from the request. // Update tokens quantity, Calculate the number of tokens -= float64(n) // Calculate the number of tokens -= float64(n) Calculate latency WaitDuration var WaitDuration time. Duration if tokens < 0 {WaitDuration = lim. Limit. DurationFromTokens (tokens) - } // Decide result // determine whether to satisfy the assignment requirement // 1. The size to be allocated does not exceed the bucket capacity // 2. The waiting time does not exceed the set waiting duration OK := n <= lim.burst && waitDuration <= maxFutureReserve // Prepare reservation // R := Reservation{ok: ok, lim: lim, limit: lim.limit, } if ok {r.tokens = n.r.toact = now.add (waitDuration)} // Update state // if ok {lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last } lim.mu.Unlock() return r }Copy the code

From the perspective of implementation, limiter does not update the number of current buckets every once in a while, but records the number of tokens in the last access and the current bucket. When it visits again, it calculates the number of current tokens based on the last access time and decides whether to issue tokens.

Welcome to pay attention to the public number: programmer wealth free road

The resources

  • Semaphore based current limiter: github.com/golang/net/…
  • Didi has opened source a middleware for HTTP requests to stream limiter: github.com/didip/tollb…
  • Uber has opened source a bug based algorithm that fails a current limiter: github.com/uber-go/rat…