To all of you

I’ve been hearing from readers lately with questions about how to learn about distribution, how to practice projects, and how to improve your programming skills.

How do you learn distributed when it’s so hard? Why read so many books but still can not master?

There are too many open source frameworks like Dubbo code, and I don’t know how to learn from them.

There is no project experience in school, looking for a job is imminent, how to make up this piece? Are you limited to the XXX management system “interviewer fatigue”?

When I was at school, I had the same confusion as everyone else.

After graduation, I worked in Ali for several years. Through participating in actual project development, I have gained some experience on how to learn new knowledge and how to quickly get started and apply it. Here I would like to share with you:

  • Be sure to do it, write code, demo, debug, learn in the process of debugging. There’s no need to read a big book and then start writing code. Learning by doing is the fastest way.
  • When learning about frameworks, try to start with the initial version of the framework, because open source frameworks tend to be complex and have large code that can be easily dissuaded. Learning about the Linux kernel, for example, can start with earlier versions.
  • “Make the wheel.” The best way to master knowledge is to do the project and make it happen. About the project, a lot of recommended XXX management system, I think, such XXX management system on the resume is not competitive, the interviewers are tired, training institutions uniform XXX management system, Springboot whole family bucket. You have to differentiate.
  • Here I recommend a few excellent projects, which I will implement one by one: Spring IOC/AOP, RPC framework, MQ framework, KV storage, distributed locking. These projects integrate seamlessly with the technology stack of Internet giants, and realize distributed components by themselves. “That is what we call building wheels.” Why do we build wheels? One is to avoid becoming a switchman or CRUD engineer, and the other is to increase your technical depth and broaden your career path.

I have written 5 articles in this series so far, and I will share them with you successively in this public account. You can follow me and catch up with me.

This article is worth a thumbs up and a favorite because it contains a complete code implementation and really teaches you how to implement it. These items are definitely a plus when it comes to making your resume stand out in an interviewer’s eyes.

Github address: (welcome star)

Github.com/xiajunhust/…


Distributed RPC framework, WHY?

RPC stands for Remote Procedure Call. This allows us to invoke remote services in a distributed environment as easily as we can invoke native services. It is widely used in distributed applications.

One might ask, “Why implement RPC yourself when you have an open source RPC framework?”

The basic principle of RPC is not difficult, but in the actual implementation process, there are still many pits, involving many knowledge points: thread model, communication protocol design, load balancing, dynamic proxy and so on.

A simple RPC framework, including the core functions of RPC, can test your knowledge, learn to use it flexibly in practice, and deepen your understanding. Of course, in a production environment, you are advised to use a mature open source framework.


Theoretical basis of RPC framework

In his 1984 paper “Implementing Remote Procedure Calls”, BRUCE JAY NELSON described that when we initiate RPC Calls in a program, five modules are involved:

  • User: indicates the application module that initiates the call. The application module that initiates an RPC call is unaware of the underlying RPC logic as it initiates a local call.
  • User-stub: Is responsible for packaging the call request and unpacking the result.
  • RPCRuntime: RPC runtime that handles remote network interactions, such as network packet retransmission and encryption.
  • Server-stub: Is responsible for unpacking requests and packaging results.
  • Server: The application module that actually provides service processing.

The relationship between these five parts is shown in the figure below:


Mainstream open source RPC framework

(1) Dubbo: Alibaba’s RPC framework, which has experienced the test of e-commerce sea volume scenarios, Github 36.7 Star. Supports the Java language.

Liverpoolfc.tv: dubbo.apache.org/zh/

Github:github.com/apache/dubb…

(2) GRPC: Google open source RPC framework, supporting multiple languages. Making star 33.2 k.

Liverpoolfc.tv: GRPC. IO /

github:github.com/grpc/grpc

(3) Motan: Sina’s open source RPC framework, which only supports Java language.

Github:github.com/weibocom/mo…

(4) Spring Cloud: RPC framework opened source by Pivotal in 2014, which only supports Java.

(5) BRPC: baidu open source RPC framework, C++ implementation.

Github:github.com/apache/incu…


RPC Framework Design

The overall structure is shown in the figure below, which shows an RPC framework with “small as a sparrow is” and auxiliary functions such as control platform are removed. Through the design and implementation of core functions, understand the design principle of the whole RPC framework.

