No matter you are a course of study or a career change, so excellent you must have been in primary school Chinese, so you must be no stranger to expanding and contracting sentences. Contractions are the process of removing all the trimmings to extract the core of a sentence without losing the basic semantics. Let’s implement a simple RPC program and explore its essence to understand the complex RPC framework. The so-called complex framework is to enrich the functions of RPC by adding some design decorations in the simple process, such as Dubbo filter, router, loadblance, cluster fault tolerance, various Invokers, communication protocol, etc., which is a process of sentence enlargement. Article welfare, attached a photo of Liu Mimei

RPC refers to remote procedure call, that is, two servers A and B, one application deployed on server A, want to call the function/method provided by the application on server B, because the memory space is not the same, cannot call directly, you need to initiate A call request to obtain the result through the network.

Both the mainstream and the minority RPC frameworks in the market have implemented the semantics of RPC. Service governance: Dubbo, Dubbox, Motan; Multi-language: GRPC, Thrift, Avro, Protocol buffers

【 Blogger recently wrote a Java implementation of RPC framework Bridge welcome to pay attention, consider Mesh 】

A, principle

First of all, a graph is used to briefly describe the call process of RPC, which is taken from the official website of Dubbo. It is not the simplest graph, but it is very simple. The simplest RPC call is left after removing the Registry and Monitor, which is simply a network request.

Process Description:

  1. Start the server provider and register your exposed service address and service details with the registry
  2. Then start the consumer side and subscribe to the contents of the registry, the subscription service, to get the details of the service
  3. If the service changes, the registry will notify the consumer to update the subscription content and update the service details.
  4. The client obtains the service details and initiates a network request to the server to obtain the result
  5. The monitor can obtain, but not limited to, service invocation details and consumption details

OK, the principle is so simple, next according to the above description step by step implementation.

Second, hands-on practice

The following is based on SpringBoot to achieve the above process.

2.1 Building Modules

Construction project and sub-module, engineering structure is as follows:

2.2 Implementing the Server

Take a look at the server side

Define the interface in the API module, and reference both the Consumer and Provider modules. The interface HelloService code is as follows

package com.glmapper.simple.api;

/**
 * service interface
 *
 * @author: Jerry
 */
public interface HelloService {

    /**
     * service function
     *
     * @param name
     * @return* /
    String hello(String name);
}
Copy the code

Then implement the interface in the Provider module with a custom annotation @simpleProvider

package com.glmapper.simple.provider.annotation;

/** * Custom service annotation **@author Jerry
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
// indicates that Spring can scan
@Component 
public @interfaceSimpleProvider { Class<? > value(); }Copy the code

The annotation uses the @Component flag, so it can be scanned by Spring. Let’s look at the implementation class HelloServiceImpl:

package com.glmapper.simple.provider.service;

/**
 * service implement class
 *
 * @author: Jerry
 */
@SimpleProvider(HelloService.class)
public class HelloServiceImpl implements HelloService {

    /**
     * service function
     *
     * @param name
     * @return* /
    @Override
    public String hello(String name) {
        return "Hello! "+ name; }}Copy the code

In defining a service configuration class, SimpleProviderProperties, easy to configure through the application.yml file,

package com.glmapper.simple.provider.property;

/**
 * provider properties
 *
 * @author: Jerry
 */
public class SimpleProviderProperties {

    /** * exposes the service port */
    private Integer port;

    public Integer getPort(a) {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port; }}Copy the code

At this point, the base class file is done, and the service initialization, the entry ProviderInitializer, is started

package com.glmapper.simple.provider;

/** * Start and register the service **@author Jerry
 */
public class ProviderInitializer implements ApplicationContextAware.InitializingBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProviderInitializer.class);

    private SimpleProviderProperties providerProperties;

    /** * service registry */
    private ServiceRegistry serviceRegistry;

    /** * store interface and service implement mapping */
    private Map<String, Object> handlerMap = new HashMap<>();

