The cluster – LoadBalance

Objective: Describes the load balancing of clusters in Dubbo. Describes the source code of the loadBalance package under Dubbo -cluster.

preface

Load balancing, colloquially speaking, is a bowl of water. Fairness is important in this era, and the same is true when it comes to network requests. We have many machines, but requests always go to one server, and some servers are idle all the year around, resulting in a waste of resources and increasing the risk of server downtime due to pressure overload. This is when the emergence of load balancing is needed. It acts as a scale, allowing each server to get the load appropriate to its processing capacity through various strategies, thus avoiding the heavy load of servers and resource waste. Load balancing is divided into software load balancing and hardware load balancing. What we are talking about here is software load balancing. In DuBBo, call requests of consumers need to be distributed to avoid the situation that a few service providers are overloaded and other services are idle, because excessive load will lead to service request timeout. This is where load balancing comes into play. Dubbo provides four load balancing implementations:

  1. RandomLoadBalance: Weighted random algorithm
  2. Leastactive VeloadBalance: Algorithm based on least number of active calls
  3. ConsistentHashLoadBalance: based on the hash consistency
  4. RoundRobinLoadBalance: Based on the weighted polling algorithm

The concrete implementation is explained below.

Source code analysis

(a) AbstractLoadBalance

This class implements the LoadBalance interface. It is an abstract class of load balancing and provides the weight calculation function.

1.select

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Return null if invokers is empty
    if (invokers == null || invokers.isEmpty())
        return null;
    // If invokers has only one service provider, one is returned
    if (invokers.size() == 1)
        return invokers.get(0);
    // Call doSelect for selection
    return doSelect(invokers, url, invocation);
}
Copy the code

This method selects an invoker, and the key choice is to call the doSelect method, but doSelect is an abstract method implemented by each of the four load-balancing strategies mentioned above.

2.getWeight

protected int getWeight(Invoker
        invoker, Invocation invocation) {
    // Get the weight configuration, which is the service weight. The default value is 100
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
    if (weight > 0) {
        // Get the startup timestamp
        long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
        if (timestamp > 0L) {
            // Get the total startup time
            int uptime = (int) (System.currentTimeMillis() - timestamp);
            // The total time it takes to get warmed up. The default value is 10 minutes
            int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
            // If the service running time is less than the preheating time, recalculate the service weight, that is, reduce the weight
            if (uptime > 0&& uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); }}}return weight;
}
Copy the code

This method is the method to obtain the weight, which is implemented in the calculateWarmupWeight method, which takes into account the process of JVM warm-up.

3.calculateWarmupWeight

static int calculateWarmupWeight(int uptime, int warmup, int weight) {
    // Calculate the weight (uptime/warmup) * weight, progress percentage * weight
    int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
    // The weight range is between [0, weight]
    return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
Copy the code

This method is used to calculate the weight. The calculation formula is (uptime/Warmup) * weight, which means percentage of progress * weight value.

(2) RandomLoadBalance

For example, I have A group of Servers = [A, B, C], their corresponding weights = [6, 3, 1], the total weight is 10, now spread these weight values on the one-dimensional coordinate value. Three regions appear respectively, A region is [0,6), B region is [6,9), C region is [9,10), and then generate A random number [0, 10), to see which region the number falls in, which server is used, so that the greater the weight, the greater the probability of being hit.

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    /** * random number generator */
    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Get the service length
        int length = invokers.size(); // Number of invokers
        // Total weight
        int totalWeight = 0; // The sum of weights
        // Whether the weights are the same
        boolean sameWeight = true; // Every invoker has the same weight?
        // Iterate over each service and calculate the corresponding weight
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // Calculate the total weight value
            totalWeight += weight; // Sum
            // If the weight of the former service is not equal to the weight of the latter, sameWeight is false
            if (sameWeight && i > 0&& weight ! = getWeight(invokers.get(i -1), invocation)) {
                sameWeight = false; }}// If each service weight is different and the total weight value is not 0
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            // Loop the number of offsets minus the service provider weight value, and return the corresponding Invoker when offset is less than 0.
            For example, we have servers = [A, B, C], weights = [6, 3, 1], offset = 7.
            // for the first loop, offset-6 = 1 > 0, i.e. offset > 6
            // Indicates that it does not fall on the corresponding range of server A.
            // For the second loop, offset 3 = -2 < 0, i.e. 6 < offset < 9,
            // indicates that it will fall on the corresponding range of server B
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    returninvokers.get(i); }}}// If all invokers have the same weight value or totalWeight=0, return evenly.
        // If all service providers have the same weight value, then just return one randomly
        returninvokers.get(random.nextInt(length)); }}Copy the code