Core technologies involved:

  • Registry: The service side registers the published service to the registry, and the calling side subscribes to the service from the registry to obtain the address of the service before invoking.
  • In a distributed environment, different servers communicate with each other over a network (RPC Client).
  • Network communication inevitably involves coding and decoding.
  • To avoid invoking the registry for each address, the service caller also needs to cache the service information.
  • Dynamic proxy: Facilitates transparency of calls to clients.

Detailed design & technical implementation

01

Technology selection

  • Spring-boot, dependency management, powerful configuration capabilities. It is convenient to make the starter of RPC framework, which is very convenient to integrate and use.
  • netty
  • zookeeper
  • protobuf

02

RPC call flow analysis

What happens when an RPC calls the whole process? Detailed steps are shown in sequence diagram as follows:

03

Engineering module dependency

The code modules are layered as follows:

  • Util: Base utility class.
  • Model: Base domain model.
  • Annotation: Provides annotation function, can be very convenient to publish RPC services and reference RPC services.
  • Registry: registry, which gives an implementation of ZK.
  • IO: encoding and decoding implementation.
  • Provider: service provider implementation.
  • Consumer: Service consumer implementation.

Code package details:

04

Code details

The spring-boot framework is adopted. The RPC framework is implemented as a starter for easy integration.

annotations

To facilitate the use of this RPC framework, we define annotations that allow consumers to publish and reference services directly from a single line of annotations.

/** * RPC provider annotations */ @Retention(retentionPolicy.runtime) @target (elementType.type) @Component public @interface SimpleRpcProvider { Class<? > serviceInterface() default Object.class; String serviceVersion () the default "1.0.0"; } /** * RPC consumer * * @author summer * @version $Id: SimpleRpcProviderBean.java, V 0.1 2022 01月16 11:53 AM Summer Exp $*/ @Retention(retentionpolicy.runtime) @target (elementtype.field) @component public @interface SimpleRpcConsumer {/** * serviceVersion * @return */ String serviceVersion() default "1.0.0"; /** * registerType - default zk * @return */ String registerType() default "zookeeper"; /** * registerAddress * @return */ String registerAddress() default "127.0.0.1:2181"; }Copy the code

The registry

There are many common registries such as Zookepper, Eureka, Nacos, Consul, etc. The principles of registries are not the focus of this article and will not be described in detail.

The implementation of ZooKeeper is used here. Interested children can implement other implementations by themselves, and only need to implement a subclass.

/** * Registry service interface definition */ public interface ServiceRegistry {/** * Register service ** @param serviceMetaConfig Service metadata configuration * @throws Exception */ void register(ServiceMetaConfig serviceMetaConfig) throws Exception; /** * Cancel service registration ** @param serviceMetaConfig Service metadata configuration * @throws Exception */ void unRegister(serviceMetaConfig) serviceMetaConfig) throws Exception; /** * service discovery ** @param serviceName serviceName * @return * @throws Exception */ ServiceMetaConfig discovery(String serviceName) throws Exception; }Copy the code

Zk implementation (using curator) :

