This is the 24th day of my participation in the August More Text Challenge

This article uses the source address: simple-rpc

In distributed Services Framework – Underlying Communication (3) we solved three problems of Netty client invocation:

  1. Select a proper serialization protocol to solve the problem of half packet/sticky packet in Netty transmission.
  2. Use the advantage of long connection to reuse Netty Channel.
  3. Netty is an asynchronous framework. After a client initiates a service call, it waits to obtain the call result.

Now we’re going to actually make the call. We have created a dynamic proxy class for the service caller when the service is introduced. By implementing the InvocationHandler interface, the complex remote call communication logic is encapsulated in the Invoke (Object Proxy, Method Method, Object[] args) Method.

A dynamic proxy

@Override
public Object invoke(Object proxy, Method method, Object[] args) {

    // Service interface name
    String serviceItfName = targetItf.getName();
    IRegisterCenter registerCenter = RegisterCenterImpl.getInstance();
    // Get the list of services
    List<Provider> providerList = registerCenter.getServiceMetadata().get(serviceItfName);

    if (CollectionUtils.isEmpty(providerList)) {
        log.debug("can't find provider service={}, maybe need to reconnect zk server", serviceItfName);
    } else {
        // Obtain the corresponding service provider based on the load balancing policy
        LoadBalanceStrategy loadBalance = LoadBalanceStrategyEngine.getLoadBalanceStrategy(this.loadBalanceStrategy);
        Provider provider = loadBalance.select(providerList);

        Provider providerCopy = Provider.copy(provider);
        providerCopy.setServiceMethod(method);
        providerCopy.setServiceItf(targetItf);

        Request request = Request.builder().args(args)
                .invokeMethodName(method.getName())
                .invokeTimeout(timeout)
                .provider(providerCopy)
                .uniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId())
                .build();
        try {
            InetSocketAddress socketAddress = new InetSocketAddress(providerCopy.getServerIp(), providerCopy.getServerPort());

            // Initiate an asynchronous invocation request
            Future<Response> responseFuture = executorService.submit(RevokerServiceCallable.of(socketAddress, request));
            Response response = responseFuture.get(timeout, TimeUnit.MILLISECONDS);
            if(response ! =null) {
                returnresponse.getResult(); }}catch (Exception e) {
            log.error("RevokerProxyBeanFactory invoke error, request={}", request, e);
            throw new SRpcException("RevokerProxyBeanFactory invoke error", e); }}return null;
}

public Object getProxy(a) {
    return Proxy.newProxyInstance(
            Thread.currentThread().getContextClassLoader(), 
            new Class[]{targetItf}, this);
}
Copy the code

Let’s start with the getProxy() method, which uses proxy.newProxyInstance () to get the Proxy object. Here’s what invoke() does:

  1. Get the list of service providers from the service registry;
  2. Implement soft load (described below) based on load policy parameters configured on the service callerloadBalanceStrategy, obtain the specific load class, select the appropriate service provider;
  3. Assemble the service invocation requestRequest;
  4. Asynchronously submit requestsRevokerServiceCallable;
  5. The result of the request is awaited synchronously through a blocking queue mechanism.

As you can see, everything except step 4 is preparation for the service invocation. The RevokerServiceCallable class must be the core logic for a client to initiate a service call.

Client asynchronous service invocation

RevokerServiceCallable class implements Callable

and implements the call() method. If you are not familiar with it, you can review it by yourself.

@Override
public Response call(a) {
    
    RevokerResponseHolder.initResponseData(request.getUniqueKey());
    ArrayBlockingQueue<Channel> blockingQueue = NettyChannelPoolFactory.getInstance().acquire(inetSocketAddress);
    try {
        if (channel == null) {
            channel = blockingQueue.poll(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
        }
        if (channel == null) {
            log.error("can't find channel to resolve this request");
            throw new SRpcException("can't find channel to resolve this request");
        } else {
            while(! channel.isOpen() || ! channel.isActive() || ! channel.isWritable()) { log.warn("retry get new channel");
                channel = blockingQueue.poll(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
                if (channel == null) {
                    channel = NettyChannelPoolFactory.getInstance().registerChannel(inetSocketAddress);
                }
            }
            ChannelFuture channelFuture = channel.writeAndFlush(request);
            channelFuture.syncUninterruptibly();
            long invokeTimeout = request.getInvokeTimeout();
            returnRevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout); }}catch (Exception e) {
        log.error("service invoke error", e);
        throw new SRpcException("service invoke error", e);
    } finally{ NettyChannelPoolFactory.getInstance().release(blockingQueue, channel, inetSocketAddress); }}Copy the code

The whole process steps are as follows:

  1. Initialize the return result container as the unique identifier of this callKeyStore the returned resultMap;
  2. Gets the corresponding based on the local invocation service provider addressNettychannelchannelThe queue;
  3. Gets the value of this call from the queueNettychannelchannelIf no one is available in the queueChannel, then register a new oneChannel.
  4. Encodes the service request data object into a byte array through some serialization protocolChannelSend to the server;
  5. Synchronously wait for the server to return the call result;
  6. Finally, after this call, theNettyThe passage ofChannelRe-release to the queue for reuse in the next call.

conclusion

Now that the client and server have been able to complete the communication, let’s review:

  1. In the first chapter, we introduced the basic use of Netty and TCP half packet sticky packet problem;

  2. The second chapter introduces the code implementation of the server start and the service end flow limiting scheme implementation ideas;

  3. The third chapter describes how the client to complete Netty Channel reuse and how to complete the client to initiate service call synchronization wait to obtain the call results;

  4. In the final article we completed client dynamic proxy and asynchronous request.