As a common stability measure in micro services, limiting traffic is often asked in the interview. I often like to ask what do you know about the limiting algorithm in the interview? Have you read the source code? What is the implementation principle?

The first part of the flow limiting algorithm, and finally the realization of the source code.

Current limit algorithm

The algorithms for limiting traffic can be broadly classified into four categories: fixed window counters, sliding window counters, Leaky buckets and Token buckets.

Fixed window

Fixed window, compared to other limiting algorithms, this should be the simplest one.

It simply counts the number of requests within a fixed time window, and if the number of requests exceeds a threshold, it is discarded.

The advantages and disadvantages of this simple traffic limiting algorithm are obvious. The advantages are simplicity, the disadvantages, for example.

For example, the yellow area in the figure below is the fixed time window. The default time range is 60s and the number of current limits is 100.

As shown in the parentheses in the figure, there is no traffic for a period of time in the front, but 100 requests come in the next 30 seconds. At this time, because the traffic threshold is not exceeded, all the requests pass, and then 100 requests also pass in the next 20 seconds.

So it’s equivalent to 200 requests being passed in the 40 seconds of this parenthesis, over our threshold for limiting traffic.

Current limiting

The sliding window

To optimize this problem, we have a sliding window algorithm, which, as the name suggests, is a window of time that moves over time.

Sliding window continues to divide a fixed time window into N small Windows, and then counts each small window separately. The sum of requests from all small Windows cannot exceed the current limiting threshold set by us.

As an example, suppose our window is split into three smaller Windows, each of which is 20s. Also based on the above example, when 100 requests come in the third 20s, it can pass.

Then the time window slides, and the next 20 seconds request comes with 100 requests. At this time, the number of requests within 60 seconds of our sliding window must be more than 100, so the request is rejected.

Bucket Leaky bucket

Leaky bucket algorithm, as the name implies, it is a leaky bucket. No matter how many requests are made, they will flow out at a constant rate. If the requested traffic exceeds the leaky bucket size, the excess traffic will be discarded.

That is, the rate of flow in is variable, but the rate of flow out is constant.

This is similar to the idea of MQ peak clipping and valley filling. In the face of sudden surge of flow, leaky bucket algorithm can achieve uniform queuing and fixed speed flow limiting.

The advantage of the leaky bucket algorithm is uniform speed, which is both an advantage and a disadvantage. Many people say that the leaky bucket can not handle sudden increase in flow, which is not accurate.

Leaky bucket is supposed to deal with intermittent sudden increase of flow. When the flow rises, the system cannot handle it. Therefore, the leaky bucket can be handled at idle time to prevent system crash caused by sudden increase of flow and protect system stability.

However, to change the way of thinking, in fact, these sudden increase of flow for the system completely no pressure, you are still slowly uniform queuing, in fact, is a waste of system performance.

Therefore, for this kind of scenario, the token bucket algorithm has an advantage over the leaky bucket algorithm.

Token bucket Token bucket

Token bucket algorithm refers to that the system throws tokens into the token bucket at a certain speed. When a request comes, it will apply for a token in the token bucket. If the token can be obtained, the request can proceed normally, otherwise it will be discarded.

Now the token bucket algorithm, and the realization of the Sentinel like Guava has cold start/preheating, in order to avoid the traffic surges of hang system to play at the same time, the token bucket algorithm will cold start at the beginning of a period of time, with the increase of flow, the system will dynamically according to the volume of traffic speed of adjustment to generate the token, until finally the request to achieve the system threshold.

The source code, for example,

We take Sentinel as an example. The sliding window algorithm is used in sentinel statistics, and then the leaky bucket and token bucket algorithms are also used.

The sliding window

Sentinel uses the sliding window algorithm to do the statistics, but the implementation is a little bit different from the graph I drew above, and it actually makes more sense to describe the sliding window in Sentinel with a circle.

