There are three powerful tools you can use to protect a high-concurrency system: caching, degradation, and limiting traffic, and today we’ll talk about limiting traffic

Cache: The purpose of cache is to improve the system access speed and increase the system processing capacity. Degrade: Degrade some services and pages strategically based on the current service status and traffic when the server pressure increases sharply, so as to release server resources and ensure the normal operation of core tasks. The purpose of traffic limiting is to limit the speed of concurrent access/requests or requests within a time window to protect the system. Once the rate reaches the limit, service denial, queuing or waiting, and degradation can be processed

Basically, I use Timer for the implementation of the following algorithms. In fact, I can also use Timer for time. You can take a look at Guava’s RateLimiter. When too much pressure thread processing logic to calculate force not to come over, the result is bad, you can use the ScheduledThreadPoolExecutor thread pool to perform, reduce the pressure

It also makes extensive use of queue data structures, because most producer-consumer models require queue, first-in, first-out characteristics

The first section is about setting up the environment, writing requirements, interface requirements, and test cases, and the following four sections are basic algorithms

1. Environment construction

We simulate the Filter#doFilter interface for testing, all implementing AbstractLimiter#limit methods

The Filter implementation

public interface Filter {

    default public void init(a) {}public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain);

    default public void destroy(a) {}}Copy the code

FilterChain implementation

public interface FilterChain {

    void doFilter(ServletRequest request, ServletResponse response);
}
Copy the code

ServletRequest implementation

public class ServletRequest {

    private String msg;

    public String getMsg(a) {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString(a) {
        return "ServletRequest{" +
                "msg='" + msg + '\' ' +
                '} ';
    }

    public ServletRequest(String msg) {
        this.msg = msg; }}Copy the code

ServletResponse implementation

public class ServletResponse {


}
Copy the code

AbstractLimiter implementation

public abstract class AbstractLimiter {

    /** * Maximum traffic */
    protected final int MAX_FlOW;

    /** * constructor, input maximum flow per second *@paramMAX_FlOW Maximum flow */
    public AbstractLimiter(int MAX_FlOW) {
        this.MAX_FlOW = MAX_FlOW;
    }


    /** * the implementation method *@paramRequest request *@paramThe response response *@param* / chain execution
    public abstract void limit(ServletRequest request, ServletResponse response, FilterChain chain);

}
Copy the code

The Demo test class

public class Demo {

    @Test
    public void test(a) {
        
        / / filter
        Filter filter = new Filter() {
            AbstractLimiter limit = null;

            @Override
            public void init(a) {
                // Entry, we are limited to 100 requests per second
                limit = new LeakyBucketLimiter(100);
            }

            @Override
            public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) { limit.limit(request, response, chain); }};// Filter initialization
        filter.init();

        / / timer
        long start = System.currentTimeMillis();
        
        / / counter
        AtomicInteger integer = new AtomicInteger(0);

        ExecutorService pool = Executors.newFixedThreadPool(10);
        // Simulate 4000 requests
        IntStream.range(0.4000).forEach(e -> {
            try {
                // Simulate request latency
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e1) {
                //
            }

            // Multithreaded execution
            pool.execute(()->{
                filter.doFilter(new ServletRequest("" + e), new ServletResponse(), new FilterChain() {
                    @Override
                    public void doFilter(ServletRequest request, ServletResponse response) {
                        // Callback interface
                        integer.incrementAndGet();
                        System.out.println("Request:"+request.getMsg() + "Pass, execute thread"+Thread.currentThread().getName()); }}); }); }); System.out.println(Total time + (System.currentTimeMillis() - start));
        System.out.println("All through:"+ integer.get()); }}Copy the code

2. Counter algorithm

For example, I can pass 100 requests per second. Every time I get a request, I increment the Counter by 1. When the Counter reaches 100, I don’t let the request pass. For example, when I received 100 requests at 999ms, I initialized it just after 1000ms, but another 100 requests came in, this will happen. In fact, 200 requests were processed in 0.1s, which is seriously overloaded. At this point, the server cannot handle and all requests timeout….

public class CounterLimiter extends AbstractLimiter {

    private static final Integer initFlow = 0;

    private final AtomicInteger flow;

    public CounterLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
		
        // Initialize the counter
        flow = new AtomicInteger(initFlow);

        new Timer().schedule(new TimerTask() {
            @Override
            public void run(a) {
                // initialization is performed every 1000msflow.set(initFlow); }},0.1000);
    }

    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        // Compare whether there is an overload
        if (flow.get() < MAX_FlOW) {
		// pass: counter +1flow.incrementAndGet(); chain.doFilter(request, response); }}}Copy the code

3. Sliding window algorithm

Rolling Window algorithm can be said to be an improvement of the counter algorithm, he subdivided the calculator, for example, I subdivided the 1S 1000ms into 10 100ms, we have 10 counters, such as the question above, The 999ms and 1000ms problem, because we’re continuous, and I’m counting the 1000ms in, that’s not going to happen,

The higher the granularity is, the more resources will be calculated and the more accurate it will be. In fact, compared with Hystrix and Sentinel, the sliding window algorithm mainly considers the problem of less computing resources.

My algorithm is not optimal, and there is no need to use ArrayBlockingQueue to maintain the slider, since we are executing on a single thread and there is no multithreading problem, we can actually use LinkedList to simulate the queue, and there are other points to look at

public class RollingWindowFilter extends AbstractLimiter {

    /** * Our sliding window object, which contains multiple Windows */
    private final Slider slider;

