“This is the 17th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”

Before reading this article, it is recommended to read the content related to the article first.

[1] Detailed analysis of the underlying implementation principle of network communication under distributed microservice architecture (Diagram)

[2] After working for 5 years, do you really understand Netty and why you use it? (Deep dry goods)

[3] An in-depth analysis of Netty’s core components

[4] Netty ByteBuf is a Netty ByteBuf

[5] How to solve the problem of unpacking and sticky packet in Netty through a large number of actual cases?

[6] Implementation of custom message communication protocol based on Netty (Protocol Design and analysis application Practice)

[7] The most detailed and complete network serialization technology and in-depth analysis and application practice

In the previous content, we have understood the basic knowledge and implementation principles of Netty from simple to deep. I believe that you have a comprehensive understanding of Netty. So next, we will take you to understand Netty’s practical application through a practical case of handwritten RPC communication.

Why did YOU choose RPC for the actual practice? Because Netty itself is to solve communication problems, and in practice, RPC protocol framework is the one we have contacted most, so this practice can let you understand the Netty practical application, but also understand the underlying principle of RPC.

What is the RPC

Remote Procedure Call (RPC) is a protocol that requests services from Remote computer programs over the network without understanding the underlying network technology. Simply understood, it enables developers to Call Remote services just like calling local services.

If it is a protocol, it must have protocol specifications, as shown in Figure 6-1.

In order to enable developers to invoke remote services like local services, the RPC protocol needs to implement remote interaction as shown in Figure 6-1.

  • When a client invokes a remote service, the details of network communication must be shielded through the local dynamic proxy module, so the dynamic proxy module needs to be responsible for assembling data such as request parameters and methods into packets and sending them to the target server
  • When sending this packet, it also needs to follow the agreed message protocol and serialization protocol, and finally converted into binary data stream transmission
  • After receiving the packet, the server decodes the packet according to the agreed message protocol and obtains the request information.
  • The server then invokes the target service according to the request information, and gets the result and returns it to the client.

The mainstream RPC framework in the industry

Any framework that meets RPC protocol is called RPC framework. In practical development, we can use open source and relatively mature RPC framework to solve the remote communication problems under microservice architecture. Common RPC frameworks are as follows:

  1. Thrift: Thrift is a software framework for the development of extensible, cross-language services. It combines a powerful software stack with a code generation engine, To build on C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, node.js, Smalltalk, And OCaml seamlessly integrated, efficient services between these programming languages.
  2. Dubbo: Dubbo is a distributed services framework and SOA governance solution. Its functions mainly include: high-performance NIO communication and multi-protocol integration, dynamic service addressing and routing, soft load balancing and fault tolerance, dependency analysis and degradation, etc. Dubbo is the core framework of Alibaba’s internal SOA servitization governance solution. Since its open source in 2011, Dubbo has been used by many non-Alibaba companies.

Notes on handwritten RPC

Based on the above understanding of RPC protocol, what technologies need to be considered if we implement it ourselves? In fact, the whole process based on Figure 6-1 should have a general understanding.

  • Communication protocol: RPC framework has very high requirements on performance, so the communication protocol should be as simple as possible, which can reduce performance loss caused by codec. Most mainstream RPC frameworks directly choose TCP and HTTP protocols.
  • Serialization and deserialization, data to be transmitted over the network, need to serialize and deserialize the data, as we said before, the so-called serialization and deserialization is the process of not converting objects into binary streams and converting binary streams into objects. In the selection of serialization framework, we generally choose efficient and common algorithms, such as FastJson, Protobuf, Hessian, etc. These serialization techniques are more efficient and have higher compression ratios than native serialization operations.
  • Dynamic proxy: A dynamic proxy is used to mask network communication details when a client invokes a remote service. Dynamic proxy is generated in the running process, so the generation speed and bytecode size of dynamic proxy class will affect the performance and resource consumption of the whole RPC framework. Common dynamic proxy technologies: Javassist, Cglib, JDK dynamic proxy, etc.

Implement RPC based on Netty handwriting

After understanding the RPC protocol, we implement an RPC communication framework based on Netty.

See attached netty-rpc-example for the code

Jar packages to import:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.72</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>
Copy the code

Module dependencies:

  • Provider depends on netty-rpc-protocol and Netty-rpc-API

  • Cosumer relies on netty-rpc-protocol and netty-rpc-API

Netty xml-rpc API module

IUserService

public interface IUserService {

    String saveUser(String name);
}
Copy the code

Netty xml-rpc – provider module

UserServiceImpl

@Service
@Slf4j
public class UserServiceImpl implements IUserService {
    @Override
    public String saveUser(String name) {
        log.info("begin saveUser:"+name);
        return "Save User Success!"; }}Copy the code

NettyRpcProviderMain