In the early stage, nodes are created, and then slots are connected to form a chain of responsibility mode. StatisticSlot is used to collect statistics through a sliding window. FlowSlot is the real flow limiting logic, and there are also some measures of degradation and system protection, which finally forms the whole flow limiting mode of Sentinel.

Just look at the official drawing. The circle is disgusting

The implementation of the sliding window can mainly see LeapArray code, the default words define the parameters of the time window.

For Sentinel, Windows are actually divided into two levels: second and minute. The number of Windows in second is 2, while the number of Windows in minute is 60. The time length of each window is 1s.

Public abstract class LeapArray<T> {// Window length, ms, default 1000ms protected int windowLengthInMs; // Number of Windows, default 60 protected int sampleCount; // Milliseconds interval, default 60*1000 protected int intervalInMs; Private double intervalInSecond; private double intervalInSecond; // Protected Final AtomicReferenceArray<WindowWrap<T>> array;Copy the code

Then what we need to see is how it calculates the current window, which is clear in the source code, but not so easy to understand if you think of it as a straight line.

First, calculate the index index of the array and the time window time, which is relatively easy, the difficulty should be mostly because the third point window is larger than old.

  1. The time window in the array is empty, indicating that the time has passed the initialization time. At this time, a new window is updated through CAS, and the new window is returned.
  2. The second case is if the time window is exactly the same, then just return, nothing to say
  3. The third condition is more difficult to understand, you can see two time line chart, are better understood, time window covered 1200 for the first time, and then circle began to cycle time window, the new time starting position or 1200, then the time window of time in 1676, the location of the B2 if or old window is 600, So we want to reset the previous time window to the current time.
  4. The last general scenario is unlikely to happen unless the clocks go back

WindowWrap (QPS); WindowWrap (QPS); WindowWrap (QPS); WindowWrap (QPS);

private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int) (timeId % array.length()); } protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; } public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null; } // Calculate the index of the time window int idx = calculateTimeIdx(timeMillis); // Calculate the start time of the current time window long windowStart = calculateWindowStart(timeMillis); WindowWrap<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 Timestamp * ^ * time=888 */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 */ return old; } else if (windowStart > old.windowStart()) { /* * B0 B1 B2 B3 B4 * |_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * timestamp * ^ * time=1676 The * window will start at 1600, and the old time will actually be 600, so it must be expired. */ if (updatelock.trylock ()) {try {// Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); }} else if (windowStart < old.windowStart()) {// This is not likely to occur, well.. Return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); }}}Copy the code

bucket

The RateLimiterController is the implementation of the leaky bucket algorithm. This implementation is much simpler than the other ones.

  1. First calculate the time cost of the current request amortized to 1s, and then calculate the estimated time of this request
  2. If the value is smaller than the current time, the current time is used as the main time
  3. Otherwise, if the time exceeds the current time, it is necessary to wait in line at this time. When waiting, it is necessary to judge whether the time exceeds the current maximum waiting time, and if it exceeds the current maximum waiting time, it is directly discarded
  4. If not, update the last pass time, and then compare whether timeout, timeout will reset the time, otherwise within the waiting time range, wait, if not, you can pass
