1 introduction

In this article, we try to use some simple mathematical formulas and flow charts to comb through these cluster fault tolerance algorithms.

2 Soul Torture

  • Discuss the load balancing algorithm and its characteristics in Dubbo
  • How does the minimum active number algorithm count this active number
  • Talk a little bit about what you know about consistent hashing

3 Interface inheritance system

4 RandomLoadBalance

Random, set random probability according to weight

The probability of collision on a section is high, but the distribution is more uniform with the increase of adjustment dosage, and the distribution is more uniform after using weight according to the probability, which is conducive to dynamic adjustment of provider weight.

Default strategy, but this randomness is not the same as randomness, because it also has a concept called weight, which controls the probability of randomness. Let’s look at the code implementation.

package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * This class selects a random provider from multiple providers. * Weights can be defined for each provider: * Random. NextInt (number of callers) will be used if weights are all the same. * If the weights are different, random. NextInt (w1 + w2 +... Note that if the machine performs better than other machines, you can set a larger weight. * If the performance is not very good, you can set a smaller weight. */ public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; /** * Select an invoker * @param Invokers list of possible Invokers * @param URL URL * @param Invocation * @param <T> * @override protected <T> invoker <T> doSelect(List< invoker <T>> invokers, URL URL, Int length = Invokers.size (); // Each invoker has equal weight Boolean sameWeight = true; Int [] weights = new int[length]; Int firstWeight = Invocation (Invokers.get (0), Invocation); weights[0] = firstWeight; Int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); Weights [I] = weights; // Sum totalWeight += weight; if (sameWeight && weight ! = firstWeight) { sameWeight = false; } } if (totalWeight > 0 && ! SameWeight) {// if not every invoker has the sameWeight and at least one invoker has a weight greater than 0, Please choose according to totalWeight random int offset = ThreadLocalRandom. The current () nextInt (totalWeight); // return invoker for (int I = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); }}} // If all invokers have the same weight value or totalWeight = 0, the average is returned. return invokers.get(ThreadLocalRandom.current().nextInt(length)); }}Copy the code

Analysis of the

  • The flow chart

Suppose there are four cluster nodes A,B,C, and D, with corresponding weights of 1,2,3, and 4 respectively, then the probability of A request to node A is 1/(1+2+3+4) = 10%. Nodes B,C, and D are similarly 20%,30%, and 40%.

Although it is relatively easy to understand the random algorithm, the interview usually don’t ask this, but if we want to achieve a similar function, he this line of code is very elegant, very has the reference significance to him the implementation approach is well understood from the Angle of pure mathematics, we are still in accordance with the above condition in mathematical analysis. We know that the total weight is 10(1+2+3+4), so how do we do randomization by weight? Select A random integer according to 10, if the random integer is 2. Then subtract the weight successively, for example, 2(random number)-1(weight of A) = 1, then 1(the result of the previous step)-2(weight of B) = -1, at this time -1 < 0, then call B, and the rest of the analogy

5 RoundRobinLoadBalance

Polling, setting the round-robin ratio according to the weight after the convention

There is the problem of slow providers accumulating requests, for example: the second machine is slow but not hung up, gets stuck when the request is switched to the second machine, and over time all the requests get stuck to the second machine

package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** * Round robin load balance. */ public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private static final int RECYCLE_PERIOD = 60000; protected static class WeightedRoundRobin { private int weight; private AtomicLong current = new AtomicLong(0); private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); private AtomicBoolean updateLock = new AtomicBoolean(); /** * Get the invocation address list * for unit test only */ Protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map ! = null) { return map.keySet(); } return null; } @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; for (Invoker<T> invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); int weight = getWeight(invoker, invocation); if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } if (weight ! = weightedRoundRobin.getWeight()) { //weight changed weightedRoundRobin.setWeight(weight); } long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } totalWeight += weight; } if (! updateLock.get() && invokers.size() ! = map.size()) { if (updateLock.compareAndSet(false, true)) { try { // copy -> modify -> update reference ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map); newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } if (selectedInvoker ! = null) { selectedWRR.sel(totalWeight); return selectedInvoker; } // should not happen here return invokers.get(0); }}Copy the code

Nginx load balancing is polling by default

6 Leastactive Veload Balance(minimum active number)

  • Minimum number of active calls, random number of the same active number, active number refers to the difference in count before and after the call
  • Make slower providers receive fewer requests, because slower providers have a larger difference in the count before and after the invocation.

For example, if each service has one active counter, let’s say we have two providers, A and B. The count starts at 0 when provider A starts processing the request, the count is +1, while provider A is not finished processing the request, and then it counts -1 when it’s finished processing the request and request B receives the request very quickly. A hasn’t finished processing after B, so the count of A and B is 1,0, so when A new request comes in, provider B will be selected (B has A smaller active count than A). This is what the documentation says, making slower providers receive fewer requests

package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcStatus; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * Filters the number of callers whose activity has been called the least often, and calculates the weight and number of those callers. * If there is only one caller, the caller is used directly; * If there are multiple callers with different weights, randomize according to the total weight; * If there are multiple callers with the same weight, they are called randomly. */ public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Int length = Invokers.size (); Int leastActive = -1; Int leastCount = 0; int leastCount = 0; // leastActive index int[] leastIndexes = new int[length]; // the weight of every invokers int[] weights = new int[length]; Int totalWeight = 0; // The weight of the first least active invoker, used to calculate whether the same int firstWeight = 0; // Each of the least active callers has the same weight value? boolean sameWeight = true; // Filter out all the least active invokers for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // Get the active number of the invoker int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Get the weight of the invoker's configuration. The default value is 100. int afterWarmup = getWeight(invoker, invocation); // save for later use weights[i] = afterWarmup; // If it is the first invoker or the active number of the invoker is less than the current least active number if (leastActive == -1 || active < leastActive) { // Reset the active number of the current invoker to the least active number leastActive = active; // Reset the number of least active invokers leastCount = 1; // Put the first least active invoker first in leastIndexes leastIndexes[0] = i; // Reset totalWeight totalWeight = afterWarmup; // Record the weight the first least active invoker firstWeight = afterWarmup; // Each invoke has the same weight (only one invoker here) sameWeight = true; // If current invoker's active value equals with leaseActive, } else if (active == leastActive) {leastIndexes order leastIndexes[leastCount++] = I; // totalWeight += afterWarmup; // If every invoker has the same weight? if (sameWeight && i > 0 && afterWarmup ! = firstWeight) { sameWeight = false; Select invokers from all the least active invokers if (leastCount == 1) {if (leastCount == 1) invokers.get(leastIndexes[0]); } if (! SameWeight &&totalweight > 0) {// If the weights are different and the weights are greater than 0, then the weights are random int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); For (int I = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); }}} / / if equal weight or weight equal to 0, random return invokers. Get (leastIndexes [ThreadLocalRandom. Current () nextInt (leastCount)]); }}Copy the code

Analysis of the

This code can be summed up in two parts

  • Active number and weight statistics
  • Select Invoker. That is, he counts the minimum number of active Invokers into leastIndexs array. If the weight is consistent (refer to the random algorithm above for this consistent rule) or the total weight is 0, then it will be called randomly The machine algorithm in that sequence of subtraction ideas). It doesn’t matter if you don’t understand, look at the flow chart and mathematical analysis below
  • The flow chart

