01 Service Discovery

1.1 Service Discovery Process

Overall Duubo service consumption principle

The Dubbo framework does service consumption in two parts. The first step is to generate an Invoker by holding a remote service instance. The Invoker is the core remote proxy object on the client side. The second step transforms the Invoker through a dynamic proxy into a dynamic proxy reference that implements the user interface.

The service consumer refers to the blue initialization chain of the service, the sequence diagram

1.2 Source code analysis application

Reference entry :ReferenceBean’s getObject method, defined in Spring’s FactoryBean interface, which ReferenceBean implements.

public Object getObject() throws Exception { return get(); } public synchronized T get() {if (ref == null) {if (ref == null) { And createProxy generates the proxy class init(); } return ref; }Copy the code

Dubbo provides rich configurations for tuning and optimizing framework behavior, performance, and more. Dubbo checks and processes these configurations before referencing or exporting services to ensure correct configurations.

Private void init() {// createProxy class ref = createProxy(map); }Copy the code

This method is a long code that does configuration loading, checking, and creating referenced proxy objects. We’re going to start with createProxy. Literally, createProxy seems to be used only to createProxy objects. But that’s not the case; the method also calls other methods to build and merge Invoker instances. The details are as follows.

private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); . IsDvmRefer = InjvmProtocol.geTLNJvmProtocol ().islnJVMRefer (tmpUrl) // Local reference omitted if (isJvmRefer) {} else {// Point-to-point call omitted if  (url ! = null && url.length() > 0) {} else {// Load registry URL List< url > us = loadRegistries(false); if (us ! = null && ! us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl ! = null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // Add the refer parameter to the url, And add the url to the urls in the urls. The add (u.a ddParameterAndEncoded (the REFER_KEY, StringUtils. ToQueryString (map))); }}} // A single registry or service provider (service directly connected, If (urls.size() == 1) {Invoker = refprotocol.refer(interfaceClass, urls.get(0)); // Multiple registries or multiple service providers, or a combination of both} else {List<Invoker<? >> invokers = new ArrayList<Invoker<? > > (); URL registryURL = null; // Get all Invoker for (URL: // Build Invoker by calling refProtocol to refer. Refprotocol loads the specified Protocol instance at runtime based on the url Protocol header. Add (refprotocol. Refer (interfaceClass, URL))); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL ! = null) {// If registry link is not empty, AvailableCluster URL U = registryURl.addParameter (Constants.CLUSTER_KEY, AvailableCluster.NAME); Invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); }}} // omit irrelevant code... Return (T) proxyFactory.getProxy(invoker); }Copy the code

The above code is a lot, but the logic is clear. If there is only one registry, build the Invoker instance interface directly through the Protocol adaptive extension class. If there is more than one registry, build the Invoker instance interface based on the URL first. It then merges multiple invokers through the Cluster and finally calls ProxyFactory to generate the proxy class

(1) Create a client

On the service consumer side, Invoker is used to make remote calls. Invoker is built from the Protocol implementation class. There are many Protocol implementation classes. Here we examine DubboProtocol

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create DubboInvoker DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }Copy the code