import com.summer.simplerpc.registry.cache.ServiceProviderCache; import com.summer.simplerpc.registry.model.ServiceMetaConfig; import com.summer.simplerpc.registry.ServiceRegistry; import com.summer.simplerpc.util.ServiceUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.x.discovery.*; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.strategies.RoundRobinStrategy; Public class ZkServiceRegistry implements ServiceRegistry {/** * zk Base Path */ private final static String ZK_BASE_PATH = "/simplerpc"; /** * serviceProvider lock */ private final Object lock = new Object(); /** * zk framework client */ private CuratorFramework client; /** * private ServiceDiscovery<ServiceMetaConfig> ServiceDiscovery; /** * serviceProvider cache */ private ServiceProviderCache ServiceProviderCache; /** * public ZkServiceRegistry(String address, ServiceProviderCache serviceProviderCache) throws Exception { this.client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3)); this.client.start(); this.serviceProviderCache = serviceProviderCache; JsonInstanceSerializer<ServiceMetaConfig> serializer = new JsonInstanceSerializer<>(ServiceMetaConfig.class); serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaConfig.class) .client(client) .serializer(serializer) .basePath(ZK_BASE_PATH) .build(); serviceDiscovery.start(); } @Override public void register(ServiceMetaConfig serviceMetaConfig) throws Exception { ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder(); ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder .name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion())) .address(serviceMetaConfig.getAddress()) .port(serviceMetaConfig.getPort()) .payload(serviceMetaConfig) .uriSpec(new UriSpec("{scheme}://{address}:{port}")) .build(); serviceDiscovery.registerService(serviceInstance); } @Override public void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception { ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder(); ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder .name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion())) .address(serviceMetaConfig.getAddress()) .port(serviceMetaConfig.getPort()) .payload(serviceMetaConfig) .uriSpec(new UriSpec("{scheme}://{address}:{port}")) .build(); serviceDiscovery.unregisterService(serviceInstance); } @override public ServiceMetaConfig Discovery (String serviceName) throws Exception {// Read cache first ServiceProvider<ServiceMetaConfig> serviceProvider = serviceProviderCache.queryCache(serviceName); // Cache miss, Set serviceDiscovery if (serviceProvider == NULL) {synchronized (lock) {serviceProvider = serviceDiscovery.serviceProviderBuilder() .serviceName(serviceName) .providerStrategy(new RoundRobinStrategy<>()) .build(); serviceProvider.start(); / / update the cache serviceProviderCache updateCache (serviceName, serviceProvider); } } ServiceInstance<ServiceMetaConfig> serviceInstance = serviceProvider.getInstance(); return serviceInstance ! = null ? serviceInstance.getPayload() : null; }}Copy the code

Core domain model and local cache:

/** * private String name; /** * private String name; /** * Service version */ private String version; /** * private String address; /** * service port */ private Integer port; } /** * * @author summer * @version $Id: ServiceProviderCache.java, V 0.1 2022年01月16 11:41 AM Summer Exp $*/ public interface ServiceProviderCache {/** ** query cache * @param serviceName */ @return */ ServiceProvider<ServiceMetaConfig> queryCache(String serviceName); /** * update the cache ** @param serviceName serviceName * @param serviceProvider serviceProvider * @return */ void updateCache(String) serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider); ** @author summer * @version $Id: ServiceProviderLocalCache.java, V 0.1 January 16, 2022 "AM summer Exp $* / public class ServiceProviderLocalCache implements ServiceProviderCache {/ * * * Local Cache map */ private map <String, ServiceProvider<ServiceMetaConfig>> serviceProviderMap = new ConcurrentHashMap<>(); @Override public ServiceProvider<ServiceMetaConfig> queryCache(String serviceName) { return serviceProviderMap.get(serviceName); } @Override public void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider) { serviceProviderMap.put(serviceName, serviceProvider); }}Copy the code

Service Provider

As I mentioned earlier, in practice services are published via annotations. So, we need to scan the Bean annotated with SimpleRpcProvider after the bean is initialized to register the service to the registry. In addition, we need to start the Netty server after initialization. Therefore, I define the service provider bean to implement SimpleRpcProviderBean, inheriting InitializingBean, BeanPostProcessor:

  • In postProcessAfterInitialization method to determine whether a bean with SimpleRpcProvider annotations, if is the analytical service information, registered with the registry.
  • Start the Netty server in the afterPropertiesSet method.
  • Receive a service invocation request and perform the actual invocation through a dynamic proxy
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.summer.simplerpc.annotation.SimpleRpcProvider;
import com.summer.simplerpc.io.RPCDecoder;
import com.summer.simplerpc.io.RPCEncoder;
import com.summer.simplerpc.registry.ServiceRegistry;
import com.summer.simplerpc.registry.model.ServiceMetaConfig;
import com.summer.simplerpc.util.ServiceUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;


import java.util.Map;
import java.util.concurrent.*;


/**
 * rpc provider功能实现。
 *
 * 负责扫描服务provider注解bean,注册服务到注册中心,启动netty监听。
 * 提供RPC请求实际处理。
 */
@Slf4j
public class SimpleRpcProviderBean implements InitializingBean, BeanPostProcessor {


    /**
     * 地址
     */
    private String          address;


    /**
     * 服务注册中心
     */
    private ServiceRegistry serviceRegistry;


    /**
     * 服务提供bean的缓存map
     */
    private Map<String, Object> providerBeanMap = new ConcurrentHashMap<>(64);


    /**
     * 处理实际rpc请求的线程池
     */
    private static ThreadPoolExecutor rpcThreadPoolExecutor;


