Custom annotations enable service registration and discovery

What is the RPC

Remote Procedure Call Protocol (RPC) calls a remote service just as if it were a local service, regardless of the details of the call

RPC principle

The program to realize RPC communication consists of five parts: RPC-client, client proxy, Socket, server proxy, and RPC-server

request

  • Client: When the RPC-client initiates a remote call, it actually serializes the interface, method, parameter, and parameter type to be called by the client proxy, and then sends the instance encapsulating the call parameter to the server through the socket in real time.
  • Server: The socket receives information from the client, deserializes it, and delegates it to a concrete implementation object

response

  • Server: After the target method is executed, it returns the result to the socket
  • Client: After receiving the result, the socket returns the result to the Rpc-client

The technology applied

  • java
  • spring
  • serialization
  • socket
  • reflection
  • A dynamic proxy

GitHub address of the project

Github.com/autumnqfeng…

Original address of Blog

Autumn200.com/2020/06/21/…

Server project

The project structure

The RPC-server project consists of two sub-projects: order-API and order-provider

Order-api holds request interface and RpcRequest (entity class with class name, method name, parameter)

Order-provider refers to classes related to request interface implementation, socket, and proxy

order-api

order-provider

The service registry

The key to making dynamic calls to ServiceImpl is to manage the Service classes. How do we manage these service classes?

We can use spring’s @Service annotation to customize the Service registration. We define an annotation @rpCremoteservice on the ServiceImpl class and save the annotation’s class name and method name in the Map to locate the implementation class.

@RpcRemoteServiceannotations
/** * Server service discovery annotation **@author: * * * *@date: 2020/6/21 16:21
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcRemoteService {
}
Copy the code
Service Registration classInitialMerdiator

After the Spring container is initialized, scan the @rpCremoteservice class and save it in mediator.routing.

/** * Initializes the intermediate proxy layer object **@author: * * * *@date: 2020/6/21 direction * /
@Component
public class InitialMerdiator implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // The bean tagged with service publication is published remotely
        if (bean.getClass().isAnnotationPresent(RpcRemoteService.class)) {
            Method[] methods = bean.getClass().getDeclaredMethods();
            for (Method method : methods) {
                String routingKey = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
                BeanMethod beanMethod = newBeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.ROUTING.put(routingKey, beanMethod); }}returnbean; }}Copy the code

The socket listening

Socket listens for client requests

Socket to start the classSocketServer

After the Spring container is loaded, start the socket

/** * After the spring container is started, a ContextRefreshedEven will be published@author: * * * *@date: 2020/6/21 16:51 * /
@Component
public class SocketServer implements ApplicationListener<ContextRefreshedEvent> {
    private final ExecutorService executorService= Executors.newCachedThreadPool();

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        ServerSocket serverSocket=null;
        try {
            serverSocket = new ServerSocket(8888);
            while (true) {
                Socket accept = serverSocket.accept();
                // The thread pool handles the socket
				executorService.execute(newProcessorHandler(accept)); }}catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(serverSocket ! =null) {
                try {
                    serverSocket.close();
                } catch(IOException e) { e.printStackTrace(); }}}}Copy the code
The socket handle classProcessorHandler

Process each socket monitored

public class ProcessorHandler implements Runnable {

    private Socket socket;

    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run(a) {
        ObjectInputStream inputStream = null;
        ObjectOutputStream outputStream = null;
        try {
            inputStream = new ObjectInputStream(socket.getInputStream());
            // deserialize
            RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();

            // The intermediate proxy executes the target method
            Mediator mediator = Mediator.getInstance();
            Object response = mediator.processor(rpcRequest);
            System.out.println("Server execution result:"+response);

            outputStream = new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(response);
            outputStream.flush();

        } catch (Exception e) {
            e.printStackTrace();
        } finally{ closeStream(inputStream, outputStream); }}private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
        / / close the flow
        if(inputStream! =null) {try {
                inputStream.close();
            } catch(IOException e) { e.printStackTrace(); }}if(outputStream! =null) {try {
                outputStream.close();
            } catch(IOException e) { e.printStackTrace(); }}}Copy the code

Server agent

Mediator
/** * The proxy layer between the server socket and the target method **@author: * * * *@date: 2020/6/21 25 * /
public class Mediator {

    /** The instance used to store the published service (the route to the service invocation) */
    public static Map<String, BeanMethod> ROUTING = new ConcurrentHashMap<>();

    /** The singleton creates the proxy layer instance */
    private volatile static Mediator instance;

    private Mediator(a) {}public static Mediator getInstance(a) {
        if (instance == null) {
            synchronized (Mediator.class) {
                if (instance == null) {
                    instance = newMediator(); }}}return instance;
    }

    public Object processor(RpcRequest rpcRequest) {
        / / routing key
        String routingKey = rpcRequest.getClassName() + "." + rpcRequest.getMethodName();
        BeanMethod beanMethod = ROUTING.get(routingKey);
        if (beanMethod == null) {
            return null;
        }
        // Execute the target method
        Object bean = beanMethod.getBean();
        Method method = beanMethod.getMethod();
        try {
            return method.invoke(bean, rpcRequest.getArgs());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null; }}Copy the code
BeanMethod
/** * When the middle layer reflection calls, stores the target method, the target class entity **@author: * * * *@date: 2020/6/21 departed * /
public class BeanMethod {

    private Object bean;

    private Method method;

    // Skip setter and getter
}
Copy the code

Client project

The project structure

Service discovery

The service finds that we also use annotations to do this. We need to follow the @AutoWired principle in Spring to customize the @RPcreference annotation, define it on the field, and inject the interface implementation’s proxy class into that field.

@RpcReferenceannotations
/** * Service injection annotation **@author: * * * *@date: 2020/6/20 22:41 * /
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcReference {

Copy the code
Service discovery ClassReferenceInvokeProxy

Before spring container initialization, scan all fields of the @RPCreference annotation tag in the bean.

/** * Remotely dynamically invoke the service proxy **@author: * * * *@date: 2020/6/20 agony * /
@Component
public class ReferenceInvokeProxy implements BeanPostProcessor {

    @Autowired
    private RemoteInvocationHandler invocationHandler;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Field[] fields = bean.getClass().getDeclaredFields();
        for (Field field : fields) {
            if (field.isAnnotationPresent(RpcReference.class)) {
                field.setAccessible(true);
                // Set the RpcReference annotated field to a proxy value
                Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), newClass<? >[]{field.getType()}, invocationHandler);try {
                    // Set up a proxy for the annotation with RpcReference. The implementation of this proxy is invocationHandler
                    field.set(bean, proxy);
                } catch(IllegalAccessException e) { e.printStackTrace(); }}}returnbean; }}Copy the code

Client agent

Client dynamic proxyInvocationHandlerThe implementation classRemoteInvocationHandler

Encapsulate the target method name, target class name, and parameter information into RpcRequest, and then hand it to the socket to send to the server.

/ * * *@author: * * * *@date: 2020/6/20 ahaziah * /
@Component
public class RemoteInvocationHandler implements InvocationHandler {

    @Autowired
    private RpcNetTransport rpcNetTransport;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setTypes(method.getParameterTypes());
        rpcRequest.setArgs(args);
        returnrpcNetTransport.send(rpcRequest); }}Copy the code

The client socket

Network transmissionRpcNetTransport
/** * RPC socket network transmission **@author: * * * *@date: 2020/6/20 space of * /
@Component
public class RpcNetTransport {
    @Value("${rpc.host}")
    private String host;
    @Value("${rpc.port}")
    private int port;


    public Object send(RpcRequest rpcRequest) {
        ObjectOutputStream outputStream = null;
        ObjectInputStream inputStream = null;
        try {
            Socket socket = new Socket(host, port);
            // Send target method information
            outputStream = new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(rpcRequest);
            outputStream.flush();
			// Receive the return value
            inputStream = new ObjectInputStream(socket.getInputStream());
            return inputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            closeStream(inputStream, outputStream);
        }
        return null;
    }

    private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
        / / close the flow
        if(inputStream! =null) {try {
                inputStream.close();
            } catch(IOException e) { e.printStackTrace(); }}if(outputStream! =null) {try {
                outputStream.close();
            } catch(IOException e) { e.printStackTrace(); }}}}Copy the code