Public class RateLimiterController implements TrafficShapingController {// The maximum wait time, Default 500ms private final int maxQueueingTimeMs; Private final double count; Private final AtomicLong latestPassedTime = new AtomicLong(-1); @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // Reject when count is less or equal than 0. // Otherwise,the costTime will be max of long and waitTime will overflow  in some cases. if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); Long costTime = math. round(1.0 * (to tell someone a story)/count * 1000); Long expectedTime = costTime + latestPassedTime.get(); // expectedTime = costTime + latestPassedTime.get(); If (expectedTime <= currentTime) {latestPassedTime.set(currentTime); return true; } else {// The expected pass time is longer than the current time. The difference is the need to wait for long time waitTime = costTime + latestPassedTime. The get () - TimeUtil. CurrentTimeMillis (); If (waitTime > maxQueueingTimeMs) {return false; } else {/ / on the contrary, the last time can be updated by time long oldTime = latestPassedTime. AddAndGet (costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); / / update before judgment, or more than the maximum timeout time and then discarded and time to reset the if (waitTime > maxQueueingTimeMs) {latestPassedTime. AddAndGet (- costTime); return false; } // If (waitTime > 0) {thread.sleep (waitTime); } return true; } catch (InterruptedException e) { } } } return false; }}Copy the code

The token bucket

Finally, the token bucket, this is not the implementation of the copy, but you can look at the source code to see what the calculation of the stuff… Sentinel’s token bucket implementation is based on Guava, coded in WarmUpController.

We can ignore the logic of the algorithm. But we are clear about the process.

A few core parameters look at the comments, the constructor of the calculation of the logic temporarily regardless of how it is calculated (I do not understand, but does not affect our understanding), the key to see how canPass does.

  1. Get the QPS of the current window and the previous window
  2. Fill the token, that is, throw the token into the bucket, and then we’ll look at the logic of filling the token
Public class WarmUpController implements TrafficShapingController {// QPS protected Double Count; Private int coldFactor; private int coldFactor; // Protected int warningToken = 0; Private int maxToken; // Protected double slope; Protected AtomicLong storedTokens = new AtomicLong(0); Protected AtomicLong lastFilledTime = new AtomicLong(0); public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { construct(count, warmUpPeriodInSec, coldFactor); } public WarmUpController(double count, int warmUpPeriodInSec) { construct(count, warmUpPeriodInSec, 3); } private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; this.coldFactor = coldFactor; //stableInterval Specifies the interval at which tokens are generated. 1/QPS //warmUpPeriodInSec warmUpPeriodInSec/Cold Start time. Default value: 10s warningToken = (int)(warmUpPeriodInSec * count)/(coldface-1); MaxToken = warningToken + (int)(2 * warmUpPeriodInSec * count/(1.0 + coldFactor)); // Slope = (coldfactor-1.0)/count/(maxtoken-warningToken); // Slope = (coldfactor-1.0)/count/(maxtoken-warningToken); } @Override public boolean canPass(Node node, int acquireCount, Boolean prioritized) {// QPS long passQps = (long) node.passqps (); QPS long previousQps = (long) node.previouspassqps (); // Populate the token syncToken(previousQps); QPS long restToken = storedTokens. Get (); QPS long restToken = storedTokens. If (restToken >= warningToken) {// The number of tokens that exceed the threshold long aboveToken = resttoken-WarningToken; // Consume faster than warning, // Current interval = restToken*slope+1/count Double warningQps = math.nextup (1.0 / (aboveToken *slope+ 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { if (passQps + acquireCount <= count) { return true; } } return false; }}Copy the code

The logic for filling the token is as follows:

  1. Take the current time and subtract the milliseconds to get the second time
  2. The reason why the time is less than here is to control dropping tokens every second
  3. And then there iscoolDownTokensTo calculate our cold start/warm up how to calculate the fill token
  4. I’m going to calculate the number of tokens left in the bucket, but I’m going to subtract the last token that I consumed
protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); CurrentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); If (currentTime <= oldLastFillTime) {return; } // Current token count long oldValue = storedTokens. Get (); Long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, Long currentValue = storedTokens. AddAndGet (0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); }}Copy the code
  1. The fact of the beginning becauselastFilledTimeandoldValueIt’s both zero, so it’s going to be a very large number based on the current timestamp, and it’s going to be zeromaxTokenSmall gives you the maximum number of tokens, so it’s generated the first time you initialize itmaxTokenThe token
  2. And then we’re going to assume that the QPS of the system starts out very low and then suddenly goes up. So start by going all the way back to the logic above the warning line, and thenpassQpsIt’s low, so it’s always filling up the token bucket (currentTime - lastFilledTime.get()Will always be 1000, which is 1 second), so the maximum QPS will be filled every timecountNumber of tokens
  3. Then came the surge of traffic, the QPS were high, and slowly the number of tokens would be consumed below the warning line and come to usifAnd then follow itcountQuantity increases token
