The original link

Microservices have become a technology that every Internet developer must master. RPC framework is one of the most important components of microservices. While you have time. I looked at dubbo’s source code again. Dubbo uses a lot of design patterns and SPI mechanisms to be flexible and decoupled, and it’s not easy to read dubbo’s code.

Following the steps of the Barehanded Framework series, I will implement an RPC framework in a minimalist way. To help you understand how the RPC framework works.

Broadly speaking, a complete RPC contains many components, including service discovery, service governance, remote invocation, call chain analysis, gateway, and so on. I will slowly implement these functions, this article mainly explains the cornerstone of RPC, the implementation of remote calls.

After reading this article, you will be able to implement a framework that provides RPC calls on your own.

1. RPC call procedure

Through the following figure, we will understand the call process of RPC and see what process an RPC call goes through from a macro view.

When a call begins:

  1. The client invokes the local dynamic proxy
  2. This proxy will serialize the calls through the protocol to the byte stream
  3. The byte stream is sent to the server through the Netty network framework
  4. After receiving the byte stream, the server will, according to the protocol, deserialize to the original call, using reflection principle to call the method provided by the server
  5. If the request has a return value, the result needs to be serialized according to the protocol and then returned to the caller via Netty

2. Framework overview and technology selection

Take a look at the framework’s components:

Clinet is the caller. Servive is the service provider. The protocol package defines the communication protocol. Common contains common logical components.

The technology selection project uses Maven as the package management tool, JSON as the serialization protocol, Spring Boot to manage the life cycle of objects, and Netty as the networking component of NIO. So to read this article, you need to have a basic understanding of Spring Boot and Netty.

Here’s a look at the implementation of each component:

3. protocol

In fact, as RPC protocol, only one problem needs to be considered, that is, how to turn a local method call into a byte stream that can be transmitted by the network.

We need to define the method call and return two object entities:

Request:

@Data
public class RpcRequest {
    // Call number
    private String requestId;
    / / the name of the class
    private String className;
    / / the method name
    private String methodName;
    // The data type of the request parameter
    privateClass<? >[] parameterTypes;// Request parameters
    private Object[] parameters;
}
Copy the code

Response:

@Data
public class RpcResponse {
    // Call number
    private String requestId;
    // The exception thrown
    private Throwable throwable;
    // Return the result
    private Object result;

}
Copy the code

Determine the object entity to be serialized, it is necessary to determine the protocol of serialization, to achieve two methods, serialization and deserialization.

public interface Serialization {
    <T> byte[] serialize(T obj);
    <T> T deSerialize(byte[] data,Class<T> clz);
}
Copy the code

There are many protocols available for serialization, such as:

  • The JDK’s serialization method. (Not recommended, not good for later cross-language calls)
  • Json is readable, but slow to serialize and bulky.
  • Protobuf, Kyro, Hessian, etc. are excellent serialization frameworks and can be selected on demand.

For simplicity and ease of debugging, we chose JSON as the serialization protocol and Jackson as the JSON parsing framework.

/ * * *@author Zhengxin
 */
public class JsonSerialization implements Serialization {

    private ObjectMapper objectMapper;

    public JsonSerialization(a){
        this.objectMapper = new ObjectMapper();
    }


    @Override
    public <T> byte[] serialize(T obj) {
        try {
            return objectMapper.writeValueAsBytes(obj);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public <T> T deSerialize(byte[] data, Class<T> clz) {
        try {
            return objectMapper.readValue(data,clz);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null; }}Copy the code

Because Netty supports custom coders. So only need to implement ByteToMessageDecoder and MessageToByteEncoder two interfaces. This solves the serialization problem:

public class RpcDecoder extends ByteToMessageDecoder {

    privateClass<? > clz;private Serialization serialization;

    public RpcDecoder(Class
        clz,Serialization serialization){
        this.clz = clz;
        this.serialization = serialization;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.readableBytes() < 4) {return;
        }

        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = serialization.deSerialize(data, clz); out.add(obj); }}Copy the code
public class RpcEncoder extends MessageToByteEncoder {

    privateClass<? > clz;private Serialization serialization;

    public RpcEncoder(Class
        clz, Serialization serialization){
        this.clz = clz;
        this.serialization = serialization;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        if(clz ! =null) {byte[] bytes = serialization.serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }}}Copy the code

At this point, protocol is implemented, and we can convert the method call and the resulting response to a string of byte[] arrays that can be transferred across the network.

4. server

The server is the component responsible for handling client requests. In the context of high concurrency on the Internet, it is relatively easy to deal with high concurrency using Nio’s non-blocking approach. Netty is an excellent Nio processing framework. Server is developed based on NetTY. The key codes are as follows:

  1. Netty is based on the Reacotr model. Therefore, two groups of thread bosses and workers need to be initialized. The boss is responsible for distributing the request, and the worker is responsible for executing the corresponding handler:
 @Bean
    public ServerBootstrap serverBootstrap(a) throws InterruptedException {

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup(), workerGroup())
                .channel(NioServerSocketChannel.class)
                .handler(newLoggingHandler(LogLevel.DEBUG)) .childHandler(serverInitializer); Map<ChannelOption<? >, Object> tcpChannelOptions = tcpChannelOptions(); Set<ChannelOption<? >> keySet = tcpChannelOptions.keySet();for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
            serverBootstrap.option(option, tcpChannelOptions.get(option));
        }