Note that in the current step, the part that describes the case is not added for the moment, and will be added later

@ComponentScan(basePackages = {"com.example.spring","com.example.service"})  //case1(add later)
@SpringBootApplication
public class NettyRpcProviderMain {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(NettyRpcProviderMain.class, args);
        new NettyServer("127.0.0.1".8080).startNettyServer();   //case2(add later)}}Copy the code

netty-rpc-protocol

Start to write communication protocol module, this module mainly do several things

  • Defining message protocols
  • Define serialization deserialization methods
  • Set up netTY communication

Defining message protocols

Before we talked about the custom message protocol, we can define it in the following protocol format.

/ * + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | magic number 2 byte | serialization algorithm 1 byte 1 byte | | request type + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | message ID 8 byte data length 4 byte | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + * /Copy the code

Header

@AllArgsConstructor
@Data
public class Header implements Serializable {
    / * + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | magic number 2 byte | serialization algorithm 1 byte 1 byte | | request type + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | message ID 8 byte data length 4 byte | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + * /
    private short magic; // Magic number - used to verify the identity of the packet (2 bytes)
    private byte serialType; // Serialization type (1 byte)
    private byte reqType; // Operation type (1 byte)
    private long requestId; // Request ID (8 bytes)
    private int length; // Data length (4 bytes)

}
Copy the code

RpcRequest

@Data
public class RpcRequest implements Serializable {
    private String className;
    private String methodName;
    private Object[] params;
    privateClass<? >[] parameterTypes; }Copy the code

RpcResponse

@Data
public class RpcResponse implements Serializable {

    private Object data;
    private String msg;
}
Copy the code

RpcProtocol

@Data
public class RpcProtocol<T> implements Serializable {
    private Header header;
    private T content;
}
Copy the code

Define correlation constants

Several enumeration-related classes are involved in the above message protocol definition, as defined below

ReqType

Message type

public enum ReqType {

    REQUEST((byte)1),
    RESPONSE((byte)2),
    HEARTBEAT((byte)3);

    private byte code;

    private ReqType(byte code) {
        this.code=code;
    }

    public byte code(a){
        return this.code;
    }
    public static ReqType findByCode(int code) {
        for (ReqType msgType : ReqType.values()) {
            if (msgType.code() == code) {
                returnmsgType; }}return null; }}Copy the code

SerialType

Serialization type

public enum SerialType {

    JSON_SERIAL((byte)0),
    JAVA_SERIAL((byte)1);

    private byte code;

    SerialType(byte code) {
        this.code=code;
    }

    public byte code(a){
        return this.code; }}Copy the code

RpcConstant

public class RpcConstant {
    // Total number of bytes in the header section
    public final static int HEAD_TOTAL_LEN=16;
    / / the magic number
    public final static short MAGIC=0xca;
}
Copy the code

Define serialization related implementations

Here we demonstrate two ways, one is JSON mode, the other is Java native mode

ISerializer

public interface ISerializer {

    <T> byte[] serialize(T obj);

    <T> T deserialize(byte[] data,Class<T> clazz);

    byte getType(a);
}
Copy the code

JavaSerializer

public class JavaSerializer implements ISerializer{

