1. Related concepts

In the distributed model-CAP theory, A stands for availability, that is, access to the same service can always request A correct result. In the microservice environment, to ensure high availability of services, services usually appear in the form of clusters. When a service invocation is abnormal, network jitter, or the service is temporarily unavailable, automatic fault tolerance is required, or only local testing or service degradation is required, Mock results are required. Cluter layer has several core interfaces Cluster, Directory, Router and LoadBalance. The process of remote invocation in Cluster mode is as follows, which can be divided into several steps:

  • 1. Generate Invoker objects. Different Cluster implementations will generate different types of ClusterInvoker objects and return them.
  • 2. Obtain the list of services that can be called, first do pre-verification to check whether the remote service has been destroyed, and then use the Direcotory#list method to obtain the list of all available services. Then, the Router interface is used to process the service list, and some services are filtered according to routing rules. Finally, the available service list is returned.
  • 3. For load balancing, after obtaining the service list, a service node needs to be selected through different load balancing strategies to be used as the target call node. Dubbo framework will call ExtensionLoader to obtain the extension point realization of different load balancing strategies according to the user’s configuration, and then call the doInvoke method implemented by the subclass. A subclass selects a callable service based on a specific load balancing policy;
  • Make RPC calls, save the Invoker for each call to the RPC context, and implement the RPC call. The result of the call is then processed, and each fault-tolerant policy has a different way of handling exceptions, successes, failures, and so on.

2. Implementation of fault tolerance mechanism

2.1 Framework fault tolerance mechanism

The Dubbo fault tolerance mechanism enhances the robustness of the entire application. You can select different fault tolerance mechanisms based on different configuration items. Each fault tolerance mechanism has different configuration items. The different fault tolerance strategies are shown below.

  • Failover: When a failure occurs, other servers are retried. You can set the number of retries by setting the service consumer reference configuration parameter “retries=2”. This is Dubbo’s default fault tolerance mechanism and load balances requests. Read or idemidemous write operations are commonly used, but the retry section will lead to increased latency of the interface, and create request pressure on downstream service nodes, increasing the load. If you don’t want to retry, just set it to -1 instead of 1.
  • Failfast: Fails quickly. If a request fails, an abnormal result is quickly returned without any retry. This fault-tolerant mechanism loads requests and is typically used for calls to non-idempotent interfaces. The call is subject to network jitter.
  • Failsafe: Ignores exceptions when exceptions occur. Requests are load-balanced, they do not care whether the call is successful or not, and they do not want to throw exceptions to affect normal business processes, such as some unimportant log synchronization, exceptions do not matter.
  • Forking: Call multiple identical services at the same time and return the result as soon as one of them returns. The user can configure forks= “Maximum parallel call parameter” to determine the maximum number of services that can be called in parallel. It is usually used for calls that require a lot of real time, but can also waste a lot of resources.
  • Failback: After a request fails, it is automatically recorded in the failure list and retry periodically by a thread pool. This mode applies to asynchronous or consistent requests.
  • Broadcast: Broadcast invokes all services. If any node reports an error, an error is reported and load balancing is not required.
  • Mock: Provides to return predefined response results when the call fails. Or simply force a return of a predefined result.
  • Available: Iterates through the list of all service nodes, finds the first Available node, and returns the request directly. Throw an exception if no service node is available.
  • Mergeable: Automatically merges the results of multiple node requests.

2.2 Cluster layer is mainly implemented

In a microservice environment, it is possible for multiple nodes to provide the same service simultaneously. When Invoker is called by the upper layer, there are actually multiple Invokers, and the fault-tolerant logic of the whole call can be completed only through the Cluster layer. This implementation logic includes obtaining service list, link routing, load balancing, etc., using Directory, Router, LoadBalance and other interfaces. There are two kinds of fault-tolerant interfaces, Cluster and ClusterInvoker. The Cluster interface has a variety of different implementations, each of which has a Join method, in which a corresponding ClusterInvoker implementation will be created. When each remote call is initialized, Invoker is wrapped in a layer and FailoverInvoker is returned after proxy. The default implementation uses the code shown below.