This algorithm is easy to understand, of course, RandomLoadBalance also has some disadvantages. When the number of calls is relatively small, the Random number generated by Random may be more concentrated, and most requests will fall on the same server, but the impact is not big.

(3) LeastActiveLoadBalance

The load balancing strategy is based on the minimum number of active calls algorithm. The smaller the number of active calls to a service, the more efficient the service provider is, and the more requests it can handle per unit of time. This type of server should be selected. The implementation is very simple, that is, each service has an active number active to record the active value of the service, each received a request, the active will be increased by 1, not complete a request, active will be reduced by 1. After the service has been running for a while, the better performing service providers process the requests more quickly, so the active count drops more quickly, and they get priority for new service requests. In addition to the minimum active number, the weight value is also introduced, that is, when the active number is the same, the weight method is used to select, if the weight is the same, then randomly select one.

public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

    /** ** with the machine */
    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Get the service length
        int length = invokers.size(); // Number of invokers
        // Minimum active number
        int leastActive = -1; // The least active value of all invokers
        // The number of service providers (hereinafter referred to as Invoker) with the same minimum active number
        int leastCount = 0; // The number of invokers having the same least active value (leastActive)
        // leastIndexs is used to record the subscript information of invokers with the same "minimum active number" in the Invokers list
        int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
        // Total weight
        int totalWeight = 0; // The sum of with warmup weights
        // The Invoker weight of the first minimum active number is used for comparison with other invokers with the same minimum active number.
        // To test whether "all invokers with the same minimum active number are equally weighted"
        int firstWeight = 0; // Initial value, used for comparision
        // Whether the weights are the same
        boolean sameWeight = true; // Every invoker has the same weight value?
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // Get the number of Invoker active
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
            // Get the weight of the service
            int afterWarmup = getWeight(invoker, invocation); // Weight
            // Find a smaller active number and start again
            if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
                // Record the current minimum active number
                leastActive = active; // Record the current least active value
                // Update leastCount to 1
                leastCount = 1; // Reset leastCount, count again based on current leastCount
                // Record the current subscript value in leastIndexs
                leastIndexs[0] = i; // Reset
                totalWeight = afterWarmup; // Reset
                firstWeight = afterWarmup; // Record the weight the first invoker
                sameWeight = true; // Reset, every invoker has the same weight value?
                // If the current active count of Invoker is the same as the minimum active count of leastActive
            } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
                // In leastIndexs, record the index of the current Invoker in the invokers collection
                leastIndexs[leastCount++] = i; // Record index number of this invoker
                // add weights
                totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
                // If every invoker has the same weight?
                if (sameWeight && i > 0&& afterWarmup ! = firstWeight) { sameWeight =false; }}}// assert(leastCount > 0)
        // If only one Invoker has the minimum number of invokers, the Invoker is returned
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexs[0]);
        }
        // There are multiple Invokers with the same minimum number of invokers, but they have different weights
        if(! sameWeight && totalWeight >0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            // Generate a random number
            int offsetWeight = random.nextInt(totalWeight) + 1;
            // Return a invoker based on the random value.
            // The algorithm can refer to RandomLoadBalance
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    returninvokers.get(leastIndex); }}// If all invokers have the same weight value or totalWeight=0, return evenly.
        // If the weights are the same, pick one at random
        returninvokers.get(leastIndexs[random.nextInt(leastCount)]); }}Copy the code

The first half of the strategy is for the minimum active number, and the second half is for the random weight strategy, see RandomLoadBalance.

(4) ConsistentHashLoadBalance

