preface

Because some special scenarios in the project require external calls to a single POD in K8S, but HTTP cannot be used to call, so I thought of using TCP/IP to make a long connection, and then external and internal POD can communicate directly. So netty comes to mind.

What is a netty

I won’t go into details here, there should be a lot of introductions online. For example: What is Netty?

Train of thought

It is divided into two parts, one is the server, one is the client.

The service side

The idea is to mark up your own native methods in the same way as you would with SpringMVC, and then call the tagged methods in Netty’s Handle by reflection.

The client

Similar to the server, annotation reflection is used to invoke local write methods.

The configuration class

@Data
@Slf4j
@Configuration
@EnableConfigurationProperties(value = RpcConfigProperties.class)
public class RpcAutoConfiguration {
    
    @Bean
    @ConditionalOnProperty(prefix = "com.github.zy.netty.rpc.server", name = "enable", havingValue = "true")
    public RpcServerRunner serverBootstrap(RpcConfigProperties rpcConfigProperties, ServerHandler serverHandler) {
        return new RpcServerRunner(rpcConfigProperties, serverHandler);
    }
    
    @Bean
    @ConditionalOnProperty(prefix = "com.github.zy.netty.rpc.client", name = "enable", havingValue = "true")
    public RpcClientRunner clientBootstrap(ClientHandler clientHandler, RpcClient client, RpcConfigProperties configProperties) {
        return newRpcClientRunner(clientHandler, client, configProperties); }}Copy the code

RpcServerRunner

The idea here is to implement CommandLineRunner, which calls the implementation of this interface after SpringBoot is started, so netty initialization can be placed there.

@Slf4j
public class RpcServerRunner implements CommandLineRunner {

    private final RpcConfigProperties rpcConfigProperties;
    private final ServerHandler serverHandler;

    public RpcServerRunner(RpcConfigProperties rpcConfigProperties, ServerHandler serverHandler){
        this.rpcConfigProperties = rpcConfigProperties;
        this.serverHandler = serverHandler;
    }


    public void startServer(a) {
        if (rpcConfigProperties.getServer().isEnable()) {
            log.info("Start NettyServer...");
            int port = rpcConfigProperties.getServer().getPort();
            ServerBootstrap bootstrap = new ServerBootstrap();
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(2048.0.4.0.4));
                                ch.pipeline().addLast(new LengthFieldPrepender(4));
                                ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                                ch.pipeline().addLast(new MessagePacketDecoder());
                                ch.pipeline().addLast(new MessagePacketEncoder(FastJsonSerializer.INSTANCE));
                                ch.pipeline().addLast(new IdleStateHandler(0.0,rpcConfigProperties.getServer().getDisconnectInterval()));
                                ch.pipeline().addLast(newValidatePacketHandler()); ch.pipeline().addLast(serverHandler); }}); ChannelFuture future = bootstrap.bind(port).sync(); log.info("NettyServer[{}] started successfully...", port);
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                log.error("Start NettyServer [" + port + "] failure...", e);
            } finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}}@Override
    public void run(String... args) throws Exception {
        new Thread(this::startServer).start(); }}Copy the code

RpcClientRunner

On the client side, in the business I use, I need to call initialization manually, so there is an extra start method, which blocks, and the thread does not continue until the connection with the server is complete. If the use scenario allows asynchronism, then it can be modified to start directly in child threads like RpcServerRunner.

@Slf4j
public class RpcClientRunner implements CommandLineRunner {

    private final ClientHandler clientHandler;
    private final RpcClient client;
    private final RpcConfigProperties configProperties;

    public RpcClientRunner(ClientHandler clientHandler, RpcClient client, RpcConfigProperties configProperties) {
        this.clientHandler = clientHandler;
        this.client = client;
        this.configProperties = configProperties;
    }

    public void doStart(a) {
        client.startClient(clientHandler);
    }


    @Override
    public void run(String... args) throws Exception {
        new Thread(this::doStart).start();
    }

