Introduction to the

After exploring the process in the previous article, today we’ll explore the resilience4J plugin for the lower limit flows

The sample run

Environment configuration

Start MySQL and Redis

docker run -dit --name redis -p 6379:6379 redis
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
Copy the code

Soul-admin startup and related configuration

Run soul-admin and go to the admin interface: System Administration > Plug-in Management > Resilience4J, click Edit and turn it on

Add selectors and rules, here install the matching mode of Divide plug-in, so that the interfaces of Divide/HTTP prefix are limited (because the official HTTP test is used when testing).

Rule configuration: The token filling number must be greater than 0. Otherwise, an error will be reported

Circuit enable Indicates the logic of traffic limiting

Others: Set fallback URI to any path. Set other parameters to 1

The soul-bootstrap configuration starts

Enter the related dependencies in soul-Bootstrap, which look like this:

<! -- soul resilience4j plugin start-->
  <dependency>
      <groupId>org.dromara</groupId>
      <artifactId>soul-spring-boot-starter-plugin-resilience4j</artifactId>
       <version>${last.version}</version>
  </dependency>
  <! -- soul resilience4j plugin end-->
Copy the code

Start the Soul – the Bootstrap

HTTP sample startup

Launch: soul-examples –> SoulTestHttpApplication

On the divide management page, you can view information about registered interfaces

Visit: http://127.0.0.1:9195/http/order/findById? id=1111

Successful run, now start source debug

{
    "id": "1111"."name": "hello world findById"
}
Copy the code

Source code for the Debug

Track and confirm the flow limiting process sequence

From the previous article, we have a clear idea of the processing flow. From debugging, we know that RateLimiterPlugin is derived from AbstractSoulPlugin, which will follow the logic related to route matching, as shown in the code below. Use the flow limiting logic of doExcute only after the match is successful

    # AbstractSoulPlugin
    // The route is matched first
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if(pluginData ! =null && pluginData.getEnabled()) {
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return CheckUtils.checkSelector(pluginName, exchange, chain);
            }
            final SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                if (PluginEnum.WAF.getName().equals(pluginName)) {
                    return doExecute(exchange, chain, null.null);
                }
                return CheckUtils.checkSelector(pluginName, exchange, chain);
            }
            if (selectorData.getLoged()) {
                log.info("{} selector success match , selector name :{}", pluginName, selectorData.getName());
            }
            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
            if (CollectionUtils.isEmpty(rules)) {
                if (PluginEnum.WAF.getName().equals(pluginName)) {
                    return doExecute(exchange, chain, null.null);
                }
                return CheckUtils.checkRule(pluginName, exchange, chain);
            }
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                //get last
                rule = rules.get(rules.size() - 1);
            } else {
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return CheckUtils.checkRule(pluginName, exchange, chain);
            }
            if (rule.getLoged()) {
                log.info("{} rule success match ,rule name :{}", pluginName, rule.getName());
            }
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

    # RateLimiterPlugin
    // Logic of limiting traffic after matching
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assertsoulContext ! =null;
        // Where can all the rules be filled out
        Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);
        // select * from 'Circle enable' where '1' = 'combined' and 'limit' = '0'
        if (resilience4JHandle.getCircuitEnable() == 1) {
            return combined(exchange, chain, rule);
        }
        return rateLimiter(exchange, chain, rule);
    }

    // This is a bit complicated, so I can only continue to follow it
    private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
        return ratelimiterExecutor.run(
                chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule))
                .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable));
    }
Copy the code

RateLimiter is a bit confusing at the beginning. I can’t understand the flow programming knowledge, but I can generally understand the flow limiting logic

public class RateLimiterExecutor implements Executor {

    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
        // Generate current limiter
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
        // The current limiting logic should be triggered here
        Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
        if(fallback ! =null) {
            return to.onErrorResume(fallback);
        }
        returnto; }}Copy the code

Moving on to the above class, we see obvious logic for generating the limiter, but one thing that is confusing is that we don’t see obvious limiter trigger logic for returning the Mono. Feeling confused without the basics of responsive programming, and not yet locating where the actual trigger code is? But the speculation was triggered by the passage noted in the comment above

Because of the response, there’s no way to follow, so we have to find another path, and see what the specific flow limiting logic looks like, right

RateLimiter is a current limiter. Let’s look at its implementation

Discovery is an interface, we see what it has achieved, found that there are two: SemaphoreBasedRateLimiter and AtomicRateLimiter

Without knowing which one to use, we set breakpoints for any function that might be executed in either class

Restart the sending request, repeatedly jump breakpoints, and finally enter a stream limiter class: AtomicRateLimiter, roughly as follows

    # AtomicRateLimiter
    public long reservePermission(final int permits) {
        long timeoutInNanos = ((AtomicRateLimiter.State)this.state.get()).config.getTimeoutDuration().toNanos();
        AtomicRateLimiter.State modifiedState = this.updateStateWithBackOff(permits, timeoutInNanos);
        boolean canAcquireImmediately = modifiedState.nanosToWait <= 0L;
        if (canAcquireImmediately) {
            this.publishRateLimiterEvent(true, permits);
            return 0L;
        } else {
            boolean canAcquireInTime = timeoutInNanos >= modifiedState.nanosToWait;
            if (canAcquireInTime) {
                this.publishRateLimiterEvent(true, permits);
                return modifiedState.nanosToWait;
            } else {
                this.publishRateLimiterEvent(false, permits);
                return -1L; }}}Copy the code

