This is the 9th day of my participation in the August More Text Challenge. For details, see:August is more challenging

preface

The Ribbon is an open source project of Netflix, which is now included in SpringCloud. It is a client load balancer based on HTTP and TCP. When we use the Ribbon with Eureka, The Ribbon retrieves the server list from the Eureka registry and performs load balancing through polling. The client load balancer uses the heartbeat mechanism to maintain the server list. This process works with the service registry.

What is load balancing?

Load balancing is one of the important means for us to deal with high concurrency, relieve network pressure and expand server capacity. However, in general, what we call load balancing usually refers to server load balancing. There are two kinds of load balancing, and one is client load balancing.

RibbonNginxThe difference between?

The Ribbon is the client load balancer. Nginx is the server load balancer.

The client load refers to the client has a list of service instances to call. For example, Eureka/NACOS stores information about each service instance. The Ribbon, which is integrated with eureka/ NACOS, selects an instance load from a known list of services based on a certain policy. That is, load balancing is performed on clients.

Server load refers to that the client does not know which server instance to call. After sending a request, the client selects one of multiple servers to access the request based on the load balancing algorithm of the server. That is, the client allocates the load balancing algorithm on the server.

The difference between client load balancing and server load balancing is actually the location where the service list is stored. In client load balancing, all clients have a list of server addresses to access.

Ribbon Load Policy

The diagram of seven load balancing policies is as follows:

Load balancing interface com.net flix. Loadbalancer. IRule, by an abstract class AbstractLoadBalancerRule IRule interface, the concrete implementation strategy is an abstract class.

Polling policy –RoundRobinRule

Let’s take a look at how the source code for RoundRobinRule class is implemented.

public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        // Used to count the number of attempts by the load balancer to obtain an available server
        int count = 0;
        // A total of 10 attempts are made. If more than 10 attempts are made, the load fails
        while (server == null && count++ < 10) {
            // Get all reachable servers
            List<Server> reachableServers = lb.getReachableServers();
            // Get all servers
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
            // Spin-lock calculates the next load of the server
            int nextServerIndex = incrementAndGetModulo(serverCount);
            // Fetch the next load of the server
            server = allServers.get(nextServerIndex);
            // If this server is not available, the current thread relinquizes the CPU and sets it to the ready state
            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }
        // Get load server will be tried 10 times, more than 10 warnings
        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }
    
    // Modulo and update the server for the next load using the CAS mechanism
    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            // Get the atomic attribute value
            int current = nextServerCyclicCounter.get();
            // Take modulus
            int next = (current + 1) % modulo;
            // The CAS mechanism updates the identifying server cycle counter
            if (nextServerCyclicCounter.compareAndSet(current, next))
                returnnext; }}}Copy the code

Polling calculates the index of the loaded machine through modulus calculation, and takes out the list of all servers as the loaded server according to the index. The AtomicInteger + CAS mechanism is used to record the server identity of a load, ensuring thread safety.

Random strategy –RandomRule

The random policy refers to the random selection of server instances for load and the use of ThreadLocalRandom to obtain random numbers to ensure thread safety.

public class RandomRule extends AbstractLoadBalancerRule {
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            if (Thread.interrupted()) {
                return null;
            }
            // Obtain the reachable server and all servers
            List<Server> upList = lb.getReachableServers();
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();
            if (serverCount == 0) {
                return null;
            }
            // Select a random number
            int index = chooseRandomInt(serverCount);
            server = upList.get(index);

            if (server == null) {
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            server = null;
            Thread.yield();
        }

        return server;

    }

    // Select a random number
    protected int chooseRandomInt(int serverCount) {
        returnThreadLocalRandom.current().nextInt(serverCount); }}Copy the code

Retry Policy –RetryRule

The service instance is obtained based on the polling load policy. If the service instance fails to be obtained, the service instance is retried within the specified time (500ms by default), and the polling policy is repeatedly invoked to obtain the instance.

Using InterruptTask, a Timer daemon thread is started to delay the execution of a specified task. It invokes the polling policy repeatedly to obtain server information within the retry time range. If no server information is obtained after the specified retry time, null is returned

public class RetryRule extends AbstractLoadBalancerRule {

    public Server choose(ILoadBalancer lb, Object key) {
       long requestTime = System.currentTimeMillis();
       long deadline = requestTime + maxRetryMillis;

       Server answer = null;

       // Call the polling policy
       answer = subRule.choose(key);

       / / if the polling strategy no access to the server | | server inactive && at a specified maximum retry time
       if (((answer == null) | | (! answer.isAlive())) && (System.currentTimeMillis() < deadline)) {// Start the daemon thread to monitor the remaining specified retry time
          InterruptTask task = new InterruptTask(deadline
                - System.currentTimeMillis());

          // Within the specified retry time range, if the current thread is not interrupted, the polling policy is invoked circulatively
          while(! Thread.interrupted()) { answer = subRule.choose(key);if (((answer == null) | | (! answer.isAlive())) && (System.currentTimeMillis() < deadline)) {/* pause and retry hoping it's transient */
                Thread.yield();
             } else {
                break;
             }
          }

          task.cancel();
       }

       if ((answer == null) | | (! answer.isAlive())) {return null;
       } else {
          returnanswer; }}}Copy the code

