Consumer Consumer Demo example


      
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <context:property-placeholder/>

    <dubbo:application name="serialization-java-consumer">
        <dubbo:parameter key="qos.enable" value="true" />
        <dubbo:parameter key="qos.accept.foreign.ip" value="false" />
        <dubbo:parameter key="qos.port" value="33333" />
    </dubbo:application>

    <dubbo:registry address="Zookeeper: / / ${zookeeper. Address: 127.0.0.1} : 2181"/>

    <dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.serialization.api.DemoService"/>

</beans>
Copy the code

As you’ve seen in the previous section, Dubbo implements custom tags based on the Spring custom tag specification, loads beans with custom tags, and launches the Dubbo client by implementing an implementation that listens for Spring container refresh completion events. Starting the client is accompanied by service publishing and service subscription.

public class DubboConsumer {

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-demo-consumer.xml");
        context.start();
        DemoService demoService = context.getBean("demoService", DemoService.class);
        String hello = demoService.sayHello("world"); System.out.println(hello); }}Copy the code

Dubbo references the service via the

    @Override
    public Object getObject(a) {
        return get();
    }
Copy the code

Referenceconfigure #getObject() gets the application Bean

The ReferenceBean inherits the ReferenceConfig, and the Get () method of the ReferenceBean is called when the getObject() method of the ReferenceBean is called.

    public synchronized T get(a) {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        If the proxy reference is empty, call init
        if (ref == null) {
            init();
        }
        return ref;
    }

    public synchronized void init(a) {
        if (initialized) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }
        // 1. Check the configuration of ConsumerConfig. If there is one, check the configuration
        // 2. Reflection creates the call API
        Initialize ServiceMetadata
        // 4. Register Consumer
        // 5. Check ReferenceConfig, RegistryConfig, ConsumerConfig
        checkAndUpdateSubConfigs();

        checkStubAndLocal(interfaceClass);
        // Check whether the referenced interface mocks
        ConfigValidationUtils.checkMock(interfaceClass, this);
        // Consumer information
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);
        // Add runtime parameters to map, including: dubbo, release, TIMESTAMP, pid
        ReferenceConfigBase.appendRuntimeParameters(map);
        // Is it a generalization, if not an entry condition
        if(! ProtocolUtils.isGeneric(generic)) { String revision = Version.getVersion(interfaceClass, version);if(revision ! =null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }
            / / access method, the wrapper classes, using javassist, put the generated classes in WRAPPER_MAP, the key is org. Apache. Dubbo. Samples. Serialization. API. DemoService class object, the value is a wrapper class instance
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                // Put method in map, where method is sayHello
                map.put(METHODS_KEY, StringUtils.join(newHashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR)); }}// interface org.apache.dubbo.samples.serialization.api.DemoService
        map.put(INTERFACE_KEY, interfaceName);
        // Add additional parameters to the map
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if(metadataReportConfig ! =null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        Map<String, AsyncMethodInfo> attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries"."0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if(asyncMethodInfo ! =null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }
        
        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);
        // All data is stored in the attachments of the serviceMetadata
        serviceMetadata.getAttachments().putAll(map);
        // Create the Service proxy
        ref = createProxy(map);
        // Set the Service proxy reference for ServiceMetadata
        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);
        // The flag is initialized
        initialized = true;

        / / dispatch a ReferenceConfigInitializedEvent since 2.7.4
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }
Copy the code

