0 foreword

Under the current trend of microservices, multiple service nodes are involved in the process of one call, and the logs generated are distributed on different servers. Although the scattered logs can be summarized into ES using ELK technology, how to integrate these logs is a key problem.

If you need to view the all-link logs of an invocation, the general practice is to generate a traceId in the system boundary and send the traceId to the subsequent services of the invocation chain. The subsequent services use traceId to print logs and then send the traceId to other subsequent services. This process is referred to as: TraceId passthrough.

In the system that uses HTTP as the service protocol, you can use a encapsulated HTTP client to perform transparent traceId transmission. However, dubbo’s implementation of traceId transparency is a little more complicated. According to the last section of “☆ Talk about Dubbo (six) : core source -Filter chain principle”, in general, will customize Filter to achieve traceId transparent transmission, but there are two more special implementation methods: (1) re-implement the related classes inside Dubbo; (2) Implementation based on RpcContext;

1 based on rewrite implementation

1.1 Source Code Analysis

Proxy is a dynamic Proxy instance generated by Dubbo using JavAssist for the Consumer Side Service.

Implement is the service implementation instance on the provider side.

TraceId transparent transmission, that is, Proxy and Implement must have the same traceId. Dubbo has good layering, and the object of transport is RPCInvocation.

Therefore, the key logic implementation of the rewrite is that the Proxy puts traceId in the RPCInvocation and sends it to the Client for serialization and TCP transmission. The Server deserializes the RPCInvocation and sends traceId to the Implement.

JavassistProxyFactory for Consumer

public class JavassistProxyFactory extends AbstractProxyFactory {

    /** * When the Spring container starts, the proxy factory method generates the Service proxy class for the Consumer * invoker and interfaces are read from the Spring configuration file */
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker
       
         invoker, Class
        [] interfaces)
        {
        / / generated Service proxy class every method of bytecode, calls the InvokerInvocationHandler. Invoke (...). Method,
        // do the actual RpcInvocation wrapper, serialize, TCP transfer, deserialize the result
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // The TODO Wrapper class does not handle class names with $correctly
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, Class
       [] parameterTypes, Object[] arguments) throws Throwable {
                returnwrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); }}; }}Copy the code

InvokerInvocationHandler for Consumer

public class InvokerInvocationHandler implements InvocationHandler {

    private finalInvoker<? > invoker;public InvokerInvocationHandler(Invoker
        handler){
        this.invoker = handler;
    }

    /** * When RPC is actually called, invoke is a generic proxy object generated by the bytecode of each Service proxy. The second parameter is the method name and the third parameter is the parameter list. You can map to the provider-side implementation and get the return value */
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // Since this is still the business thread on the consumer side, we need to fetch the traceId from ThreadLocal.
        The Provider can extract the transparent traceId from the received RpcInvocation instance with the attachment for the RpcInvocation
        return invoker.invoke(newRpcInvocation(method, args)).recreate(); }}Copy the code

The following is the code analysis of Provider DubboProtocol:

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceofInvocation) { Invocation inv = (Invocation) message; Invoker<? > invoker = getInvoker(channel, inv);// If it is callback, it needs to handle the problem of calling the older version
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") = = -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break; }}}if(! hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null; }}// when the Provider receives the message, it retriels a thread from the thread pool, deserializes the RpcInvocation, and calls the corresponding method of the implementation class
                // So here is the Provider implementation thread, fetching the traceId and putting it into a ThreadLocal
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ":" + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }
Copy the code

1.2 Concrete Implementation

package com.alibaba.dubbo.rpc.proxy;

/** * traceId tool class This class is newly added */
public class TraceIdUtil {

    private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<String>();

    public static String getTraceId(a) {
        return TRACE_ID.get();
    }

    public static void setTraceId(String traceId) { TRACE_ID.set(traceId); }}/** * InvokerHandler This class is modified */
public class InvokerInvocationHandler implements InvocationHandler {

