Algorithm principle

1. (fixed) Time window flow limiting algorithm

  • Features: The system automatically selects the starting zero of a time window, and then divides the time axis into several time Windows of fixed length.
  • Principle: Checks whether the current statistics in the time window where the request arrives exceeds the threshold.
  • Disadvantages: Data beyond the threshold across window ranges.

2. Sliding time window flow limiting algorithm

  • Features: There is no fixed time window starting point and end point. Instead, the arrival time of each request is taken as the end point of the statistical time window, and the starting point is the time point at which the end point pushes forward the time window.
  • Principle: Determine whether the data in the range of request arrival time – window length exceeds the threshold.
  • Disadvantages: Duplicate statistics, performance problems.

3. (Improved) Sliding time window flow limiting algorithm

  • Features: fixed time window + sliding time window combination, time window is divided into several “sample window”.
  • Principle: Data will be counted once for each sample window and recorded. When a request arrives, the traffic data in the sample window at the current request time point will be counted, and then the statistics of other sample Windows in the time window will be added to determine whether the threshold is exceeded.
  • Disadvantages: Not accurate enough. It’s just that the time window is more fine-grained and less inaccurate.

The core source

I. Data statistics

Core classes:

  • StatisticSlot – Statistics entry
  • DefaultNode – Actual entry
  • StatisticNode – A statistics node
  • ArrayMetric – A metric class that uses arrays to hold data
  • LeapArray – Sample window array (circular array)
  • BucketLeapArray – Reset sample window
  • WindowWrap – Sample window (generic T for MetricBucket)
  • MetricBucket – Statistics encapsulation class (multidimensional, dimension type enumerated in MetricEvent)

1. StatisticSlot – Statistics entry

It is used to record and statistics the monitoring information of runtime indicators in different latitudes. (Do real-time statistics)

  • Number of threads: internal maintenance of a LongAdder to count the current number of threads, each incoming request +1, each released request -1.
  • QPS: Indicates whether the number of requests exceeds the threshold through a sliding time window.