    @Override
    public <T> byte[] serialize(T obj) {
        ByteArrayOutputStream byteArrayOutputStream=
                new ByteArrayOutputStream();
        try {
            ObjectOutputStream outputStream=
                    new ObjectOutputStream(byteArrayOutputStream);

            outputStream.writeObject(obj);

            return  byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public <T> T deserialize(byte[] data, Class<T> clazz) {
        ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(data);
        try {
            ObjectInputStream objectInputStream=
                    new ObjectInputStream(byteArrayInputStream);

            return (T) objectInputStream.readObject();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public byte getType(a) {
        returnSerialType.JAVA_SERIAL.code(); }}Copy the code

JsonSerializer

public class JsonSerializer implements ISerializer{
    @Override
    public <T> byte[] serialize(T obj) {
        return JSON.toJSONString(obj).getBytes();
    }

    @Override
    public <T> T deserialize(byte[] data, Class<T> clazz) {
        return JSON.parseObject(new String(data),clazz);
    }

    @Override
    public byte getType(a) {
        returnSerialType.JSON_SERIAL.code(); }}Copy the code

SerializerManager

Implements management of the serialization mechanism

public class SerializerManager {

    private final static ConcurrentHashMap<Byte, ISerializer> serializers=new ConcurrentHashMap<Byte, ISerializer>();

    static {
        ISerializer jsonSerializer=new JsonSerializer();
        ISerializer javaSerializer=new JavaSerializer();
        serializers.put(jsonSerializer.getType(),jsonSerializer);
        serializers.put(javaSerializer.getType(),javaSerializer);
    }

    public static ISerializer getSerializer(byte key){
        ISerializer serializer=serializers.get(key);
        if(serializer==null) {return new JavaSerializer();
        }
        returnserializer; }}Copy the code

Define encoding and decoding implementations

Since the message protocol is customized, you need to implement your own encoding and decoding, the code is as follows

RpcDecoder

@Slf4j
public class RpcDecoder extends ByteToMessageDecoder {


    / * + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | magic number 2 byte | serialization algorithm 1 byte 1 byte | | request type + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | message ID 8 byte data length 4 byte | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + * /
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("==========begin RpcDecoder ==============");
        if(in.readableBytes()< RpcConstant.HEAD_TOTAL_LEN){
            // The message is not long enough to parse
            return;
        }
        in.markReaderIndex();// marks an index to read data for subsequent resetting.
        short magic=in.readShort(); / / read the magic
        if(magic! =RpcConstant.MAGIC){throw new IllegalArgumentException("Illegal request parameter 'magic',"+magic);
        }
        byte serialType=in.readByte(); // Read the serialization algorithm type
        byte reqType=in.readByte(); // Request type
        long requestId=in.readLong(); // Request message ID
        int dataLength=in.readInt(); // Request data length
        // The number of bytes in the readable area is smaller than the actual data length
        if(in.readableBytes()<dataLength){
            in.resetReaderIndex();
            return;
        }
        // Read the message content
        byte[] content=new byte[dataLength];
        in.readBytes(content);

        // Build the header information
        Header header=new Header(magic,serialType,reqType,requestId,dataLength);
        ISerializer serializer=SerializerManager.getSerializer(serialType);
        ReqType rt=ReqType.findByCode(reqType);
        switch(rt){
            case REQUEST:
                RpcRequest request=serializer.deserialize(content, RpcRequest.class);
                RpcProtocol<RpcRequest> reqProtocol=new RpcProtocol<>();
                reqProtocol.setHeader(header);
                reqProtocol.setContent(request);
                out.add(reqProtocol);
                break;
            case RESPONSE:
                RpcResponse response=serializer.deserialize(content,RpcResponse.class);
                RpcProtocol<RpcResponse> resProtocol=new RpcProtocol<>();
                resProtocol.setHeader(header);
                resProtocol.setContent(response);
                out.add(resProtocol);
                break;
            case HEARTBEAT:
                break;
            default:
                break; }}}Copy the code

RpcEncoder

@Slf4j
public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {

    / * + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | magic number 2 byte | serialization algorithm 1 byte 1 byte | | request type + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | message ID 8 byte data length 4 byte | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + * /
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcProtocol<Object> msg, ByteBuf out) throws Exception {
        log.info("=============begin RpcEncoder============");
        Header header=msg.getHeader();
        out.writeShort(header.getMagic()); // Write the magic number
        out.writeByte(header.getSerialType()); // Write the serialization type
        out.writeByte(header.getReqType());// Write request type
        out.writeLong(header.getRequestId()); // Write request ID
        ISerializer serializer= SerializerManager.getSerializer(header.getSerialType());
        byte[] data=serializer.serialize(msg.getContent()); / / the serialization
        header.setLength(data.length);
        out.writeInt(data.length); // Write message lengthout.writeBytes(data); }}Copy the code

NettyServer

Implement NettyServer build.

@Slf4j
public class NettyServer{
    private String serverAddress; / / address
    private int serverPort; / / port

    public NettyServer(String serverAddress, int serverPort) {
        this.serverAddress = serverAddress;
        this.serverPort = serverPort;
    }

    public void startNettyServer(a) throws Exception {
        log.info("begin start Netty Server");
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new RpcServerInitializer());
            ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
            log.info("Server started Success on Port:{}".this.serverPort);
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            log.error("Rpc Server Exception",e);
        }finally{ workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}}Copy the code

RpcServerInitializer

public class RpcServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline()
            .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12.4.0.0))
            .addLast(new RpcDecoder())
            .addLast(new RpcEncoder())
            .addLast(newRpcServerHandler()); }}Copy the code

RpcServerHandler

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
        RpcProtocol resProtocol=new RpcProtocol<>();
        Header header=msg.getHeader();
        header.setReqType(ReqType.RESPONSE.code());
        Object result=invoke(msg.getContent());
        resProtocol.setHeader(header);
        RpcResponse response=new RpcResponse();
        response.setData(result);
        response.setMsg("success");
        resProtocol.setContent(response);

        ctx.writeAndFlush(resProtocol);
    }

    private Object invoke(RpcRequest request){
        try{ Class<? > clazz=Class.forName(request.getClassName()); Object bean= SpringBeansManager.getBean(clazz);// Get the instance object (CASE)
            Method declaredMethod=clazz.getDeclaredMethod(request.getMethodName(),request.getParameterTypes());
            return declaredMethod.invoke(bean,request.getParams());
        } catch (ClassNotFoundException | NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause); }}Copy the code