    private finalInvoker<? > invoker;public InvokerInvocationHandler(Invoker
        handler){
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // here put the traceId on the cosumer side into the RpcInvocation
        RpcInvocation rpcInvocation = new RpcInvocation(method, args);
        rpcInvocation.setAttachment("traceId", TraceIdUtil.getTraceId());
        returninvoker.invoke(rpcInvocation).recreate(); }}package com.alibaba.dubbo.rpc.protocol.dubbo;

/** * Dubbo protocol support reimplements DubboProtocol ** /
public class DubboProtocol extends AbstractProtocol {


    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceofInvocation) { Invocation inv = (Invocation) message; Invoker<? > invoker = getInvoker(channel, inv);// If it is callback, it needs to handle the problem of calling the older version
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") = = -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break; }}}if(! hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // Add the received traceId from the consumer to the Thread local of the provider
                TraceIdUtil.setTraceId(inv.getAttachment("traceId"));
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ":" + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: "+ channel.getLocalAddress()); }}}Copy the code

2 Based on RpcContext

Before explaining the custom filter to implement transparent traceId, let’s first examine the RpcContext object. Its RpcContext is essentially a ThreadLocal object that maintains context information for an RPC interaction.

Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the  License. */
package com.alibaba.dubbo.rpc;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.NetUtils;