        return serverBootstrap;
    }
Copy the code
  1. Netty’s operation is pipeline-based. Therefore, we need to register several coders implemented in protocol into netty pipeline.

        ChannelPipeline pipeline = ch.pipeline();
        // Process TCP request sticky packet coder, specific role can be Google
        pipeline.addLast(new LengthFieldBasedFrameDecoder(65535.0.4));

        // Serialization and deserialization decoders implemented in protocol
        pipeline.addLast(new RpcEncoder(RpcResponse.class,new JsonSerialization()));
        pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization()));

        // The handler that processes the request is explained below
        pipeline.addLast(serverHandler);

Copy the code
  1. Implement the concrete ServerHandler to handle the real call.

ServerHandler inheritance SimpleChannelInboundHandler < RpcRequest >. Simply put, this InboundHandler is called when data is received or when the state of the Channel changes. The channelRead0() method is used when this handler reads the data, so we’ll just override it.


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(msg.getRequestId());
        try{
            // Start processing the request after receiving it
            Object handler = handler(msg);
            rpcResponse.setResult(handler);
        }catch (Throwable throwable){
            // If an exception is thrown, the exception is also stored in response
            rpcResponse.setThrowable(throwable);
            throwable.printStackTrace();
        }
        // Write to the netty context after the operation. Netty handles the return value itself.
        ctx.writeAndFlush(rpcResponse);
    }

Copy the code

Handler (MSG) actually uses cglib’s Fastclass implementation, but the basic principle is still reflection. Learning reflection well in Java can really do anything you want.

    private Object handler(RpcRequest request) throws Throwable { Class<? > clz = Class.forName(request.getClassName()); Object serviceBean = applicationContext.getBean(clz); Class<? > serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<? >[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters();// The basic idea is to get the class name and method name and use reflection to implement the call
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);

        // Where the actual call takes place
        return fastMethod.invoke(serviceBean,parameters);
    }
Copy the code

Overall, the server implementation is not too difficult. The core knowledge points are the use of Netty channel and cglib reflection mechanism.

5. client

future

In fact, for me, the client implementation is much more difficult than the server implementation. Netty is an asynchronous framework where all returns are based on Future and Callback mechanisms.

So before reading the following text, I strongly recommend an article I wrote earlier, Future Research. The classic WITE and Notify mechanisms are used to achieve asynchronous access to request results.

/ * * *@author zhengxin
 */
public class DefaultFuture {
	private RpcResponse rpcResponse;
	private volatile boolean isSucceed = false;
	private final Object object = new Object();
	public RpcResponse getResponse(int timeout){
		synchronized (object){
			while(! isSucceed){try {
                    //wait
					object.wait(timeout);
				} catch(InterruptedException e) { e.printStackTrace(); }}returnrpcResponse; }}public void setResponse(RpcResponse response){
		if(isSucceed){
			return;
		}
		synchronized (object) {
			this.rpcResponse = response;
			this.isSucceed = true;
            //notiyobject.notify(); }}}Copy the code

Reuse resources

To improve the client throughput, the following ideas can be provided:

  1. Using an object pool: Create multiple clients and save them in the object pool. But the complexity of the code and the cost of maintaining the client can be high.

  2. Reuse netty channels whenever possible. You may have noticed earlier why you added an ID to RpcRequest and RpcResponse. Because channels in Netty are used by multiple threads. When a result is returned asynchronously, you don’t know which thread returned it. At this point, consider using a Map to create an ID and Future mapping. So the thread that requests it can get the corresponding result by using the corresponding ID.

/ * * *@author Zhengxin
 */
public class ClientHandler extends ChannelDuplexHandler {
    // Use map to maintain the mapping between id and Future. In multi-threaded environment, use thread-safe container
    private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if(msg instanceof RpcRequest){
            RpcRequest request = (RpcRequest) msg;
            // Add mappings while writing data
            futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture());
        }
        super.write(ctx, msg, promise);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof RpcResponse){
            RpcResponse response = (RpcResponse) msg;
            // When retrieving data, put the result into the future
            DefaultFuture defaultFuture = futureMap.get(response.getRequestId());
            defaultFuture.setResponse(response);
        }
        super.channelRead(ctx, msg);
    }