SpringBeansManager

@Component
public class SpringBeansManager implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringBeansManager.applicationContext=applicationContext;
    }

    public static <T> T getBean(Class<T> clazz){
        returnapplicationContext.getBean(clazz); }}Copy the code

Note that after the class is built, add compone-scan to the main method of the Netty-Rpc-Provider module

@ComponentScan(basePackages = {"com.example.spring","com.example.service"})  // Modify this
@SpringBootApplication
public class NettyRpcProviderMain {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(NettyRpcProviderMain.class, args);
        new NettyServer("127.0.0.1".8080).startNettyServer();  // Modify this}}Copy the code

netty-rpc-consumer

Next, implement the consumer side

RpcClientProxy

public class RpcClientProxy {
    
    public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){
        return (T) Proxy.newProxyInstance
                (interfaceCls.getClassLoader(),
                        newClass<? >[]{interfaceCls},newRpcInvokerProxy(host,port)); }}Copy the code

RpcInvokerProxy

@Slf4j
public class RpcInvokerProxy implements InvocationHandler {

    private String serviceAddress;
    private int servicePort;

    public RpcInvokerProxy(String serviceAddress, int servicePort) {
        this.serviceAddress = serviceAddress;
        this.servicePort = servicePort;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        log.info("begin invoke target server");
        // Assemble parameters
        RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
        long requestId= RequestHolder.REQUEST_ID.incrementAndGet();
        Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);
        protocol.setHeader(header);
        RpcRequest request=new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParams(args);
        protocol.setContent(request);
        // Send the request
        NettyClient nettyClient=new NettyClient(serviceAddress,servicePort);
        // Build asynchronous data processing
        RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));
        RequestHolder.REQUEST_MAP.put(requestId,future);
        nettyClient.sendRequest(protocol);
        returnfuture.getPromise().get().getData(); }}Copy the code

Define the client connection

Create NettyClient in the protocol package path of the netty-rpc-protocol module

@Slf4j
public class NettyClient {
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
    private String serviceAddress;
    private int servicePort;
    public NettyClient(String serviceAddress,int servicePort){
        log.info("begin init NettyClient");
        bootstrap=new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new RpcClientInitializer());
        this.serviceAddress=serviceAddress;
        this.servicePort=servicePort;
    }

    public void sendRequest(RpcProtocol<RpcRequest> protocol) throws InterruptedException {
        ChannelFuture future=bootstrap.connect(this.serviceAddress,this.servicePort).sync();
        future.addListener(listener->{
            if(future.isSuccess()){
                log.info("connect rpc server {} success.".this.serviceAddress);
            }else{
                log.error("connect rpc server {} failed .".this.serviceAddress); future.cause().printStackTrace(); eventLoopGroup.shutdownGracefully(); }}); log.info("begin transfer data"); future.channel().writeAndFlush(protocol); }}Copy the code

RpcClientInitializer

@Slf4j
public class RpcClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        log.info("begin initChannel");
        ch.pipeline()
                .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12.4.0.0))
                .addLast(new LoggingHandler())
                .addLast(new RpcEncoder())
                .addLast(new RpcDecoder())
                .addLast(newRpcClientHandler()); }}Copy the code

RpcClientHandler

Note that Netty’s communication process is based on inbound and outbound separation, so we need a Future object to retrieve the results.

@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {
        log.info("receive rpc server result");
        long requestId=msg.getHeader().getRequestId();
        RpcFuture<RpcResponse> future=RequestHolder.REQUEST_MAP.remove(requestId);
        future.getPromise().setSuccess(msg.getContent()); // Return the result}}Copy the code

The realization of the Future

Add the rpcFuture implementation to the Netty-Rpc-Protocol module

RpcFuture

@Data
public class RpcFuture<T> {
    //Promise is a writable Future. The Future itself does not have an interface for writing operations.
    // Netty extends Future with Promise to set the outcome of IO operations
    private Promise<T> promise;

    public RpcFuture(Promise<T> promise) {
        this.promise = promise; }}Copy the code

RequestHolder

Save the corresponding results of requestid and future

public class RequestHolder {

    public static final AtomicLong REQUEST_ID=new AtomicLong();

    public static final Map<Long,RpcFuture> REQUEST_MAP=new ConcurrentHashMap<>();
}
Copy the code

Students who need source code, please pay attention to the public number [follow Mic learning structure], reply keyword [RPC], can be obtained

Copyright Notice: All articles on this blog are subject to a CC BY-NC-SA 4.0 license unless otherwise stated. Reprint please specify from Mic to take you to learn structure! If this article is helpful to you, please also help to point attention and like, your persistence is the power of my continuous creation. Welcome to pay attention to the same wechat public account for more technical dry goods!