Public class ReferenceConfig<T> extends ReferenceConfigBase<T> {FailoverCluster is used by default to implement private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); private T createProxy(Map<String, String> map) { ... // Set cluster logic if (urls.size() == 1) {invoker = ref_protocol. refer(interfacseClass, urls.get(0)); } else { ... if (registryURL ! = null) {/ / add URL parameter URL u = registryURL addParameterIfAbsent (CLUSTER_KEY, ZoneAwareCluster. NAME); FailoverInvoker invoker = cluster. join(new StaticDirectory(u, invokers)); // FailoverInvoker invoker = cluster. join(new StaticDirectory(u, invokers)); } else {invoker = cluster. join(new StaticDirectory(invokers)); }... }}}Copy the code

As an implementation of Cluster interface, FailoverCluster inherits AbstractCluster class. In FailoverCluster, a new FailoverClusterInvoker implementation is created and returned. FailoverClusterInvoker inherits the Invoker interface. Some main class structures of the Cluster layer are as follows.

On the Cluster interface, the default implementation is FailoverCluster, which also implements the AbstractCluster template class. Before executing the join method, the pre-interceptor execution chain should be assembled. When the Invoke is called, The corresponding interceptor method is executed before, during, and after execution and in exceptional cases.

@SPI(FailoverCluster.NAME) public interface Cluster { @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; } public abstract class implements Cluster {private <T> Invoker<T> buildClusterInterceptors( AbstractClusterInvoker<T> clusterInvoker, String key) { AbstractClusterInvoker<T> last = clusterInvoker; / / load the interceptor extension implementation List < ClusterInterceptor > interceptors. = ExtensionLoader getExtensionLoader (ClusterInterceptor. Class) .getActivateExtension(clusterInvoker.getUrl(), key); if (! Interceptors.isempty ()) {for (int I = interceptors.size() -1; i >= 0; i--) { final ClusterInterceptor interceptor = interceptors.get(i); final AbstractClusterInvoker<T> next = last; last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next); } } return last; } @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; Try {// Interceptor. Before (next, Invocation); asyncResult = interceptor.intercept(next, invocation); } catch (Exception e) { if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener =  (ClusterInterceptor.Listener) interceptor; listener.onError(e, clusterInvoker, invocation); } throw e; // Interceptor. After (next, Invocation); } return asyncResult.whenCompleteWithContext((r, t) -> { if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; If (t == null) {// Fire the listener's onMessage Listener. OnMessage (r, clusterInvoker, Invocation); } else {// Fire the listener object onError Listener. OnError (t, clusterInvoker, Invocation); }}}); }... @override public <T> Invoker<T> join(Directory<T> Directory) throws RpcException { Return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY)); } protected abstract <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException; }Copy the code

Can see ConsumerContextClusterInterceptor class implements the Cluster interceptor interface and Cluster the listener interface, in the before/after/onMessage/onError are triggered, And put the call information into the RpcContext call context, RpcContext includes the call URL, method name, method parameter type array, method parameter array, local address, remote address call information.

public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener { ... @Override public void before(AbstractClusterInvoker<? > < div style = "text-align: center;" Use InternalThreadLocal RpcContext context = rpcContext.getContext (); context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0); if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } RpcContext.removeServerContext(); } @Override public void after(AbstractClusterInvoker<? > clusterInvoker, Invocation invocation) { RpcContext.removeContext(true); } @Override public void onMessage(Result appResponse, AbstractClusterInvoker<? > invoker, Invocation invocation) { RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments()); }... }Copy the code

AbstractClusterInvokerl class implements the Invoke method by inheriting the ClusterInvoker interface. The main logic is to call the Directory#list method to get the Invoker list and initialize the load balancing object. Call the Invoker list and load balancing implementation as subclasses to implement the doInvoker method parameters.

public abstract class AbstractClusterInvoker<T> implements Invoker<T> { protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { .... Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); . return invoker; } private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {// If the service List has only one element, return the first one, Select method if (invokers.size() == 1) {return invokers.get(0); Invoker<T> Invoker = loadbalance. Select (invokers, getUrl(), invocation); // Check invoker if ((selected! = null && selected.contains(invoker)) || (! invoker.isAvailable() && getUrl() ! // rInvoker <T> rInvoker = selected (loadbalance, invocation, invokers, selected, availablecheck); // Re-select Invoker object not null if (rInvoker! = null) { invoker = rInvoker; Invoker int index = invokers.indexof (Invoker);} else {Invoker int index = invokers.indexof (Invoker); try { invoker = invokers.get((index + 1) % invokers.size()); } catch (Exception e) { ... } } } catch (Throwable t) { ... } } return invoker; } @Override public Result invoke(final Invocation invocation) throws RpcException { ... List<Invoker<T>> Invokers = List (Invocation); // initialize LoadBalance LoadBalance LoadBalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // Invoke invoke (Invocation, Invokers, loadBalance); } protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; }Copy the code
  • Failover Cluster: Automatically switches servers when a failure occurs. If a failure occurs, retry other servers. Typically used for read operations, but retries introduce longer delays. Retries =”2″ can be used to set the number of retries (excluding the first).
  • Failfast Cluster: fails quickly. An error is reported immediately after the Failfast Cluster is invoked only once. Typically used for non-idempotent writes, such as new records.
  • Failsafe Cluster: indicates failure security. If an exception occurs, it is ignored. It is used to write audit logs. Failback Cluster: Automatically recovers when a failure occurs. Failed requests are recorded in the background and resent periodically. Typically used for message notification operations. Forking Cluster: Calls multiple servers in parallel and returns if one is successful. It is usually used for read operations that require high real-time performance but waste more service resources. The maximum parallel number can be set by forks=”2″.
  • Broadcast Cluster: Broadcasts calls to all providers one by one. An error is reported on any provider. Typically used to notify all providers to update local resource information such as caches or logs.

2.3 Implementation of different fault tolerance strategies

2.3.1 Failover strategy

The default implementation of Cluster is FailoverCluster, which first checks whether the Invoker list passed in by AbstractClusterInvoker is empty and obtains the number of retries from the call URL. Initialize collections and objects to hold exceptions that occur during the call and to record which nodes are called.

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    ...
    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        //校验invoker列表
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        //获取url中的参数
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
      
        RpcException le = null;
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                checkInvokers(copyInvokers, invocation);
            }

            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (RpcException e) {
                ...
            } catch (Throwable e) {
                ...
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
    }
}
Copy the code

2.3.2 Failfast strategy

Failfast will directly throw an exception after a failure and return. First, verify whether the Invoker list passed in by AbstractClusterInvoker is empty, call the select method for load balancing, get the node to be called, and finally make remote call.

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> { @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); } catch (Throwable e) { ... }}}Copy the code

2.3.3 Failsafe strategy

When called by Failsafe, exceptions thrown by the SELECT and Invoke procedures are caught using a try-catch and an empty result set is returned.

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    ...
    @Override
    public Result doInvoke(Invocation invocation, 
            List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
        }
    }
}
Copy the code

2.3.4 Failback strategy