There are three main things to do:

  • 1. Verify rules based on the current real-time statistics in node
  • 2. If the verification succeeds, update the real-time indicator data in node
  • 3. If it is blocked or an exception occurs, update the block indicator or exception indicator in node
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... Args) throws Throwable {try {// Pass backwards: Calls all subsequent slots in SlotChain to complete rule detection. (Exceptions may be thrown during the execution. For example, BlockException) fireEntry(Context, resourceWrapper, node, count, prioritized, args); // All previous rules pass: add thread count and QPS to DefaultNode (number of passed requests: sliding time window) node.increasethreadnum (); node.addPassRequest(count); . . }Copy the code

2. DefaultNode – Actual entry

Statistics resource current entry and global data

  • DefaultNode: Stores real-time indicators of a resource in a context. Each DefaultNode points to a ClusterNode
  • ClusterNode: Stores the total real-time indicators of a resource in all contexts. The same resource shares the same ClusterNode, regardless of the context in which it is located
Public void addPassRequest(int count) {Override public void addPassRequest(int count) {Override public void addPassRequest(int count) { super.addPassRequest(count); / / increase the current resource clusterNode global statistics (also call the superclass StatisticNode) behind this. ClusterNode. AddPassRequest (count); }Copy the code

3. StatisticNode – Statistics node

Sliding counters increase statistics by seconds/min respectively

// Define a counter that uses arrays to store data: Number of sample Windows -2, Default value of time window -1000ms Private TRANSIENT Volatile Metric rollingCounterInSecond = new ArrayMetric( SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL ); @ Override public void addPassRequest (int count) {/ / slide counter: increased statistics rollingCounterInSecond second/minute addPass (count); rollingCounterInMinute.addPass(count); }Copy the code

ArrayMetric – A metric class that uses arrays to store data

Statistics in seconds/min and record to the current sample window

@override public void addPass(int count) {WindowWrap<MetricBucket> wrap = data.currentwindow (); Wrap.value ().addpass (count); wrap.value().addpass (count); }Copy the code

5. LeapArray – Sample Window Array (circular array)

Get the sample window of the current point in time (LeapArray uses a circular array data structure, similar to the graph of the consistent hash algorithm)

  • TimeId = idX; timeId = idX; timeId = idX; timeId = idX; timeId = idX; But idX doesn’t grow, it just transforms between 0 and 1, because array is 2.)
  • 2. Calculate the start time of the current window based on the current time, in milliseconds
  • 3. According to the index IDX, obtain a time window old from the sampling window array and judge the processing
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 1. Id of the sample window where the current time is, That is, index in calculation array leapArray ((timeMillis/windowLengthInMs) % array.length()) int IDx = calculateTimeIdx(timeMillis); // 2, calculation: current sample windowStart time (timemillis-timemillis % windowLengthInMs) long windowStart = calculateWindowStart(timeMillis); WindowWrap<T> old = array.get(idx); If (old == null) {if (old == null) {if (old == null) {if (old == null) {if (old == null) {if (old == null) { newEmptyBucket(timeMillis)); If (array.compareAndSet(idx, null, window)) {// cas mode // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); }} else if (windowStart == old.windowstart ()) {return old; } else if (windowStart > old.windowstart ()) {// The calculated sample window is out of date (circle: already next circle) : If (updatelock.trylock ()) {try {return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); }} else if (windowStart < old.windowstart ()) { Return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); }}}Copy the code

Assumption: Time starts at 0

  • Within 500ms: Time window does not slide forward (timeId1), the current window start time (0), time window =timeId1+timeId2
  • 500 ~ 1000ms, the time window will slide forward to the next (timeId2), then will update the current window start time (500), time window =timeId1+timeId2
  • When the value exceeds 1000ms: Enter the next time window (timeId3) and update the start time of the current window (1000). The time window (when one of the arrays’ Windows will fail) =timeId2+timeId3
Public class Test {public static void main(String[] args) throws InterruptedException {int windowLength = 500; int arrayLength = 2; calculate(windowLength, arrayLength); for (int i = 0; i < 3; i++) { Thread.sleep(100); calculate(windowLength, arrayLength); } for (int i = 0; i < 3; i++) { Thread.sleep(500); calculate(windowLength, arrayLength); } } private static void calculate(int windowLength, int arrayLength) { long time = System.currentTimeMillis(); long timeId = time / windowLength; long currentWindowStart = time - time % windowLength; int idx = (int) (timeId % arrayLength); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); System.out.println("time=" + formatter.format(time) + ",currentWindowStart=" + currentWindowStart + ",timeId=" + timeId + ",idx=" + idx); }} Results: Time = 2021-07-15 20:06:17. 822, currentWindowStart = 1626350777500, timeId is = 3252701555, independence idx = 1 time = 2021-07-15 20:06:17. 954, currentWindowStart = 1626350777500, timeId is = 3252701555, independence idx = 1 time = 2021-07-15 20:06:18. 054, currentWindowStart = 1626350778000, timeId is = 3252701556, independence idx = 0 time = 2021-07-15 20:06:18. 157, currentWindowStart = 1626350778000, timeId is = 3252701556, independence idx = 0 time = 2021-07-15 20:06:18. 658, currentWindowStart = 1626350778500, timeId is = 3252701557, independence idx = 1 time = 2021-07-15 20:06:19. 159, currentWindowStart = 1626350779000, timeId is = 3252701558, independence idx = 0 time = 2021-07-15 20:06:19. 662, currentWindowStart = 1626350779500, timeId is = 3252701559, independence idx = 1Copy the code

6. BucketLeapArray – Reset sample window

Calculated sample window is out of date: reset the original time window (replace the old sample window)

@override protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) { Just replace the data with w.resetto (startTime); // Reset statistics for each dimension. return w; }Copy the code

MetricBucket – Statistics encapsulation class

Pass dimension increment

Public void addPass(int n) {// Add (MetricEvent. Pass, n); } public MetricBucket add(MetricEvent event, long n) { counters[event.ordinal()].add(n); return this; }Copy the code

Ii. Data Use (QPS example)

Core classes:

  • Entrance DefaultController –
  • StatisticNode – Actual entry
  • ArrayMetric – A metric class that uses arrays to hold data
  • LeapArray – Sample window array (circular array)

1. DefaultController – Entry

Gets the current time window statistics

Private int avgUsedTokens(Node Node) {if (Node == null) {return DEFAULT_AVG_USED_TOKENS; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); }Copy the code

StatisticNode – Actual entry

Gets the number of QPS passed

// Define a counter that uses arrays to store data: Number of sample Windows -2, Default value of time window -1000ms Private TRANSIENT Volatile Metric rollingCounterInSecond = new ArrayMetric( SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL ); @override public double passQps() { Statistics in current time window by the number of requests/time window length (in seconds) return rollingCounterInSecond. Pass ()/rollingCounterInSecond getWindowIntervalInSec (); }Copy the code

ArrayMetric – A metric class that uses arrays to store data

Aggregate PASS data: all sample Windows

@override public long pass() {// Update data data.currentwindow (); long pass = 0; List<MetricBucket> List = data.values(); For (MetricBucket window: list) {pass += window.pass(); } return pass; }Copy the code

4. LeapArray – Sample window array (circular array)

Summary sample window instances: Obsolete to filter

public List<T> values(long timeMillis) { if (timeMillis < 0) { return new ArrayList<T>(); } int size = array.length(); List<T> result = new ArrayList<T>(size); Result for (int I = 0; result for (int I = 0; i < size; i++) { WindowWrap<T> windowWrap = array.get(i); / / if the current traversal instance: empty/outdated if (windowWrap = = null | | isWindowDeprecated (timeMillis windowWrap)) {continue; } result.add(windowWrap.value()); } return result; } public Boolean isWindowDeprecated(long time, WindowWrap<T> WindowWrap) {// WindowWrap<T> WindowWrap Return time - windowwrap.windowStart () > intervalInMs; }Copy the code

The resources

  • The official documentation
  • Sentinel core class parsing