    /** * todo is triggered manually, and the asynchronous listener is closed. After the connection channel is established, the future is returned, and the user uses future.isSuccess to determine whether the thread has been started successfully (blocking thread) */
    public void start(a) {
        // To prevent multiple calls to this method, go to multiple launches
        if (client.getChannelFuture() == null| |! client.getChannelFuture().isSuccess()) { doStart(); waitStart();// Asynchronous listening is closedsyncClose(); }}private void syncClose(a) {
        // Asynchronous listening is closed
        new Thread(() -> {
            try {
                client.getChannelFuture().channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("Abnormal client connection", e); }}); }private void waitStart(a) {
        // Maximum wait blocking time
        long startWaitTime = configProperties.getClient().getStartWaitTime();

        // Start counting time
        long startTime = System.currentTimeMillis();
        for(; ;) {long currentTime = System.currentTimeMillis() - startTime;

            // If the current time is longer than the time to start counting, then the wait is terminated
            if(currentTime > startWaitTime){
                break;
            }

            boolean startSuccess = false;
            if(client.getChannelFuture() ! =null) {
                startSuccess = client.getChannelFuture().isSuccess();
            }

            if (startSuccess) {
                log.debug("The connection to the Netty server is successful. Exit waiting.");
                break;
            } else {
                log.debug("Connection not ready, thread blocked.");
            }
            try {
                Thread.sleep(configProperties.getClient().getStartWaitIntervalTime());
            } catch(InterruptedException e) { log.error(e.getMessage(), e); }}}}Copy the code

emphasis

1: How does the server receive messages

  1. Define the @rpcService annotation to mark the local business class, similar to @Controller for SpringMVC. This annotation is primarily intended to identify the business class.
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RpcService {

}
Copy the code
  1. Define @rpcMapping annotations to mark business methods in business classes, similar to @requestMapping in SpringMVC.
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcMapping {

    String url(a);
}
Copy the code
  1. The received message is reflected in netty’s Handle to invoke a locally flagged method.
@Slf4j
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<DefaultMessagePacket> {

    private final SessionManager sessionManager;
    private final ServerMessageHandle serverMessageHandle;
    private final RpcConfigProperties configProperties;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DefaultMessagePacket msg) throws Exception {
        serverMessageHandle.handle(ctx, msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if(IdleState.READER_IDLE == state){
                // If no client data is received at the specified time (if there is a heartbeat interaction, it will not come here) then close the client channelctx.close(); }}}@Override
    public void channelActive(ChannelHandlerContext ctx) {
        ChannelHandlerContextUtil contextUtil = ChannelHandlerContextUtil.INSTANCE;
        String ip = contextUtil.getIp(ctx);
        int port = contextUtil.getPort(ctx);
        log.debug("Establish connection with client, destination IP: {}, port: {}", ip, port);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        // Clear the session when you disconnect
        ChannelHandlerContextUtil contextUtil = ChannelHandlerContextUtil.INSTANCE;
        String ip = contextUtil.getIp(ctx);
        int port = contextUtil.getPort(ctx);
        //sessionManager.delete(SessionHelper.getSessionId(configProperties.getClient().getSystemId(), ip));
        ctx.close();
        log.error("Disconnect from client, destination IP: {}, port: {}", ip, port);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("Server exception,", cause); }}Copy the code
@RequiredArgsConstructor
public class ServerMessageHandle {

    private final List<ServerMessageResolverStrategy> strategies;

    public void handle(ChannelHandlerContext ctx, DefaultMessagePacket msg) {
        Optional<ServerMessageResolverStrategy> optional = strategies.stream().filter(strategie -> strategie.support(msg.getMessageType())).findFirst();
        if(optional.isPresent()) { optional.get().resolver(ctx, msg); }}}Copy the code

The serviceMessageHandle provides a policy for handling client requests, pings, and message reporting. See the code on Github for details.

2: How does the client receive messages

  1. Define the @rpCClient annotation to mark the client’s processing class.
  2. Use @rpcMapping to mark the local method, and reflection calls the marked method in Netty’s Handle.
@RequiredArgsConstructor
@Slf4j
@ChannelHandler.Sharable
public class ClientHandler extends SimpleChannelInboundHandler<DefaultMessagePacket> {


    private final ClientMessageHandle clientMessageHandle;
    private final RpcConfigProperties configProperties;
    private final RpcClient client;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DefaultMessagePacket packet) throws Exception {
        clientMessageHandle.handle(ctx, packet);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ClientChannelHolder.CHANNEL_REFERENCE.set(ctx.channel());
        // Carry your moduleId to the server when establishing the connection
        DefaultMessagePacket packet = MessagePacketBuilder.buildBasicReportModuleId().systemId(configProperties.getClient()
                .getSystemId()).targetIp(IPUtil.getAddress()).build();
        ctx.channel().writeAndFlush(packet);
        ChannelHandlerContextUtil contextUtil = ChannelHandlerContextUtil.INSTANCE;
        String ip = contextUtil.getIp(ctx);
        int port = contextUtil.getPort(ctx);
        log.info("Establish a connection with the server.... ip : {}, port : {}", ip, port);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();

            switch (state) {
                case READER_IDLE:
                    // Close the connection, and trigger the closed callback function
                    ctx.close();
                    break;
                case WRITER_IDLE:
                    // Set the time when there is no write operation
                    break;
                case ALL_IDLE:
                    // Send a heartbeat to the server if there is no read or write operation at the specified time
                    sendHeartbeatPacket(ctx);
                    break;
                default:
                    break; }}}@Override
    public void channelInactive(ChannelHandlerContext ctx) {
        // This method is triggered when the connection is closed and can be used to reconnect to the server
        ctx.channel().eventLoop().schedule(() -> {
            client.startClient(this);
        }, configProperties.getClient().getDisconnectRetryInterval(), TimeUnit.SECONDS);
        log.error("Disconnection from server...");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("Unknown exception occurred....", cause);
    }


    private void sendHeartbeatPacket(ChannelHandlerContext ctx) { DefaultMessagePacket packet = MessagePacketBuilder.buildBasicPing().build(); ctx.writeAndFlush(packet); }}Copy the code

3: How does the server send messages

Since the relationship between the server and the client is one-to-many, there may be multiple clients connected to the server at the same time, so we need a session to save the session connection information between the client and the server, so we define a SessionManage and SessionStorage

public interface SessionManager {