The Failbakck periodically retries after the invocation fails. The ConcurrentHashMap is used to store failed calls, and a timed thread pool is defined. By default, all failed calls are pulled out and retried. If the call is successfully retried, it is removed from the ConcurrentHashMap. It is also caught when an exception is called without an error and returns a null result.

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> { ... private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {if (failTimer == null) {synchronized (this) {if (failTimer == null) {// Initializes the timed thread pool failTimer =  new HashedWheelTimer( new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks); RetryTimerTask = new RetryTimerTask(LoadBalance, Invocation, Invokers, lastInvoker, Invocation) retries, RETRY_FAILED_PERIOD); try { failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS); } catch (Throwable e) { logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage()); } } @Override protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { Invoker<T> invoker = null; try { checkInvokers(invokers, invocation); invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } Catch (Throwable e) {// Add timer task addFailed(Loadbalance, Invocation, invokers, invoker) after failure; / / returns an empty result return AsyncRpcResult. NewDefaultAsyncResult (null, null, invocation); // ignore } } }Copy the code

2.3.5 the Available strategy

Available is called when the first Available server is found and returns the result.

public class AvailableClusterInvoker<T> extends AbstractClusterInvoker<T> { ... @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance LoadBalance) throws RpcException {// Iterate through the Invoker list to find the first available service node for (Invoker<T> Invoker: invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); }}}}Copy the code

2.3.6 Broadcast strategy

Broadcast is Broadcast to all available nodes. If an exception occurs on one node, an exception is returned.

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    ...
    @Override
    public Result doInvoke(final Invocation invocation, 
            List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        ...
        //遍历invoker列表
        for (Invoker<T> invoker : invokers) {
            //捕获每次调用发生的异常
            try {
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        //如果其中一个节点的调用发生了异常,则抛出异常
        if (exception != null) {
            throw exception;
        }
        return result;
    }
}
Copy the code

2.4 Directory implementation

The whole fault tolerance process will first use the Directory#list method to retrieve all Invoker lists, Directory currently has two implementations of static and dynamic registration, where the static list is the user set Invoker list, dynamic list changes dynamically according to the registry data. Where Direcotry is defined as follows, there are methods to get interface/get Invoker list/get all Invoker list.

public interface Directory<T> extends Node {

    Class<T> getInterface();
    
    List<Invoker<T>> list(Invocation invocation) throws RpcException;

    List<Invoker<T>> getAllInvokers();
    
    URL getConsumerUrl();

}
Copy the code

AbstractDirectory is a top-level interface. AbstractDirectory encapsulates common implementation logic. There are mainly two implementation classes RegistryDirectory and StaticDirectory. Check whether Invoker is available, destroy all invokers, get the list of invokers, and leave it to subclasses to implement an abstract doList method, where list is the main method to return all available server nodes.

public abstract class AbstractDirectory<T> implements Directory<T> {

    protected RouterChain<T> routerChain;

    public AbstractDirectory(URL url, RouterChain<T> routerChain) {
        this.url = url.removeParameter(REFER_KEY).removeParameter(MONITOR_KEY);
        this.consumerUrl = url.addParameters(StringUtils.parseQueryString(
                url.getParameterAndDecoded(REFER_KEY)))
                .removeParameter(MONITOR_KEY);
        setRouterChain(routerChain);
    }

    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        return doList(invocation);
    }

    @Override
    public void destroy() {
        destroyed = true;
    }

    protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;

}
Copy the code

The default implementation of Directory is RegistryDirectory, which has two more important logic, one is the framework and registry subscription, and dynamically update the local Invoker list, routing list, configuration information, etc., and one is the implementation of the parent class doList method. Subscribe, notify and refreshInvoker methods are mainly involved in subscription and dynamic update. RegistryProtocol#doRefer calls registrydirector #subscribe, which initializes the configuration of listeners and registry listeners, When the registry hears a change, it issues a notification event and calls the RegistryDirecoty#notify method, which updates the configuration of the url.

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { ... Public void subscribe(URL URL) {setConsumerUrl(URL); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); registry.subscribe(url, this); } // unSubscribe public void unSubscribe(URL URL) {setConsumerUrl(null); CONSUMER_CONFIGURATION_LISTENER.removeNotifyListener(this); serviceConfigurationListener.stop(); registry.unsubscribe(url, this); } @Override public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(this::judgeCategory)); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); / / on behalf of the service provider List url List < url > providerURLs = categoryUrls. GetOrDefault (PROVIDERS_CATEGORY, Collections. EmptyList ()); / / add new service node address ExtensionLoader < AddressListener > addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class); List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); if (supportedListeners ! = null && ! supportedListeners.isEmpty()) { for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); } } refreshOverrideAndInvoker(providerURLs); }}Copy the code

RegistryDirecotry#doList calls the route method of the associated Router implementation class and returns the Invoker list.

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { ... @Override public List<Invoker<T>> doList(Invocation invocation) { ... List<Invoker<T>> invokers = null; Invokers = routerChain. Route (getConsumerUrl(), Invocation); } catch (Throwable t) { ... } return invokers == null ? Collections.emptyList() : invokers; }}Copy the code

3. Summary

This article mainly describes the Dubbo framework in the Cluster fault tolerance of some design implementation, mainly the Cluster layer of the main class structure, different fault tolerance strategy and the concrete implementation of the strategy.