    private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simplerpc-provider-pool-%d").build();


    /**
     * netty相关
     */
    private EventLoopGroup bossGroup   = null;
    private EventLoopGroup workerGroup = null;


    /**
     * 构造函数
     *
     * @param address 地址
     * @param serviceRegistry 服务注册中心
     */
    public SimpleRpcProviderBean(String address, ServiceRegistry serviceRegistry) {
        this.address = address;
        this.serviceRegistry = serviceRegistry;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        //启动netty服务监听
        new Thread(() -> {
            try {
                startNettyServer();
            } catch (InterruptedException e) {
                log.error("startNettyServer exception,", e);
            }
        }).start();
    }


    /**
     * 提交rpc处理任务
     *
     * @param task 任务
     */
    public static void submit(Runnable task) {
        if (rpcThreadPoolExecutor == null) {
            synchronized (SimpleRpcProviderBean.class) {
                if (rpcThreadPoolExecutor == null) {
                    rpcThreadPoolExecutor = new ThreadPoolExecutor(100, 100,
                            600L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),
                            threadFactory);
                }
            }
        }
        rpcThreadPoolExecutor.submit(task);
    }


    /**
     * 启动netty服务监听
     *
     * @throws InterruptedException
     */
    private void startNettyServer() throws InterruptedException {
        if (workerGroup != null && bossGroup != null) {
            return;
        }


        log.info("startNettyServer begin");


        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();


        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        socketChannel.pipeline()
                                .addLast(new LengthFieldBasedFrameDecoder(65535,0,4,0,0))
                                .addLast(new RPCDecoder())
                                .addLast(new RPCEncoder())
                                .addLast(new SimpleRpcProviderNettyHandler(providerBeanMap))
                        ;
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 512)
                .childOption(ChannelOption.SO_KEEPALIVE, true);


        String[] array = address.split(":");
        String host = array[0];
        int port = Integer.parseInt(array[1]);


        //启动服务
        ChannelFuture future = serverBootstrap.bind(host, port).sync();


        log.info(String.format("startNettyServer,host=%s,port=%s", host, port));


        future.channel().closeFuture().sync();
    }


    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }


    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //获取bean上的注解
        SimpleRpcProvider simpleRpcProvider = bean.getClass().getAnnotation(SimpleRpcProvider.class);
        if (simpleRpcProvider == null) {
            //无注解直接return原始的bean
            return bean;
        }


        //缓存保存
        String serviceName = simpleRpcProvider.serviceInterface().getName();
        String version = simpleRpcProvider.serviceVersion();
        providerBeanMap.put(ServiceUtils.buildServiceKey(serviceName, version), bean);


        log.info("postProcessAfterInitialization find a simpleRpcProvider[" + serviceName + "," + version + "]");


        //将服务注册到注册中心
        String[] addressArray = address.split(ServiceUtils.SPLIT_CHAR);
        String host = addressArray[0];
        String port = addressArray[1];


        ServiceMetaConfig serviceMetaConfig = new ServiceMetaConfig();
        serviceMetaConfig.setAddress(host);
        serviceMetaConfig.setName(serviceName);
        serviceMetaConfig.setVersion(version);
        serviceMetaConfig.setPort(Integer.parseInt(port));


        try {
            serviceRegistry.register(serviceMetaConfig);
            log.info("register service success,serviceMetaConfig=" + serviceMetaConfig.toString());
        } catch (Exception e) {
            log.error("register service fail,serviceMetaConfig=" + serviceMetaConfig.toString(), e);
        }


        return bean;
    }
}
Copy the code

Netty ChannelPipeline design:

  • LengthFieldBasedFrameDecoder: decoder, solve the problem of custom length stick TCP package
  • RPCDecoder: decoder that parses RPC request parameter objects
  • SimpleRpcProviderNettyHandler: the actual RPC request processing logic, receives the request parameters, return the RPC response results
  • RPCEncoder: encoder that serializes RPC response result encoding and returns

Core logic to handle the RPC handler – SimpleRpcProviderNettyHandler