    public RpcResponse getRpcResponse(String requestId){
        try {
            // Get the real result from the future.
            DefaultFuture defaultFuture = futureMap.get(requestId);
            return defaultFuture.getResponse(10);
        }finally {
            // Remove from map when done.futureMap.remove(requestId); }}}Copy the code

Instead of inheriting InboundHandler from server, ChannelDuplexHandler is used. Methods are triggered when data is written and read. Save the ID and Future in the Map at write time. When the data is read, pull the Future out of the Map and put the result into the Future. The corresponding ID is required to obtain the results.

Use Transporters to encapsulate the request.

public class Transporters {
    public static RpcResponse send(RpcRequest request){
        NettyClient nettyClient = new NettyClient("127.0.0.1".8080);
        nettyClient.connect(nettyClient.getInetSocketAddress());
        RpcResponse send = nettyClient.send(request);
        returnsend; }}Copy the code

Implementation of dynamic proxy

The most widely known use of dynamic proxy technology is probably Spring’s Aop, a section-oriented programming implementation that dynamically adds code Before or After the original method. The function of dynamic proxy in RPC framework is to completely replace the original method and call the remote method directly.

Agent Factory:

public class ProxyFactory {
    @SuppressWarnings("unchecked")
    public static <T> T create(Class<T> interfaceClass){
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                newClass<? >[]{interfaceClass},newRpcInvoker<T>(interfaceClass) ); }}Copy the code

When a class generated by proxyFactory is called, the RpcInvoker method is executed.

public class RpcInvoker<T> implements InvocationHandler {
    private Class<T> clz;
    public RpcInvoker(Class<T> clz){
        this.clz = clz;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = newRpcRequest(); String requestId = UUID.randomUUID().toString(); String className = method.getDeclaringClass().getName(); String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes(); request.setRequestId(requestId); request.setClassName(className); request.setMethodName(methodName); request.setParameterTypes(parameterTypes); request.setParameters(args);returnTransporters.send(request).getResult(); }}Copy the code

Looking at the invoke method, there are three main functions,

  1. Generate RequestId.
  2. Assemble RpcRequest.
  3. Call Transports to send the request and get the results.

At this point, the entire call chain is complete. We have finally completed an RPC call.

With Spring integration

In order to make our client easy to use, we need to consider a custom annotation @rpcInterface. When our project is connected to Spring, Spring scans this annotation and automatically creates a proxy object through our ProxyFactory. Put it in spring’s applicationContext. This way we can inject it directly through the @AutoWired annotation.

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcInterface {
}
Copy the code
@Configuration
@Slf4j
public class RpcConfig implements ApplicationContextAware.InitializingBean {
	private ApplicationContext applicationContext;

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}

	@Override
	public void afterPropertiesSet(a) throws Exception {
		Reflections reflections = new Reflections("com.xilidou");
		DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
        // Get the interface annotated @rpcInterfacSet<Class<? >> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class);for(Class<? > aClass : typesAnnotatedWith) {// Create a proxy object and register it with the Spring context.
			beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass));
		}
		log.info("afterPropertiesSet is {}",typesAnnotatedWith); }}Copy the code

Finally, our simplest RPC framework has been developed. Now you can test it.

6. Demo

api

@RpcInterface
public interface IHelloService {
    String sayHi(String name);
}

Copy the code

server

IHelloSerivce

@Service
@Slf4j
public class TestServiceImpl implements IHelloService {

    @Override
    public String sayHi(String name) {
        log.info(name);
        return "Hello "+ name; }}Copy the code

Start the service:

@SpringBootApplication
public class Application {

    public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext context = SpringApplication.run(Application.class); TcpService tcpService = context.getBean(TcpService.class); tcpService.start(); }}Copy the code

client

@SpringBootApplication(a)public class ClientApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class);
	    IHelloService helloService = context.getBean(IHelloService.class);
        System.out.println(helloService.sayHi("doudou")); }}Copy the code

Output after running:

Hello doudou

conclusion

Finally, we realized a simplified version of RPC remote call module. It just contains the most basic remote call functionality.

If you are interested in this project, you are welcome to contact me and contribute code to the framework.

Github address: DouPpc

Free hand lift frame series article address:

Hands-free framework – Implementing IoC

Hands-free framework – Implementing Aop

Barehanded framework – Request merging in high concurrency environments

Welcome to follow my wechat official account: