background

Last time I looked at xxL-job, I found that its communication mechanism is to implement an HTTP server based on Netty. Then I found that I did not understand it very well, so I decided to implement a simple server that supports HTTP protocol and webSocket protocol to help me understand

Rely on

		<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.13</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0. 1</version>
        </dependency>
Copy the code

I prefer to use the 4.x version of Netty because netty 5.x seems to have been deprecated by netty authors. Some of the 5.x and 4.x apis may differ

Package structure

Implement WebSocketServer

  • WebSocketServer.java
@Slf4j
public class WebSocketServer {

    // /Users/weihu/Desktop/sofe/java/netty-student/netty-websocket/src/main/resources/WebSocketServer.html
    public static void main(String[] args) throws Exception{
        int port = args.length > 0 ? Integer.parseInt(args[0) :8080;
        new WebSocketServer().run(port);
    }

    public void run(int port) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("http-codec".new HttpServerCodec()) // HTTP codec processor
                                    // Multiple HTTP message parts are combined into a single complete HTTP message
                                    .addLast("aggregator".new HttpObjectAggregator(65536))
                                    // Support to send HTML5 messages to the client, mainly used to support the browser and server for Websocket communication, if only HTTP service does not need this processor
                                    .addLast("http-chunked".new ChunkedWriteHandler())
                                    // Core business logic processor
                                    .addLast("handler".newWebSocketServerHandler()); }}); Channel channel = bootstrap.bind(port).sync().channel(); log.info("Web socket or http server started at port: {}", port);
            log.info("open your browser and navigate to http://localhost:{}/",port);
            channel.closeFuture().sync();
        } finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }}}Copy the code

The code here is essentially a set of templates, and of course if you’re optimizing some of the network related parameters on the other hand, you can see that receiving and processing, the core business logic is all in the WebSocketServerHandler class

Business handler WebSocketServerHandler

  • WebSocketServerHandler.java
@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handshaker;

    @Override
    public void messageReceived(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // Traditional HTTP access
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        }
        / / the WebSocket connection
        else if (msg instanceofWebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); }}@Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
        log.info(I'm going to process the HTTP request);
        // An error is returned if HTTP decoding fails
        if(! req.getDecoderResult().isSuccess()) { sendHttpResponse(ctx, req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                    HttpResponseStatus.BAD_REQUEST));
            return;
        }

        // If the handshake is websocket
        if (("websocket".equals(req.headers().get("Upgrade")))) {
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                    "ws://localhost:8080/websocket".null.false);
            handshaker = wsFactory.newHandshaker(req);
            if (handshaker == null) {
                WebSocketServerHandshakerFactory
                        .sendUnsupportedWebSocketVersionResponse(ctx.channel());
            } else {
                handshaker.handshake(ctx.channel(), req);
            }
            return;
        }
        / / HTTP requests
        String uri = req.getUri();
        Map<String,String> resMap = new HashMap<>();
        resMap.put("method",req.getMethod().name());
        resMap.put("uri",uri);
        String msg = "< HTML > < head > < title > test < / title > < / head > < body > your request is:" + JSON.toJSONString(resMap) +"</body></html>";
        // Create an HTTP response
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1,
                HttpResponseStatus.OK,
                Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
        // Set the header information
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
        // Write HTML to the client
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);


    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        // Check whether it is the command to close the link
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(),
                    (CloseWebSocketFrame) frame.retain());
            return;
        }
        // Check whether it is a Ping message
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(
                    new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // This routine only supports text messages, not binary messages
        if(! (frameinstanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format(
                    "%s frame types not supported", frame.getClass().getName()));
        }

        // Return the reply message
        String request = ((TextWebSocketFrame) frame).text();
        log.info("{} receiver {}", ctx.channel(), request);
        ctx.channel().write(
                new TextWebSocketFrame(request
                        + ", welcome to Netty WebSocket service, now:
                        + DateUtil.now()));
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // Return the reply to the client
        if(res.getStatus().code() ! =200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),
                    CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpHeaders.setContentLength(res, res.content().readableBytes());
        }

        // If it is not keep-alive, close the connection
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if(! HttpHeaders.isKeepAlive(req) || res.getStatus().code() ! =200) { f.addListener(ChannelFutureListener.CLOSE); }}}Copy the code