Weighted response time –WeightedResponseTimeRule

The WeightedResponseTimeRule class inherits the polling policy class RandomRule

During initialization, a timer is started and weights are assigned every 30 seconds based on the service response time. The longer the response time is, the lower the weights are, and the lower the probability of being selected. The shorter the response time, the higher the weight, and the higher the probability that the instance is selected. After the weights are obtained, the random weights are generated to hit the first service instance whose weights are greater than the random weights.

public class WeightedResponseTimeRule extends RoundRobinRule {
    
    // Count the weight of each service every 30s
    public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;

    // Record the accumulated weight
    private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
    
    / / initialization
    void initialize(ILoadBalancer lb) {        
        if(serverWeightTimer ! =null) {
            serverWeightTimer.cancel();
        }
        serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
                + name, true);
        // Count the weight of each service
        serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
                serverWeightTaskTimerInterval);
        // do a initial run
        ServerWeight sw = new ServerWeight();
        sw.maintainWeights();

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run(a) {
                logger
                        .info("Stopping NFLoadBalancer-serverWeightTimer-"+ name); serverWeightTimer.cancel(); }})); }@Override
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            List<Double> currentWeights = accumulatedWeights;
            // Determine whether the thread is interrupted
            if (Thread.interrupted()) {
                return null;
            }
            // Get the list of servers
            List<Server> allList = lb.getAllServers();
            int serverCount = allList.size();
            if (serverCount == 0) {
                return null;
            }
            int serverIndex = 0;

            Currentweights.size () -1 is the sum of the ownership weights
            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 

            // If no server is hit, call the polling policy to get it
            if (maxTotalWeight < 0.001 d|| serverCount ! = currentWeights.size()) { server =super.choose(getLoadBalancer(), key);
                if(server == null) {
                    returnserver; }}else {
                // Get a random number between 0 and the sum of the ownership weights as the random weights
                double randomWeight = random.nextDouble() * maxTotalWeight;
                int n = 0;
                // Hit the first service instance whose weight is greater than the random weight
                for (Double d : currentWeights) {
                    if (d >= randomWeight) {
                        serverIndex = n;
                        break;
                    } else {
                        n++;
                    }
                }

                server = allList.get(serverIndex);
            }

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            // Next.
            server = null;
        }
        returnserver; }}/ / inner classes
class ServerWeight {

    public void maintainWeights(a) {
        ILoadBalancer lb = getLoadBalancer();
        if (lb == null) {
            return;
        }
        
        if(! serverWeightAssignmentInProgress.compareAndSet(false.true))  {
            return; 
        }
        
        try {
            logger.info("Weight adjusting job started");
            AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
            LoadBalancerStats stats = nlb.getLoadBalancerStats();
            if (stats == null) {
                // no statistics, nothing to do
                return;
            }
            double totalResponseTime = 0;
            // Calculate the cumulative average response time for all service instances
            for (Server server : nlb.getAllServers()) {
                ServerStats ss = stats.getSingleServerStat(server);
                totalResponseTime += ss.getResponseTimeAvg();
            }
            // Record the accumulated weight
            Double weightSoFar = 0.0;
            
            // Store weights for all services
            List<Double> finalWeights = new ArrayList<Double>();
            for (Server server : nlb.getAllServers()) {
                ServerStats ss = stats.getSingleServerStat(server);
                // Weight per service = Total average response time of all services - Average response time of the current service
                // Therefore, the greater the response time of the service, the smaller the weight, and the less likely it is to be selected
                double weight = totalResponseTime - ss.getResponseTimeAvg();
                weightSoFar += weight;
                finalWeights.add(weightSoFar);   
            }
            setWeights(finalWeights);
        } catch (Exception e) {
            logger.error("Error calculating server weights", e);
        } finally {
            serverWeightAssignmentInProgress.set(false); }}}Copy the code

For example: Now there are three service instances, and the average response time is:

  • A: 100 ms
  • B: 200 ms
  • C: 300 ms

Then the weights are:

  • A: 600-100 = 500
  • B: 500+600-200 = 900
  • C: 900+600-300 = 1200

If the random number is between 0 and 500, service A is matched; if the random number is between 500 and 900, service B is matched; if the random number is between 900 and 1200, service C is matched; if no service instance is matched, the polling policy result is obtained.

Best available policy –BestAvailableRule

If no load balancer is specified, a service instance is selected using a polling policy.

If the load balancer is specified, the service instances are inspected one by one, the instances with circuit breaker trip status are filtered out, and the instance with the least concurrency is selected from the unfiltered instances. If there is no match, the polling policy selects a service instance.

