The previous articles covered the process of registering and consuming Dubbo, and today we continue with the process of downscaling (only single registries, multiple registries, just an extra step, selecting one of the registries by region or configuration).

Our call on the consumer side is the object returned by the proxy factory

@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Set<Class<? >> interfaces =new HashSet<>();
    String config = invoker.getUrl().getParameter(INTERFACES);
    if(config ! =null && config.length() > 0) {
        String[] types = COMMA_SPLIT_PATTERN.split(config);
        for (String type : types) {
            // TODO can we load successfully for a different classloader? .interfaces.add(ReflectUtils.forName(type)); }}if (generic) {
			.........
    }

    interfaces.add(invoker.getInterface());
    interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
    return getProxy(invoker, interfaces.toArray(newClass<? > [0]));
}

JavassistProxyFactory

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker
       
         invoker, Class
        [] interfaces)
        {
  return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
Copy the code

Invoker.getinterface () returns the type we refer to

DubboProtocol

@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
  optimizeSerialization(url);

  // create rpc invoker.
  DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  invokers.add(invoker);

  return invoker;
}
public Class<T> getInterface(a) {
    return type;
}
Copy the code

Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

This is the familiar JDK dynamic proxy. The invoke InvokerInvocationHandler

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (method.getDeclaringClass() == Object.class) {
        returnmethod.invoke(invoker, args); } String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();if (parameterTypes.length == 0) {
        if ("toString".equals(methodName)) {
            return invoker.toString();
        } else if ("$destroy".equals(methodName)) {
            invoker.destroy();
            return null;
        } else if ("hashCode".equals(methodName)) {
            returninvoker.hashCode(); }}else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
        return invoker.equals(args[0]);
    }
    RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
    String serviceKey = invoker.getUrl().getServiceKey();
    rpcInvocation.setTargetServiceUniqueName(serviceKey);
  
    if(consumerModel ! =null) {
        rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
        rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
    }

    return invoker.invoke(rpcInvocation).recreate();
}
Copy the code

The Invoker we know from the consumer startup process is MockClusterInvoker

Downgrade circuit breaker for service

  • Normal call
  • Fusing and directly using mock related configurations
  • Mock related configurations after degradation and service provider exceptions
@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;

    String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
        //no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);

            //fix:#4585
            if(result.getException() ! =null && result.getException() instanceof RpcException){
                RpcException rpcException= (RpcException)result.getException();
                if(rpcException.isBiz()){
                    throw  rpcException;
                }else{ result = doMockInvoke(invocation, rpcException); }}}catch (RpcException e) {
            if (e.isBiz()) {
                throwe; } result = doMockInvoke(invocation, e); }}return result;
}
Copy the code
private Result doMockInvoke(Invocation invocation, RpcException e) {
    Result result = null;
    Invoker<T> minvoker;

    List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
    if (CollectionUtils.isEmpty(mockInvokers)) {
        minvoker = (Invoker<T>) new MockInvoker(getUrl(), directory.getInterface());
    } else {
        minvoker = mockInvokers.get(0);
    }
    try {
        result = minvoker.invoke(invocation);
    } catch (RpcException me) {
        if (me.isBiz()) {
            result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
        } else {
            throw newRpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause()); }}catch (Throwable me) {
        throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
    }
    return result;
}
Copy the code

How did he choose Invoker

private List<Invoker<T>> selectMockInvoker(Invocation invocation) {
    List<Invoker<T>> invokers = null;
    / / TODO generic invoker?
    if (invocation instanceof RpcInvocation) {
        //Note the implicit contract (although the description is added to the interface declaration, but extensibility is a problem. The practice placed in the attachment needs to be improved)
        ((RpcInvocation) invocation).setAttachment(INVOCATION_NEED_MOCK, Boolean.TRUE.toString());
        //directory will return a list of normal invokers if Constants.INVOCATION_NEED_MOCK is present in invocation, otherwise, a list of mock invokers will return.
        try {
            invokers = directory.list(invocation);
        } catch (RpcException e) {
            if (logger.isInfoEnabled()) {
                logger.info("Exception when try to invoke mock. Get mock invokers error for service:"
                        + getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
                        + ", will construct a new mock with 'new MockInvoker()'.", e); }}}return invokers;
}
Copy the code

Here is an Attachment((RpcInvocation) Invocation).setAttachment(INVOCATION_NEED_MOCK, Boil.true. ToString ());

Let’s look again at MockInvokersSelector, invokers is our normal invoker in RegistryDireotry

@Override
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
                                  URL url, final Invocation invocation) throws RpcException {
    if (CollectionUtils.isEmpty(invokers)) {
        return invokers;
    }

    if (invocation.getObjectAttachments() == null) {
        return getNormalInvokers(invokers);
    } else {
        String value = (String) invocation.getObjectAttachments().get(INVOCATION_NEED_MOCK);
        if (value == null) {
            return getNormalInvokers(invokers);
        } else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
            returngetMockedInvokers(invokers); }}return invokers;
}
Copy the code

Because we set INVOCATION_NEED_MOCK to true

private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
    if(! hasMockProviders(invokers)) {return null;
    }
    List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
    for (Invoker<T> invoker : invokers) {
        if(invoker.getUrl().getProtocol().equals(MOCK_PROTOCOL)) { sInvokers.add(invoker); }}return sInvokers;
}
Copy the code

Find the Invoker that uses mock as the protocol

If not, you go to the caller above and create a MockInvoker yourself. It then returns based on the configuration information

if (CollectionUtils.isEmpty(mockInvokers)) {
    minvoker = (Invoker<T>) new MockInvoker(getUrl(), directory.getInterface());	
Copy the code

If not mock, we again into the org. Apache. Dubbo. RPC. Cluster. Support. The wrapper. AbstractCluster. InterceptorInvokerNode invoke method

@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        interceptor.before(next, invocation);
        asyncResult = interceptor.intercept(next, invocation);
    } catch (Exception e) {
        // onError callback
        if (interceptor instanceof ClusterInterceptor.Listener) {
            ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
            listener.onError(e, clusterInvoker, invocation);
        }
        throw e;
    } finally {
        interceptor.after(next, invocation);
    }
    return asyncResult.whenCompleteWithContext((r, t) -> {
        // onResponse callback
        if (interceptor instanceof ClusterInterceptor.Listener) {
            ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
            if (t == null) {
                listener.onMessage(r, clusterInvoker, invocation);
            } else{ listener.onError(t, clusterInvoker, invocation); }}}); }Copy the code

The interceptor here will eventually call FailoverClusterInvoker’s invoker method

FailoverClusterInvoker’s parent class AbstractClusterInvoker

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if(contextAttachments ! =null&& contextAttachments.size() ! =0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }

    List<Invoker<T>> invokers = list(invocation);
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
Copy the code

The list here returns a list of all registered service providers by default

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
    return directory.list(invocation);
}
Copy the code

InitLoadBalance selects a load balancing policy, which is random by default

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    } else {
        returnExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE); }}Copy the code

Then enter the FailoverClusterInvoker doInvoke

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally{ providers.add(invoker.getUrl().getAddress()); }}throw newRpcException(le.getCode()......) ; }Copy the code

Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);

This is a matter of selecting one Invoker from a bunch of invokers, where there is a retry mechanism for failure, and if it is not a business exception, then another Invoker is selected for retry

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List
       
        > invokers, List
        
         > selected)
        
        throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method
    if(stickyInvoker ! =null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem
    if(sticky && stickyInvoker ! =null && (selected == null| |! selected.contains(stickyInvoker))) {if (availablecheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}
Copy the code
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List
       
        > invokers, List
        
         > selected)
        
        throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
    if((selected ! =null&& selected.contains(invoker)) || (! invoker.isAvailable() && getUrl() ! =null && availablecheck)) {
        try {
            Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if(rInvoker ! =null) {
                invoker = rInvoker;
            } else {
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                }
            }
        } catch (Throwable t) {
        }
    }
    return invoker;
}
Copy the code

Selected. Contains (Invoker) selected is the invoker that has already been called and invoker is newly selected. Reselect if invoker is already selected

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

    //Allocating one in advance, this list is certain to be used.
    List<Invoker<T>> reselectInvokers = new ArrayList<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // First, try picking a invoker not in `selected`.
    for (Invoker<T> invoker : invokers) {
        if(availablecheck && ! invoker.isAvailable()) {continue;
        }

        if (selected == null || !selected.contains(invoker)) {
            reselectInvokers.add(invoker);
        }
    }

    if(! reselectInvokers.isEmpty()) {return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // Just pick an available invoker using loadbalance policy
    if(selected ! =null) {
        for (Invoker<T> invoker : selected) {
            if ((invoker.isAvailable()) // available first&&! reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); }}}if(! reselectInvokers.isEmpty()) {return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    return null;
}
Copy the code

Logic is also relatively simple, there is no call to use not called, no have to choose has been called but is available state

Let’s keep going down. This is part of the RegistryDiretory code, details can be read several articles

invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
Copy the code

InvokerDelegate is really just a proxy class that does nothing

@Override
public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}
Copy the code

Protocol. refer(serviceType, URL) is decorated with several Wrapper classes

ProtocolFilterWrapper calls all Filter interface methods before calling the next Invoker

ListenerInvokerWrapper is a listener that gives the invoker to see if the implementation class is basically useless

After the Wrapper, we come to the AsyncToSyncInvoker

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
Copy the code

As the name implies, is a step to synchronization

@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);

    try {
        if(InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); }}catch (InterruptedException e) {
      .....
    }
    return asyncResult;
}
Copy the code

Finally, we arrived at DubboInvoker

@Override
public Result invoke(Invocation inv) throws RpcException {
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (CollectionUtils.isNotEmptyMap(attachment)) {
        invocation.addObjectAttachmentsIfAbsent(attachment);
    }

    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
        invocation.addObjectAttachments(contextAttachments);
    }

    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    AsyncRpcResult asyncResult;
    try {
        asyncResult = (AsyncRpcResult) doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception. } RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
    return asyncResult;
}
Copy the code

This communicates with the provider through Netty. currentClient.request

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = calculateTimeout(invocation, methodName);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            / / save the for 2.6 x compatibility, for example, TraceFilter in Zipkin USES com. Alibaba. XXX. FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            returnresult; }}catch(TimeoutException e) { ..... }}Copy the code

This is roughly the process of the entire invocation chain, which is actually the process of creating the proxy on the consumer side