In order to test WebSock easily, I write a simple HTML page here

  • WebSocketServer.html
<! DOCTYPEhtml>
<html>
<head>
    <meta charset="UTF-8">Netty WebSocket Time server</head>
<br>
<body>
<br>
<script type="text/javascript">
    var socket;
    if (!window.WebSocket)
    {
        window.WebSocket = window.MozWebSocket;
    }
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:8080/websocket");
        socket.onmessage = function(event) {
            var ta = document.getElementById('responseText');
            ta.value="";
            ta.value = event.data
        };
        socket.onopen = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = "Open WebSocket service ok, browser support WebSocket!";
        };
        socket.onclose = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = "";
            ta.value = "The WebSocket closed!";
        };
    }
    else
    {
        alert("Sorry, your browser does not support WebSocket protocol!");
    }

    function send(message) {
        if (!window.WebSocket) { return; }
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        }
        else
        {
            alert("WebSocket connection failed!"); }}</script>
<form onsubmit="return false;">
    <input type="text" name="message" value="Netty Best Practices"/>
    <br><br>
    <input type="button" value="Send a WebSocket request message" onclick="send(this.form.message.value)"/>
    <hr color="blue"/>
    <h3>The reply message returned by the server</h3>
    <textarea id="responseText" style="width:500px; height:300px;"></textarea>
</form>
</body>
</html>
Copy the code

test

We run it directlyWebSocketServerThe default port number is 8080

Let’s test the processing of HTTP requests, direct access to http://localhost:8080/index? query=1

So you can see that it worked and then we’re going to try the WebSocket test and we’re going to type us directly in the browserWebSocketServer.htmlAbsolute path of

You can seeWebSocketThe connection is ok, so let’s try sending a messageYou can see that the client has successfully received the data returned by the server. Let’s look at the log on the serverYou can see that the message is successfully received from the client

Xxl-job source code based on NETTY HTTP

Above we simple implementation of an HTTP, WebSocket demo, let’s take a look at the xxL-job source code is how to achieve

The core entrance isEmbedServerIn this class, let’s do a quick analysis

You can see the first two standard onesEventLoopGroup Then you can see that adding handler is similar to the demo we implemented above, except that it only supports HTTP so it doesn’tChunkedWriteHandlerThis handler, but he’s got one moreIdleStateHandler, NettyIdleStateHandlerThe heartbeat mechanism is used to check whether the remote end is alive. If the remote end is not alive or active, idle Socket connections are processed to avoid resource waste

Here his core implementation of HTTP requests is placedEmbedHttpServerHandlerThis class, let’s look at this class

EmbedHttpServerHandler is a static inner class for EmbedServer. And we implement WebSocketServerHandler similar, different first of all, he inherited the SimpleChannelInboundHandler specifies the generic FullHttpRequest represent only handles HTTP, Second, since xxl-job uses version 4.x of Netty, the abstract methods it needs to implement are also changed

        protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {}Copy the code

Let’s take a look atchannelRead0Method implementation

You can see that there is no difference with our implementation, the difference is that the processing request opened a thread pool, the core processing logic inprocessIn the

As you can see, it is also very simple if you do not support post requests directly, then add some token validation, then convert the request data to Java classes for some business logic processing and return

So far xxL-job communication source code is roughly finished analysis

conclusion

Can see if we don’t need a custom protocol, overall netty based out of the box is very easy to implement, let’s focus on business logic processing, if you want to customize the message body, add some codec, half a pack of processing, etc., or more troublesome, to realize a simple HTTP request or easier

reference

  • The definitive guide to Netty
  • XXL – job source code

Want to accept more original good articles please pay attention to the public number: small play technology