Implement simplified RPC based on Netty

Some theories

  • When your project gets too big and your business is growing, and you need to split the services, RPC can be used to service calls between services. Calls between internal services in the system use RPC.

  • The ARCHITECTURE of RPC consists of three parts:

  1. Register: Publishes local services as remote services, manages remote services, and makes them available to service consumers.
  2. Server service provider: Provides definition and implementation classes for service interfaces.
  3. Client Service consumer: Invokes remote services through remote proxy objects.
  • RPC encapsulates these steps so that a client can invoke a remote service as if it were a local service.
  1. To receive calls
  2. Encapsulate method parameters into a message body that can be transmitted over the network and send it to the server after serialization
  3. Deserialize the results of server processing and return them to the client.

practice

Server code

  1. In the Handler part, the server parses the RPC request body, invokes the corresponding method, and returns the corresponding RPC message body.
  2. The parsing part of the previous step gets the class and method names through Spring’s reflection.
RPC request message body and RPC response message body
  1. Fully qualified name of the interface called
  2. Call the method name in the interface
  3. Method return type
  4. Method parameter type array
  5. Method parameter value array
/** * RPC request body */ @data public class RpcRequestMessage extends Message{private String className; // call the interface methodName private String methodName; // Method return type private Class<? > returnType; Private Class[] parameterTypes; Private Object[] parameters; public RpcRequestMessage(int sequenceId, String className, String methodName, Class<? > returnType, Class[] parameterTypes, Object[] parameters) { super(sequenceId); this.className = className; this.methodName = methodName; this.returnType = returnType; this.parameterTypes = parameterTypes; this.parameters = parameters; } @Override public int getMessageType() { return MessageConstant.RPC_REQUEST_MESSAGE; }}Copy the code
A Handler that handles Rpc requests
/** * A Handler that handles Rpc requests, creates objects, calls methods, and returns data via a reflective mechanism. */ public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception { RpcResponseMessage rpcResponseMessage = new RpcResponseMessage(); Try {// class.forname gets the. Class attribute of the Class object itself. Class<? > aClass = Class.forName(msg.getClassName()); Method = aclass.getMethod (msg.getMethodName(), msg.getParameterTypes()); Object invoke = method.invoke(aclass.newinstance (), msg.getParameters()); rpcResponseMessage.setReturnValue(invoke); } catch (Exception e){ e.printStackTrace(); rpcResponseMessage.setException(new Exception(e.getMessage())); } ctx.writeAndFlush(rpcResponseMessage); }}Copy the code

Client code

  1. WriteAndFlush method debugging (this method can be used for hard-to-find problems that do not report errors) : WriteAndFlush returns the ChannelFuture object, which can be synchronized or addListener asynchronously. Asynchronous and PROMISE allow communication between two threads.
channelFuture.addListener(promise -> {
    if (!promise.isSuccess()){
        System.out.println(promise.cause());
    }
});
Copy the code
  1. The client should generate a singleton channel object that can be called together by other methods.
  2. The method for closing a channel should be set to asynchronous rather than synchronous waiting. Otherwise, the initialization of a channel will be blocked and the channel object cannot be retrieved.
  3. In singleton mode, double-check lock +volatite is used.
  4. The proxy pattern is used to encapsulate the parameters of the request and send the data out, making it possible to invoke remote methods as if they were local methods.
  5. Retrieving returned data by calling methods in proxy mode is done in the main thread, but processing of the data is done in the NIO thread, that is, in the RpcResponseHandler, and the threads need to communicate with each other using Promises. A Promise is a container that can exchange results across multiple threads.
  6. A method invocation corresponds to a Promise, which is stored in the Map with the serial number as the Key of the method request. Drop the Promise when the message is received.
  7. A global ID variable is used as the sequence number of the message.
The singleton gets a channel
Public class RpcClient {public static volatile Channel Channel = null; private static final Object LOCK = new Object(); public static Channel getChannel(){ if(channel==null){ synchronized (LOCK){ if(channel==null){ initChannel(); } } } return channel; } /** * Initialize Channel */ public static void initChannel(){EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ClientChannelInitializer()); ChannelFuture future = the bootstrap. Connect (" 127.0.0.1 ", 8282). The sync (); channel = future.channel(); // Close the channel asynchronously to prevent thread blocking. channel.closeFuture().addListener( e -> { group.shutdownGracefully(); }); } catch (Exception e){ e.printStackTrace(); }}}Copy the code
The proxy pattern encapsulates the request
*/ Private final static Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>(); public static Promise<Object> getPromise(int sequenceId){ return PROMISES.get(sequenceId); }Copy the code
public static <T> T getProxyService(Class<T> service){ ClassLoader classLoader = service.getClassLoader(); int sequenceId = SequenceIdGenerator.getSequenceId(); Object o = proxy.newproxyInstance (classLoader, service.getinterfaces (), (Proxy, method, Args) -> {// proxy proxy object method Method that the proxy object executes args method parameter list // 1. RpcRequestMessage = new RpcRequestMessage(sequenceId, service.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); GetChannel ().writeAndFlush(rpcRequestMessage).addListener(Future -> {if(! future.isSuccess()){ System.out.println(future.cause()); }}); // 3. Prepare a Promise object to accept the result and place it in the map DefaultPromise(getChannel().eventLoop()); MessageConstant.putPromise(sequenceId,promise); Promise.await (); If (promise.issuccess ()){return promise.getNow(); }else{ return promise.cause(); }}); return (T) o; }Copy the code

Problems encountered

  1. Stack overflow