General call

Java handwritten RPC (01) based socket implementation from scratch

Java hand-written RPC (02) -Netty4 implements both client and server side from scratch

Java from scratch handwritten RPC (03) how to implement client call server?

Java handwritten RPC (04) – serialization from scratch

In the last article, we introduced how to implement a universal server based on reflection.

In this section, we will learn how to implement a common client.

Because there is a lot of content, it is divided into two parts.

The basic idea

All method calls are implemented based on reflection.

Core classes

In order to facilitate expansion, we adjust the core class as follows:

package com.github.houbb.rpc.client.core;

import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.context.RpcClientContext;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.common.constant.RpcConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/** * <p> RPC client </p> ** <pre> Created: 2019/10/16 11:21 PM </pre> * <pre> Project: RPC </pre> **@author houbinbin
 * @sinceHundreds * /
@ThreadSafe
public class RpcClient {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    /** * Address information *@since0.0.6 * /
    private final String address;

    /** * listen port number *@since0.0.6 * /
    private final int port;

    /** * The handler is used to retrieve request information *@since0.0.4 * /
    private final ChannelHandler channelHandler;

    public RpcClient(final RpcClientContext clientContext) {
        this.address = clientContext.address();
        this.port = clientContext.port();
        this.channelHandler = clientContext.channelHandler();
    }

    /** * connect *@since0.0.6 * /
    public ChannelFuture connect(a) {
        // Start the server
        log.info("RPC service starts client");

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        /** * channel Future message * used to write request information *@since0.0.6 * /
        ChannelFuture channelFuture;
        try {
            Bootstrap bootstrap = new Bootstrap();
            channelFuture = bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<Channel>(){
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline()
                                    / / decoding bytes = > resp
                                    .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
                                    // request=>bytes
                                    .addLast(new ObjectEncoder())
                                    // Log output
                                    .addLast(new LoggingHandler(LogLevel.INFO))
                                    .addLast(channelHandler);
                        }
                    })
                    .connect(address, port)
                    .syncUninterruptibly();
            log.info("RPC service startup client completed, listening address {}:{}", address, port);
        } catch (Exception e) {
            log.error("RPC client encountered an exception", e);
            throw new RuntimeException(e);
        }
        // Do not close the thread pool!!

        returnchannelFuture; }}Copy the code

You can flexibly specify the server address and port information.

ChannelHandler is passed in as a processing parameter.

ObjectDecoder, ObjectEncoder, and LoggingHandler are all built-in implementations of Netty, similar to servers.

RpcClientHandler

The client handler is implemented as follows:

/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */

package com.github.houbb.rpc.client.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/** * <p> Client processing class </p> ** <pre> Created: 2019/10/1611:30pm </pre> * <pre> Project: RPC </pre> **@author houbinbin
 * @sinceHundreds * /
public class RpcClientHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    /** * Invoke the service management class **@since0.0.6 * /
    private final InvokeService invokeService;

    public RpcClientHandler(InvokeService invokeService) {
        this.invokeService = invokeService;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse)msg;
        invokeService.addResponse(rpcResponse.seqId(), rpcResponse);
        log.info("[Client] response is :{}", rpcResponse);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // If you don't get a response, I don't know why.
        // If it is not closed, it will always be blocked.
        ctx.flush();
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

Only channelRead0 has been tweaked to handle the results based on InvokeService.

InvokeService

interface

package com.github.houbb.rpc.client.invoke;

import com.github.houbb.rpc.common.rpc.domain.RpcResponse;

/** * Call the service interface *@author binbin.hou
 * @since0.0.6 * /
public interface InvokeService {

    /** * Add request information *@paramSeqId Serial number *@return this
     * @since0.0.6 * /
    InvokeService addRequest(final String seqId);

    /** * insert the result *@paramSeqId Unique identifier *@paramRpcResponse response result *@return this
     * @since0.0.6 * /
    InvokeService addResponse(final String seqId, final RpcResponse rpcResponse);

    /** * Get the result of the flag information *@paramSeqId Serial number *@returnResults *@since0.0.6 * /
    RpcResponse getResponse(final String seqId);

}
Copy the code

Mainly on the input parameter, the setting of the input parameter, as well as the access to the input parameter.

implementation

package com.github.houbb.rpc.client.invoke.impl;

import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/** * Call the service interface *@author binbin.hou
 * @since0.0.6 * /
public class DefaultInvokeService implements InvokeService {

    private static final Log LOG = LogFactory.getLog(DefaultInvokeService.class);

    /** * request sequence number set * (1) Here if you want to add timeout detection, can add the corresponding timeout time. * I can change this to map *@since0.0.6 * /
    private final Set<String> requestSet;

    /** * Response result *@since0.0.6 * /
    private final ConcurrentHashMap<String, RpcResponse> responseMap;

    public DefaultInvokeService(a) {
        requestSet = Guavas.newHashSet();
        responseMap = new ConcurrentHashMap<>();
    }

    @Override
    public InvokeService addRequest(String seqId) {
        LOG.info("[Client] start add request for seqId: {}", seqId);
        requestSet.add(seqId);
        return this;
    }

    @Override
    public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {
        // You can add a judgment before putting it in here.
        // If seqId must be processed in the request set, it is allowed to be added. Or simply ignore discarding.
        LOG.info([Client] get result information, seq: {}, rpcResponse: {}, seqId, rpcResponse);
        responseMap.putIfAbsent(seqId, rpcResponse);

        // Notify all waiting parties
        LOG.info("[Client] SEQ information has been placed, inform all waiting parties", seqId);

        synchronized (this) {
            this.notifyAll();
        }

        return this;
    }

    @Override
    public RpcResponse getResponse(String seqId) {
        try {
            RpcResponse rpcResponse = this.responseMap.get(seqId);
            if(ObjectUtil.isNotNull(rpcResponse)) {
                LOG.info("[Client] seq {}", seqId, rpcResponse);
                return rpcResponse;
            }

            // Enter the wait
            while (rpcResponse == null) {
                LOG.info("[Client] seq {} result null, enter wait", seqId);
                // Synchronously wait lock
                synchronized (this) {
                    this.wait();
                }

                rpcResponse = this.responseMap.get(seqId);
                LOG.info("[Client] seq {}", seqId, rpcResponse);
            }

            return rpcResponse;
        } catch (InterruptedException e) {
            throw newRpcRuntimeException(e); }}}Copy the code

Use the requestSet to store the corresponding request entry parameters.

The responseMap is used to store the corresponding request parameters, and the result is obtained by waiting in a synchronous while loop.

Here, notifyAll() and wait() are used to wait and wake up.

ReferenceConfig- Server configuration

instructions

If we want to call the server, we must first define the object to call.

ReferenceConfig is what tells the RPC framework about the server being called.

interface

package com.github.houbb.rpc.client.config.reference;

import com.github.houbb.rpc.common.config.component.RpcAddress;

import java.util.List;

/** * reference configuration class ** Later configuration: * (1) timeout Call timeout time * (2) Version Service version processing * (3) callType call mode oneWay/sync/ Async * (4) Check Whether the service must be started. * * SPI: * (1) CODEC serialization * (2) Netty network communication architecture * (3) load-balance * (4) fail-over/fail-fast * * filter: * (1) The only serviceId has a fixed interface. Can it be omitted? *@author binbin.hou
 * @since 0.0.6
 * @param<T> Interface generics */
public interface ReferenceConfig<T> {

    /** * Set the service id *@paramServiceId indicates the serviceId *@return this
     * @since0.0.6 * /
    ReferenceConfig<T> serviceId(final String serviceId);

    /** * Service unique identifier *@since0.0.6 * /
    String serviceId(a);

    /** * service interface *@since 0.0.6
     * @returnInterface information */
    Class<T> serviceInterface(a);

    /** * Set the service interface information *@paramServiceInterface indicates the serviceInterface *@return this
     * @since0.0.6 * /
    ReferenceConfig<T> serviceInterface(final Class<T> serviceInterface);

    / * * * * (1) set up the service address information individual writing: IP: port: weight * (2) cluster writing: ip1: port1: weight1, ip2: port2: weight2 * * weight weight can not write, the default is 1. * *@paramAddresses Address list information *@return this
     * @since0.0.6 * /
    ReferenceConfig<T> addresses(final String addresses);

    /** * get the corresponding reference implementation *@returnReference proxy class *@since0.0.6 * /
    T reference(a);

}
Copy the code

implementation

package com.github.houbb.rpc.client.config.reference.impl;

import com.github.houbb.heaven.constant.PunctuationConst;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.NumUtil;
import com.github.houbb.rpc.client.config.reference.ReferenceConfig;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.core.context.impl.DefaultRpcClientContext;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService;
import com.github.houbb.rpc.client.proxy.ReferenceProxy;
import com.github.houbb.rpc.client.proxy.context.ProxyContext;
import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext;
import com.github.houbb.rpc.common.config.component.RpcAddress;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;

import java.util.List;

/** * references the configuration class default implementation **@author binbin.hou
 * @since 0.0.6
 * @param<T> Interface generics */
public class DefaultReferenceConfig<T> implements ReferenceConfig<T> {