    public ProviderInitializer(SimpleProviderProperties providerProperties, ServiceRegistry serviceRegistry) {
        this.providerProperties = providerProperties;
        this.serviceRegistry = serviceRegistry;
    }

    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        // Get the Bean annotated by SimpleProvider
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(SimpleProvider.class);
        if (MapUtils.isNotEmpty(serviceBeanMap)) {
            for(Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SimpleProvider.class).value().getName(); handlerMap.put(interfaceName, serviceBean); }}}@Override
    public void afterPropertiesSet(a) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            ChannelHandler channelHandler = new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                            .addLast(new SimpleDecoder(SimpleRequest.class))
                            .addLast(new SimpleEncoder(SimpleResponse.class))
                            .addLast(newSimpleHandler(handlerMap)); }}; bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(channelHandler) .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            String host = getLocalHost();
            if (null == host) {
                LOGGER.error("can't get service address,because address is null");
                throw new SimpleException("can't get service address,because address is null");
            }
            int port = providerProperties.getPort();
            ChannelFuture future = bootstrap.bind(host, port).sync();
            LOGGER.debug("server started on port {}", port);

            if(serviceRegistry ! =null) {
                String serverAddress = host + ":" + port;
                serviceRegistry.register(serverAddress);
            }
            future.channel().closeFuture().sync();
        } finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}/**
     * get service host
     *
     * @return* /
    private String getLocalHost(a) {
        Enumeration<NetworkInterface> allNetInterfaces;
        try {
            allNetInterfaces = NetworkInterface.getNetworkInterfaces();
        } catch (SocketException e) {
            LOGGER.error("get local address error,cause:", e);
            return null;
        }
        while (allNetInterfaces.hasMoreElements()) {
            NetworkInterface netInterface = allNetInterfaces.nextElement();
            Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
            while (addresses.hasMoreElements()) {
                InetAddress ip = addresses.nextElement();
                if (ip instanceofInet4Address && ! ip.isLoopbackAddress() && ! ip.getHostAddress().contains(":")) {
                    returnip.getHostAddress(); }}}return null; }}Copy the code

Describe what this class does:

  • First he made it happenApplicationContextAware, InitializingBeanThe twospringInterface, according toIOCThe order in which the container is initialized is returned to the calling interfacesetApplicationContextafterPropertiesSetMethods.
    • setApplicationContextMethod is used to obtain the container@SimpleProviderAnnotated class and store the service interface name and service implementation class bound tohandlerMap, in the@SimpleProviderThe value attribute is used to specify which service interface a class can implement. Of course, it can also be defined as an array to handle multiple interfaces
    • afterPropertiesSetThe method does two things:
      • On the server side, a thread pool is enabled to handle socket requests, listen for and process requests received on the service exposure port, and specify a handlerSimpleHandler
      • callServiceRegistryOf the classregistryMethods tozookeeperRegister the address and port of the service. No protocol is used here, only IP :port is registered

SimpleHandler is an implementation of a netty SimpleChannelInboundHandler request handler class

package com.glmapper.simple.provider.handler;

/**
 * request handler
 *
 * @author Jerry
 */
public class SimpleHandler extends SimpleChannelInboundHandler<SimpleRequest> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHandler.class);

    private final Map<String, Object> handlerMap;

    public SimpleHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    public void channelRead0(final ChannelHandlerContext ctx, SimpleRequest request) throws Exception {
        SimpleResponse response = new SimpleResponse();
        response.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            response.setResult(result);
        } catch (Throwable t) {
            response.setError(t);
        }
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(SimpleRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); Class<? > serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<? >[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);return serviceFastMethod.invoke(serviceBean, parameters);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        LOGGER.error("server caught exception", cause); ctx.close(); }}Copy the code

SimpleHandler’s Netty-based event-driven model triggers the corresponding method. When a request event is received, the channelRead0 method is called. The function of this method is to find the corresponding implementation class and call the specified method based on the interface name in the request parameters, and then return the result.

Again, look at ServiceRegistry. The entry is ProviderInitializer, which calls the registry method of ServiceRegistry

package com.glmapper.simple.provider.registry;

/**
 * connect zookeeper to registry service
 *
 * @author Jerry
 */
public class ServiceRegistry {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);

    private ZookeeperProperties zookeeperProperties;

    public ServiceRegistry(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
    }

    public void register(String data) {
        if(data ! =null) {
            ZooKeeper zk = ZookeeperUtils.connectServer(zookeeperProperties.getAddress(), zookeeperProperties.getTimeout());
            if(zk ! =null) { addRootNode(zk); createNode(zk, data); }}}/**
     * add one zookeeper root node
     *
     * @param zk
     */
    private void addRootNode(ZooKeeper zk) {
        try {
            String registryPath = zookeeperProperties.getRootPath();
            Stat s = zk.exists(registryPath, false);
            if (s == null) {
                zk.create(registryPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }}catch (KeeperException | InterruptedException e) {
            LOGGER.error("zookeeper add root node error,cause:", e); }}private void createNode(ZooKeeper zk, String data) {
        try {
            byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
            String dataPath = zookeeperProperties.getRootPath() + zookeeperProperties.getDataPath();
            String path = zk.create(dataPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            LOGGER.debug("create zookeeper node ({} => {})", path, data);
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("create zookeeper node error,cause:", e); }}}Copy the code

The ServiceRegistry class does a simple job of registering the service IP address :port in the specified directory in zK

  • Create the root node, which is a permanent node
  • Create a temporary child node under the root node. The child node stores the IP address of the service :port. If the service fails, the child node will be killed

2.3 the consumer end

Content at the consumer end:

There is little content on the consumer side, with only three core classes: ServiceDiscovery, ConsumerHandler, and ConsumerProxy

Take a look at ServiceDiscovery:

package com.glmapper.simple.consumer.discovery;

/** * Service discovery: connect to ZK, add watch event **@author Jerry
 */
public class ServiceDiscovery {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);

    private volatile List<String> nodes = new ArrayList<>();

    private ZookeeperProperties zookeeperProperties;

    public ServiceDiscovery(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
        String address = zookeeperProperties.getAddress();
        int timeout = zookeeperProperties.getTimeout();
        ZooKeeper zk = ZookeeperUtils.connectServer(address, timeout);
        if(zk ! =null) { watchNode(zk); }}public String discover(a) {
        String data = null;
        int size = nodes.size();
        if (size > 0) {
            if (size == 1) {
                data = nodes.get(0);
                LOGGER.debug("using only node: {}", data);
            } else {
                data = nodes.get(ThreadLocalRandom.current().nextInt(size));
                LOGGER.debug("using random node: {}", data); }}return data;
    }

    private void watchNode(final ZooKeeper zk) {
        try {
            Watcher childrenNodeChangeWatcher = event -> {
                if(event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { watchNode(zk); }}; String rootPath = zookeeperProperties.getRootPath(); List<String> nodeList = zk.getChildren(rootPath, childrenNodeChangeWatcher); List<String> nodes =new ArrayList<>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(rootPath + "/" + node, false.null);
                nodes.add(new String(bytes, Charset.forName("UTF-8")));
            }
            LOGGER.info("node data: {}", nodes);
            this.nodes = nodes;
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("Node monitoring error, cause:", e); }}}Copy the code