private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; If (oldValue < warningToken) {if (oldValue < warningToken) {if (oldValue < warningToken) {if (oldValue < warningToken) {if (oldValue < warningToken) { NewValue = (long)(oldValue + (CurrentTime-lastFilledTime.get ()) * count / 1000); } else if (oldValue > warningToken) {// If (oldValue > warningToken) { Because the higher the QPS, the slower the token generation, If (passQps < (int)count/coldFactor) {newValue = (long)(oldValue + (currentTime-lastFilledTime.get ()))  * count / 1000); Return math.min (newValue, maxToken); return math.min (newValue, maxToken); }Copy the code

With the above logic sorted out, we can continue to look at some of the logic of current limiting:

  1. The logic of token calculation is completed, and then judge whether the warning line is exceeded. According to the above statement, the state of low QPS must always be exceeded, so it will calculate one according to the slopewarningQpsSince we are in the state of cold start, this stage is to calculate a QPS number based on the slope, so that the flow slowly reaches the peak that the system can withstand. For example, ifcountIs 100, so if the QPS is very low, the token bucket is always full, but the system controls the QPS, and the actual QPS that passes iswarningQpsAccording to the algorithm, it could be 10 or 20. When the QPS primary key is raised,aboveTokenAnd then gradually smaller, the wholewarningQpsIt just gets bigger and bigger, until it gets under the line. There it iselseIn the logic.
  2. When there’s a spike in traffic, it’selseLogic below the warning line, we token bucket constantly basedcountTo increase tokens, the speed of token consumption exceeds the speed of token generation, which may lead to a constant under the warning line. In this case, of course, we need to judge the flow limiting according to the highest QPS.
long restToken = storedTokens.get(); If (restToken >= warningToken) {// The number of tokens that exceed the threshold long aboveToken = resttoken-WarningToken; // Consume faster than warning, // Current interval = restToken*slope+1/count Double warningQps = math.nextup (1.0 / (aboveToken *slope+ 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { if (passQps + acquireCount <= count) { return true; }}Copy the code

So, following the process from low QPS to sharply increased QPS, imagine the process:

  1. At the beginning, the QPS of the system was so low that we filled the token bucket directly upon initialization
  2. Then, this low QPS state lasted for a period of time, because we kept filling the token with the maximum QPS quantity (because we took the minimum value, there was basically no change in the token in the bucket), so the token bucket was always in the full state, and the flow limit of the whole system was also at a low level

The part above that that stays above the warning line is actually a process called cold start/warm up.

  1. Then the QPS of the system suddenly surged, and the token consumption rate was too fast. Even if we increased the maximum number of QPS each time, the token consumption still could not maintain, so the token in the bucket kept decreasing and decreasing. At this time, the limit QPS in the cold start phase also kept increasing until the token in the bucket was below the warning line
  2. Below the warning line, the system will limit the current according to the maximum QPS. This process is the process that the system gradually reaches the maximum current limit

So that’s actually what we’re trying to do with this surge of traffic, the whole system adjusting to the sudden surge of QPS, and then finally reaching the QPS threshold of the system.

  1. Eventually, if the QPS returns to normal, it will gradually go back above the warning line, back to the original process.

conclusion

Because the algorithm if said alone are relatively simple, said we can understand, do not need a few words can say, so still have to get some source code to see how others are playing, so although I hate to put the source code, but still have to do.

Just rely on others to say a little bit actually do not understand, in accordance with the order of the words read again in the mind.

The most difficult to understand the source code is the realization of the token bucket, to tell the truth, several calculations of the logic I saw several times do not know what he calculated ghost, but the idea we understand on the line, the other logic is relatively easy to understand.