The implementation logic is not our focus this time, but what is the sequence of processes handled in the plugin

As in the previous articles, we can set a breakpoint at: SoulWebHandler to see what the order of execution of the current limiter is

Through debugging, we found that the sequence was basically the same as we expected: When entering the RateLimiterPlugin to execute, the breakpoint also reached the AtomicRateLimiter. Divide and other plug-ins started to execute after the flow limiter logic was completed

Some thoughts on enforcing penalties and Mono

Let’s take a look at the following stream limiting code:

public class RateLimiterExecutor implements Executor {

    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
        // Generate current limiter
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
        // The current limiting logic should be triggered here
        Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
        if(fallback ! =null) {
            return to.onErrorResume(fallback);
        }
        returnto; }}Copy the code

Return a Mono

Let’s look at something like Divide, which also returns Mono

public class DividePlugin extends AbstractSoulPlugin {

    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assertsoulContext ! =null;
        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
        if (CollectionUtils.isEmpty(upstreamList)) {
            log.error("divide upstream configuration error: {}", rule.toString());
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        if (Objects.isNull(divideUpstream)) {
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // set the http url
        String domain = buildDomain(divideUpstream);
        String realURL = buildRealURL(domain, soulContext, exchange);
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
        // set the http timeout
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        returnchain.execute(exchange); }}Copy the code

Take a look at the familiar: SoulWebHandler

        public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    SoulPlugin plugin = plugins.get(this.index++);
                    Boolean skip = plugin.skip(exchange);
                    if (skip) {
                        return this.execute(exchange);
                    }
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }
Copy the code

In the above function, you can see in English that all plugins return a Mono

We combine a related concept of reactive programming: publish and subscribe. That is, these plugin monos are published to a queue, and when subscribed, they are taken out and executed sequentially

The logic of subscribing is generally there, so let’s take a look at our third analysis :Soul Gateway source code reading (iii) Request processing overview

In class HttpServerHandle, find a suspicious section: HttpServerHandle

    public void onStateChange(Connection connection, State newState) {
        if (newState == HttpServerState.REQUEST_RECEIVED) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(connection.channel(), "Handler is being applied: {}"), new Object[]{this.handler});
                }

                HttpServerOperations ops = (HttpServerOperations)connection;
                // There is publishing and subscribing, and handler.apply(ops, ops) constantly calls the logic of subsequent plugins
                Mono.fromDirect((Publisher)this.handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());
            } catch (Throwable var4) {
                log.error(ReactorNetty.format(connection.channel(), ""), var4); connection.channel().close(); }}}Copy the code

The flow limiting Mono precedes Divide, so the flow limiting is implemented first, which is roughly as follows:

The fromDirect function triggers the Plugin Mono to be queued. The subscribe function triggers the execution, and the execution order is first in, first out. If GlobalPlugin is first in, the execution starts first. That order corresponds to our debugging guess

Responsive programming hasn’t been thoroughly studied, so it could be wrong

Doubt point

In the following section of the generation of the flow limiter logic, it seems that each request is a new generation, whether it is possible to reuse, configuration add a field, indicating whether the update, no update, we will reuse our previous flow limiter; If there’s an update we’ll just generate a new one

Of course, the above optimization, need to understand the dynamic configuration update, and then see if it is feasible

May be not familiar with Resilience4J, may Resilience4JRegistryFactory itself in the following code to achieve the cache reuse

public class RateLimiterExecutor implements Executor {

    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
        // Generate current limiter
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
        // The current limiting logic should be triggered here
        Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
        if(fallback ! =null) {
            return to.onErrorResume(fallback);
        }
        returnto; }}Copy the code

conclusion

This article explores the configuration of the resilience4J plug-in. The debugging verifies the execution sequence of the current-limiting logic in the plugin chain, and finds that the execution sequence of the current-limiting logic is consistent with that of the plugin

The execution conjecture of plugin chain in Mono queue is preliminarily discussed. Later, we will verify whether the conjecture is normal when we study reactive programming

Finally, some optimization questions generated by the current limiter are put forward to see whether their conjecture can be verified when configuring and updating relevant analysis later

Refer to the link

  • Resilience4j plug-in
  • Resilience4j source code analysis (3) : RateLimiter and common traffic limiting algorithms

Soul Gateway source code analysis article list

Github

  • Soul source reading (a) overview

  • Soul source code reading (two) the initial run of the code

  • HTTP request processing overview

  • Dubbo Request Overview

  • Soul Gateway source code reading (five) request type exploration

  • Soul Gateway source code reading (6) Sofa request processing overview

  • HTTP parameter request error

The Denver nuggets

  • Soul Gateway source code read (a) overview

  • Soul Gateway source code reading (two) the initial operation of the code

  • Soul Gateway source code reading (3) Request processing overview

  • Dubbo Request Overview

  • Soul Gateway source code reading (five) request type exploration

  • Soul Gateway source code reading (6) Sofa request processing overview

  • HTTP parameter request error