This class is a logical implementation of load balancing based on hash consistency. The consistent hash algorithm was developed by Karger and his collaborators at MIT in 1997 and was initially used extensively for load balancing in cache systems. It works like this: it first generates a hash for the cache node based on IP or other information, and computes the hash using arguments in dubbo. The hash is projected onto the ring [0, 232-1], and a hash value is generated when there is a query or write request. It then looks for the first cache node with a hash value greater than or equal to that hash value and queries or writes the cache entry to that node. If the current node is down, the next time you query or write to the cache, look for another cache node for the cache entry that has a greater hash value than its hash value. The general effect is shown below (refer to the picture on the official website).

Each cache node occupies a position on the ring. If the hash value of the key of the cache item is less than the hash value of the cache node, the cache item is stored or read from the cache node. The two concepts are not to be confused. The cache node is like the service provider in dubbo, there are many service providers, and the cache item is like the consumer of service references. For example, the cache item corresponding to the green dot below, the service consumer, will be stored in the cache-2 node. Since cache-3 is down, the cache item that should have been stored in that node, the service consumer, is eventually stored in cache-4, which calls the service provider.

However, the hash consistency algorithm does not guarantee the balance of the hash algorithm. For example, if cache-3 fails, all cache items under this node will be stored in cache-4. This will result in an imbalance between low-hash and high-hash stores, as shown in the following figure:

How can we avoid such a thing? To achieve balance, we need to introduce virtual nodes. Virtual nodes are replicas of actual nodes in the hash space, and “virtual nodes” are arranged with hash values in the Hash space. Here’s an example:

Can see various nodes are evenly distributed on the ring, and a service provider has multiple nodes exist, staggered with other nodes, respectively, the aim is to avoid data skew problem, which is due to dispersion of the node is not enough, resulting in a large number of requests fell on the same node, and only received a small amount of the request of the other nodes. Similar to the second picture.

Having seen the principle, let’s look at the code

1.doSelect

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Get the method name
    String methodName = RpcUtils.getMethodName(invocation);
    / / get the key
    String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
    // Get the original Hashcode of invokers
    int identityHashCode = System.identityHashCode(invokers);
    // Get a consistent hash selector from the consistent hash selector collection
    ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
    // If the value is null or the selector's hash value is not equal to the original value, a new consistent hash selector is created and added to the collection
    if (selector == null|| selector.identityHashCode ! = identityHashCode) { selectors.put(key,new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
        selector = (ConsistentHashSelector<T>) selectors.get(key);
    }
    // The selector selects an invoker
    return selector.select(invocation);
}
Copy the code

The method also does some work on whether the Invokers list has changed, creating a ConsistentHashSelector, and then calling selector. Select to make the selection.

2.ConsistentHashSelector

private static final class ConsistentHashSelector<T> {

    /** * Stores the Invoker virtual node */
    private final TreeMap<Long, Invoker<T>> virtualInvokers;

    /** * The number of virtual nodes per Invoker */
    private final int replicaNumber;

    /** * original hash value */
    private final int identityHashCode;

