In system design, limiting the flow is a conventional means to ensure the high availability of the system, and the same means are circuit breakers, service degradation and so on. This article, as a beginning, is the first article of “Angela teach Daji distributed system design”

Daji: I heard that several business lines have been connected to your system recently, and the traffic is very heavy at 9.10 am. How do you guarantee that the system won’t be damaged?

Angela: You are right. Recently, we have connected with several large organizations, and due to the impact of the epidemic, the flow of online channels is much more than usual. I have made a lot of system optimization, which can be roughly classified as the following:

  • Traffic limiting: Controls the application inlet traffic, migrates instantaneous traffic backward, adaptively limits the downstream request traffic, and dynamically adjusts the traffic according to the interface response time.
  • Delayed queuing: If the number of requests is large, queue according to the priority of the service line. For example, the priority of the real-time requests from channels on the line is increased
  • Routing: In fact, this is because of the particularity of services. All requests depend on the services of the downstream third party. Multiple downstream service providers can be made into a dynamic routing table to preferentially route requests to service providers with high interface success rates and low time consumption.
  • Backup: this is basically all distributed components will do, can do multiple machines do not do a single machine, for example: Redis do three master three standby (cluster), MySQL sub-database sub-table, MQ and Redis mutual backup (Angela encountered MQ accident) and so on;
  • Downgrade: This is the last resort, in case of a full-line crash, the use of downgrade means to ensure the availability of core functions of the system, or make the module to achieve the minimum availability.
  • Logs: complete monitoring and link logs. Logs have many functions, including troubleshooting, task redoing, data recovery, and status persistence.

Daji: Can you tell me the basic concept of current limiting?

Angela: Limiting flow, as the name implies, means limiting flow, generally divided into limiting inlet flow and limiting outlet flow. Inlet flow is my system when people ask me, I add a valve at the inlet, and outlet flow is my external system, I add a valve at the outlet.

Daji: How do you usually limit traffic?

Angela: Semphore can be used to limit the number of unified time request interfaces on a single machine, or the Google Guava package can be used to limit the flow. Redis can be used for distributed environments. There is also an Ali Sentinal or Spring Cloud Gateway for limiting traffic. Let’s first look at the single-player version of the current limiting, the code is as follows:

   // There are only 3 pits
    private static Semaphore semphore = new Semaphore(3);

    private static String[] userName = {"Da ji"."Arthur"."Ban"."Binjii"};

    private static Random random = new Random();

    / / class
    public static class Toilet{

        public void enter(User user){
            try{
                semphore.acquire();
                if(Thread.interrupted()){
                    return;
                }
                System.out.println("Time." + DateAndTimeUtil.getFormattedDate() + ""+user.getName() + "Go to the bathroom");
                Thread.sleep(2000);
            }catch (InterruptedException ex){

            }finally{ semphore.release(); }}}public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        Toilet toilet = new Toilet();

        for(int i = 0; i < 20; i++){
            executorService.submit(()->{
                toilet.enter(new User(userName[random.nextInt(3))); }); }}Copy the code

This code is easy to understand, there are only three pits, using Semaphore definition, “Daji “,” Arthur “, “Lu Ban “,” Zhen Ji “take turns to go to the toilet a total of 20 times, Semaphore is the lock mechanism, enter the toilet, lock the door, look at the console output:

Only three people can go to the bathroom at once.

Daji: I kind of get it. Toilets are resources. When we test, it’s like a request.

Angela: The idea of distributed environment is the same as that of single machine. It also controls the access frequency of resources. Generally, there are two main design ideas:

  1. Holes algorithm

    A bucket is placed between the request inlet and the service responding to the request. The water in the bucket flows out at a constant rate to ensure that the rate of traffic received by the service is stable. If the bucket is full, the incoming water will overflow (the request is rejected).

    Leaky bucket is an algorithm commonly used in Traffic Shaping or Rate Limiting in the network environment. Its main purpose is to control the Rate at which data enters the network and smooth the burst Traffic on the network.

  2. Token bucket algorithm

    Token bucket algorithm is somewhat similar to the producer consumer model, specifically for a producer to the token bucket at a constant rate in a token, while the request processor (consumer) in handling the request must first get the token from the bucket, if did not get the token, there are two strategies: one is returned directly to refuse the request, one is waiting for a period of time, try again to access token.

    The token bucket algorithm is used to control the amount of data sent to the network and to allow burst data to be sent

    The RateLimiter class in Google’s Guava package is a solution to the token bucket algorithm.