import com.summer.simplerpc.model.SimpleRpcRequest; import com.summer.simplerpc.model.SimpleRpcResponse; import com.summer.simplerpc.util.ServiceUtils; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.cglib.reflect.FastClass; import java.util.Map; @Slf4j public class SimpleRpcProviderNettyHandler extends SimpleChannelInboundHandler<SimpleRpcRequest> { /** * */ Private map <String, Object> handlerMap; / * * * * * @ constructor param handlerMap * / public SimpleRpcProviderNettyHandler (Map < String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcRequest simpleRpcRequest) throws Exception { SimpleRpcProviderBean.submit(() -> { log.debug("Receive rpc request {}", simpleRpcRequest.getBizNO()); SimpleRpcResponse simpleRpcResponse = new SimpleRpcResponse(); simpleRpcResponse.setBizNO(simpleRpcRequest.getBizNO()); try { Object result = doHandle(simpleRpcRequest); simpleRpcResponse.setData(result); } catch (Throwable throwable) { simpleRpcResponse.setMsg(throwable.toString()); log.error("handle rpc request error", throwable); } channelHandlerContext.writeAndFlush(simpleRpcResponse).addListener( (ChannelFutureListener) channelFuture -> log.info("return response for request " + simpleRpcRequest.getBizNO() + ",simpleRpcResponse=" + simpleRpcResponse)); }); } /** * by reflection, Execute the actual RPC request * @Param simpleRpcRequest * @return */ Private Object doHandle(simpleRpcRequest simpleRpcRequest) throws Exception { String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion()); if (handlerMap == null || handlerMap.get(key) == null) { log.error("doHandle,the provider {0} not exist,", simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion()); throw new RuntimeException("the provider not exist"); } log.info("doHandle,simpleRpcRequest=" + simpleRpcRequest.toString()); Object provider = handlerMap.get(key); FastClass FastClass = fastClass.create (provider.getClass()); return fastClass.invoke(fastClass.getIndex(simpleRpcRequest.getMethodName(), simpleRpcRequest.getParamTypes()), provider, simpleRpcRequest.getParamValues()); }}Copy the code

As I mentioned earlier, I’m implementing a framework that needs to be easy to integrate and use, so I’ll implement it as a SpringBoot starter:

import com.summer.simplerpc.model.RpcCommonProperty; import com.summer.simplerpc.registry.ServiceRegistry; import com.summer.simplerpc.registry.cache.ServiceProviderCache; import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache; import com.summer.simplerpc.registry.zk.ZkServiceRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j public class SimplerRpcProviderAutoConfiguration { @Bean public SimpleRpcProviderBean initRpcProvider() throws Exception { RpcCommonProperty rpcCommonProperty = new RpcCommonProperty(); RpcCommonProperty. SetServiceAddress (127.0.0.1: "50001"); RpcCommonProperty. SetRegistryAddress (127.0.0.1: "2181"); The info (" = = = = = = = = = = = = = = = = = = = SimplerRpcProviderAutoConfiguration init, rpcCommonProperty=" + rpcCommonProperty.toString()); ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache(); ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(rpcCommonProperty.getRegistryAddress(), serviceProviderCache);  return new SimpleRpcProviderBean(rpcCommonProperty.getServiceAddress(), zkServiceRegistry); }}Copy the code

IO

IO is mainly serialization and deserialization. There are many common sequence chemicals. Hessian is used here.

The server and the consumer will implement the encoder and decoder respectively, which will be added to netty’s ChannelPipeline. For details, see the server and the consumer.

Service consumer

Using this framework for service consumption, again through annotations, annotating a bean completes the reference to a service. You can make RPC calls as if you were using a local bean directly. All other operations are implemented by the RPC framework:

  • Scan all beans annotated with SimpleRpcConsumer
  • Redefine the BeanDefinition and re-inject the Spring container using proxy classes
  • Initiate an RPC service call, get the remote service details from the local cache or registry, and initiate a network call
  • Gets the service return result

SimpleRpcConsumer annotations

import org.springframework.stereotype.Component; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * RPC consumer annotation */ @Retention(retentionPolicy.runtime) @target (elementType.field) @Component public @interface SimpleRpcConsumer {/** * serviceVersion * @return */ String serviceVersion() default "1.0.0"; /** * registerType - default zk * @return */ String registerType() default "zookeeper"; /** * registerAddress * @return */ String registerAddress() default "127.0.0.1:2181"; }Copy the code

Generate a proxy class FactoryBean:

import com.summer.simplerpc.registry.ServiceRegistry; import com.summer.simplerpc.registry.cache.ServiceProviderCache; import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache; import com.summer.simplerpc.registry.zk.ZkServiceRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.FactoryBean; import java.lang.reflect.Proxy; / * * * generate RPC consumer proxy bean FactoryBean * / @ Slf4j public class SimpleRpcConsumerFactoryBean implements FactoryBean {/ * * * Private Class<? > interfaceClass; /** * Private String serviceVersion; /** * private String registryType; /** * private String registryAddress; /** * real bean */ private Object Object; /** * init method, Generate a bean using a dynamic proxy * * @throws Exception */ public void init() throws Exception {ServiceProviderCache ServiceProviderCache = new ServiceProviderLocalCache(); ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(registryAddress, serviceProviderCache); / / dynamic Proxy enclosing object = Proxy newProxyInstance (interfaceClass. GetClassLoader (), new Class <? >[] {interfaceClass}, new SimpleRpcInvokeHandler<>(this.serviceVersion, zkServiceRegistry)); log.info("SimpleRpcConsumerFactoryBean getObject {}", interfaceClass.getName()); } /** * Returns the created bean instance ** @return * @throws Exception */ @Override public Object getObject() throws Exception {return this.object; } /** * the type of the created bean instance ** @return */ @override public Class<? > getObjectType() { return interfaceClass; } /** * @override public Boolean isSingleton() {return true; } public void setInterfaceClass(Class<? > interfaceClass) { this.interfaceClass = interfaceClass; } public void setServiceVersion(String serviceVersion) { this.serviceVersion = serviceVersion; } public void setRegistryType(String registryType) { this.registryType = registryType; } public void setRegistryAddress(String registryAddress) { this.registryAddress = registryAddress; }}Copy the code

SimpleRpcInvokeHandler- The Handler that performs the actual network call:

import com.summer.simplerpc.model.SimpleRpcRequest; import com.summer.simplerpc.model.SimpleRpcResponse; import com.summer.simplerpc.registry.ServiceRegistry; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.UUID; */ @slf4j public class SimpleRpcInvokeHandler<T> implements InvocationHandler {/** * service version */ private String serviceVersion; /** * registry */ private ServiceRegistry ServiceRegistry; Public SimpleRpcInvokeHandler() {} public SimpleRpcInvokeHandler(String serviceVersion, ServiceRegistry serviceRegistry) { this.serviceVersion = serviceVersion; this.serviceRegistry = serviceRegistry; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { SimpleRpcRequest simpleRpcRequest = new SimpleRpcRequest(); simpleRpcRequest.setBizNO(UUID.randomUUID().toString()); simpleRpcRequest.setClassName(method.getDeclaringClass().getName()); simpleRpcRequest.setServiceVersion(this.serviceVersion); simpleRpcRequest.setMethodName(method.getName()); simpleRpcRequest.setParamTypes(method.getParameterTypes()); simpleRpcRequest.setParamValues(args); log.info("begin simpleRpcRequest=" + simpleRpcRequest.toString()); SimpleRpcConsumerNettyHandler simpleRpcConsumerNettyHandler = new SimpleRpcConsumerNettyHandler(this.serviceRegistry); SimpleRpcResponse simpleRpcResponse = simpleRpcConsumerNettyHandler.sendRpcRequest(simpleRpcRequest); log.info("result simpleRpcResponse=" + simpleRpcResponse); return simpleRpcResponse.getData(); }}Copy the code

Launched by SimpleRpcConsumerNettyHandler netty network call, the client’s netty ChannelPipeline simpler than the server:

The core is SimpleRpcConsumerNettyHandler:

import com.summer.simplerpc.io.RPCDecoder; import com.summer.simplerpc.io.RPCEncoder; import com.summer.simplerpc.model.SimpleRpcRequest; import com.summer.simplerpc.model.SimpleRpcResponse; import com.summer.simplerpc.registry.ServiceRegistry; import com.summer.simplerpc.registry.model.ServiceMetaConfig; import com.summer.simplerpc.util.ServiceUtils; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; /** * consumer netty handler */ @Slf4j public class SimpleRpcConsumerNettyHandler extends SimpleChannelInboundHandler < SimpleRpcResponse > {/ * * * * / private registry ServiceRegistry ServiceRegistry. /** * netty EventLoopGroup */ private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4); /** * netty channel */ private Channel channel; /** * rpc response */ private SimpleRpcResponse rpcResponse; /** * lock */ private final Object lock = new Object(); / * * * * * @ constructor param serviceRegistry * / public SimpleRpcConsumerNettyHandler (serviceRegistry serviceRegistry) { this.serviceRegistry = serviceRegistry; } /** * @param simpleRpcRequest request argument * @return */ public SimpleRpcResponse sendRpcRequest(SimpleRpcRequest simpleRpcRequest) { try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() AddLast (new RPCEncoder ()). AddLast (new RPCDecoder ()) / / by. The class for this type of instance (https://www.cnblogs.com/penglee/p/3993033.html)  .addLast(SimpleRpcConsumerNettyHandler.this); }}); String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion()); ServiceMetaConfig serviceMetaConfig = this.serviceRegistry.discovery(key); if (serviceMetaConfig == null) { log.error("sendRpcRequest fail,serviceMetaConfig not found"); throw new Exception("serviceMetaConfig not found in registry"); } log.info("sendRpcRequest begin,serviceMetaConfig=" + serviceMetaConfig.toString() + ",key=" + key); final ChannelFuture channelFuture = bootstrap.connect(serviceMetaConfig.getAddress(), serviceMetaConfig.getPort()) .sync(); channelFuture.addListener((ChannelFutureListener)args0 -> { if (channelFuture.isSuccess()) { log.info("rpc invoke success,"); } else { log.info("rpc invoke fail," + channelFuture.cause().getStackTrace()); eventLoopGroup.shutdownGracefully(); }}); this.channel = channelFuture.channel(); this.channel.writeAndFlush(simpleRpcRequest).sync(); synchronized (this.lock) { log.info("sendRpcRequest lock.wait"); this.lock.wait(); } log.info("get rpc response=" + rpcResponse.toString()); return this.rpcResponse; } catch (Exception e) { log.error("sendRpcRequest exception,", e); return null; } finally {// Close the connection if (this.channel! = null) { this.channel.close(); } if (this.eventLoopGroup ! = null) { this.eventLoopGroup.shutdownGracefully(); } } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcResponse simpleRpcResponse) throws Exception { this.rpcResponse = simpleRpcResponse; log.info("rpc consumer netty handler,channelRead0,rpcResponse=" + rpcResponse); Synchronized (lock) {log.info("channelRead0 simpleRpcResponse lock.notifyAll"); synchronized (lock) {log.info("channelRead0 simpleRpcResponse Lock. notifyAll"); lock.notifyAll(); }}}Copy the code

The starter definition:

import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rpc consumer starter */ @Configuration @Slf4j public class SimplerConsumerAutoConfiguration { @Bean public static BeanFactoryPostProcessor initRpcConsumer() throws Exception { return new SimpleRpcConsumerPostProcessor(); }}Copy the code

05

Integration and use of RPC framework

The RPC framework code described above is packaged by Springboot and installed into the local MVN repository, and then a new SpringBoot project is created to integrate and test.

MVN depends on:

< the dependency > < groupId > com. Summer < / groupId > < artifactId > simplerpc - starter < / artifactId > < version > 0.0.1 - the SNAPSHOT < / version > </dependency>Copy the code

The test service is simple. The parameter is a String, and the server constructs the return value: the input parameter is concatenated with a random UUUID String.

Service definition and implementation:

Public interface HelloworldService {/** * example method * @param param * @return */ String buildHelloworld(String param); }Copy the code

Service implementation:

import com.summer.simplerpc.annotation.SimpleRpcProvider; import com.summer.simplerpctest.consumer.HelloworldService; import lombok.extern.slf4j.Slf4j; import java.util.UUID; @simplerpcProvider (serviceInterface= HelloWorldService.class) @slf4j public class HelloworldServiceImpl implements HelloworldService { @Override public String buildHelloworld(String param) { log.info("HelloworldServiceImpl begin"); return param + "_" + UUID.randomUUID().toString(); }}Copy the code

We define a bean in which to initiate a call to the RPC service:

import com.summer.simplerpc.annotation.SimpleRpcConsumer;
import com.summer.simplerpctest.consumer.HelloworldService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;


/**
 * 发起对HelloWorldService调用示例
 */
@Slf4j
@Component
public class ConsumerSample {


    @SimpleRpcConsumer
    @Resource
    private HelloworldService helloworldService;


    public String invokeHelloworldService() {
        String result = helloworldService.buildHelloworld("qwert");
        return result;
    }
}
Copy the code

Then we open a Controller and launch the SpringBoot project so that we can launch the test directly in the browser:

import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; /** * test controller ** url: http://127.0.0.1:8004/helloworld/do * / @ RestController @ RequestMapping ("/helloworld ") public class TestController { @Resource private ConsumerSample consumerSample; @GetMapping(value = "/do") public String say(){ String helloServiceRes = consumerSample.invokeHelloworldService(); return helloServiceRes; }}Copy the code

To invoke the RPC service, enter the following URL in the browser:

http://127.0.0.1:8004/helloworld/do

Watch out for some front-loading:

  • Zk needs to be started.

IDEA console log printing:

The 09:27:22 2022-01-25. 30366-581 the INFO [nio - 8004 - exec - 1] C.S.S.C onsumer. SimpleRpcInvokeHandler: begin simpleRpcRequest=SimpleRpcRequest(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, The className = com. Summer. Simplerpctest. Consumer. HelloworldService, methodName = buildHelloworld, serviceVersion = 1.0.0, paramTypes=[class java.lang.String], ParamValues = [qwerty]) 2022-01-25 09:27:22. 698 INFO - 30366 [nio - 8004 - exec - 1] C.S.S.C.S impleRpcConsumerNettyHandler: sendRpcRequest begin,serviceMetaConfig=ServiceMetaConfig(name=com.summer.simplerpctest.consumer.HelloworldService, Version = 1.0.0, address = 127.0.0.1, Port = 50001), the key = com. Summer. Simplerpctest. Consumer. HelloworldService: 1.0.0 09:27:22 2022-01-25. 30366-715 the INFO [ntLoopGroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler : RPC invoke the success, the 2022-01-25 09:27:22. 30366-759 the INFO [nio - 8004 - exec - 1] C.S.S.C.S impleRpcConsumerNettyHandler: SendRpcRequest lock. Wait 09:27:22 2022-01-25. 30366-771 the INFO [the provider - the pool - 0] C.S.S.P.S impleRpcProviderNettyHandler : doHandle,simpleRpcRequest=SimpleRpcRequest(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, The className = com. Summer. Simplerpctest. Consumer. HelloworldService, methodName = buildHelloworld, serviceVersion = 1.0.0, paramTypes=[class java.lang.String], ParamValues = [qwerty]) 2022-01-25 09:27:22. 772 INFO - 30366 [the provider - the pool - 0] C.S.S.P rovider. HelloworldServiceImpl: HelloworldServiceImpl begin 2022-01-25 09:27:22.774 INFO 30366 -- [ntLoopgroup-3-1] c.s.s.p.SimpleRpcProviderNettyHandler : return response for request 46154373-2cf7-4731-b4c0-208d6ca28b87,simpleRpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, Data = qWERt_3CA010cc-cb14-49d0-b9d4-23168e7786e4) 2022-01-25 09:27:22.774 INFO 30366 -- [ntLoopgroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler : rpc consumer netty handler,channelRead0,rpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, Data = qWERt_3CA010CC-cb14-49d0-b9d4-23168e7786e4) 2022-01-25 09:27:22.775 INFO 30366 -- [ntLoopgroup-4-1] c.s.s.c.SimpleRpcConsumerNettyHandler : ChannelRead0 simpleRpcResponse Lock. NotifyAll 2022-01-25 09:27:22.775 INFO 30366 -- [NIO-8004-exec 1] c.s.s.c.SimpleRpcConsumerNettyHandler : get rpc response=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, Data = qWERt_3CA010CC-cb14-49d0-b9d4-23168e7786e4) 2022-01-25 09:27:22.776 INFO 30366 -- [NIO-8004-exec-1] c.s.s.consumer.SimpleRpcInvokeHandler : result simpleRpcResponse=SimpleRpcResponse(bizNO=46154373-2cf7-4731-b4c0-208d6ca28b87, msg=null, data=qwert_3ca010cc-cb14-49d0-b9d4-23168e7786e4)Copy the code

This article took me quite a long time to write out, look at these detailed code implementation, you should feel I want to teach you sincerity ~~~

If you feel useful, like + forward + favorites, one key three to prevent lost wow ~