Micro service is already a technology that every Internet developer must master. RPC framework is one of the most important components of microservices. While you have time. I looked at dubbo’s source code again. Dubbo uses a lot of design patterns and SPI mechanisms to be flexible and decoupled, and it’s not easy to read dubbo’s code.

Following the steps of the Barehanded Framework series, I will implement an RPC framework in a minimalist way. To help you understand how the RPC framework works.

Broadly speaking, a complete RPC contains many components, including service discovery, service governance, remote invocation, call chain analysis, gateway, and so on. I will slowly implement these functions, this article mainly explains the cornerstone of RPC, the implementation of remote calls.

After reading this article, you will be able to implement a framework that provides RPC calls on your own.

  1. Call process of RPC through a figure we understand the call process of RPC, from the macro up to see what process an RPC call through.

When a call begins:

The client is going to call the local dynamic proxy and the proxy is going to serialize the call through the protocol through the byte stream through the Netty network framework and send the byte stream to the server and the server is going to deserialize the original call based on the protocol, If the request has a return value, the result needs to be serialized according to the protocol, and then returned to the caller through Netty 2. Framework overview and technology selection Take a look at the components of the framework:

Clinet is the caller. Servive is the service provider. The protocol package defines the communication protocol. Common contains common logical components.

The technology selection project uses Maven as the package management tool, JSON as the serialization protocol, Spring Boot to manage the life cycle of objects, and Netty as the networking component of NIO. So to read this article, you need to have a basic understanding of Spring Boot and Netty.

Here’s a look at the implementation of each component:

  1. Protocol is actually the RPC protocol, there is only one problem to consider – how to convert a method call into a byte stream that can be transmitted by the network.

First we need to define the method call and return two entities:

Request:

@data public class RpcRequest {private String requestId; // className private String className; // methodName private String methodName; Private Class<? >[] parameterTypes; Private Object[] parameters; } the result:

@data public class RpcResponse {// Call number private String requestId; Private Throwable Throwable; Private Object result; } determine the object to be serialized, it is necessary to determine the protocol of serialization, to achieve two methods, serialization and deserialization of two methods.

public interface Serialization { byte[] serialize(T obj); T deSerialize(byte[] data,Class clz); } There are many alternative serialization protocols such as:

The JDK’s serialization method. (Not recommended, which is not conducive to future cross-language calls.) JSON is highly readable, but is slow and bulky in serialization. Protobuf, Kyro, Hessian, etc. are excellent serialization frameworks and can be selected on demand. For simplicity and ease of debugging, we chose JSON as the serialization protocol and Jackson as the JSON parsing framework.

/ * *

  • @author Zhengxin */ public class JsonSerialization implements Serialization { private ObjectMapper objectMapper; public JsonSerialization(){ this.objectMapper = new ObjectMapper(); } @Override public byte[] serialize(T obj) { try { return objectMapper.writeValueAsBytes(obj); } catch (JsonProcessingException e) { e.printStackTrace(); } return null; } @Override public T deSerialize(byte[] data, Class clz) { try { return objectMapper.readValue(data,clz); } catch (IOException e) { e.printStackTrace(); } return null; }} Because Netty supports custom coders. So only need to implement ByteToMessageDecoder and MessageToByteEncoder two interfaces. This solves the serialization problem:

public class RpcDecoder extends ByteToMessageDecoder { private Class<? > clz; private Serialization serialization; public RpcDecoder(Class<? > clz,Serialization serialization){ this.clz = clz; this.serialization = serialization; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { if(in.readableBytes() < 4){ return; } in.markReaderIndex(); int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = serialization.deSerialize(data, clz); out.add(obj); } } public class RpcEncoder extends MessageToByteEncoder { private Class<? > clz; private Serialization serialization; public RpcEncoder(Class<? > clz, Serialization serialization){ this.clz = clz; this.serialization = serialization; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if(clz ! = null){ byte[] bytes = serialization.serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }}} At this point, protocol is implemented, and we can convert the method call and result return into a string of byte[] arrays that can be transmitted across the network.

  1. Server THE server is the component responsible for handling client requests. In the context of high concurrency on the Internet, it is relatively easy to deal with high concurrency using Nio’s non-blocking approach. Netty is an excellent Nio processing framework. The key code for the Server is as follows:

Netty is based on the Recotr model. Therefore, two groups of thread bosses and workers need to be initialized. The boss is responsible for distributing the request, and the worker is responsible for executing the corresponding handler:

@Bean public ServerBootstrap serverBootstrap() throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup(), workerGroup()) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(serverInitializer); Map<ChannelOption<? >, Object> tcpChannelOptions = tcpChannelOptions(); Set<ChannelOption<? >> keySet = tcpChannelOptions.keySet(); for (@SuppressWarnings(“rawtypes”) ChannelOption option : keySet) { serverBootstrap.option(option, tcpChannelOptions.get(option)); } return serverBootstrap; } Netty operations are pipeline-based. Therefore, we need to register several coders implemented in protocol into netty pipeline.

ChannelPipeline pipeline = ch.pipeline(); / / processing TCP request stick pack coder, concrete action to Google pipeline. AddLast (new LengthFieldBasedFrameDecoder,0,4 (65535)); AddLast (new RpcEncoder(rpcresponse.class,new JsonSerialization()))); pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization())); Pipeline.addlast (serverHandler); Implement the concrete ServerHandler to handle the real call. ServerHandler SimpleChannelInboundHandler inheritance. Simply put, this InboundHandler is called when data is received or when the state of the Channel changes. The channelRead0() method is used when this handler reads the data, so we’ll just override it.

@Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception { RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setRequestId(msg.getRequestId()); Object handler = handler(MSG); rpcResponse.setResult(handler); }catch (Throwable Throwable){// If an exception is thrown, the exception is also put in response rpCresponse.setThrowable (Throwable); throwable.printStackTrace(); } // Write to the netty context after the operation. Netty handles the return value itself. ctx.writeAndFlush(rpcResponse); } Handler (MSG) actually uses cglib’s Fastclass implementation, but the basic principle is still reflection. Learning reflection well in Java can really do anything you want.

private Object handler(RpcRequest request) throws Throwable { Class<? > clz = Class.forName(request.getClassName()); Object serviceBean = applicationContext.getBean(clz); Class<? > serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<? >[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); FastClass = FastClass.create(serviceClass); FastClass = fastClass.create (serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes); // Return fastMethod.invoke(serviceBean,parameters); } Overall, the server implementation is not too difficult. The core knowledge points are the use of Netty channel and cglib reflection mechanism.

  1. client future

In fact, for me, the client implementation is much more difficult than the server implementation. Netty is an asynchronous framework where all returns are based on Future and Callback mechanisms.

So before reading the following text, I strongly recommend an article I wrote earlier, Future Research. Classic witE and Notify mechanisms are used to achieve asynchronous access to the results of the request.

/ * *

  • @author zhengxin */ public class DefaultFuture { private RpcResponse rpcResponse; private volatile boolean isSucceed = false; private final Object object = new Object(); public RpcResponse getResponse(int timeout){ synchronized (object){ while (! isSucceed){ try { //wait object.wait(timeout); } catch (InterruptedException e) { e.printStackTrace(); } } return rpcResponse; } } public void setResponse(RpcResponse response){ if(isSucceed){ return; } synchronized (object) { this.rpcResponse = response; this.isSucceed = true; //notiy object.notify(); }}} reuse resources

To improve the client throughput, the following ideas can be provided:

Using an object pool: Create multiple clients and save them in the object pool. But the complexity of the code and the cost of maintaining the client can be high. Reuse netty channels whenever possible. You may have noticed earlier why you added an ID to RpcRequest and RpcResponse. Because channels in Netty are used by multiple threads. When a result is returned asynchronously, you don’t know which thread returned it. At this point, consider using a Map to create an ID and Future mapping. So the thread that requests it can get the corresponding result by using the corresponding ID.

/ * *

  • @author () {public class ClientHandler extends ChannelDuplexHandler { Private Final Map

    futureMap = new ConcurrentHashMap<>(); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if(msg instanceof RpcRequest){ RpcRequest request = (RpcRequest) msg; Futuremap.putifabsent (Request.getrequestid (),new DefaultFuture())); // Add futureMap.putifAbsent (Request.getrequestid (),new DefaultFuture()); } super.write(ctx, msg, promise); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof RpcResponse){ RpcResponse response = (RpcResponse) msg; DefaultFuture DefaultFuture = futureMap.get(response.getrequestid ()); defaultFuture.setResponse(response); } super.channelRead(ctx, msg); } public RpcResponse getRpcResponse(String requestId){try {// Get the real result from the future. DefaultFuture defaultFuture = futureMap.get(requestId); return defaultFuture.getResponse(10); }finally {// Remove from map when done. futureMap.remove(requestId); }}} ChannelDuplexHandler is used instead of inheriting InboundHandler from server. Methods are triggered when data is written and read. Save the ID and Future in the Map at write time. When the data is read, pull the Future out of the Map and put the result into the Future. The corresponding ID is required to obtain the results.
    ,>

Use Transporters to encapsulate the request.

public class Transporters { public static RpcResponse send(RpcRequest request){ NettyClient nettyClient = new NettyClient (” 127.0.0.1 “, 8080); nettyClient.connect(nettyClient.getInetSocketAddress()); RpcResponse send = nettyClient.send(request); return send; }} Dynamic proxy implementation

The most widely known use of dynamic proxy technology is probably Spring Aop, a section-oriented programming implementation. Dynamically add code to the old method Before or After. The function of dynamic proxy in RPC framework is to completely replace the original method and call the remote method directly.

Agent Factory:

public class ProxyFactory { @SuppressWarnings(“unchecked”) public static T create(Class interfaceClass){ return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<? >[]{interfaceClass}, new RpcInvoker(interfaceClass) ); }} When the class generated by proxyFactory is called, the RpcInvoker method is executed.

public class RpcInvoker implements InvocationHandler { private Class clz; public RpcInvoker(Class clz){ this.clz = clz; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); String requestId = UUID.randomUUID().toString(); String className = method.getDeclaringClass().getName(); String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes(); request.setRequestId(requestId); request.setClassName(className); request.setMethodName(methodName); request.setParameterTypes(parameterTypes); request.setParameters(args); return Transporters.send(request).getResult(); }} See that the invoke method has three main functions,

Generate RequestId. Assemble RpcRequest. Call Transports to send the request and get the results. Finally, the entire call chain is complete. We have finally completed an RPC call.

With Spring integration

In order to make our client easy to use, we need to consider a custom annotation @rpcInterface. When our project is connected to Spring, Spring scans this annotation and automatically creates a proxy object through our ProxyFactory. Put it in spring’s applicationContext. This way we can inject it directly through the @AutoWired annotation.

@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface RpcInterface { }

@Configuration @Slf4j public class RpcConfig implements ApplicationContextAware,InitializingBean { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public void afterPropertiesSet() throws Exception { Reflections reflections = new Reflections(“com.xilidou”); DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory(); @rpcInterfac Set

> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class); for (Class
aClass: typesAnnotatedWith) {// Create a proxy object and register it with the Spring context. beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass)); } log.info(“afterPropertiesSet is {}”,typesAnnotatedWith); Finally, we have developed the simplest RPC framework. Now you can test it.
>

  1. Demo api

@RpcInterface public interface IHelloService { String sayHi(String name); } server

IHelloSerivce

@Service @Slf4j public class TestServiceImpl implements IHelloService { @Override public String sayHi(String name) { log.info(name); return “Hello ” + name; }} Start service:

@SpringBootApplication public class Application { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext context = SpringApplication.run(Application.class); TcpService tcpService = context.getBean(TcpService.class); tcpService.start(); } } ` client

@SpringBootApplication() public class ClientApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class); IHelloService helloService = context.getBean(IHelloService.class); System.out.println(helloService.sayHi(“doudou”)); }} Output after the run:

Hello doudou

Finally, we realized a simplified version of RPC remote call module.