Source code repository github.com/zhshuixian/…

Remote Proceduce Call (RPC) is generally used to implement method calls between systems deployed on different machines, enabling programs to access Remote system resources over the network just as they access local system resources.

Here will use Netty to write a very simple RPC program, the project is roughly schematic as follows:

Create a new sub-project 03-Netty-RPC based on the previous project. See GitHub’s project repository for dependencies and Maven configuration for the project.

Protocol RPC request Protocol

Create a new class, rpcProtocol. Java, to define the format of the data for RPC requests. The remote procedure call encapsulates the data that must be transmitted over the Netty network and then serializes it as a binary data stream. Here the Java native serialization and Netty own object coding and decoding.

TODO uses Protobuf/kyro to serialize and encapsulate the returned data uniformly

/** * The format of the RPC request must be */
@Data
public class RpcProtocol implements Serializable {
    private static final long serialVersionUID = 5933724318897197513L;
    /** * The actual implementation class in the remote service is invoked by the server based on the interface name
    private String interfaceName;
    /** * the interface method name */
    private String methodName;
    /** * Parameter value */
    private Object[] paramValues;
    /** * Parameter type */
    privateClass<? >[] paramTypes; }Copy the code

2. Provider server

Provider The Provider server consists of three classes

  • Provider: entry class of the server to start the Netty service
  • ProviderHandler: Invokes the specific service registered with the Provider on request and returns the result written to the ChannelHandlerContext
  • ProviderRegister: Uses ConcurrentHashMap to save the provider-registered service and its object instances

ProviderRegister, which provides two main methods, addService and getService, to register a service with the Provider and get a service from a HashMap.

public class ProviderRegister {
    /** * The service name and its object */
    private static final Map<String, Object> SERVICE_MAP = new ConcurrentHashMap<>();

    /** * Add the RPC Provider service */
    public <T> void addService(T service, Class<T> clazz) {
        // getCanonicalName() gets the formatted output of the passed class defined from the Java language specification
        String serviceName = clazz.getCanonicalName();
        log.info("Add service, name is {}", serviceName);
        if(! SERVICE_MAP.containsKey(serviceName)) {// Add the service name and its object to the SERVICE_MAPSERVICE_MAP.put(serviceName, service); }}/** * Obtain the RPC Provider service */
    public Object getService(String serviceName) {
        Object service = SERVICE_MAP.get(serviceName);
        if (service == null) {
            log.debug("No PRC service found");
            return null;
        }
        log.info("Find service {}", serviceName);
        returnservice; }}Copy the code

ProviderHandler, according to the interface name in the RpcProtocol request data, obtains its corresponding service and writes the result back.

@Slf4j
public class ProviderHandler extends ChannelInboundHandlerAdapter {
    private final ProviderRegister register = new ProviderRegister();
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Object result;
        RpcProtocol rpcProtocol = (RpcProtocol) msg;
        try {
            // Find the service from provider
            Object service = register.getService(rpcProtocol.getInterfaceName());
            // Get the specific method from the service based on the method name and the type of argument passed in
            Method method = service.getClass().getMethod(rpcProtocol.getMethodName(),
                    rpcProtocol.getParamTypes());
            // Execute this method
            result = method.invoke(service, rpcProtocol.getParamValues());
            // Return the result
            ctx.writeAndFlush(result);
            log.info("Service name: {}, the method called is {}", rpcProtocol.getInterfaceName(), rpcProtocol.getMethodName());
        } catch (NoSuchMethodException | IllegalArgumentException |
                InvocationTargetException | IllegalAccessException e) {
            log.error("Service not found or service error");
        } finally{ ctx.flush(); ctx.close(); }}@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

Provider, the portal class on the server side, starts the Netty service, which is almost the same as in the previous two demos, with the encoding and decoder replaced and the ProviderHandler registered to handle specific events.

@Slf4j
public class Provider {
    private final int port;
    private final String host;
    private final ProviderRegister register = new ProviderRegister();

    public Provider(String host, int port) {
        this.port = port;
        this.host = host;
    }
    /** * Start Netty service, similar to the previous demo, difference is encoder and decoder */
    public void start(a) {
        log.info("Start RPC service at {} port number: {}", host, port);
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // The connection timeout period. If the connection cannot be established after the timeout period, the connection fails
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    // TCP enables the Nagle algorithm by default. This algorithm is used to send large data as fast as possible and reduce network transmission.
                    The TCP_NODELAY parameter controls whether the Nagle algorithm is enabled.
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    // Whether to enable the TCP underlying heartbeat mechanism
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // Indicates the maximum length of the queue used by the system to temporarily store requests that have completed the three-way handshake. If connections are established frequently, the server will be slow to create new connections.
                    // You can adjust this parameter appropriately
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // Channel binds the ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            TODO uses Protobuf or Kryo for serialization
                            // Object decoder
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                            // The encoder of the object
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(newProviderHandler()); }});// use bind to listen on host and port
            ChannelFuture future = bootstrap.bind(host, port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("Error starting service :", e);
        } finally {
            log.info("Close bossGroup and workerGroup"); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}/** * PRC Provider registers the service **@paramService service *@paramClazz service interface defined classes *@param<T> Service concrete implementation class */
    public <T> void addService(T service, Class<T> clazz) { register.addService(service, clazz); }}Copy the code