ReferenceConfig#createProxy() creates the service proxy

    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        // Whether InJvm, protocol is InJvm
        if (shouldJvmRefer(map)) {
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service "+ interfaceClass.getName()); }}else {
            urls.clear();
            if(url ! =null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if(us ! =null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (UrlUtils.isRegistry(url)) {
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else{ urls.add(ClusterUtils.mergeUrl(url, map)); }}}}else { // assemble URL from register center's configuration
                // Check the registry if protocols is not injVM
                if(! LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) { checkRegistry();Zookeeper :// registry://
                    List<URL> us = ConfigValidationUtils.loadRegistries(this.false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            / / monitor the URL
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            // Add monitoring configurations to map
                            if(monitorUrl ! =null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            // Generate a URL based on the information in the map
                            / * registry: / / 127.0.0.1:2181 / org. Apache. Dubbo. Registry. RegistryService? Application = Java serialization - - consumer&dubbo = 2.0.2 & pid = 66793 & qos. Accept. Foreign. IP = false&qos. Enable = true&qos. Port = 33333 & refer = application % 3 dserialization - Java - consumer % 26 check % 3 dtrue % 26 dubbo % 3 d2. The 0.2% % 26 init 3 dfalse % 26 interface % 3 dorg apache. dubbo.samples.serialization.api.DemoService%26methods%3DsayHello%26pid%3D66793%26qos.accept.foreign.ip%3Dfalse%26qos.ena Ble % 3 dtrue % 26 qos. The port % 3 d33333%26 register. The IP % 3 d192. 168.58.45%26 release % 3 d2. The 7.7% % 26 side 3 dconsumer % 26 sticky % 3 dfalse % 26 times Tamp % 3 d1636532992568 & registry = zookeeper&release = 2.7.7 & timestamp = 1636533091883 * /urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); }}if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config 
       to your spring config."); }}}// If only one URL is referenced directly through the refer service, which is similar to export, through the call chain
            // - ProtocolListenerWrapper
            // - - ProtocolFilterWrapper
            // - - - RegistryProtocol
            if (urls.size() == 1) {
                // refer to the service via RegistryProtocol#refer
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
                // If multiple services are referenced, loop processingList<Invoker<? >> invokers =newArrayList<Invoker<? > > (); URL registryURL =null;
                for (URL url : urls) {
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url}}if(registryURL ! =null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    // Cluster processing
                    URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    // Join the cluster, which contains the cluster processing mode, respectively:
                    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = CLUSTER.join(newStaticDirectory(invokers)); }}}// Invoer is not available
        if(shouldCheck() && ! invoker.isAvailable()) { invoker.destroy();throw new IllegalStateException("Failed to check the status of the service "
                    + interfaceName
                    + ". No provider available for the service "
                    + (group == null ? "" : group + "/")
                    + interfaceName +
                    (version == null ? "" : ":" + version)
                    + " from the url "
                    + invoker.getUrl()
                    + " to the consumer "
                    + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        / * * *@since2.7.0 * ServiceData storage, SPI mechanism, support both memory and remote */
        String metadata = map.get(METADATA_KEY);
        WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
        if(metadataService ! =null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataService.publishServiceDefinition(consumerURL);
        }
        // create service proxy
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
Copy the code

Injvm generates the url based on the configuration information (map) constructed and changes the URL protocol to zookeeper:// and registry://. The service is then referred to via the Protocol interface refer method. Similar to publishing a service, the process of referring to a service also wraps the invocation chain of the method as follows:

- ProtocolListenerWrapper
- - ProtocolFilterWrapper
- - - RegistryProtocol
Copy the code

In the process of refer, the reference of one server and the service of multiple servers are distinguished. For the service with multiple servers, cluster processing will be carried out, and the Invoker list will be added to the cluster. During the call process, different strategies will be selected according to the cluster policy for invocation. Cluster policy implementation also implements SPI mechanism,

RegistryProtocol#refer reference service

    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // Registry url converts registry:// to Zookeeper ://
        url = getRegistryUrl(url);
        // Get the registry
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // Parse the reference service information from the URL
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        // Get the grouping
        String group = qs.get(GROUP_KEY);
        // If groups are set, use the MergeableCluster policy
        if(group ! =null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                returndoRefer(getMergeableCluster(), registry, type, url); }}return doRefer(cluster, registry, type, url);
    }
Copy the code

RegistryProtocol#doRefer references services

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // Get the cluster Directory, which represents a collection of invokers
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            // Register the consumer URL
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        ZookeeperRegistry#doSubscribe
        directory.subscribe(toSubscribeUrl(subscribeUrl));
        // Join the cluster
        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }
Copy the code

Registrydirector # Subscribe Subscription service

RegistryProtocol#doRefer, directory.subscribe is processed in the following call chain, and finally ZookeeperRegistry#doSubscribe is called to register the data subscription interface with zk and set up a listener.

- RegistryDirectory
- - ListenerRegistryWrapper
- - - FailbackRegistry
- - - - ZookeeperRegistry
Copy the code
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                // Initialize the listener
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if(! anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), k); }}});// Create a ZK node
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener); }}}else {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if(children ! =null) { urls.addAll(toUrlsWithEmpty(url, path, children)); }}// notify(url, listener, urls); }}catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

Create Invoker DubboProtocol# protocolBindingRefer

This method is called when the refer service creates an Invoker. This method creates a network client using getClients, determines whether the client is a shared link, and ExchangeClient based on connections. Then through initClient initialize the client, the initialization process will determine whether to delay the client LazyConnectExchangeClient, not delay the client, will be through the connect connection service provider, connect to the service provider, the specific process is not here, a connection is established Will be introduced in network communication.

    @Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // Create RPC Invoker with getClients(URL)
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        // Add invoker to invokers
        invokers.add(invoker);

        return invoker;
    }

    // Create the client array
    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        // Whether to share links
        boolean useShareConnect = false;

        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            useShareConnect = true;

            /* * The XML configuration should have a higher priority than properties. */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            // Share the link client
            shareClients = getSharedClient(url, connections);
        }
        / / create ExchangeClient
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                // It is obtained from the shared client
                clients[i] = shareClients.get(i);

            } else {
                // Initialize the clientclients[i] = initClient(url); }}return clients;
    }

    // Initialize the client
    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        if(str ! =null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), ""));
        }

        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                // Lazily load the client
                client = new LazyConnectExchangeClient(url, requestHandler);

            } else {
                // Use NettyTransporter Connect to create a client and connect to the providerclient = Exchangers.connect(url, requestHandler); }}catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + ")." + e.getMessage(), e);
        }

        return client;
    }
Copy the code

conclusion

Dubbo Consumer starts and references the service. ReferenceBean implements Spring’s FactoryBean interface. The getObject method of the ReferenceBean is called when the Spring context getBean is used, and the service is referenced by creating a proxy (createProxy) and then by referring to the Protocol method;

The process of referring to a service can be roughly understood as creating ExchangeClient through the refer method of DubboProtocol. DubboInvoker creates ExchangeClient by calling the getClients method, and then initializing the network client through the initClient method. The client is initialized to establish a connection with the service provider through Exchangers’ Connect, followed by the network client to the server. In this case, similar to the service provider, the network client is initialized through doOpen and then the connection is established by calling doConnect.