    /** * Get session ** based on sessionId@param sessionId
     * @return* /
    Session findOne(String sessionId);

    /** * get all sessions **@return* /
    List<Session> findAll(a);

    /** * Removes a session **@param sessionId
     * @return* /
    void delete(String sessionId);

    /** * Removes a batch of sessions **@param sessionIds
     * @return* /
    void delete(List<String> sessionIds);

    /** * Save a single session **@param session
     */
    void save(Session session);

    /** * Save a batch of sessions **@param sessions
     */
    void save(Iterable<Session> sessions);

}
Copy the code
/** * session storage policy *@version 1.0 created by zy on 2020/4/26 14:50
 */
public interface SessionStorage {

    /** * get a session *@param sessionId systemId + ip
     * @return* /
    Session findOne(String sessionId);


    /** * get all sessions *@return* /
    List<Session> findAll(a);
    /** * Remove a session *@param sessionId systemId + ip
     * @return* /
    void delete(String sessionId);

    /** * Remove a batch of sessions *@param sessionIds systemId + ip
     * @return* /
    void delete(List<String> sessionIds);

    /** * Save a single session *@param session
     */
    void save(Session session);

    /** * Save a batch of sessions *@param sessions
     */
    void save(Iterable<Session> sessions);
}
Copy the code

4: How does the client send messages

To define the @rpCreQuestClient annotation tag interface, just like @feginclient, you need to match the URL mapping to the @rpcMapping mapping on the server side

/ * * *@version 1.0 created by zy on 2020/4/26 9:45
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcRequestClient {

    RpcRequestClientType targetType(a) default RpcRequestClientType.SERVER;

}
Copy the code
/ * * *@version 1.0 created by zy on 2020/5/24 17:37
 */
@RpcRequestClient
public interface ServerRequestClient {

    @RpcMapping(url = "/hello-server")
    ServerResponse helloServer(ClientRequest request);
}
Copy the code

use

@RestController
public class TestController {

    @Autowired
    private ServerSendTemplate serverSendTemplate;
    @Autowired
    private SessionManager sessionManager;
    @Autowired
    private ServerRequestClient serverRequestClient;

    @GetMapping("/hello-client")
    public ClientResponse helloClient(ServerRequest serverRequest) {
        return serverSendTemplate.sendToClient(serverRequest.getSystemId(), serverRequest.getIp(), serverRequest.getUrlMapping(), serverRequest, ClientResponse.class);
    }

    @GetMapping("/hello-server")
    public ServerResponse helloServer(ClientRequest clientRequest) {
        return serverRequestClient.helloServer(clientRequest);
    }

    @GetMapping("/sessions")
    public List<Session> sessions(a) {
        returnsessionManager.findAll(); }}Copy the code

And then visit http://localhost:8080/hello-server, the server sends the request.

accesshttp://localhost:8080/hello-client, to the client sends a request

SessionManager findAll (sessionManager) findAll (sessionManager) findAll (sessionManager) findAll (sessionManager)

The above –

It only introduces the general implementation ideas and part of the code. For more details, please go to Github. If you have good advice and write bad put words, please also advise ~