public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {

    @Override
    public Server choose(Object key) {
        // The polling policy is invoked when no load balancer is specified
        if (loadBalancerStats == null) {
            return super.choose(key);
        }
        // Get the list of all servers
        List<Server> serverList = getLoadBalancer().getAllServers();
        // Minimum number of concurrent connections
        int minimalConcurrentConnections = Integer.MAX_VALUE;
        long currentTime = System.currentTimeMillis();
        Server chosen = null;
        // Iterate through the list of servers
        for (Server server: serverList) {
            // Get server statistics
            ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
            // Filter out instances of circuit breaker trip if the server circuit breaker does not trip
            if(! serverStats.isCircuitBreakerTripped(currentTime)) {int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                // Select the instance with the least concurrency
                if(concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; }}}// If no, poll to select an instance
        if (chosen == null) {
            return super.choose(key);
        } else {
            returnchosen; }}}Copy the code

Availability filtering policy –AvailabilityFilteringRule

The policy is inherited from the abstract policy PredicateBasedRule class.

If the service does not match the filter conditions, the system polls for 10 times. If the service does not match the filter conditions for 10 times, the system polls for an instance.

Filter condition: The circuit breaker is faulty or the number of concurrent requests exceeds the threshold

public class AvailabilityFilteringRule extends PredicateBasedRule {  

    @Override
    public Server choose(Object key) {
        int count = 0;
        // The polling policy selects an instance
        Server server = roundRobinRule.choose(key);

        while (count++ <= 10) {
            // Determine whether the assertion conditions are met
            if (predicate.apply(new PredicateKey(server))) {
                return server;
            }
            // If the assertion condition is not met, then poll to select an instance
            server = roundRobinRule.choose(key);
        }
        // If more than 10 times are not satisfied, use the superclass 'PredicateBasedRule' policy
        return super.choose(key); }}Copy the code

Take a look at the load policy of the superclass PredicateBasedRule

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {

    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        // After filtering according to the conditions, select instances using a polling policy
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null; }}}Copy the code

To see what the assertions are, go to the AvailabilityPredicate class and see what the assertions are

public class AvailabilityPredicate extends  AbstractServerPredicate {

    @Override
    public boolean apply(@Nullable PredicateKey input) {
        LoadBalancerStats stats = getLBStats();
        if (stats == null) {
            return true;
        }
        return! shouldSkipServer(stats.getSingleServerStat(input.getServer())); }private boolean shouldSkipServer(ServerStats stats) {
        // Filter an instance if one of the following two conditions is met
        // 1. The circuit breaker is open and faulty
        // 2. Concurrent requests of the instance >= threshold
        if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
            return true;
        }
        return false; }}Copy the code

Area avoidance strategy –ZoneAvoidanceRule

Inherited from PredicateBasedRule

public class ZoneAvoidanceRule extends PredicateBasedRule {

    private static final Random random = new Random();

    private CompositePredicate compositePredicate;
    
    public ZoneAvoidanceRule(a) {
        super(a);// Two filter conditions
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); }}Copy the code

Two assertion conditions

public class ZoneAvoidancePredicate extends  AbstractServerPredicate {
    @Override
    public boolean apply(@Nullable PredicateKey input) {
        if(! ENABLED.get()) {return true;
        }
        String serverZone = input.getServer().getZone();
        if (serverZone == null) {
            // there is no zone information from the server, we do not want to filter
            // out this server
            return true;
        }
        LoadBalancerStats lbStats = getLBStats();
        if (lbStats == null) {
            // no stats available, do not filter
            return true;
        }
        if (lbStats.getAvailableZones().size() <= 1) {
            // only one zone is available, do not filter
            return true;
        }
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        if(! zoneSnapshot.keySet().contains(serverZone)) {// The server zone is unknown to the load balancer, do not filter it out 
            return true;
        }
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        // Obtain the available zone
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if(availableZones ! =null) {
            return availableZones.contains(input.getServer().getZone());
        } else {
            return false; }}}Copy the code

This filter is the filter conditions AvailabilityFilteringRule strategy.

public class AvailabilityPredicate extends  AbstractServerPredicate {

    @Override
    public boolean apply(@Nullable PredicateKey input) {
        LoadBalancerStats stats = getLBStats();
        if (stats == null) {
            return true;
        }
        return! shouldSkipServer(stats.getSingleServerStat(input.getServer())); }// Filter an instance if one of the following two conditions is met
    // 1. The circuit breaker is open and faulty
    // 2. Concurrent requests of the instance >= threshold
    private boolean shouldSkipServer(ServerStats stats) {        
        if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
            return true;
        }
        return false; }}Copy the code

summary

This article mainly analyzes the seven load balancing strategies of the ribbon from the perspective of source code. If you are interested in MySQL, Spring, etc., please keep paying attention.