    /** * The only counter exposed in the program can be called the current window */
    private AtomicInteger counter;

    /** * The counter initializes the size */
    private static final int INIT_SIZE = 0;

    /** * for example, if the window is divided into 10 blocks, this represents the calculation value of the 9 blocks first, why introduce this because it does not waste computing resources, most of the calculation is repeated */
    private final AtomicInteger preCount;


    /** * we default the queue size to 20, in fact, the granularity is very high 50ms calculation, can overload the construction parameter adjustment **@paramMAX_FlOW Maximum flow */
    public RollingWindowFilter(int MAX_FlOW) {

        super(MAX_FlOW);

        // Initialize the window, feel change the name to Windows better....
        slider = new Slider(20);


        // Initialize the object
        preCount = new AtomicInteger(INIT_SIZE);

        new Timer().schedule(new TimerTask() {
            @Override
            public void run(a) {

                ArrayBlockingQueue<AtomicInteger> queue = slider.blocks;

                // Current window size
                int size = queue.size();

                /** * Initialization window length */
                if (size < slider.capacity) {
                    try {

                        /** * count the sum of the counters in the previous window ** I'm too lazy to change, or you can just instantiate lots of objects instead of me as a single object */
                        preCount.set(INIT_SIZE);
                        if (size > 0) {
                            queue.forEach(e -> preCount.addAndGet(e.get()));
                        }

                        // Create a new counter and add the corresponding slider to the end of the queue
                        counter = new AtomicInteger(INIT_SIZE);
                        queue.put(counter);
                    } catch (InterruptedException e) {
                        //}}/** * When the window length initialization is complete */
                if (size == slider.capacity) {

                    try {
                        // Exit the first one in
                        queue.take();

                        // Calculate the sum of the counters in the previous window
                        preCount.set(INIT_SIZE);
                        queue.forEach(e -> preCount.addAndGet(e.get()));

                        
                        // Create a new counter and add the corresponding slider to the end of the queue
                        counter = new AtomicInteger(INIT_SIZE);
                        queue.put(counter);
                    } catch (InterruptedException e) {
                        //}}}},0.1000 / slider.capacity);
    }


    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {

        int cur = counter.get();
        int pre = preCount.get();
        int sum = cur + pre;

        if(sum < MAX_FlOW) { counter.incrementAndGet(); chain.doFilter(request, response); }}/** * a private static class is not a static inner class, so it cannot be instantiated. And the constructor is undecorated */
    private static class Slider {
        // How many counters
        private final int capacity;
        // Place the counter
        private final ArrayBlockingQueue<AtomicInteger> blocks;

        Slider(int capacity) {
            this.blocks = new ArrayBlockingQueue<>(capacity);
            this.capacity = capacity; }}}Copy the code

4. Leaky bucket algorithm

In fact, the Leaky Bucket algorithm, let’s think about it. We have a Leaky Bucket and a Leaky Bucket, and who controls the two of us? The Leaky Bucket is just a lot of requests, and the Leaky Bucket is the request we let go, so it’s a producer-consumer model, and the producer is the request. The consumer is the rate at which we consume requests,

Leaky bucket algorithm can make the request flow rate is uniform, no matter how many requests you make, I flow rate is uniform, when the bucket is full overflow, not full waiting to be drained

When you understand my above two paragraphs, you will understand the code below, my comments are very clear

public class LeakyBucketLimiter extends AbstractLimiter {

    /** ** our funnel */
    private final LeakyBucket leakyBucket;

    /** * constructor, input maximum flow per second **@paramMAX_FlOW Maximum flow */
    public LeakyBucketLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        this.leakyBucket = new LeakyBucket(MAX_FlOW);
    }

    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        try {
            // 1. Get the current water size of the bucket
            int size = leakyBucket.bucket.size();

            // 2. Compare the water in the bucket
            if (size < leakyBucket.waterSize) {

                // If it is not full, we will put the water in
                leakyBucket.bucket.put(newWater(request, response, chain)); }}catch (InterruptedException e) {
            //}}static class LeakyBucket {

        /** ** the size of the queue */
        final int waterSize;

        /** * our bucket of water */
        final ArrayBlockingQueue<Water> bucket;

        public LeakyBucket(int MAX_FlOW) {
            this.waterSize = MAX_FlOW;
            bucket = new ArrayBlockingQueue<>(this.waterSize);

            /** * analog consumption, 1S can only pass 100, indicating that 100ms can consume 10, depending on your granularity */
            new Timer().schedule(new TimerTask() {
                @Override
                public void run(a) {
                    // 10 out of 100ms
                    for (int i = 0; i < (waterSize / 10); i++) {
                        try {
                            // Out of the water
                            Water water = bucket.take();

                            / / implementation
                            water.chain.doFilter(water.request, water.response);
                        } catch (InterruptedException e) {
                            //}}}},0.100); }}/** * Our node object, which can actually be called successfully injected water, is waiting to be drained by the leaky bucket */
    static class Water {

        private ServletRequest request;

        private ServletResponse response;

        private FilterChain chain;

        public Water(ServletRequest request, ServletResponse response, FilterChain chain) {
            this.request = request;
            this.response = response;
            this.chain = chain; }}}Copy the code

5. Token bucket algorithm

The Token Bucket algorithm is the opposite of the leaky Bucket algorithm. It is also a producer-consumer model, but the roles are reversed. It is us to control the generation, request to perform the consumption, for example: For example, if we limit the flow to 100, we will generate 10 tokens every 100ms. When the number of tokens reaches 100, we will not produce any tokens. When a request comes, we will remove a token

So we can compare this to the leaky bucket algorithm, and let’s say I’m just getting started, and now I have 100 requests coming in, and the token bucket might reject 90 of them, because I only produced 10 tokens, but the leaky bucket doesn’t, he’ll put all 100 requests in and consume them, Because my bucket capacity is 100, I can put so many requests in it, that’s the difference between the two…. In fact, the stability is almost indistinguishable

The thought shift of the producer-consumer model can clarify the thinking, and the choice of model is sometimes an appropriate way to solve the problem

Most of the token bucket algorithms on the Internet are implemented using Guava’s RateLimiter, and here I’m going to implement two kinds of token bucket algorithms: one is implemented by myself, one is implemented using RateLimiter,

1. Self-implemented token buckets

public class TokenBucketLimiter extends AbstractLimiter {

    /** * token bucket */
    private final TokenBucket tokenBucket;

    /** * constructor, input maximum flow per second **@paramMAX_FlOW Maximum flow */
    public TokenBucketLimiter(int MAX_FlOW) {
        super(MAX_FlOW);
        this.tokenBucket = new TokenBucket(MAX_FlOW);
    }


    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        /** * we poll and poll and wait for 5mS. If we fail to poll and wait for 5mS, we poll and wait for 5mS
        try {
            // Try to get a token
            Token token = tokenBucket.bucket.poll(5, TimeUnit.MILLISECONDS);
            
            // Get the pass
            if (null != token) {
                chain.doFilter(request, response);
            }

        } catch (InterruptedException e) {
            //}}/** * token bucket */
    private static class TokenBucket {
        /** * the token location is maintained by a queue */
        private final ArrayBlockingQueue<Token> bucket;

        /** * How many tokens can a bucket hold */
        private final int tokenSize;

        public TokenBucket(int MAX_FlOW) {
            this.tokenSize = MAX_FlOW;
            this.bucket = new ArrayBlockingQueue<>(this.tokenSize);

            new Timer().schedule(new TimerTask() {
                @Override
                public void run(a) {
                    for (int x = 0; x < (tokenSize / 10); x++) {
                        try {
                            if (bucket.size() < tokenSize) {
                                // Place the token periodically
                                bucket.put(newToken()); }}catch (InterruptedException e) {
                            //}}}},0.100); }}/** * token */
    private static class Token {}}Copy the code

2. Implement token bucket based on Guava RateLimiter

public class GuavaRateLimiter extends AbstractLimiter {

    /** * token bucket */
    private final RateLimiter limiter;

    /** * The number of tokens required each time */
    private static final int ACQUIRE_NUM = 1;
    /** * Maximum waiting time */
    private static final int WAIT_TIME_PER_MILLISECONDS = 5;

    /** * constructor, input maximum flow per second **@paramMAX_FlOW Maximum flow */
    public GuavaRateLimiter(final int MAX_FlOW) {
        super(MAX_FlOW);
        limiter = RateLimiter.create(MAX_FlOW);
    }


    @Override
    public void limit(ServletRequest request, ServletResponse response, FilterChain chain) {
        /** * means THAT I'm trying to get a token, and the maximum waiting time is 5 ms, which is actually too long, and the development is less than 1ms */
        boolean flag = limiter.tryAcquire(ACQUIRE_NUM, WAIT_TIME_PER_MILLISECONDS, TimeUnit.MILLISECONDS);
        if(flag) { chain.doFilter(request, response); }}}Copy the code