Introduction to the

The previous article analyzed request forwarding and data synchronization, but today we’ll take a look at the resilience4J plugin

The sample run

The environment is configured to start MySQL and Redis

D: \ Software \ mysql - 5.7.31 - winx64 \ bin > mysqld -- the console D: \ good \ redis \ redis - x64-3.2.100 > redis - server. Exe redis. Windows. ConfCopy the code

Soul-admin startup and related configuration

  • Run soul-admin and go to the admin interface: System Administration > Plug-in Management > Resilience4J. Click Edit to enable it.
  • Add selectors and rules, here install the matching mode of Divide plug-in, so that the interfaces of Divide/HTTP prefix are limited (because the official HTTP test is used when testing).
  • Rule configuration: The token filling number must be greater than 0. Otherwise, an error will be reported
  • Circuit enable Indicates the logic of traffic limiting
  • Others: Set fallback URI to any path. Set other parameters to 1

The soul-bootstrap configuration starts

Enter the related dependencies in soul-Bootstrap, which look like this:

<! -- soul resilience4j plugin start--> <dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-plugin-resilience4j</artifactId> <version>${last.version}</version> </dependency> <! -- soul resilience4j plugin end-->Copy the code

Start the Soul – the Bootstrap

HTTP sample startup

  • Launch: soul-examples –> SoulTestHttpApplication
  • On the divide management page, you can view information about registered interfaces
  • Visit: http://127.0.0.1:9195/http/order/findById? id=1111
  • Successful run, now start source debug

Source code analysis

RateLimiterPlugin is derived from AbstractSoulPlugin. Then RateLimiterPlugin will run the logic associated with route matching and run the doExcute flow limiting logic only after the route matching succeeds. The following code looks like this:

    # AbstractSoulPlugin
    // The route is matched first
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if(pluginData ! =null && pluginData.getEnabled()) {
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return CheckUtils.checkSelector(pluginName, exchange, chain);
            }
            final SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                if (PluginEnum.WAF.getName().equals(pluginName)) {
                    return doExecute(exchange, chain, null.null);
                }
                return CheckUtils.checkSelector(pluginName, exchange, chain);
            }
            if (selectorData.getLoged()) {
                log.info("{} selector success match , selector name :{}", pluginName, selectorData.getName());
            }
            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
            if (CollectionUtils.isEmpty(rules)) {
                if (PluginEnum.WAF.getName().equals(pluginName)) {
                    return doExecute(exchange, chain, null.null);
                }
                return CheckUtils.checkRule(pluginName, exchange, chain);
            }
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                //get last
                rule = rules.get(rules.size() - 1);
            } else {
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return CheckUtils.checkRule(pluginName, exchange, chain);
            }
            if (rule.getLoged()) {
                log.info("{} rule success match ,rule name :{}", pluginName, rule.getName());
            }
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

    # RateLimiterPlugin
    // Logic of limiting traffic after matching
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assertsoulContext ! =null;
        // Where can all the rules be filled out
        Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);
        // select * from 'Circle enable' where '1' = 'combined' and 'limit' = '0'
        if (resilience4JHandle.getCircuitEnable() == 1) {
            return combined(exchange, chain, rule);
        }
        return rateLimiter(exchange, chain, rule);
    }

    private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
        return ratelimiterExecutor.run(
                chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule))
                .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable));
    }
Copy the code

Continue to the RateLimiterExecutor class as follows:

public class RateLimiterExecutor implements Executor {

    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
        // Generate current limiter
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
        // The current limiting logic should be triggered here
        Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
        if(fallback ! =null) {
            return to.onErrorResume(fallback);
        }
        returnto; }}Copy the code

We see the obvious logic of generating a current limiter. When we look at its implementation, we find that it is an interface. When we look at its implementation, we find that there are two, respectively: SemaphoreBasedRateLimiter and AtomicRateLimiter, because I don’t know in which, we might perform in these two classes function to hit a breakpoint, send the request to restart, continuous jump breakpoints, finally entered a current limiter class: AtomicRateLimiter, roughly as follows:

    # AtomicRateLimiter
    public long reservePermission(final int permits) {
        long timeoutInNanos = ((AtomicRateLimiter.State)this.state.get()).config.getTimeoutDuration().toNanos();
        AtomicRateLimiter.State modifiedState = this.updateStateWithBackOff(permits, timeoutInNanos);
        boolean canAcquireImmediately = modifiedState.nanosToWait <= 0L;
        if (canAcquireImmediately) {
            this.publishRateLimiterEvent(true, permits);
            return 0L;
        } else {
            boolean canAcquireInTime = timeoutInNanos >= modifiedState.nanosToWait;
            if (canAcquireInTime) {
                this.publishRateLimiterEvent(true, permits);
                return modifiedState.nanosToWait;
            } else {
                this.publishRateLimiterEvent(false, permits);
                return -1L; }}}Copy the code

Debug SoulWebHandler to see if the current limiter is executed in the same order as expected: When the RateLimiterPlugin is executed, the breakpoint also reaches the AtomicRateLimiter. Divide and other plugins can execute only after the flow limiter logic is completed.

conclusion

This article explores the configuration of the resilience4J plug-in. The debugging verifies the execution sequence of the current-limiting logic in the plugin chain, and finds that the execution sequence of the current-limiting logic is consistent with that of the plugin.