The theory of

Assume that the minimum active numbers of nodes A,B,C, and D are 1,1,2,3, respectively, with weights of 1,2,3, and 4. The contents of leastIndexs are [A,B]. The weights of A and B are 1 and 2, so the probability of calling A is 1/(1+2) = 1/3, and the probability of calling B is 2/(1+2) = 2/3

The number of active change is in the org. Apache. Dubbo. RPC. Filter. The dubbo ActiveLimitFilter if there is no configuration: reference attributes of actives, default is active before invoking number + 1, calls to end – 1 in view of the fact that many people may never used this attribute, the Let me put up a screenshot of the document

In addition, if the load balancing algorithm is useddubbo:serviceYou also need to configurefilter="activelimit"

7 ConsistentHashLoadBalance consistency (hash)

  • Consistent Hash, where requests with the same parameters are always sent to the same provider
  • When a provider hangs, requests originally sent to that provider are spread over other providers based on virtual nodes without drastic changes.
  • Recommended reading: http://en.wikipedia.org/wiki/Consistent_hashing

By default, only the first parameter Hash is used. If you want to change it, configure it

"Dubbo: parameter key =" hash. The arguments "value =" 0, 1 "/ >Copy the code

By default, 160 virtual nodes are used. If you want to change the number, configure the number

<dubbo:parameter key="hash.nodes" value="320" />Copy the code

package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.support.RpcUtils; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash";  /** * Hash nodes name */ public static final String HASH_NODES = "hash.nodes"; /** * Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments"; private final ConcurrentMap<String, ConsistentHashSelector<? >> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<? > > (); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.identityHashCode ! = identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector<T> { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } private Invoker<T> selectForKey(long hash) { Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); }}}Copy the code

The code implementation of the algorithm is relatively long, mainly talking about three key words, principle,down machine influence, virtual node

The principle of

In simple terms, suppose we have a clock, each server node is mapped to the time of the clock, and key is mapped to a certain time of the clock, and then the first node that key encounters is the server node that we need to find

Or if we have four nodes a, B, C and D (feel like the whole article is doing this if….) And convert them into integers by some rule, which are 0,3,6,9 respectively. So the distribution according to the clock is shown as follows

If this key is converted to 1 by some rule, then the first node it hits clockwise is b, so b is the node we’re looking for

This is a rule you can make up your own, but note that the probability of different node names being converted to the same integer is a measure of how good this rule is, and if you can make different node names uniquely correspond to an integer, that’s great. Of course, Java inside the CRC32 class you can learn about.

Here may be another question, the clock points are limited, in case not fit how to do

In fact, the clock is just a convenient metaphor. In reality, we can distribute the number 0,2^32-1 on the circle, which can fit on any server in the world.

The down machine influence

As can be seen from the figure above, when node B is suspended, the target node is C according to the clockwise rule, that is to say, only one node is affected and the other nodes are not affected.

If it’s a polling algorithm, let’s say you go from N servers to N-1, then the hit ratio becomes 1/(n-1), so the more servers, the greater the impact.

Virtual node

Why the concept of virtual nodes? Let’s go back to the first assumption, we still have nodes A, B, C, and D, which are converted to 0,3,6, and 9 by some rule. But if it’s 0,1,2,3, that’s very uneven. In fact, the general Hash function does not map nodes evenly on the ring. So we need to introduce virtual nodes, so what is a virtual node?

Suppose there are N real nodes, map each real node to M virtual nodes, and hash M*N virtual nodes on the ring. Virtual nodes corresponding to real nodes interleave each other. In this way, if a real node becomes Down, its impact is evenly distributed to all other nodes.

Is a, b, c, d virtual node a0, a1, a2, b0, b1, b2, c0, c1 and c2, d0, d1, d2 scattered on the torus, assuming that c # nodes down, c0, c1 and c2 pressure respectively to do, a1, b1, the following figure

reference

  • https://www.jianshu.com/p/53feb7f5f5d9
  • Dubbo website

Welcome to JavaEdge