    /** * Value position array */
    private final int[] argumentIndex;

    ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
        this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
        this.identityHashCode = identityHashCode;
        URL url = invokers.get(0).getUrl();
        // Obtain the number of virtual nodes. The default value is 160
        this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes".160);
        // Gets the subscript value of the parameter participating in the hash calculation. By default, the first parameter is hash
        String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments"."0"));
        // Create a subscript array
        argumentIndex = new int[index.length];
        / / traverse
        for (int i = 0; i < index.length; i++) {
            // Record the subscript
            argumentIndex[i] = Integer.parseInt(index[i]);
        }
        / / traverse invokers
        for (Invoker<T> invoker : invokers) {
            String address = invoker.getUrl().getAddress();
            for (int i = 0; i < replicaNumber / 4; i++) {
                // Address + I md5 algorithm to get a 16-byte array
                byte[] digest = md5(address + i);
                // // hashes part of the digest byte four times to produce four different positive longs
                for (int h = 0; h < 4; h++) {
                    // If h is 0, perform bit calculation on the four bytes with subscripts from 0 to 3 in the digest
                    // If h is 1, take the four bytes with subscripts from 4 to 7 in the digest and perform bit calculation
                    // when h = 2 and h = 3, the process is the same as above
                    long m = hash(digest, h);
                    // Store the hash to Invoker mapping in virtualInvokers,
                    // virtualInvokers needed to provide efficient query operations, so TreeMap was chosen as the storage structurevirtualInvokers.put(m, invoker); }}}}/** * Select an invoker *@param invocation
     * @return* /
    public Invoker<T> select(Invocation invocation) {
        // Convert the parameter to key
        String key = toKey(invocation.getArguments());
        // Performs md5 arithmetic on the key parameter
        byte[] digest = md5(key);
        // Hash the first four bytes of the digest array and pass the hash to the selectForKey method,
        // Find the right Invoker
        return selectForKey(hash(digest, 0));
    }

    /** * change the parameter to key *@param args
     * @return* /
    private String toKey(Object[] args) {
        StringBuilder buf = new StringBuilder();
        // Iterate over the parameter subscript
        for (int i : argumentIndex) {
            if (i >= 0 && i < args.length) {
                // Concatenate parameters to generate a keybuf.append(args[i]); }}return buf.toString();
    }

    /** * Select invoker * with hash@param hash
     * @return* /
    private Invoker<T> selectForKey(long hash) {
        // Go to TreeMap to find the Invoker whose first node value is greater than or equal to the current hash
        Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
        // If hash is greater than Invoker's maximum position on the circle, entry = null,
        // The TreeMap head node needs to be assigned to entry
        if (entry == null) {
            entry = virtualInvokers.firstEntry();
        }
        // Returns the selected invoker
        return entry.getValue();
    }

    /** * Computes the hash value *@param digest
     * @param number
     * @return* /
    private long hash(byte[] digest, int number) {
        return (((long) (digest[3 + number * 4] & 0xFF) < <24)
                | ((long) (digest[2 + number * 4] & 0xFF) < <16)
                | ((long) (digest[1 + number * 4] & 0xFF) < <8)
                | (digest[number * 4] & 0xFF))
                & 0xFFFFFFFFL;
    }

    /**
     * md5
     * @param value
     * @return* /
    private byte[] md5(String value) {
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        md5.reset();
        byte[] bytes;
        try {
            bytes = value.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        md5.update(bytes);
        returnmd5.digest(); }}Copy the code

This class is an internal class that is a consistent hash selector. First look at its properties and use TreeMap to store the Invoker virtual node because efficient query operations are required. Look at the constructor, it performs a series of initialization logic, such as virtual node number for configuration and participate in the parameters of the hash computation subscript, by default, using only the first parameter to the hash, and load balancing logic ConsistentHashLoadBalance only affected by the parameter values, Requests with the same parameter values will be assigned to the same service provider. There is also a select method, which is relatively simple, to perform the MD5 operation first. Then hash, and finally select the corresponding invoker.

(5) RoundRobinLoadBalance

This class is an implementation of load balancing based on weighted polling algorithm. Weighted polling, so what is A polling is easy to understand, for example, I first request assigned to A server, the second request assigned to server B, third request assigned to C the server, the fourth request is assigned to A server, this is the polling, but this is only for each server performance of similar case, this is A very ideal situation, It’s more that the performance of each server is different. In this case, the poor servers are assigned the same number of requests, so they will suffer from high stress and downtime. In this case, we need to weight the polling. Server A will receive 6 of these requests, server B will receive 3 of these requests, and server C will receive 1 of these requests, which means that each server can receive requests due to its weight.

1. The attribute

/** * Reclaim interval */
private static int RECYCLE_PERIOD = 60000;
Copy the code

2.WeightedRoundRobin

protected static class WeightedRoundRobin {
    /** ** weight */
    private int weight;
    /** * The number of requests currently placed on the service provider can also be considered as a dynamic weight */
    private AtomicLong current = new AtomicLong(0);
    /** * last updated */
    private long lastUpdate;
    public int getWeight(a) {
        return weight;
    }
    public void setWeight(int weight) {
        this.weight = weight;
        current.set(0);
    }
    public long increaseCurrent(a) {
        return current.addAndGet(weight);
    }
    public void sel(int total) {
        current.addAndGet(-1 * total);
    }
    public long getLastUpdate(a) {
        return lastUpdate;
    }
    public void setLastUpdate(long lastUpdate) {
        this.lastUpdate = lastUpdate; }}Copy the code

This inner class is a weighted poller that records data about a particular service provider, such as weights, such as how many requests have currently landed on that service provider.

3.doSelect

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    / / key = fully qualified class name + "" + method name, such as com. XXX. DemoService. SayHello
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
    if (map == null) {
        methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
        map = methodWeightMap.get(key);
    }
    // Total weights
    int totalWeight = 0;
    // Minimum weight
    long maxCurrent = Long.MIN_VALUE;
    // Get the current timestamp
    long now = System.currentTimeMillis();
    // Create the selected invoker
    Invoker<T> selectedInvoker = null;
    // Create a weighted poller
    WeightedRoundRobin selectedWRR = null;

    // This loop does several things:
    // 1. Iterate through the Invoker list to check whether Invoker exists
    // The corresponding WeightedRoundRobin is not created
    // 2. Check whether the Invoker weight is changed, if it is changed,
    // Updates WeightedRoundRobin's weight field
    // 3. Add current += weight
    // 4. Set the lastUpdate field, i.e. lastUpdate = now
    WeightedRoundRobin = WeightedRoundRobin = WeightedRoundRobin
    // Save for later use
    // 6. Calculate the sum of weights
    for (Invoker<T> invoker : invokers) {
        // Get the value identify
        String identifyString = invoker.getUrl().toIdentityString();
        // Get the weighted poller
        WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
        // Calculate the weight
        int weight = getWeight(invoker, invocation);
        // If the weight is less than 0, set 0
        if (weight < 0) {
            weight = 0;
        }
        // If the weighted poller is empty
        if (weightedRoundRobin == null) {
            // Create a weighted poller
            weightedRoundRobin = new WeightedRoundRobin();
            // Set the weight
            weightedRoundRobin.setWeight(weight);
            // join the collection
            map.putIfAbsent(identifyString, weightedRoundRobin);
            weightedRoundRobin = map.get(identifyString);
        }
        // If the weight is different from the previous weight, reset the weight
        if(weight ! = weightedRoundRobin.getWeight()) {//weight changed
            weightedRoundRobin.setWeight(weight);
        }
        // Count increment by 1
        long cur = weightedRoundRobin.increaseCurrent();
        // Update the last update time
        weightedRoundRobin.setLastUpdate(now);
        // The number of statistics falling on the service provider is greater than the maximum allowable number
        if (cur > maxCurrent) {
            / / assignment
            maxCurrent = cur;
            // The selected selectedInvoker assignment
            selectedInvoker = invoker;
            // The selected weighted poller assignment
            selectedWRR = weightedRoundRobin;
        }
        / / accumulation
        totalWeight += weight;
    }
    // If the update lock cannot be obtained and the invokers size does not match the map size
    // check 
      
        to filter out nodes that have not been updated for a long time.
      ,>
    // This node may be hung, invokers does not include this node, so the lastUpdate of this node cannot be updated for a long time.
    // If the unupdated time exceeds the threshold, the update will be removed. The default threshold is 60 seconds.
    if(! updateLock.get() && invokers.size() ! = map.size()) {if (updateLock.compareAndSet(false.true)) {
            try {
                // copy -> modify -> update reference
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                / / copy
                newMap.putAll(map);
                Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                / / polling
                while (it.hasNext()) {
                    Entry<String, WeightedRoundRobin> item = it.next();
                    // If the recycle time is longer than the recycle time, recycle
                    if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                        // Remove from the collectionit.remove(); }}// join the collection
                methodWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false); }}}// If the selected selectedInvoker is not empty
    if(selectedInvoker ! =null) {
        // Set the total weight
        selectedWRR.sel(totalWeight);
        return selectedInvoker;
    }
    // should not happen here
    return invokers.get(0);
}
Copy the code

This method is the core of the selection. In fact, the key is some data records. In each request, the number of requests landed on the service will be recorded, and then allocated according to the weight, and there will be recycling time to deal with some nodes that have not been updated for a long time.

Afterword.

The source code for this section is github.com/CrazyHZM/in…

This article explains the cluster on the load balancing implementation of the part, each algorithm is now very common load balancing algorithm, I hope you savor. Next, I’ll start with the grouping and aggregation section of the cluster module.