Comparing these two algorithms, in fact, there is nothing more than a constant rate at the exit of the water, and a constant rate of the token, Angela thinks there is not much difference, but the token bucket algorithm is more flexible, often in actual work, can dynamically adjust the speed of the token into the token bucket and the total size of the token bucket.

Daji: Why do I feel like the two algorithms are the same?

Angela: A token bucket has an advantage over a leaky bucket in that it can accommodate sudden traffic requests. For example: if the online environment resource is idle, because the leak flow rate is constant, the request will not be answered in time because of the speed limit. For example, the outflow rate of the leak is 3 per second, now there are 5 requests on the line, all of which go into the leak bucket. There are only 5 requests in the leak bucket, but only 3 requests can be processed in one second (outflow speed limit). But if it is a token bucket algorithm, speed is 3 a/s is put in the tokens, suppose the token bucket already has two tokens, coming 5 requests at this moment, can get a token to complete the request, therefore the token bucket algorithm is oriented to request (request to take the initiative to take the token, to each according to his need), and vulnerability is geared to the needs of the token, I at a constant speed of the water, don’t care how much you have request.

Daji: I see. Can you tell me how Google Guava implements the token bucket algorithm?

Angela: Once you understand the idea, it’s easy to understand the implementation. Let’s take a look at the source code:

The API is very simple, you just need to specify the flow limiting speed, for example, the first one, the speed is 2 per second, if it is a minute limit, you can also set it to 0.2, which means 0.2 tokens are generated per second, and the flow limiting per minute is 12. The second example is 5000 per second, which demonstrates how the network can be limited to 5KB per second through a flow limiter. 5000 byte.

Guava has many other methods, as follows:

Return values and method modifiers Methods and Description
double acquire()

Get a license from RateLimiter, and the method is blocked until the request is obtained
double acquire(int permits)

Gets the specified number of permissions from RateLimiter, and the method blocks until the request is retrieved
static RateLimiter create(double permitsPerSecond)

RateLimiter is created based on the number of token buckets placed per second, which is the number of tokens generated per second (usually QPS, number of queries per second)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)

Depending on the amount per second on the token bucket and preheating period to create RateLimiter, means will not generate all tokens, the token bucket fill, but will gradually increase the token, the second in a token bucket number refers to the generated token per second (usually refers to the QPS, quantity of how many requests per second), the preheating period, RateLimiter’s license allocation per second grows steadily until it reaches its maximum at the end of the warm-up period.
double getRate()

Returns the steady rate in the RateLimiter configuration, in the number of tokens generated per second
void setRate(double permitsPerSecond)

Update the RateLimite’s steady rate with the parameter permitsPerSecond supplied by the factory method that constructed the RateLimiter.
boolean tryAcquire()

Obtain permission from RateLimiter, if it can be obtained immediately without delay
boolean tryAcquire(int permits)

Get the license number from RateLimiter, if the license number can be obtained immediately without delay
boolean tryAcquire(int permits, long timeout, TimeUnit unit)

Get the specified number of permissions from RateLimiter. If the number of permissions can be obtained within timeout, or if it cannot be obtained before timeout expires, return false immediately (without waiting)

Daji: Let’s look at distributed limiting.

Angela: Distributed means putting the local token bucket in a place that all hosts can access.

Daji: Usually where is the right place.

Angela: Distributed middleware, such as Redis, distributed caching, token generation and token acquisition can all be implemented using Redis instructions, and they are also fast.

Daji: Then tell me how to make it happen.

Angela: Let me tell you about my implementation steps and background. In the token bucket algorithm, there is a separate producers at a constant rate to the token bucket in a token, if by redis, a producer threads continuously added to redis token (write), other request thread each request read redis access token, it would have a significant performance loss, good solution is to delay the operation of the token, The token is only put in when the token is retrieved, combining the two operations.

Daji: How do you calculate how many tokens to put in the bucket when you get them?

Angela: Well, that’s a good question. FilledTokens represent the number of tokens you need to put in this time.

Number of tokens that should be filled at this point = min((number of tokens left in the token bucket + interval between current time and last token generation * token generation speed), total token capacity)

Redis provides lua support. The script is as follows:

Redis.log (redis.log_debug, redis.log, redis.log_debug,"start_ratelimit")
redis.log(redis.LOG_DEBUG, KEYS[1])
redis.log(redis.LOG_DEBUG, KEYS[2])
redis.log(redis.LOG_DEBUG, ARGV[1])
redis.log(redis.LOG_DEBUG, ARGV[2])
redis.log(redis.LOG_DEBUG, ARGV[3])
redis.log(redis.LOG_DEBUG, ARGV[4])

local tokens_key = KEYS[1] - request_rate_limiter. ${id}. The number of tokens token bucket of remaining token KEY values local timestamp_key = KEYS [2Local rate = tonumber(ARGV[)1] -- replenishRate How long the average token bucket filling rate is generated1a6Local capacity = tonumber(ARGV[2] -- burstCapacity local now = tonumber(ARGV[3] -- get from1970-01-01 00:00:00Local requested = tonumber(ARGV[4] -- consume token count, default1Local fill_time = capacity/rate -- calculates how long it takes for the token bucket to fill up with tokens10A redis. Log (redis LOG_DEBUG,"--fill_time--")
redis.log(redis.LOG_DEBUG, fill_time)
local ttl = math.floor(fill_time*2) - *2Redis.log (redis.log_debug,"--fill_time--")
redis.log(redis.LOG_DEBUG, ttl)
local last_tokens = tonumber(redis.call("get", tokens_key) -- gets the number of tokens left in the token bucketifLast_tokens = Capacity end local last_refreshed = tonumber(last_tokens == nil)"get", timestamp_key) -- the time when the token bucket was last filledif last_refreshed == nil then
  last_refreshed = 0
end

local delta = math.max(0, now-last_refreshed)
redis.log(redis.LOG_DEBUG, "-- Get the time since last refresh delta--"Local filled_tokens = math.min(capacity, Last_tokens +(delta*rate) -- Fill tokens, calculate the number of tokens left in the new token bucket fill does not exceed the token bucket limit. redis.log(redis.LOG_DEBUG,"Filled_tokens **")
redis.log(redis.LOG_DEBUG, filled_tokens)

local allowed = filled_tokens >= requested      
local new_tokens = filled_tokens
local allowed_num = 0
ifAllowed then -- If successful, new_tokens are reduced to consume tokens and allowed_num =1). new_tokens = filled_tokens - requested allowed_num =1End -- New_tokens specifies the number of tokens left in the token bucket (now) TTL specifies the time redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

redis.log(redis.LOG_DEBUG, "** Number of tokens new_tokens**")
redis.log(redis.LOG_DEBUG, new_tokens)

redis.log(redis.LOG_DEBUG, "** Last updated token time now**"Redis.log (redis.log_debug, now) -- redis.log(redis.log_debug, now) -- Redis.log (redis.log_debug, now)"end_ratelimit")

return { allowed_num, new_tokens }
Copy the code

Daji: How do you test this after it’s written?

Angela: Follow the Wx official account: Angela’s blog, I teach you! Redis provides client loading tools to facilitate lua script debugging, as shown below:

// Enable debugging mode. The parameters are redis remaining token key, last token generation key, generation rate, token bucket number, current time, and token number obtained this time redis-cli-- LDB -- Eval ratelimit. Lua Remain.${1}

Copy the code

You can type help to see the complete command, usually n and print, the next line and the current local variable, respectively

In addition, you can directly load the Redis Lua script through script load command, and run it directly after obtaining SHA1 (this is the real program running mode of the model, which can be temporarily skipped).

// 1.On the Redis serverloadScript to sha Redis - CLI scriptload "$(cat ratelimit.lua)"
//sha1: ebbcd2ed99990afaca6d2ba61a0f2d5bdd907e59
// 2.By running script script sha1 value redis - cli evalsha ebbcd2ed99990afaca6d2ba61a0f2d5bdd907e592 remain.${0}.tokens last_fill_time 0.2 12 `gdate +%s%3N` 1
Copy the code

Daji: Is there a performance loss when executing lua scripts, compared to redis, which is single-threaded?

Angela: Redis uses epoll to implement the I/O multiplexing event-driven model. For each read and write operation, we need to do a pressure test on the lua script. Redis provides a pressure test command redis-benchmark to test the execution of 100,000 scripts, the command is as follows:

redis-benchmark -n 100000 evalsha ebbcd2ed99990afaca6d2ba61a0f2d5bdd907e59 2 remain.${1}.tokens last_fill_time 0.2 12 `gdate +%s%3N` 1
Copy the code

The actual effect is as follows:

99.9% do it in less than 2ms, more than 45,000 times per second, so the loss is acceptable.

Daji: How to put distributed current-limiting Lua into the Spring Boot project?

Angela: Let’s start down the engineering road.

First of all,

  1. Hand write a Lua script (copy of the above script directly) and place it in the Spring project directory, as shown below.

  2. Lua script is loaded when the program starts, and the SHA1 value of Lua is used to determine whether the script has been loaded into Redis (Redis cannot store too many scripts). The program is as follows:

    @Configuration
    public class LuaConfiguration {
    
        private Logger logger = LoggerFactory.getLogger(LuaConfiguration.class);
    
    
        public static final String RATE_LIMIT_SCRIPT_LOCATION = "scripts/ratelimit.lua";
    
        @Bean(name = "rateLimitRedisScript")
        public DefaultRedisScript<List> redisScript(LettuceConnectionFactory lettuceConnectionFactory) {
    
            DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
            redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(RATE_LIMIT_SCRIPT_LOCATION)));
            redisScript.setResultType(List.class);
    
            String rateLimitSha1 = redisScript.getSha1();
            logger.info("Sha1: {}", rateLimitSha1);
            logger.info("Lua script scriptStr: {}", redisScript.getScriptAsString());
    
            List<Boolean> luaScriptsExists = null;
    
            RedisClusterConnection clusterConnection = lettuceConnectionFactory.getClusterConnection();
            // Preload the script
            if((luaScriptsExists = clusterConnection.scriptExists(redisScript.getSha1())) ! =null && luaScriptsExists.size() > 0){
                logger.info("Redis already exists redis Lua sha1: {}", rateLimitSha1);
            }else {
                String scriptLuaSha1 = clusterConnection.scriptLoad(redisScript.getScriptAsString().getBytes(UTF_8));
                logger.info("Loading Redis Lua successfully sha1: {}", scriptLuaSha1);
            }
    
            returnredisScript; }}Copy the code

    Jedis 3.* does not support lua scripts in cluster mode. It is recommended to use the oracle database.

    The Spring Boot default client has been changed to Lettuce.

    EvalSha is not supported in cluster environment

  3. Configure a current limiter

    @Component
    public class RateLimiter implements IRateLimit{
    
        private Logger logger = LoggerFactory.getLogger(RateLimiter.class);
    
        @Autowired
        RedisTemplate redisTemplate;
    
        @Autowired
        @Qualifier("rateLimitRedisScript")
        DefaultRedisScript<List> rateLimitRedisScript;
    
        private static final String REDIS_KEY_REMAIN_TOKENS = "{1}remain_tokens";
        private static final String REDIS_KEY_LAST_FILL_TIME = "{1}last_fill_time";
    
        @Override
        public boolean achieveDistributeToken(String keySuffix, int tokenCapacity, float tokenGenerateRate, int achiveTokenPer) {
    
            String remainTokenKey = REDIS_KEY_REMAIN_TOKENS + "_" + keySuffix;
            String lastFillTimeKey = REDIS_KEY_LAST_FILL_TIME + "_" + keySuffix;
            List<String> keys = Arrays.asList(remainTokenKey, lastFillTimeKey);
    
            String now = String.valueOf(System.currentTimeMillis()/1000);
    
            List<String> result = (List<String>) redisTemplate.execute(rateLimitRedisScript, keys, String.valueOf(tokenGenerateRate), String.valueOf(tokenCapacity), now, String.valueOf(achiveTokenPer));
    
            if(result ! =null && result.size() > 0){
                logger.info("Distributed token acquisition succeeded {} interface :{}, number of remaining tokens :{}", result.get(0), "yuntrustQuery", result.get(1));
                return true;
            }
            return false; }}Copy the code

    One thing to note here is that keys are prefixed with {1}, which is used when all keys hash into the same slot in cluster mode, because Lua scripts cannot be executed across cluster nodes.

Look at the effect, comfortable… :

There is also a section on dynamically adjusting the token bucket size and token generation rate, which will be covered next time due to the length of this article.

Follow the Wx account: Angela’s blog