3. Consumer client

  • Consumer: Connects to the Provider and sends requests
  • ConsumerHandler: Processes the data returned by the server
  • ConsumerProxy: Uses InvocationHandler to handle method calls to dynamic proxy objects
// Extract the result
public class ConsumerHandler extends ChannelInboundHandlerAdapter {
    private Object result;

    public Object getResult(a) {
        return result;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { result = msg; }}Copy the code

Consumer, use Netty to communicate with Provider, basically using the same method as Netty client.

public class Consumer {
    private final int port;
    private final String host;
    private final RpcProtocol protocol;

    public Consumer(String host, int port, RpcProtocol protocol) {
        this.port = port;
        this.host = host;
        this.protocol = protocol;
    }

    public Object start(a) throws InterruptedException {
        // TODO Netty connections are multiplexed to extract these services
        EventLoopGroup group = new NioEventLoopGroup();
        ConsumerHandler consumerHandler = new ConsumerHandler();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    // The connection timeout period. If the connection cannot be established after the timeout period, the connection fails
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    // Whether to enable the TCP underlying heartbeat mechanism
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    // TCP enables the Nagle algorithm by default. This algorithm is used to send large data as fast as possible and reduce network transmission. The TCP_NODELAY parameter controls whether the Nagle algorithm is enabled.
                    .option(ChannelOption.TCP_NODELAY, true)
                    // Channel binds the ChannelPipeline
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            // Object parameter type decoder
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
                            // Object parameter type encoder
                            pipeline.addLast(newObjectEncoder()); pipeline.addLast(consumerHandler); }});// Link to the server and use ChannelFuture to receive the returned data
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // Send the request
            future.channel().writeAndFlush(protocol).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
        returnconsumerHandler.getResult(); }}Copy the code

ConsumerProxy, which implements the InvocationHandler interface, actually calls its invoke() method when invoking the dynamic proxy method.

public class ConsumerProxy implements InvocationHandler {
    private final String host;
    private final int port;

    public ConsumerProxy(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clazz) {
        // This means that an instance of ConsumerProxy is passed in and the dynamic proxy object is invoked using the consumerProxy.invoke () method.
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), newClass<? >[]{clazz},this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // Encapsulate a request into an RPC
        RpcProtocol protocol = new RpcProtocol();
        // Get the class name of the method
        protocol.setInterfaceName(method.getDeclaringClass().getName());
        / / the method name
        protocol.setMethodName(method.getName());
        // The type of argument passed to the method
        protocol.setParamTypes(method.getParameterTypes());
        // The method passes in the actual value of the argument
        protocol.setParamValues(args);
        Start PRC Consumer
        Consumer consumer = new Consumer(host, port, protocol);
        returnconsumer.start(); }}Copy the code

4. Use this PRC program

  • Interfaces: Interface classes defined by the client and server services
  • Provider: server-side service and implementing interfaces classes
  • Consumer: client, using the PRC program of this project to invoke the service side of the server

Interfaces: Defines interface services

public interface HelloService {
    /** returns a string */
    String hello(String username);
}
Copy the code
@Data
@AllArgsConstructor
public class Message implements Serializable {
    private static final long serialVersionUID = -4268077260739000146L;
    private int code;
    private String message;
}
public interface MessageService {
    ** returns a custom Java object */
    Message sayMessage(String name);
}
Copy the code

Provider: registers RPC services and listens to RPC requests

package org.xian.rpc.test.provider
// Implement the corresponding interface helloServiceImp.java
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String username) {
        return "Hi "+ username; }}// MessageServiceImpl.java
public class MessageServiceImpl implements MessageService {
    @Override
    public Message sayMessage(String name) {
        return new Message(200."Your name is "+ name); }}// Start PRC server, providerApplication.java
public class ProviderApplication {
    public static void main(String[] args) {
        Provider server = new Provider("127.0.0.1".8080);
        // Register HelloService and MessageService services
        server.addService(new HelloServiceImpl(), HelloService.class);
        server.addService(new MessageServiceImpl(), MessageService.class);
        // Start RPC serverserver.start(); }}Copy the code

Consumer: Invokes the RPC remote service

package org.xian.rpc.test.consumer;
@Slf4j
public class ConsumerApplication {
    public static void main(String[] args) {
        ConsumerProxy proxy = new ConsumerProxy("127.0.0.1".8080);
        // Generate dynamic proxy objects
        HelloService helloService = proxy.getProxy(HelloService.class);
        // Invoke (Object Proxy, Method Method, Object[] args)
        String result = helloService.hello("xian");
        log.info("HelloService call results in {}", result);

        MessageService messageService = proxy.getProxy(MessageService.class);
        Message message = messageService.sayMessage("xiaoxian");
        log.info("The result of MessageService call is {}", message.toString()); }}Copy the code

In this example, a very simple RPC service was implemented using Netty, and of course there are many optimization points that need to be optimized, such as reuse of client connections, no Zookeeper or other service discovery, etc.

Reference: Netty 4 Core Principles and Writing RPC Framework Practice

github.com/Snailclimb/guide-rpc-framework