The entry of this class is the constructor, which is used to get the address of ZK, and then get the node information of ZK. There is no service subscription here, that is, if there are two services on ZK and one fails, the client will not remove the information of the failed service, resulting in the call failure.

Then there’s ConsumerProxy, which is a proxy factory:

package com.glmapper.simple.consumer.proxy;

/**
 * ConsumerProxy
 *
 * @author Jerry
 */
public class ConsumerProxy {

    private ServiceDiscovery serviceDiscovery;

    public ConsumerProxy(ServiceDiscovery serviceDiscovery) {
        this.serviceDiscovery = serviceDiscovery;
    }

    @SuppressWarnings("unchecked")
    public <T> T create(Class
        interfaceClass) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                newClass<? >[]{interfaceClass},new SimpleInvocationHandler());
    }

    private class SimpleInvocationHandler implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            SimpleRequest request = buildRequest(method, args);
            String serverAddress = getServerAddress();
            String[] array = serverAddress.split(":");
            String host = array[0];
            int port = Integer.parseInt(array[1]);
            ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
            SimpleResponse response = consumerHandler.send(request);
            if(response.getError() ! =null) {
                throw new SimpleException("service invoker error,cause:", response.getError());
            } else {
                returnresponse.getResult(); }}private SimpleRequest buildRequest(Method method, Object[] args) {
            SimpleRequest request = new SimpleRequest();
            request.setRequestId(UUID.randomUUID().toString());
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setParameters(args);
            return request;
        }

        private String getServerAddress(a) {
            String serverAddress = null;
            if(serviceDiscovery ! =null) {
                serverAddress = serviceDiscovery.discover();
            }
            if (null == serverAddress) {
                throw new SimpleException("no server address available");
            }
            returnserverAddress; }}}Copy the code

There is an inner class SimpleInvocationHandler is the core of production agent, at the heart of the method is in SimpleInvocationHandler. Invoke () is called the two lines of code

ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);
Copy the code

To initiate a web request, look at the ConsumerHandler class

package com.glmapper.simple.consumer.handler;

/** * RPC actually calls the client **@author Jerry
 */
public class ConsumerHandler extends SimpleChannelInboundHandler<SimpleResponse> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHandler.class);

    private int port;

    private String host;

    private SimpleResponse response;

    private CountDownLatch latch = new CountDownLatch(1);

    public ConsumerHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, SimpleResponse response) throws Exception {
        this.response = response;
        latch.countDown();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("client caught exception", cause);
        ctx.close();
    }

    public SimpleResponse send(SimpleRequest request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            ChannelInitializer<SocketChannel> channelHandler = new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                            // Encode RPC requests (to send requests)
                            .addLast(new SimpleEncoder(SimpleRequest.class))
                            // Decode the RPC response (to process the response)
                            .addLast(new SimpleDecoder(SimpleResponse.class))
                            // Use RpcClient to send RPC requests
                            .addLast(ConsumerHandler.this); }}; bootstrap.group(group).channel(NioSocketChannel.class) .handler(channelHandler) .option(ChannelOption.SO_KEEPALIVE,true);

            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().writeAndFlush(request).sync();
            latch.await();
            if(response ! =null) {
                future.channel().closeFuture().sync();
            }
            return response;
        } finally{ group.shutdownGracefully(); }}}Copy the code

This class is similar to the ProviderHandler code on the server side and is also a Netty communication class

GitHub address simple- RPC