/** * Thread local context. (API, ThreadLocal, ThreadSafe) RpcContext is a temporary status logger. The state of the RpcContext changes when an RPC request is received or initiated. * For example, if A calls B and B calls C, then on machine B, RpcContext records the information that A calls B before B calls C, and RpcContext records the information that B calls C after B calls C. * *@see com.alibaba.dubbo.rpc.filter.ContextFilter
 * @author qian.lei
 * @author william.liangf
 * @export* /
public class RpcContext {
	
	private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
		@Override
		protected RpcContext initialValue(a) {
			return newRpcContext(); }};/**
	 * get context.
	 * 
	 * @return context
	 */
	public static RpcContext getContext(a) {
	    return LOCAL.get();
	}
	
	/**
	 * remove context.
	 * 
	 * @see com.alibaba.dubbo.rpc.filter.ContextFilter
	 */
	public static void removeContext(a) {
	    LOCAL.remove();
	}

    privateFuture<? > future;private List<URL> urls;

    private URL url;

    private String methodName;

    privateClass<? >[] parameterTypes;private Object[] arguments;

	private InetSocketAddress localAddress;

	private InetSocketAddress remoteAddress;

    private final Map<String, String> attachments = new HashMap<String, String>();

    private final Map<String, Object> values = new HashMap<String, Object>();

    // now we don't use the 'values' map to hold these objects
    // we want these objects to be as generic as possible
    private Object request;
    private Object response;

	@Deprecated
    privateList<Invoker<? >> invokers;@Deprecated
    privateInvoker<? > invoker;@Deprecated
    private Invocation invocation;
    
	protected RpcContext(a) {}/**
     * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
     *
     * @return null if the underlying protocol doesn't provide support for getting request
     */
    public Object getRequest(a) {
        return request;
    }

    /**
     * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
     *
     * @returnnull if the underlying protocol doesn't provide support for getting request or the request is not of the specified type * /
    @SuppressWarnings("unchecked")
    public <T> T getRequest(Class<T> clazz) {
        return(request ! =null && clazz.isAssignableFrom(request.getClass())) ? (T) request : null;
    }


    public void setRequest(Object request) {
        this.request = request;
    }

    /**
     * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
     *
     * @return null if the underlying protocol doesn't provide support for getting response
     */
    public Object getResponse(a) {
        return response;
    }

    /**
     * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
     *
     * @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type
     */
    @SuppressWarnings("unchecked")
    public <T> T getResponse(Class<T> clazz) {
        return(response ! =null && clazz.isAssignableFrom(response.getClass())) ? (T) response : null;
    }

    public void setResponse(Object response) {
        this.response = response;
    }

    /**
     * is provider side.
     * 
     * @return provider side.
     */
    public boolean isProviderSide(a) {
        URL url = getUrl();
        if (url == null) {
            return false;
        }
        InetSocketAddress address = getRemoteAddress();
        if (address == null) {
            return false;
        }
        String host;
        if (address.getAddress() == null) {
            host = address.getHostName();
        } else {
            host = address.getAddress().getHostAddress();
        }
        returnurl.getPort() ! = address.getPort() || ! NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host)); }/**
     * is consumer side.
     * 
     * @return consumer side.
     */
    public boolean isConsumerSide(a) {
        URL url = getUrl();
        if (url == null) {
            return false;
        }
        InetSocketAddress address = getRemoteAddress();
        if (address == null) {
            return false;
        }
        String host;
        if (address.getAddress() == null) {
            host = address.getHostName();
        } else {
            host = address.getAddress().getHostAddress();
        }
        return url.getPort() == address.getPort() && 
                NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
    }

    /**
     * get future.
     * 
     * @param <T>
     * @return future
     */
    @SuppressWarnings("unchecked")
    public <T> Future<T> getFuture(a) {
        return (Future<T>) future;
    }

    /**
     * set future.
     * 
     * @param future
     */
    public void setFuture(Future
        future) {
        this.future = future;
    }

    public List<URL> getUrls(a) {
        return urls == null&& url ! =null ? (List<URL>) Arrays.asList(url) : urls;
    }

    public void setUrls(List<URL> urls) {
        this.urls = urls;
    }

    public URL getUrl(a) {
        return url;
    }

    public void setUrl(URL url) {
        this.url = url;
    }

    /**
     * get method name.
     * 
     * @return method name.
     */
    public String getMethodName(a) {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    /**
     * get parameter types.
     * 
     * @serial* /
    publicClass<? >[] getParameterTypes() {return parameterTypes;
    }

    public void setParameterTypes(Class
       [] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    /**
     * get arguments.
     * 
     * @return arguments.
     */
    public Object[] getArguments() {
        return arguments;
    }

    public void setArguments(Object[] arguments) {
        this.arguments = arguments;
    }

    /**
     * set local address.
     * 
     * @param address
     * @return context
     */
	public RpcContext setLocalAddress(InetSocketAddress address) {
	    this.localAddress = address;
	    return this;
	}

	/**
	 * set local address.
	 * 
	 * @param host
	 * @param port
	 * @return context
	 */
    public RpcContext setLocalAddress(String host, int port) {
        if (port < 0) {
            port = 0;
        }
        this.localAddress = InetSocketAddress.createUnresolved(host, port);
        return this;
    }

	/**
	 * get local address.
	 * 
	 * @return local address
	 */
	public InetSocketAddress getLocalAddress(a) {
		return localAddress;
	}

	public String getLocalAddressString(a) {
        return getLocalHost() + ":" + getLocalPort();
    }
    
	/**
	 * get local host name.
	 * 
	 * @return local host name
	 */
	public String getLocalHostName(a) {
		String host = localAddress == null ? null : localAddress.getHostName();
		if (host == null || host.length() == 0) {
		    return getLocalHost();
		}
		return host;
	}

    /**
     * set remote address.
     * 
     * @param address
     * @return context
     */
    public RpcContext setRemoteAddress(InetSocketAddress address) {
        this.remoteAddress = address;
        return this;
    }
    
    /**
     * set remote address.
     * 
     * @param host
     * @param port
     * @return context
     */
    public RpcContext setRemoteAddress(String host, int port) {
        if (port < 0) {
            port = 0;
        }
        this.remoteAddress = InetSocketAddress.createUnresolved(host, port);
        return this;
    }

	/**
	 * get remote address.
	 * 
	 * @return remote address
	 */
	public InetSocketAddress getRemoteAddress(a) {
		return remoteAddress;
	}
	
	/**
	 * get remote address string.
	 * 
	 * @return remote address string.
	 */
	public String getRemoteAddressString(a) {
	    return getRemoteHost() + ":" + getRemotePort();
	}
	
	/**
	 * get remote host name.
	 * 
	 * @return remote host name
	 */
	public String getRemoteHostName(a) {
		return remoteAddress == null ? null : remoteAddress.getHostName();
	}

    /**
     * get local host.
     * 
     * @return local host
     */
    public String getLocalHost(a) {
        String host = localAddress == null ? null : 
            localAddress.getAddress() == null ? localAddress.getHostName() 
                    : NetUtils.filterLocalHost(localAddress.getAddress().getHostAddress());
        if (host == null || host.length() == 0) {
            return NetUtils.getLocalHost();
        }
        return host;
    }

    /**
     * get local port.
     * 
     * @return port
     */
    public int getLocalPort(a) {
        return localAddress == null ? 0 : localAddress.getPort();
    }

    /**
     * get remote host.
     * 
     * @return remote host
     */
    public String getRemoteHost(a) {
        return remoteAddress == null ? null : 
            remoteAddress.getAddress() == null ? remoteAddress.getHostName() 
                    : NetUtils.filterLocalHost(remoteAddress.getAddress().getHostAddress());
    }

    /**
     * get remote port.
     * 
     * @return remote port
     */
    public int getRemotePort(a) {
        return remoteAddress == null ? 0 : remoteAddress.getPort();
    }

    /**
     * get attachment.
     * 
     * @param key
     * @return attachment
     */
    public String getAttachment(String key) {
        return attachments.get(key);
    }

    /**
     * set attachment.
     * 
     * @param key
     * @param value
     * @return context
     */
    public RpcContext setAttachment(String key, String value) {
        if (value == null) {
            attachments.remove(key);
        } else {
            attachments.put(key, value);
        }
        return this;
    }

    /**
     * remove attachment.
     * 
     * @param key
     * @return context
     */
    public RpcContext removeAttachment(String key) {
        attachments.remove(key);
        return this;
    }

    /**
     * get attachments.
     * 
     * @return attachments
     */
    public Map<String, String> getAttachments(a) {
        return attachments;
    }

    /**
     * set attachments
     * 
     * @param attachment
     * @return context
     */
    public RpcContext setAttachments(Map<String, String> attachment) {
        this.attachments.clear();
        if(attachment ! =null && attachment.size() > 0) {
            this.attachments.putAll(attachment);
        }
        return this;
    }
    
    public void clearAttachments(a) {
        this.attachments.clear();
    }

    /**
     * get values.
     * 
     * @return values
     */
    public Map<String, Object> get(a) {
        return values;
    }

    /**
     * set value.
     * 
     * @param key
     * @param value
     * @return context
     */
    public RpcContext set(String key, Object value) {
        if (value == null) {
            values.remove(key);
        } else {
            values.put(key, value);
        }
        return this;
    }

    /**
     * remove value.
     * 
     * @param key
     * @return value
     */
    public RpcContext remove(String key) {
        values.remove(key);
        return this;
    }

    /**
     * get value.
     * 
     * @param key
     * @return value
     */
    public Object get(String key) {
        return values.get(key);
    }

    public RpcContext setInvokers(List
       
        > invokers)
       > {
        this.invokers = invokers;
        if(invokers ! =null && invokers.size() > 0) {
            List<URL> urls = new ArrayList<URL>(invokers.size());
            for(Invoker<? > invoker : invokers) { urls.add(invoker.getUrl()); } setUrls(urls); }return this;
    }

    public RpcContext setInvoker(Invoker
        invoker) {
        this.invoker = invoker;
        if(invoker ! =null) {
            setUrl(invoker.getUrl());
        }
        return this;
    }

    public RpcContext setInvocation(Invocation invocation) {
        this.invocation = invocation;
        if(invocation ! =null) {
            setMethodName(invocation.getMethodName());
            setParameterTypes(invocation.getParameterTypes());
            setArguments(invocation.getArguments());
        }
        return this;
    }

    / * * *@deprecated Replace to isProviderSide()
     */
    @Deprecated
    public boolean isServerSide(a) {
        return isProviderSide();
    }
    
    / * * *@deprecated Replace to isConsumerSide()
     */
    @Deprecated
    public boolean isClientSide(a) {
        return isConsumerSide();
    }
    
    / * * *@deprecated Replace to getUrls()
     */
    @Deprecated
    @SuppressWarnings({ "unchecked"."rawtypes" })
    publicList<Invoker<? >> getInvokers() {return invokers == null&& invoker ! =null ? (List)Arrays.asList(invoker) : invokers;
    }

    / * * *@deprecated Replace to getUrl()
     */
    @Deprecated
    publicInvoker<? > getInvoker() {return invoker;
    }

    / * * *@deprecated Replace to getMethodName(), getParameterTypes(), getArguments()
     */
    @Deprecated
    public Invocation getInvocation(a) {
        return invocation;
    }
    
    /** * asynchronous calls that return values, even if the future. get method is used, the call timeout problem will be handled@param callable
     * @returnGet () to get the result. */
    @SuppressWarnings("unchecked")
	public <T> Future<T> asyncCall(Callable<T> callable) {
    	try {
	    	try {
	    		setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
				final T o = callable.call();
				The //local call returns the result directly.
				if(o ! =null) {
					FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
						public T call(a) throws Exception {
							returno; }}); f.run();return f;
				} else{}}catch (Exception e) {
				throw new RpcException(e);
			} finally{ removeAttachment(Constants.ASYNC_KEY); }}catch (final RpcException e) {
			return new Future<T>() {
				public boolean cancel(boolean mayInterruptIfRunning) {
					return false;
				}
				public boolean isCancelled(a) {
					return false;
				}
				public boolean isDone(a) {
					return true;
				}
				public T get(a) throws InterruptedException, ExecutionException {
					throw new ExecutionException(e.getCause());
				}
				public T get(long timeout, TimeUnit unit)
						throws InterruptedException, ExecutionException,
						TimeoutException {
					returnget(); }}; }return ((Future<T>)getContext().getFuture());
    }
    
	/** * oneway call, only send request, do not receive return result. *@param callable
	 */
	public void asyncCall(Runnable runable) {
    	try {
    		setAttachment(Constants.RETURN_KEY, Boolean.FALSE.toString());
    		runable.run();
		} catch (Throwable e) {
			// Should FIXME exceptions be placed in future?
			throw new RpcException("oneway call error ." + e.getMessage(), e);
		} finally{ removeAttachment(Constants.RETURN_KEY); }}}Copy the code

Note: Attachments from the RpcContext are filled in to the RpcInvocation object and transmitted together.

Therefore, some people suggest that we can simply inject traceId into the RpcContext, so that we can easily implement traceId passthrough.

Define the Dubbo interface class:

public interface IEchoService {
    String echo(String name);
}
Copy the code

Write server-side code (Provider) :

@Service("echoService")
public class EchoServiceImpl implements IEchoService {
 
    @Override
    public String echo(String name) {
        String traceId = RpcContext.getContext().getAttachment("traceId");
        System.out.println("name = " + name + ", traceId = " + traceId);
        return name;
    }
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring-dubbo-test-producer.xml");
 
        System.out.println("server start");
        while (true) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
    } 
}
Copy the code

Write the client code:

public class EchoServiceConsumer {
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring-dubbo-test-consumer.xml");
 
        IEchoService service = (IEchoService) applicationContext
                .getBean("echoService");
 
        // *) Set the traceId
        RpcContext.getContext().setAttachment("traceId"."100001");
        System.out.println(RpcContext.getContext().getAttachments());
        // *) the first call
        service.echo("lilei");
 
        // *) the second call
        System.out.println(RpcContext.getContext().getAttachments());
        service.echo("hanmeimei"); }}Copy the code

The result is as follows:

Server output: name = LILEI, traceId=100001 Name = Hanmeimei, traceId= NULL Client output: {traceId=100001} {}Copy the code

From the output of the server, we can be surprised to find that the traceId did pass, but only the first time, not the second time. The output of RpcContext content from the client also confirms this phenomenon, and the essential reason for this phenomenon is that the attachment of RpcContext object is cleared after an RPC interaction.

To the clearAttachments method of the RpcContext, set a breakpoint and reproduce. We can find the following call stack:

java.lang.Thread.State: RUNNABLE
    at com.alibaba.dubbo.rpc.RpcContext.clearAttachments(RpcContext.java:438)
    at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:50)
    at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapperThe $1.invoke(ProtocolFilterWrapper.java:91)
    at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53)
    at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77)
    at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:227)
    at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
    at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
    at com.alibaba.dubbo.common.bytecode.proxy0.echo(proxy0.java:-1)
    at com.test.dubbo.EchoServiceConsumer.main(EchoServiceConsumer.java:20)
Copy the code

The most direct call is to Dubbo’s built-in ConsumerContextFilter. Let’s examine its code:

@Activate(
    group = {"consumer"},
    order = -10000
)
public class ConsumerContextFilter implements Filter {
    public ConsumerContextFilter(a) {}public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext().setInvoker(invoker).setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
        if(invocation instanceof RpcInvocation) {
            ((RpcInvocation)invocation).setInvoker(invoker);
        }
 
        Result var3;
        try {
            var3 = invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
 
        returnvar3; }}Copy the code

Indeed, in the finally snippet, we see that the RpcContext empties the Attachment object after each RPC call.

Now that we have found the root cause, the solution is to reset the traceId each time it is called, like this:

// *) the first call
RpcContext.getContext().setAttachment("traceId"."100001");
service.echo("lilei");
 
// *) the second call
RpcContext.getContext().setAttachment("traceId"."100001");
service.echo("hanmeimei");
Copy the code

3 Based on Filter

Let’s start with a utility class:

public class TraceIdUtils {
 
    private static final ThreadLocal<String> traceIdCache
            = new ThreadLocal<String>();
 
    public static String getTraceId(a) {
        return traceIdCache.get();
    }
 
    public static void setTraceId(String traceId) {
        traceIdCache.set(traceId);
    }
 
    public static void clear(a) { traceIdCache.remove(); }}Copy the code

Then we define a Filter class:

package com.test.dubbo;
 
public class TraceIdFilter implements Filter {
 
    @Override
    public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
        String traceId = RpcContext.getContext().getAttachment("traceId");
        if ( !StringUtils.isEmpty(traceId) ) {
            // *) Retrieve traceId from RpcContext and save it
            TraceIdUtils.setTraceId(traceId);
        } else {
            // *) Reconfigure the traceId before interaction to avoid information loss
            RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId());
        }
        // *) the actual RPC call
        returninvoker.invoke(invocation); }}Copy the code

In the resource directory, add the meta-inf/dubbo, then add the com. Alibaba. Dubbo. RPC. The Filter files:

Editor (com. Alibaba. Dubbo. RPC. The Filter file) content is as follows:

traceIdFilter=com.test.dubbo.TraceIdFilter
Copy the code

Then we configure the corresponding filter items for both producer and consumer of Dubbo:

<dubbo:service interface="com.test.dubbo.IEchoService" ref="echoService" version="1.0.0"
        filter="traceIdFilter"<dubbo: Reference interface="com.test.dubbo.IEchoService" id="echoService" version="1.0.0"
        filter="traceIdFilter"/>
Copy the code

The server side test code is changed as follows:

@Service("echoService")
public class EchoServiceImpl implements IEchoService {
 
    @Override
    public String echo(String name) {
        String traceId = TraceIdUtils.getTraceId();
        System.out.println("name = " + name + ", traceId = " + traceId);
        returnname; }}Copy the code

The client test code snippet is:

// *) the first call
RpcContext.getContext().setAttachment("traceId"."100001");
service.echo("lilei");
 
// *) the second call
service.echo("hanmeimei");
Copy the code

For the same code, the test results are as follows:

Server output: name = LILEI, traceId=100001 Name = Hanmeimei, traceId=100001 Client output: {traceId=100001} {}Copy the code

As expected, it feels very elegant. The attachment of RpcContext is still cleared (ConsumerContextFilter is executed after the custom Filter), but traceId will be re-injected before each RPC interaction to ensure the successful transparent transmission of trace clues.