The above method seems simple enough to create a DubboInvoker. The client object for the remote call is passed in through the constructor. By default, Dubbo uses NettyClient for communication. Next, let’s take a quick look at the logic of the getClients method.

  1. I sorted out some information, and friends in need can click to get it directly

  2. Microservice architecture: RPC+Dubbo+SpirngBoot+Alibaba+Docker+K8s

  3. Java Core Knowledge Set +25 topic interview set

    Private ExchangeClient[] getClients(URL URL) {// Whether to share the connection Boolean Service_SHARE_connect = false; Int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); int connections = urL.getParameter (Constants.CONNECTIONS_KEY, 0); If (connections == 0) {service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; I ++) {if (service_share_connect) {// Clients [I] = getSharedClient(url); } else {// initialize new client [I] = initClient(URL); } } return clients; }

The getSharedClient method also calls initClient, so let’s take a look at this method.

Private ExchangeClient initClient(URL URL) {// Get the client type, Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); // omit irrelevant code ExchangeClient client; Try {// get lazy configuration, If (urL.getParameter (Constants.LAZY_CONNECT_KEY, False)) {/ / create a lazy loaded ExchangeClient instance client = new LazyConnectExchangeClient (url, requestHandler); } else {// Create an ExchangeClient instance client = exchangers.connect (url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..." ); } return client; }Copy the code

The initClient method first gets the client type configured by the user, which is netty by default. Let’s examine Exchangers’ CONNECT method.

Public static ExchangeClient connect(URL URL, ExchangeHandler Handler) throws RemotingException {// Obtaining a non-recoverable instance. The default is HeaderExchangeClient return getExchanger(URL).connect(URL, handler); }Copy the code

As shown above, getExchanger loads an instance of HeaderExchange lient via SPI. This method is relatively simple, so take a look for yourself. Next, the implementation of HeaderExchange Lient is analyzed.

Public ExchangeClient Connect (URL URL, ExchangeHandler Handler) throws RemotingException {// There are multiple calls, respectively: // 1. Create HeaderExchangeHandler object // 2. Create DecodeHandler object // 3. Return New HeaderExchangeClient(transporters.connect (url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }Copy the code

There are more calls here, so let’s focus on the Transporters Connect method. As follows:

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else {// Create a ChannelHandler if the number of handlers is greater than 1. Handler = new ChannelHandlerDispatcher(Handlers); Return getTransporter().connect(url, handler); return Transporter().connect(url, handler); }Copy the code

As above, the getTransporter method returns an adaptive extension class that loads the specified Transporter implementation class at run time based on the client type. If the client type is not configured, the NettyTransporter is loaded by default and its connect method is invoked. As follows:

public Client connect(URL url, ChannelHandler Listener) throws RemotingException {// Create a NettyClient object. Return New NettyClient(URL, listener); }Copy the code

(2) Registration

So here we have our NettyClient object. This concludes the analysis of the refer method for DubboProtocol. Next, analyze the logic of the refer method of RegistryProtocol.

Public <T> Invoker<T> refer(Class<T> type, URL URL) throws RpcException { And set it to the protocol header URL = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); / / get Registry instance Registry Registry. = registryFactory getRegistry (url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // Convert url query String to Map Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); // Constants. if (group ! = null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // Return doRefer(getMergeableCluster(), Registry, type, URL); Return doRefer(cluster, registry, type, URL); }Copy the code

The code above first sets the protocol header for the URL and then loads the registry instance based on the URL parameters. It then gets the group configuration, which determines the type of the first parameter of doRefer. The focus here is on the doRefer method, as follows:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, <T> directory = new RegistryDirectory<T>(type, URL); // Set the registry and protocol directory.setregistry (registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String> (directory.getUrl().getParameters()); SubscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // Register service consumers, in the consumers directory new node if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } / / subscribe will, configurators, routers and other nodes data directory. The subscribe (subscribeUrl. AddParameter (the CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); Invoker = cluster.join(directory); Invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }Copy the code

As above, the doRefer method creates an instance of RegistryDirectory, then generates a service consumer link and registers it with the registry. After the registration is complete, you can subscribe to the data of providers, Configurators, and Routers. After the subscription is complete, RegistryDirectory receives information about the children under these nodes. Since a service can be deployed on multiple servers, there will be multiple nodes in providers, which requires the Cluster to merge the multiple service nodes into one and generate an Invoker.

(3) Create proxy objects

Once Invoker is created, the next thing to do is generate proxy objects for the service interface. With a proxy object, you can make remote calls. The entry method generated by the proxy object is getProxy of ProxyFactory, which is analyzed next.

Public <T> T getProxy(Invoker<T> Invoker) throws RpcException {// Call the overloaded method return getProxy(Invoker, false); } public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Class<? >[] interfaces = null; String config = invoker.geturl ().getParameter("interfaces"); if (config ! = null && config.length() > 0) {// List of split interfaces String[] types = Constants.COM ma_split_pattern.split (config); if (types ! = null && types.length > 0) { interfaces = new Class<? >[types.length + 2]; // Set the service interface class and echoService. class to interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; I ++) {// Load interfaces[I + 1] = reflectutils.forname (types[I]); } } } if (interfaces == null) { interfaces = new Class<? >[]{invoker.getInterface(), EchoService.class}; } // To provide generalized call support for HTTP and hessian protocols, see pull Request #1827 if (! invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<? >[] temp = interfaces; // create new interfaces array interfaces = new Class<? >[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); Genericservice. interfaces[len] = genericService. class; // Interfaces [len] = genericService. class; } // call the overloaded method return getProxy(invoker, interfaces); } public abstract <T> T getProxy(Invoker<T> invoker, Class<? >[] types);Copy the code

As shown above, the entire code is used to retrieve an array of interfaces, so let’s move on. getProxy(Invoker, Class<? >[]) this method is abstract. Let’s look at the implementation code of JavassistProxyFactory.

public <T> T getProxy(Invoker<T> invoker, Class<? >[] interfaces) {// Subclass Proxy (Proxy is an abstract class). Return (T) proxy.getProxy (interfaces).newinstance (new) InvokerInvocationHandler(invoker)); }Copy the code

The above code is not very much, the first is to get the Proxy subclass through Proxy getProxy method, then create the InvokerInvocationHandler object, and pass this object to newInstance to generate a Proxy instance. InvokerInvocationHandler Implements the JDK’s InvocationHandler interface, which is used to intercept interface class calls. Below to org. Apache. Dubbo. Demo. DemoService this interface, for example, look at the code in the class of interface agent is roughly how (ignoring the EchoService interface).

package org.apache.dubbo.common.bytecode; public class proxy0 implements org.apache.dubbo.demo.DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0() { } public proxy0(java.lang.reflect.InvocationHandler arg0) { handler = $1; } public java.lang.String sayHello(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = ($w) $1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String) ret; }}Copy the code

Well, that’s the end of the proxy generation logic. The whole process is complicated, so you need to be patient.

1.3 summarize

  1. Finding a reference service from the registry: In the presence of a registry, where the provider address is discovered through the registry, the URL format resolved by ReferenceConfig is: registry://registryhost:/org.apache.registry.RegistryService? refer=URL.encode(“conumerhost/com.foo.FooService? Version = “1.0.0).

  2. The RegistryProtocol#refer() method is invoked by registry:// protocol header identification of the URL

  3. Query the provider URL, such as dubbo://service-host/ com.foo.fooservice? Version =1.0.0 to get the registry

  4. Create a RegistryDirectory instance and set up the registry and protocol

  5. Generate conusmer connections, create nodes in the Consumer directory, and register with the registry

  6. After registration, you can subscribe to the data of providers, Configurators, and Routers

  7. An ExchangeClient client is created and an instance of DubboInvoker is returned by calling the DubboProtocol#refer() method through the DUbbo :// protocol header recognition of the URL

  8. Since a service may be deployed on multiple servers, it creates multiple nodes in providers, thus creating multiple instances of DubboInvoker. RegistryProtocol calls Cluster to disguise multiple service provider nodes as a single node. And returns an Invoker

  9. After Invoker is created, ProxyFactory is called to generate a proxy object for the service interface and return the provider reference

02 Network Communication

In the previous article, we analyzed consumer side service discovery in relation to provider side service exposure, and also learned that the consumer side uses built-in load balancing algorithms to get the appropriate call invoker to make remote calls. Next we’ll look at the remote call process, network communication.

Network communication is located in the Remoting module:

  • The Remoting implementation is an implementation of the Dubbo protocol. If you choose the RMI protocol, the entire Remoting will not be used.

  • Remoting is divided into Transport layer and Exchange information Exchange layer.

  • The Transport layer is only responsible for one-way message transmission and is an abstraction of Mina, Netty and Grizzly. It can also extend UDP Transport.

  • Exchange layer encapsulates request-Response semantics on top of transport layer.

Network communication problems:

  • The connection between the client and server is abnormal

  • Sticking and unpacking problems

  • Asynchronous multithreading data consistency problem

2.1 Communication Protocol

Dubbo has 10 communication protocols including DuBBo, RMI, Hessian, HTTP, WebService, Thrift, REST, GRPC, Memcached, Redis, etc. Features of each protocol are as follows

Dubbo agreement

The default Dubbo protocol uses a single long connection and NIO asynchronous communication, which is suitable for small data volume and large concurrent service invocation, and the number of service consumer machines is much larger than the number of service provider machines.

  • The default protocol uses tbremoting interaction based on Mina 1.1.7 and Hessian 3.2.1.

  • Number of connections: single connection

  • Connection mode: Long connection

  • Transport protocol: TCP

  • Transmission mode: NIO asynchronous transmission

  • Serialization: Hessian binary serialization

  • Scope of application: the incoming and outgoing parameter packets are small (less than 100K is recommended), there are more consumers than providers, a single consumer cannot overwhelm providers, try not to use dubbo protocol to transfer large files or large strings.

  • Application scenario: General remote service method invocation

The rmi protocol

RMI protocol uses JDK standard java.rmi.* implementation, using blocking short connection and JDK standard serialization.

  • Number of connections: Multiple connections

  • Connection mode: Short connection

  • Transport protocol: TCP

  • Transmission mode: Synchronous transmission

  • Serialization: Java standard binary serialization

  • Scope of application: incoming and outgoing parameters packet size mixed, the number of consumers and providers is about the same, can be transmitted files.

  • Application scenario: Regular remote service method calls interoperable with native RMI services

Hessian protocol

Hessian protocol is used to integrate Hessian services. Hessian uses Http communication and Servlet to expose services.

Dubbo lacks in-province Jetty as a server implementation.

Dubbo’s Hessian protocol interoperates with the native Hessian service, that is: The provider exposes the service using Dubbo’s Hessian protocol and the consumer invokes it directly using the standard Hessian interface or the provider exposes the service using Dubbo’s Hessian protocol and the consumer invokes it using Dubbo’s Hessian protocol.

  • Number of connections: Multiple connections

  • Connection mode: Short connection

  • Transport protocol: HTTP

  • Transmission mode: Synchronous transmission

  • Serialization: Hessian binary serialization

  • Scope of application: large incoming and outgoing parameter data packets, more providers than consumers, greater pressure from providers, files can be transmitted.

  • Application scenario: Page transfer, file transfer, or interoperate with the native Hessian service

The HTTP protocol

Remote call protocol based on HTTP forms, using Spring HttpInvoker implementation

  • Number of connections: Multiple connections

  • Connection mode: Short connection

  • Transport protocol: HTTP

  • Transmission mode: Synchronous transmission

  • Serialization: Form serialization

  • Scope of application: the size of incoming and outgoing parameters packet is mixed, the number of providers is more than the number of consumers, the browser can be used to view, the form or URL can be used to pass in parameters, temporarily does not support file transmission.

  • Application scenario: Services that need to be used by applications and browsers.

Webservice Remote call protocol based on WebService, implemented based on Apache CXF](dubbo.apache.org/zh-cn/docs/…

Can interoperate with native WebService services, that is: The provider exposes the service with Dubbo’s WebService protocol and the consumer invokes it directly with the standard WebService interface, or the provider exposes the service with the standard WebService and the consumer invokes it with Dubbo’s WebService protocol.

  • Number of connections: Multiple connections

  • Connection mode: Short connection

  • Transport protocol: HTTP

  • Transmission mode: Synchronous transmission

  • Serialization: SOAP text serialization (HTTP + XML)

  • Application scenario: System integration and cross-language invocation

Thrift agreement

The thrift protocol currently supported by Dubbo [1] is an extension of the thrift native protocol [2], adding some additional headers, such as Service name, Magic Number, etc.

Rest protocol

Based on the standard Java REST API — REST call support implemented by JAX-RS 2.0 (Java API for RESTful Web Services)

GRPC agreement

Dubbo supports gRPC since version 2.7.5. GRPC is a good option for developers who plan to use HTTP/2 for communication, or who want to take advantage of gRPC’s Stream, backvoltage, and Reactive capabilities.

To bring service governance capabilities to users who want to use the gRPC protocol and facilitate access to the Dubbo system users can use the Dubbo style, interface-based programming style to define and use remote services

The memcached protocol

RPC protocol based on memcached implementation

Redis protocol

RPC protocol based on Redis

2.2 the serialization

Serialization is the conversion of an object into a byte stream for network transmission, and a byte stream into an object for restoration upon receipt of byte stream data. Serialization has many advantages, such as better security, cross-platform, etc. We know that Dubbo uses Netty for network communication, and you can see the related netty classes in the nettyClient.doopen () method

bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; }});Copy the code

Then go to the NettyCodecAdapter class and finally enter the encodeRequest method of the Exchange Dec class as follows:

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request
req) throws IOException {
       Serialization serialization = getSerialization(channel);
       // header.
       byte[] header = new byte[HEADER_LENGTH];
Copy the code

The Hessian2Serialization interface is the default Serialization interface.

Dubbo serialization supports Java, CompactedJava, NativeJava, FastJSON, Dubbo, FST, Hessian2, Kryo, protostuff hessian2 by default. Among them, Java, compactedjava and nativejava belong to the serialization of nativejava.

Dubbo serialization: Ali has not yet developed an efficient Java serialization implementation, and ali does not recommend its use in production environments. Hessian2 Serialization: Hessian is an efficient way to serialize binary data across languages. But this is actually not native Hessian2 serialization, but ali’s modification, which is the serialization enabled by Dubbo RPC by default. Json serialization: there are two implementations, one is the fastJSON library of Ali, the other is the simple JSON library of Dubbo itself, but its implementation is not particularly mature, and the performance of JSON text serialization is generally inferior to the above two binary serialization. Java serialization: mainly using Java Java serialization JDK implementation, performance is not ideal.

2.3 Network Communication

(1) Data format in Dubbo

There are three ways to solve the problem of data sticking and unpacking in a socket

Fixed-length protocol (consistent packet length)

In a fixed-length protocol, the length of the protocol content is fixed. For example, the byte length of the protocol is 50. After 50 bytes are read from the network, the decode operation is performed. Fixed-length protocol is more efficient in reading or writing, because the size of the data cache is basically determined, just like array. The defect is insufficient adaptability. Taking RPC scenarios as an example, it is difficult to estimate the fixed length.

Special terminator (data terminator: identifies # with a special character)

Compared with fixed-length protocols, if a special character can be defined to indicate the end of each protocol unit, communication can be conducted in a variable length manner, thus striking a balance between data transmission and efficiency, such as using the special character \n. The problem of special terminal mode is that it is too simple to think about the process of protocol transmission. For a protocol unit, all the data must be read before it can be processed. In addition, the data transmitted by the user must be prevented from being the same as the terminal, otherwise there will be disorder.

Variable length protocol (Protocol header + Payload mode)

This protocol is usually a custom protocol and consists of a fixed-length plus an indeterminate length section, where the indeterminate length section describes the content length of the indeterminate length. Dubbo uses this form of data transfer format

Dubbo packet is divided into message header and message body. The message header is used to store some meta information, such as Magic, packet type (Request/Response), and message body Length (Data Length). The message body is used to store specific invocation messages, such as method names, parameter lists, and so on. Here is a brief list of the contents of the message header.

Offset (Bit) Value of the field

0 to 7 Magic number high 0xda00

8 to 15 Magic number low 0xBB

16 Packet types 0-response and 1-request

17 calls only work if bit 16 is set to 1, 0 – one-way calls, 1 – two-way calls

18 Event identification 0 – Current packet is a request or response packet, 1 – Current packet is a heartbeat packet

19 ~ 23 serializer number 3-2 – Hessian2Serialization JavaSerialization 6-4 – CompactedJavaSerialization FastJsonSerialization 7 – NativeJavaSerialization 8 – KryoSerialization 9 – FstSerialization

24 to 31 Status 20 -OK 30 -client_timeout 31 -server_timeout 40 -bad_request 50 -bad_response……

32 to 95 Request number The value contains 8 bytes and is generated during runtime

96 to 127 The message body length is calculated during runtime

(2) The consumer sends the request

/** *proxy0#sayHello(String) * -- > InvokerInvocationHandler# Object[]) * -- > MockClusterInvoker#invoke(Invocation) * -- > AbstractClusterInvoker#invoke(Invocation) * -- > FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>,LoadBalance) * -- > Filter#invoke(Invocation, Invocation, Invocation) Invocation) // multiple Filter Invocation * -- > ListenerInvokerWrapper#invoke(Invocation) * -- > AbstractInvoker#invoke(Invocation) * - > DubboInvoker# doInvoke (Invocation) * - > ReferenceCountExchangeClient# request (Object, Int) * -- > HeaderExchangeClient#request(Object, int) * -- > HeaderExchangeChannel#request(Object, Int * -- > AbstractPeer#send(Object) * -- > AbstractClient#send(Object, Boolean) * -- > NettyChannel#send(Object, Boolean) * -- > NioClientSocketChannel#write(Object) */Copy the code

Dubbo consumer, automatically generated code object as follows

public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { private InvocationHandler handler; Public String sayHello(String String) {Object[] arrobject = New Object[]{String}; Object Object = this.handler.invoke(this, methods[0], arrobject); Return (String)object; }}Copy the code

The Invoker member variable in InvokerInvocationHandler is of type MockClusterInvoker, which internally encapsulates the service degradation logic. Here’s a quick look:

public Result invoke(Invocation invocation) throws RpcException { Result result = null; String Value = Directory.geturl ().getmethodParameter (Invocation. GetMethodName (), MOCK_KEY, Invocation invocation. Boolean.FALSE.toString()).trim(); If (value. The length () = = 0 | | value. EqualsIgnoreCase (" false ")) {/ / no mock logic, direct call other Invoker object invoke method, FailoverClusterInvoker result = This. Invoker. Invocation (Invocation); } else if (Value.startswith ("force")) {// force: XXX Mock logic, invocation result = doMockInvoke(Invocation, NULL); } else {// fail: XXX mock logic after the invocation fails try {result = this.invoker.invocation (Invocation); } Catch (RpcException e) {mock result = Invocation (Invocation, e); } } return result; }Copy the code

Considering that the FailoverClusterInvoker has been analyzed in detail in the previous section, this section ignores FailoverClusterInvoker and directly analyzes DubboInvoker.

public abstract class AbstractInvoker<T> implements Invoker<T> { public Result invoke(Invocation inv) throws RpcException { if (destroyed.get()) { throw new RpcException("Rpc invoker for service ..." ); } RpcInvocation invocation = (RpcInvocation) inv; // setInvoker invocation. SetInvoker (this); if (attachment ! = null && attachment. The size () > 0) {/ / set the attachment invocation. AddAttachmentsIfAbsent (legislation); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments ! = null && contextAttachments.size() ! = 0) {/ / add contextAttachments to RpcInvocation# attachment variable invocation. AddAttachments (contextAttachments); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, False)) {// Invocation of rPCInvocation.setAttachment (Constants.ASYNC_KEY, Bool.true.tostring ()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); Return invocation (Invocation); // Invocation (invocation); } catch (InvocationTargetException e) { // ... } catch (RpcException e) { // ... } catch (Throwable e) { return new RpcResult(e); } } protected abstract Result doInvoke(Invocation invocation) throws Throwable; // omit other methods}Copy the code

The above code comes from the AbstractInvoker class, and most of it is used to add information to the RpcInvocation# Attachment variable. After that, doInvoke is called to perform subsequent calls. DoInvoke is an abstract method that needs to be implemented by subclasses, as seen in DubboInvoker.

@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); Invocation Invocation inv.setAttachment(PATH_KEY, getUrl().getPath())); inv.setAttachment(VERSION_KEY, version); // Get the client connection ExchangeClient currentClient; If (clients.length == 1) {// Default currentClient = clients[0]; } else {currentClient = clients[index.getAndIncrement() % clients.length]; Boolean isOneway = RPCutils. isOneway(getUrl(), Invocation); int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); / / isOneway to true, Boolean isSent = getUrl().getMethodParameter(methodName, constants.sent_key, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else {return value exists // AsyncRpcResult AsyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); responseFuture.whenComplete((obj, t) -> { if (t ! = null) { asyncRpcResult.completeExceptionally(t); } else { asyncRpcResult.complete((AppResponse) obj); }}); RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); return asyncRpcResult; }} // omit irrelevant code}Copy the code

Eventually you go to the headerExchange Hannel # Request method, assemble the request and send it out

public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed tosend request " + request + ", cause: The channel " + this + " is closed!" ); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { //NettyClient channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }Copy the code

How does request encoding work?

When Netty starts, we set up the codec, which is done with Exchange Dec as follows:

Public class ExchangeCodec extends TelnetCodec {public class ExchangeCodec extends TelnetCodec {// Protected static final int HEADER_LENGTH = 16; // protected static final short MAGIC = (short) 0xdabb; protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0]; protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1]; protected static final byte FLAG_REQUEST = (byte) 0x80; protected static final byte FLAG_TWOWAY = (byte) 0x40; protected static final byte FLAG_EVENT = (byte) 0x20; protected static final int SERIALIZATION_MASK = 0x1f; private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class); public Short getMagicCode() { return MAGIC; } @Override public void encode(Channel channel, ChannelBuffer buffer, Object MSG) throws IOException {if (MSG instanceof Request) {encodeRequest(channel, buffer, (Request) msg); } else if (MSG instanceof Response) {encodeResponse(channel, buffer, (Response) MSG); } else { super.encode(channel, buffer, msg); } } protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // Create an array of header bytes of 16 bytes [] header = new byte[HEADER_LENGTH]; // Set the MAGIC bytes. short2bytes(MAGIC, header); / / set the packet types (Request/Response) and the serializer number header [2] = (byte) (FLAG_REQUEST | serialization. GetContentTypeId ()); / / set the communication mode (one-way/two-way) if (the req. IsTwoWay ()) {header [2] | = FLAG_TWOWAY; } / / the event marks the if (the req. IsEvent ()) {header [2] | = FLAG_EVENT; } // Set the request number, 8 Bytes, starting with the fourth byte bytes.long2bytes (req.getid (), header, 4); Int savedWriteIndex = buffer.writerIndex(); int savedWriteIndex = buffer.writerIndex(); // Update writerIndex, reserving 16 bytes for the header buffer. WriterIndex (savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // Create a serializer, such as Hessian2ObjectOutput ObjectOutput out = serialization.serialize(channel.geturl (), bos); If (req.isEvent()) {encodeEventData(Channel, out, req.getData()); Equestdata (channel, out, req.getData(), req.getVersion())); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); Int len = bos.writtenBytes(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12); // Write the length of the message body to the header. Buffer. writerIndex(savedWriteIndex); // Move the buffer pointer to savedWriteIndex in preparation for writing the header. // Write the message header buffer.writeBytes(header) from the savedWriteIndex subscript; WriterIndex (savedWriteIndex + HEADER_LENGTH + len); // Set a new writerIndex, writerIndex = original write subscript + header length + body length buffer. } // omit other methods}Copy the code

This is the encoding of the request object, which begins by writing the message header into the header array using a bit operation. The data field of the Request object is then serialized, and the serialized data is eventually stored in the ChannelBuffer. After the serialization operation, len is the serialized length of the data, which is then written to the location specified in the header. Finally, the header byte array is written to the ChannelBuffer, and the encoding process is complete. Equestdata method of encodeRequestData. For example:

public class DubboCodec extends ExchangeCodec implements Codec2 { protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; // Serialize dubbo version, path, version out.writeUTF(version); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); // Serialize call method name out.writeutf (inv.getMethodName()); WriteUTF (reflectutils.getdesc (inv.getParameterTypes()))); // Convert parameter types to strings and serialize out.writeutf (reflectutils.getdesc (inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args ! = null) for (int i = 0; i < args.length; I ++) {// Serialize out.writeObject(encodeInvocationArgument(channel, inv, I)); } // Serialize attachments out.writeObject(inv.getattachments ()); }}Copy the code

Now that we have analyzed the process by which the service consumer sends the request, let’s look at how the service provider receives the request.

(3) Providing Party accepts the request

How is the request decoded?

Here, the decoding logic of the requested data is analyzed directly, ignoring the intermediate process, as follows:

public class ExchangeCodec extends TelnetCodec { @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); Byte [] header = new byte[math.min (readable, HEADER_LENGTH)]; // Read the message header data buffer.readBytes(header); Return decode(channel, buffer, readable, header); } @Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, Byte [] header) throws IOException {// Check whether the magic is equal. If (readable > 0 && header[0]! = MAGIC_HIGH || readable > 1 && header[1] ! = MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; }} // Packets sent via Telnet command line do not contain headers, Return super.decode(Channel, buffer, readable, header); return super.decode(channel, buffer, readable, header); } // Check if the amount of readable data is less than the length of the header, NEED_MORE_INPUT if (readable < HEADER_LENGTH) {return decoderesult. NEED_MORE_INPUT; Int len = bytes. bytes2int(header, 12); CheckPayload (channel, len); // Check whether the length of the message body exceeds the limit. int tt = len + HEADER_LENGTH; NEED_MORE_INPUT; if (readable < tt) {return decoderesult. NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); DecodeBody (channel, is, header); decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }Copy the code

The above method intercepts unconventional packets, such as packets sent from the Telnet command line, in advance by detecting whether the magic number in the message header is equal to the specified magic number. The length of the message body and the number of bytes that can be read are then detected. Finally, the decodeBody method is called for subsequent decoding. The decodeBody method is implemented in Exchange Dec, but because subclass DubboCodec overrides the method, the decodeBody method in DubboCodec is called at runtime. Let’s look at the code for this method.

public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {// Get the third byte in the header, Byte flag = header[2], PROTO = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); Long id = bytes. bytes2Long (header, 4); 0 - Response, 1 - Request if ((flag & FLAG_REQUEST) == 0) {// Decode the Response result to get the Response object. //... } else {// create Request object Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); // Set the Request object to req.settwoway ((flag & FLAG_TWOWAY)! = 0); If ((flag & FLAG_EVENT)! Req.setevent (request.heartbeat_event); } try { Object data; If (req.isheartbeat ()) { DecodeHeartbeatData (channel, deserialize(s, channel.geturl (), is)); } else if (req.isEvent()) {decodeEventData(channel, deserialize(s, channel.geturl (), is)); } else { DecodeableRpcInvocation inv; If (channel.geturl ().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants. Constants.DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); // Perform subsequent decoding on the current thread, the IO thread. Inv.decode (); inv.decode(); inv.decode(); } else {// Create only DecodeableRpcInvocation, But not on the current thread execution solution code logic inv = new DecodeableRpcInvocation (channel, the req, new UnsafeByteArrayInputStream (readMessageData (is)), proto); } data = inv; } // setData to the Request object req.setdata (data); } catch (Throwable t) {// If an exception occurs during decoding, set the broken field to true, and set the exception object to req.setbroken (true); req.setData(t); } return req; }}}Copy the code

As above, decodeBody decodes some of the fields and wraps the decoded fields into Request. The decode method of the Decodeable PC Invocation is then called for subsequent decoding. When this is done, you can parse out the calling method name, attachment, and call parameters.

Call the service

After the decoder parses the packet into a Request object, NettyHandler’s messageReceived method receives the object and passes it down. The entire call stack is as follows:

NettyServerHandler# channelRead (ChannelHandlerContext, MessageEvent) - > AbstractPeer# received (Channel, MultiMessageHandler#received(Channel, Object) -- > MultiMessageHandler#received(Channel, Object) -- > MultiMessageHandler#received(Channel, Object) Object) -- > AllChannelHandler#received(Channel, Object) -- > ExecutorService#execute(Runnable) // subsequent call logic executed by the thread pool =Copy the code

Here we directly examine the first and last call method logic in the call stack. Here it is: For the sake of space, and the logic of many intermediate calls is not very important, we will not analyze every method in the call stack here. Here we go straight to the logic of the last call method. As follows:

public class ChannelEventRunnable implements Runnable { private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; @override public void run() {// Check the channel status. State = RECEIVED if (state == channelstate. RECEIVED) {try {// Pass channel and message to ChannelHandler. Handler. Received (channel, message); } catch (Exception e) { logger.warn("... operation error, channel is ... message is ..." ); Else {switch (state) {case CONNECTED: try {handler. CONNECTED (channel); } catch (Exception e) { logger.warn("... operation error, channel is ..." ); } break; case DISCONNECTED: // ... case SENT: // ... case CAUGHT: // ... default: logger.warn("unknown state: " + state + ", message is " + message); }}}}Copy the code

As mentioned above, the frequency of request and response messages is obviously higher than that of other types of messages, so this type of message is judged specifically. ChannelEventRunnable is only a relay station, and its run method does not contain specific call logic. Used only to pass parameters to another ChannelHandler object for processing, which is of type DecodeHandler

public class DecodeHandler extends AbstractChannelHandlerDelegate { public DecodeHandler(ChannelHandler handler) { super(handler); } @Override public void received(Channel channel, Object message) throws RemotingException {if (message Instanceof Decodeable) {// Decode the Decodeable interface implementation class Object decode(message); } if (message instanceof Request) {decode((Request) message.getData()); } if (message instanceof Response) {decode((Response) message).getresult ()); } // Execute subsequent logic handler.received(channel, message); } private void decode(Object message) {// Decodeable // DecodeableRpcInvocation and DecodeableRpcResult if (Message! Decodeable = null && message instanceof Decodeable) {try {// Decode logic ((Decodeable) message).decode(); } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Call Decodeable.decode failed: " + e.getMessage(), e); } } } } }Copy the code

DecodeHandler basically contains some decoding logic, and the fully decoded Request object will continue to be passed back

public class DubboProtocol extends AbstractProtocol { public static final String NAME = "dubbo"; private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<? > invoker = getInvoker(channel, inv); If (boil.true.tostring ().equals(inv.getattachments ().get(IS_CALLBACK_SERVICE_INV OKE))) {// Callback related, Ignore} RpcContext. GetContext (). SetRemoteAddress (channel) getRemoteAddress ()); Return invoker.invoke(inv); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: ..." ); // ignore other methods} Invoker<? > Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation Invocation int port = channel.getLocalAddress().getPort(); / / calculate the service key, format for groupName/serviceName: serviceVersion: port. Than such as: / / dubbo/com. Alibaba. Dubbo. Demo. DemoService: 1.0.0:20880 String the serviceKey = the serviceKey (port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); // Search for DubboExporter from exporterMap, // The <serviceKey, DubboExporter> mapping is stored in the exporterMap collection. DubboExporter<? > exporter = (DubboExporter<? >) exporterMap.get(serviceKey); if (exporter == null) throw new RemotingException(channel, "Not found exported service ..." ); // Get the Invoker object and return my exporter. GetInvoker (); } // Ignore other methods}Copy the code

As described in previous lessons, services are saved in exporterMap after they are all exposed. Here you get your Invoker from serviceKey after your exporter, and invoke the service logic through Invoker’s Invoke method

public abstract class AbstractProxyInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation Invocation) throws RpcException {try {// Call doInvoke to execute subsequent invocation and encapsulate the call result in RpcResult, And return the new RpcResult (doInvoke (proxy invocation. GetMethodName (), invocation. GetParameterTypes (), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method ..." ); } } protected abstract Object doInvoke(T proxy, String methodName, Class<? >[] parameterTypes, Object[] arguments) throws Throwable; }Copy the code

As mentioned above, doInvoke is an abstract method that needs to be implemented by a concrete Invoker instance. The Invoker instance is created at run time using the JavassistProxyFactory with the following logic:

Public class JavassistProxyFactory extends AbstractProxyFactory {Override public <T> Invoker<T> getInvoker(T)  proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); Return new AbstractProxyInvoker<T>(proxy, type, url) {@override protected Object doInvoke(T proxy, String methodName, Class<? >[] parameterTypes, Object[] arguments) throws Throwable {// Call invokeMethod for subsequent calls to return wrapper. InvokeMethod (proxy, methodName) parameterTypes, arguments); }}; }}Copy the code

Wrapper is an abstract class where invokeMethod is an abstract method. Dubbo generates an implementation class for Wrapper at run time through the Javassist framework and implements the invokeMethod method, which ultimately invokes the specific service based on the invocation information. For example, DemoServiceImpl, Javassist generates the following proxy class.

/** Wrapper0 is generated at runtime, */ public class Wrapper0 extends Wrapper implements classGenerator. DC {public static String[] PNS; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; Public Object invokeMethod(Object Object, String String, Class[] arrClass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; Try {// Type conversion demoService = (demoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); If ("sayHello".equals(string) &&arrClass.length == 1) {return demoService.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString()); }}Copy the code

At this point, the entire service invocation process is analyzed. Finally, post the call process as follows:

ChannelEventRunnable#run() -- > DecodeHandler#received(Channel, Object) -- > HeaderExchangeHandler#received(Channel, Object) - > HeaderExchangeHandler# handleRequest (ExchangeChannel, Request) - > DubboProtocol. RequestHandler# reply (ExchangeChannel, Object) - > Filter# invoke (Invoker, Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) -- > Invocation (Invocation) Object []) - > DemoServiceImpl# sayHello (String)Copy the code

(4) The provider returns the call result

After the service provider invokes the specified service, it encapsulates the invocation result into a Response object and returns the object to the service consumer. The service provider also returns the Response object through the Send method of NettyChannel, so the analysis is not repeated here.

(5) The consumer receives the call result

After the service consumer receives the Response data, the first thing to do is to decode the Response data and get the Response object. This object is then passed to the next inbound handler, which is the NettyHandler. NettyHandler then passes the object down, and finally AllChannelHandler’s received method receives the object and dispatches it to the thread pool. This process is the same as when the service provider receives the request, so the analysis will not be repeated here

03 summary

So far the whole dubbo core process principle and source code, we will finish the analysis, the overall process of thinking is not complex, but a lot of details, to understand the idea of the first, or have to spend more time carefully again.