    /** * Service unique identifier *@since0.0.6 * /
    private String serviceId;

    /** * service interface *@since0.0.6 * /
    private Class<T> serviceInterface;

    /** * Service address information * (1) If the value is not empty, it is directly obtained by the address. (2) If the value is empty, it is automatically discovered **TODO:It makes more sense to change this to set. * * if it is subscribe, it is automatically found and filled with this field information. *@since0.0.6 * /
    private List<RpcAddress> rpcAddresses;

    /** * Used to write information * (1) The client connects to the channel Future of the server * (2) Later performs operations such as load-balance routing. You can do it here. *@since0.0.6 * /
    private List<ChannelFuture> channelFutures;

    /** * The client processes the information *@since0.0.6 * /
    @Deprecated
    private RpcClientHandler channelHandler;

    /** * Invoke the service management class *@since0.0.6 * /
    private InvokeService invokeService;

    public DefaultReferenceConfig(a) {
        // Initialize information
        this.rpcAddresses = Guavas.newArrayList();
        this.channelFutures = Guavas.newArrayList();
        this.invokeService = new DefaultInvokeService();
    }

    @Override
    public String serviceId(a) {
        return serviceId;
    }

    @Override
    public DefaultReferenceConfig<T> serviceId(String serviceId) {
        this.serviceId = serviceId;
        return this;
    }

    @Override
    public Class<T> serviceInterface(a) {
        return serviceInterface;
    }

    @Override
    public DefaultReferenceConfig<T> serviceInterface(Class<T> serviceInterface) {
        this.serviceInterface = serviceInterface;
        return this;
    }

    @Override
    public ReferenceConfig<T> addresses(String addresses) {
        ArgUtil.notEmpty(addresses, "addresses");

        String[] addressArray = addresses.split(PunctuationConst.COMMA);
        ArgUtil.notEmpty(addressArray, "addresses");

        for(String address : addressArray) {
            String[] addressSplits = address.split(PunctuationConst.COLON);
            if(addressSplits.length < 2) {
                throw new IllegalArgumentException("Address must be has ip and port, like 127.0.0.1:9527");
            }
            String ip = addressSplits[0];
            int port = NumUtil.toIntegerThrows(addressSplits[1]);
            // Contains weight information
            int weight = 1;
            if(addressSplits.length >= 3) {
                weight = NumUtil.toInteger(addressSplits[2].1);
            }

            RpcAddress rpcAddress = new RpcAddress(ip, port, weight);
            this.rpcAddresses.add(rpcAddress);
        }

        return this;
    }

    (1) Handle all the reflection proxy information - methods can be separated and started independently. * (2) Start the corresponding long link *@returnReference proxy class *@since0.0.6 * /
    @Override
    public T reference(a) {
        // 1. Enable the connection information from the client to the server
        // 1.1 To improve performance, you can set all client=>server connections to one Thread.
        // 1.2 For initial simplicity, use synchronous loop directly.
        / / create a handler
        // Loop connection
        for(RpcAddress rpcAddress : rpcAddresses) {
            final ChannelHandler channelHandler = new RpcClientHandler(invokeService);
            final DefaultRpcClientContext context = new DefaultRpcClientContext();
            context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler);
            ChannelFuture channelFuture = new RpcClient(context).connect();
            // Loop synchronous wait
            // If an exception occurs, interrupt directly. Catch exception continue??
            channelFutures.add(channelFuture);
        }

        // 2. Interface dynamic proxy
        ProxyContext<T> proxyContext = buildReferenceProxyContext();
        return ReferenceProxy.newProxyInstance(proxyContext);
    }

    /** * Build the call context *@returnReference the proxy context *@since0.0.6 * /
    private ProxyContext<T> buildReferenceProxyContext(a) {
        DefaultProxyContext<T> proxyContext = new DefaultProxyContext<>();
        proxyContext.serviceId(this.serviceId);
        proxyContext.serviceInterface(this.serviceInterface);
        proxyContext.channelFutures(this.channelFutures);
        proxyContext.invokeService(this.invokeService);
        returnproxyContext; }}Copy the code

The corresponding proxy implementation is initialized based on the specified server information.

Here can also expand the specified weight, facilitate the later load balance expansion, this period will not be implemented.

ReferenceProxy

instructions

For all RPC calls, the client has only the server interface.

So how do you call a remote method as well as a local method?

The answer is dynamic proxy.

implementation

The implementation is as follows:

package com.github.houbb.rpc.client.proxy;

import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.proxy.context.ProxyContext;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcRequest;
import com.github.houbb.rpc.common.support.id.impl.Uuid;
import com.github.houbb.rpc.common.support.time.impl.DefaultSystemTime;
import io.netty.channel.Channel;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/ * * * reference: https://blog.csdn.net/u012240455/article/details/79210250 * * (1) the method does not need to have the implementation class. * (2) The relevant information can be processed directly according to reflection. * (3) RPC is an implementation that forces programming against an interface. *@author binbin.hou
 * @since0.0.6 * /
public class ReferenceProxy<T> implements InvocationHandler {

    private static final Log LOG = LogFactory.getLog(ReferenceProxy.class);

    /** * Service id *@since0.0.6 * /
    private final ProxyContext<T> proxyContext;

    /** * Temporarily privatize the constructor *@paramProxyContext proxyContext *@since0.0.6 * /
    private ReferenceProxy(ProxyContext<T> proxyContext) {
        this.proxyContext = proxyContext;
    }

    /** * reflection calls *@paramThe proxy agent *@param* method method@param* args parameter@returnResults *@throwsThrowable exception *@since 0.0.6
     * @seeMethod#getGenericSignature() generic identifier that you can use to optimize your code. * /
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // Reflection information processing becomes rpcRequest
        final String seqId = Uuid.getInstance().id();
        final long createTime = DefaultSystemTime.getInstance().time();
        DefaultRpcRequest rpcRequest = new DefaultRpcRequest();
        rpcRequest.serviceId(proxyContext.serviceId());
        rpcRequest.seqId(seqId);
        rpcRequest.createTime(createTime);
        rpcRequest.paramValues(args);
        rpcRequest.paramTypeNames(ReflectMethodUtil.getParamTypeNames(method));
        rpcRequest.methodName(method.getName());

        // Call remote
        LOG.info("[Client] start call remote with request: {}", rpcRequest);
        proxyContext.invokeService().addRequest(seqId);

        // Use load-balance to select channel write.
        final Channel channel = getChannel();
        LOG.info("[Client] start call channel id: {}", channel.id().asLongText());

        // There are actually strict requirements for writing information.
        // writeAndFlush is actually an asynchronous operation. Use sync() directly to see the exception message.
        // Support must be ByteBuf
        channel.writeAndFlush(rpcRequest).sync();

        // loop the result
        // Use Loop+match wait/notifyAll
        // Distributed according to redis+queue+loop
        LOG.info("[Client] start get resp for seqId: {}", seqId);
        RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
        LOG.info("[Client] start get resp for seqId: {}", seqId);
        Throwable error = rpcResponse.error();
        if(ObjectUtil.isNotNull(error)) {
            throw error;
        }
        return rpcResponse.result();
    }

    /** * get the corresponding channel * (1) temporarily use the first * (2) *@returnCorresponding channel information. *@since0.0.6 * /
    private Channel getChannel(a) {
        return proxyContext.channelFutures().get(0).channel();
    }

    /** * get the proxy instance * (1) interface just for the proxy. * (2) The actual call is more concerned with serviceId *@paramProxyContext proxyContext *@param< T > generic *@returnProxy instance@since0.0.6 * /
    @SuppressWarnings("unchecked")
    public static <T> T newProxyInstance(ProxyContext<T> proxyContext) {
        finalClass<T> interfaceClass = proxyContext.serviceInterface(); ClassLoader classLoader = interfaceClass.getClassLoader(); Class<? >[] interfaces =new Class[]{interfaceClass};
        ReferenceProxy proxy = new ReferenceProxy(proxyContext);
        return(T) Proxy.newProxyInstance(classLoader, interfaces, proxy); }}Copy the code

The client initializes newProxyInstance by creating the proxy.

The client invokes a remote method, which is actually the process of invoking an invoke.

(1) Construct reflection invoke request information and add reqId

(2) Netty remotely invokes the server

(3) Synchronously obtain response information

test

The introduction of maven

<dependency>
    <groupId>com.github.houbb</groupId>
    <artifactId>rpc-client</artifactId>
    <version>0.0.6</version>
</dependency>
Copy the code

The test code

public static void main(String[] args) {
    // Service configuration information
    ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>();
    config.serviceId(ServiceIdConst.CALC);
    config.serviceInterface(CalculatorService.class);
    config.addresses("localhost:9527");

    CalculatorService calculatorService = config.reference();
    CalculateRequest request = new CalculateRequest();
    request.setOne(10);
    request.setTwo(20);

    CalculateResponse response = calculatorService.sum(request);
    System.out.println(response);
}
Copy the code

Test log:

[the DEBUG] [14:16:17 2021-10-05. 534] [the main] [C.G.H.L.I.C.L ogFactory. SetImplementation] - Logging the initialized using 'class Com. Making. Houbb. Log. Integration. Adaptors. Stdout. StdOutExImpl 'adapter. [INFO] [14:16:17 2021-10-05. 625] [the main] [C.G.H.R.C.C.R pcClient. Connect] xml-rpc service to start the client... [the INFO] [14:16:19 2021-10-05. 328] [the main] [C.G.H.R.C.C.R pcClient. Connect] xml-rpc service launch the client is complete, Listening address localhost: 9527 [INFO] [14:16:19 2021-10-05. 346] [the main] [C.G.H.R.C.P.R eferenceProxy. Invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, Two = 20}}] [INFO] [14:16:19 2021-10-05. 347] [the main] [C.G.H.R.C.I.I.D efaultInvokeService. AddRequest] - [Client] start add request for seqId: A525c5a6196545f5a5241b2cdc2ec2c2 [INFO] [14:16:19 2021-10-05. 348] [the main] [C.G.H.R.C.P.R eferenceProxy. Invoke] - [Client] start call channel id: 00 e04cfffe360988 b9d7e1b88839d - 000017 BC - 00000000-399-5 ccc4a29 October 5, 2021 io.net ty 2:16:19 afternoon. Handler. Logging. LoggingHandler write information: [id: 0 x5ccc4a29, L: / 127.0.0.1:50596 R: localhost / 127.0.0.1:9527] the WRITE: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], ParamValues =[CalculateRequest{one=10, two=20}]} October 05, 2021 io.net ty 2:16:19 afternoon. Handler. Logging. LoggingHandler flush information: [id: 0x5ccc4a29, L: / 127.0.0.1:50596 R: FLUSH localhost / 127.0.0.1:9527] [INFO] [14:16:19 2021-10-05. 412] [the main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: A525c5a6196545f5a5241b2cdc2ec2c2 [INFO] [14:16:19 2021-10-05. 413] [the main] [C.G.H.R.C.I.I.D efaultInvokeService. The method getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 corresponding results is empty, Waiting for October 5, 2021 io.net ty 2:16:19 afternoon. Handler. Logging. LoggingHandler channelRead information: [id: 0 x5ccc4a29, L: / 127.0.0.1:50596 R: localhost / 127.0.0.1:9527] READ: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}} ... [the INFO] [14:16:19 2021-10-05. 505] [nioEventLoopGroup - 2-1] [C.G.H.R.C.I.I.D efaultInvokeService. AddResponse] - [Client] Get the information, seq: a525c5a6196545f5a5241b2cdc2ec2c2 rpcResponse: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, Sum = 30}} [INFO] [14:16:19 2021-10-05. 505] [nioEventLoopGroup - 2-1] [C.G.H.R.C.I.I.D efaultInvokeService. AddResponse] - [Client] SeQ information has been added. Inform all waiting [INFO] [14:16:19 2021-10-05. 506] [nioEventLoopGroup - 2-1] [C.G.H.R.C.C.R pcClient. ChannelRead0] - [Client] response is :DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, Sum = 30}} [INFO] [14:16:19 2021-10-05. 506] [the main] [C.G.H.R.C.I.I.D efaultInvokeService. The method getResponse] - [Client] seq A525c5a6196545f5a5241b2cdc2ec2c2 corresponding results have been obtain: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, Sum = 30}} [INFO] [14:16:19 2021-10-05. 507] [the main] [C.G.H.R.C.P.R eferenceProxy. Invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2 CalculateResponse{success=true, sum=30}Copy the code

summary

Now there seems to be a small problem, the server must specify port, which is a little unreasonable, such as proxy domain name, need to be optimized later.

The startup declaration here is also fairly basic, so consider integrating with Spring in the future.

In order to facilitate learning, the above source code has been open source:

github.com/houbb/rpc

I hope this article is helpful to you. If you like it, please click to collect and forward a wave.

I am old ma, looking forward to meeting with you next time.