This article mainly writes the personal example design realization idea, if think part of the knowledge is not detailed enough to suggest to search by oneself, also can refer to my previous Web architecture Reactor thread model knowledge collate article.

preface

It’s time again, I wanted to drag Websocket to see if I can write other content, but I don’t want to write the content of the rotten street, I think it’s ok to write the content of the rotten street on my static blog, I really can’t think of any good other content to write until now. Had to pick up the end of the strategy pattern to continue to write the WebSocket chat room example. This article is mainly dry goods, supplemented by wet goods, the last article of this year, used for the last working day of the evil year to end the work (the pride of the fish party).

Introduction of WebSocket

WebSocket is a protocol for full duplex communication over a single TCP connection. In the WebSocket API, the browser and server only need to shake hands with Http 101 status code to create a persistent connection and carry out bidirectional data transmission. Image point metaphor is the difference between voice communication and voice information, voice communication as long as one of the parties does not close the two sides can always be BB, voice information is generally you sentence I sentence. The main difference between WebSocket and Http is that one is a persistent connection and the other is a one-time connection, which shows the following advantages of WebSocket:

  • Real-time: client and server can send messages in real time at any time. Common application practices are common, such as chat room and server message push
  • Reduced overhead: Much less overhead than using Http for long polling of message pushes
  • .

To save space, the WebSocket connection setup diagram for this example is omitted:It can be seen that when the WebSocket connection is established, the client will send an Http status code of 101. The Http 101 status code requests to switch the protocol, and only a more advanced protocol can be switched. In this example, the upgrade in Response Header is switched to the WebSocket protocol.

Part of the Websocket client API:

Let socket = new WebSocket("ws://localhost:9000/chat"); // Check whether the websocket status is connected by CONNECTING connections, CLOSING, or CLOSING events. Onmessage = function(event) {socket. Onmessage = function(event) {socket. Onclose = function(event) {socket function (event) { console.log("websocket close"); // Send a message to the server socket.send('hello world');Copy the code

I don’t want to embarrass you anymore because it’s front-end crap.

Netty

Netty is said to be a high performance network application framework, high performance application common options, the most common and common way of back-end WebSocket implementation, but the question is how Netty is implemented? Before this, I think I need to understand the Web request processing architecture, I/O multiplexing and Reactor(Responsive) threading model.

Netty pre-knowledge

This part is mainly from the sorting out of the knowledge found during learning. Please inform us if there is any violation.

Web request processing architecture

Every use of a Web application looks like you’re asking for a Web response, but the processing structure of the request can actually be divided into two types:

  • A thread-based architecture typically uses multiple threads to process client requests. Each time a request is received, a separate thread is started to process it. This architecture is also known as the traditional Web request processing architecture.
  • Event-driven architecture is a widely used approach that defines a series of event handlers to respond to events and separates server acceptance of connections from the processing of events. An event is a state change, such as the new incoming Connection, Ready for Read, or Ready for Write of a SOCKET in TCP.

I/O multiplexing

I/O multiplexing can be explained literally simply in terms of network programming: I/O generally refers to network I/O, multiplexing refers to multiple TCP connections, multiplexing refers to the reuse of one or a small number of threads, and linking refers to multiple (descriptor -fd)I/ OS that process multiple connections by using one thread repeatedly. General I/O multiplexing mechanisms rely on an Event Demultiplexer object that separates THE I/O events from the Event source and distributes them to the corresponding Read /write Event Handler. This event multiplexer looks familiar, and I think that the Web event-driven processing architecture and Netty are just implementations of I/O multiplexing. Linux supports I/O multiplexing system call modules such as SELECT, poll, and epoll. The reason for the length is……

Reactor threading model

The Reactor Thread model is an event-processing pattern for processing service requests that are concurrently passed by one or more inputs to a service handler, which decompresses incoming requests and dispatches them synchronously to the relevant request handlers. Reactor thread model is an implementation model of Web event-driven architecture, which is widely used in software programming based on I/O multiplexing mechanism. Netty and Redis all use this model to solve high performance concurrency problems. Reactor model is mainly divided into the following three roles:

  • Reactor: Assigns I/O events to the corresponding handler
  • Acceptor: Processes client connection events
  • Handler: Binds itself to events to handle non-blocking tasks

There are three commonly used Reactor thread models: single Reactor single thread model, single Reactor multi-thread model, master and slave multi-thread model. Netty, as the framework of this thread model, naturally provides the Settings of the above three models. The Reactor model for the primary and secondary multithreading model is shown below:

Introduction of Netty

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high-performance protocol servers and clients (fromwebsite), and combined with many protocols, greatly reducing the time developers spend on network programming. As for the implementation principle of Netty believe that look at the front of the pre knowledge have a rough outline, is a based on is widely used based onI/O multiplexing mechanismDesign and implementation of software programmingReactor thread modelNIO(non-blocking) network communication framework, commonly used in web-based event-driven architecture application implementation, such as Alibaba’s RocketMQ why so high performance? Using Netty(rocketMQ-Remoting Netty Client class):

Netty core components

Native Java NIO is NIO, but it is extremely difficult to use, developers need to understand a lot of network programming knowledge, code writing complex, and Netty through the NIO encapsulation greatly simplified and simplified network programming, so that developers can quickly start network programming. A brief introduction to the core components of Netty around the ChannelPipeline and threading model will precede the examples (see the self-source documentation).

ChannelPipelineComponents involved in event handling

component describe
Channel A connection to a network socket, or to a component that can perform READ, write, connect, and bind I/O operations, setting the appropriate parameter configuration (such as buffer) and I/O operations (such as read and write)
ChannelHandler To deal withI/OEvent or interceptionI/OAction and forward the event to its inChannelPipelineThe next handler in. Among themI/OEvents can be divided into inbound events and outbound events (such as the common inbound decoding outbound codes), and the corresponding processors are respectivelyChannelInboundHandlerwithChannelOutboundHandlerThe official recommendation is to use the adapter class
ChannelHandlerContext Responsible for allChannelHandlerinChannelPipelineTo forward events to the next handler.
ChannelPipeline Can be thought of as a netty event handling container, a container that contains the required eventsChannelHandlerList for processing or interceptionChannelInbound and outbound event operations. eachChannelThey all have their ownChannelPipelineWhen aChannelIs createdChannelPipelineIt also goes with the creation.

In order to visualize the relationship between the above components, the following sections are cutChannelPipelineAnnotated document diagram of:

Reactor thread model component representation in Netty

component describe
Channel A connection to a network socket, or to a component that can perform READ, write, connect, and bind I/O operations, setting the appropriate parameter configuration (such as buffer) and I/O operations (such as read and write)
ChannelHandler To deal withI/OEvent or interceptionI/OAction and forward the event to its inChannelPipelineThe next handler in. Among themI/OEvents can be divided into inbound events and outbound events (such as the common inbound decoding outbound codes), and the corresponding processors are respectivelyChannelInboundHandlerwithChannelOutboundHandler, the official recommendation is to implement the adapter classChannelInboundHandlerAdapterwithChannelOutboundHandlerAdapterPerform event processing
ChannelHandlerContext Responsible for allChannelHandlerinChannelPipelineTo forward events to the next handler.
ChannelPipeline Can be thought of as a netty event handling container, a container that contains the required eventsChannelHandlerList for processing or interceptionChannelInbound and outbound event operations. eachChannelThey all have their ownChannelPipelineWhen aChannelIs createdChannelPipelineIt also goes with the creation.

The following is the relationship among channels, Eventloops, threads, and EventLoopGroups (from Netty In Action) :

  • An EventLoopGroup contains one or more Eventloops
  • An EventLoop is bound to only one Thread during its lifetime
  • All I/O events processed by EventLoop will be processed on its proprietary Thread
  • A Channel registers only one EventLoop during its lifetime
  • An EventLoop may be assigned to one or more channels

There are three roles in the Reactor thread model: Reactor(I/O event distribution), Acceptor(client connection event processing), and Handler(binding event processing non-blocking task). We believe that we can find the components corresponding to the roles in the Reactor thread model through the introduction above:

  • EventLoopGroup: Based on NettyChannelA thread-pool abstraction for an operation, both a Reactor and an Acceptor
  • ChannelHandler: is equivalent to the Reactor Handler. Netty added it to facilitate interaction and event processing between handlersChannelPipeline,ChannelHandlerContextTwo roles
  • BootstrapwithServerBootstrapJust think of it as client and server

Netty – based WebSocket example

Finally code to the instance part, finally can CV operation + point design idea description finished. This example is just a simple chat room based on Netty, pure work needs so I took out a year ago to write the crap demo study and bloody wash, learning does not affect my front-end is still rotten display. This example covers the following points:

  • Spring Boot
  • Message distribution is based on the policy pattern (pseudo-no policy pattern) implemented by Spring
  • Netty

Implementation approach

Netty implementation ideas

Netty combined with many protocols, for all the event inbound and outbound processing is handed to the Handler, so when using Netty as the server WebSocket implementation we only need to understand Netty to some of the WebSocket encapsulation and inbound and outbound processing, out of the box, You don’t have to worry about implementation like Java NIO does.

Netty providesChannelInboundHandlerAdapterA simple abstract implementation class for an interfaceSimpleChannelInboundHandler<I>, which only processes messages of a specific type, generic<I>Is the type of the message. Developers who want to simply implement handlers that handle a particular message type can simply inherit this class and add it toChannelPipelineCan.

Netty providesWebSocketFrameThe abstract class serves as the base class for wrapping all WebSocket data frames and provides the following implementation classes:For simplicitySimpleChannelInboundHandlerThe message type selects a text frameTextWebSocketFrameAnd of the classtext()Methods toUTF-8To obtain the WebSocket transmission content in the format of a string. Each WebSocket interaction between the server and the client is transmitted as a string.

Message distribution implementation idea – policy pattern

Although messages are sent in strings, there has to be a distribution mechanism, otherwise messages from private chats may be sent to group chats and explode, so I decided to send them in JSON string format with the necessary parameters for message type determination and distribution. If the message type judgment processing is written with if every time it is too ugly, it is not easy to deal with more business, and it does not conform to the forced grid I coded for myself, and then came up with a set of pseudo-no policy mode implementation ideas:

  1. WebSocket message classes follow certain suffix constraints and all inherit from the same parent class
  2. The policy handling class that scans all WebSocket messages during project initialization
  3. Gets the parameter type of the policy processing class message processing method by reflection, truncating the string before the suffix as the policy name
  4. Map the policy name to the policy processing class and the policy name to the corresponding message class, exposing a policy name list API for query
  5. During WebSocket handshake, parameters are added to the address bar. When the server receives a handshake message, the parameters in the address bar are converted into the corresponding message object and sent to the policy Context. The Context obtains the corresponding policy class from the Map to process the message and then completes the handshake
  6. After the client transmits the WebSocket JSON message to the server, the Context converts the JSON message to the corresponding message class and sends it to the corresponding policy class for processing

Hence the following handshake message processing sequence diagram:Client sends message processing sequence diagram:

Code sample

WebSocket message inbound handler –ChatMsgInboundHandler

@Component @ChannelHandler.Sharable @Slf4j @AllArgsConstructor public class ChatMsgInboundHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private final WebSocketConfig webSocketConfig; private final MessageContext messageContext; @Override public void channelRead(ChannelHandlerContext ctx, Object MSG) throws Exception {// WebSocket establishes a long connection through Http handshake if (MSG instanceof FullHttpRequest) {FullHttpRequest Request = (FullHttpRequest) msg; / / address bar parameters JSONObject paramsJson = RequestUtils. UrlParamsToJson (. Request uri ()); HttpRequestHandle (CTX, request); // httpRequestHandle(CTX, request); / / will address bar parameters is converted to a json WebSocketMessage message = messageContext. ConvertJsonToMessage (paramsJson); message.setChannel(ctx.channel()); log.info("user {} is online", message.getFromUser()); messageContext.registerMessage(message); } super.channelRead(ctx, msg); } /** * handle connection requests, * * @param CTX @param Request */ private void httpRequestHandle(ChannelHandlerContext CTX, FullHttpRequest request) { String uri = request.uri(); // Check whether the webSocket contextPath is the same as the contextPath in the request address (webSocketConfig. GetContextPath (.) the equals (RequestUtils. GetBasePath (uri))) {/ / because it is possible to carry on the parameter, which the client has been unable to return to handshake packet, so after checking through, Reset request path request. SetUri (webSocketConfig getContextPath ()); } else { ctx.close(); } } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { messageContext.removeChannel(ctx.channel()); log.info("channelUnregistered: {}", ctx.channel().id().asLongText()); super.channelUnregistered(ctx); } @override public void channelRead0(ChannelHandlerContext CTX, ChannelHandlerContext CTX, TextWebSocketFrame frame) { messageContext.handleMessage(frame.text()); } /** * Handle exceptionCaught(ChannelHandlerContext) ** @param CTX * @param cause */ @override public void exceptionCaught(ChannelHandlerContext) ctx, Throwable cause) throws Exception { if (cause instanceof BusinessException) { System.out.println(ctx.channel().isOpen()); ServerResponse<? > response = ServerResponse.serverError(cause.getMessage()); log.error("netty handler exceptionCaught: {}", cause.getMessage()); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(response))); } else { super.exceptionCaught(ctx, cause); }}}Copy the code

Method introduction:

  • channelRead():ChannelWhat happens when a message is received
  • channelRead0(): define theSimpleChannelInboundHandler<I>Class, each time received<I>This method is called whenever a message is of type
  • channelUnregistered():ChannelHandlerContextIn theChannelFrom itsEventLoopTo cancel registration in
  • exceptionCaught(): Handles exceptions thrown by handlers

Because of the policy pattern, the business complexity of different message processing is encapsulated in the corresponding policy processing class, and the scheduling of the policy class is encapsulated in MessageContext, which makes the Handler look very clean. The parent method is called at the end of the ChatMsgInboundHandler’s channelRead() method to get the message into channelRead0() for processing, General also inherited SimpleChannelInboundHandler advocates to < I > < I > class type of the message processing in channelRead0 (), mainly because of SimpleChannelInboundHandler < I > section method source code is as follows:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }
    }
}

protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
Copy the code

Policy core context –MessageContext

@Slf4j @SuppressWarnings({"rawtypes", "Unchecked "}) @Component public class MessageContext implements ApplicationContextAware {/** * Map{ Message Class} for FastJSON to deserialize to the actual WebSocket message Class */ private Map<String, Class<? extends WebSocketMessage>> msgTypeMap; /** * Map{message class name prefix: Private Map<String, MessageHandler> messageHandlerMap; private Map<String, MessageHandler> /** * initialize: Into the soul, Mapping initialization */ @override public void setApplicationContext(ApplicationContext ApplicationContext) throws BeansException {// Get all MessageHandler maps in the container <String, MessageHandler> handlerBeanMap = applicationContext.getBeansOfType(MessageHandler.class); this.messageHandlerMap = new HashMap<>(handlerBeanMap.size()); this.msgTypeMap = new HashMap<>(handlerBeanMap.size()); // Reflection gets the type of message actually processed by all policy handling classes MessageHandler, MsgTypeMap, messageHandlerMap HandlerBeanmap.values (). try { handleMsg = ReflectUtils.method(messageHandler, true, "handleMsg", method -> method.getParameterCount() == 1 && ! Objects.equals(method.getParameterTypes()[0], WebSocketMessage.class)); } catch (ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); log.error("MessageContext message handler map init failed(handler={}",messageHandler.getClass().getSimpleName()); return; } Class<? extends WebSocketMessage> msgClass = (Class<? extends WebSocketMessage>) handleMsg.getParameterTypes()[0]; String msgClassName = msgClass.getSimpleName(); String msgType = getMsgType(msgClassName); this.messageHandlerMap.put(msgType, messageHandler); this.msgTypeMap.put(msgType, msgClass); }); log.info("websocket message context init completed, msg type: {}, handler map: {}", msgTypeMap, messageHandlerMap); } /** * Get the actual policy name from the message class.simplename, Such as PrivateChatWebSocketMessage strategy called PrivateChat * * @ param msgClassName * @ return * / private String getMsgType (String msgClassName) { return msgClassName.contains(WebSocketMessage.MSG_TYPE_SEPARATOR) ? StringUtils.substringBefore(msgClassName, WebSocketMessage.MSG_TYPE_SEPARATOR) : msgClassName; } Public ServerResponse handleMessage(String msgJson) {if (jsonValidator.from (msgJson).validate()) {// Convert the message to the corresponding message WebSocketMessage subclass JSONObject JSONObject = json.parseObject (msgJson); WebSocketMessage message = convertJsonToMessage(jsonObject); String msgType = jsonObject.getString(webSocketMessage.msg_type); MessageHandler messageHandler = getMessageHandler(msgType); return messageHandler.handleMsg(message); } else { throw new IllegalArgumentException("invalid msg json"); } } /** * @param msgJson * @param <T> * @return */ public <T extends WebSocketMessage> WebSocketMessage convertJsonToMessage(JSONObject msgJson) { String msgType = msgJson.getString(WebSocketMessage.MSG_TYPE); Assert.isTrue(msgTypeMap.containsKey(msgType), "Unknown json msgType " + msgType); return msgJson.toJavaObject(msgTypeMap.get(msgType)); } // omit part...... }Copy the code

Since both the policy name and the policy processing class are dynamically initialized in this Context, when a new message policy is created, you only need to add a new message class (named with a specific suffix) and the policy processing class. The policy name and the processing class match and give to the Context.

WebSocket Message Abstract class –WebSocketMessage

@Data @Accessors(chain = true) public abstract class WebSocketMessage implements Serializable { public static final String MSG_TYPE = "msgType"; public static final String MSG_TYPE_SEPARATOR = "WebSocket"; public static final String MSG_PATTERN = "%s: %s"; private String msgType; protected String fromUser; protected String content; @JsonIgnore protected Channel channel; public String userMsg() { return String.format(MSG_PATTERN, fromUser, content); }}Copy the code

Policy processing interface –MessageHandler

public interface MessageHandler<T extends WebSocketMessage> { /** * handle message by corresponding handler * @param msg  * @return */ ServerResponse<? > handleMsg(T msg); /** * register channel from handler * @param msg */ void registerChannel(T msg); /** * remove channel from handler * @param channel */ void removeChannel(Channel channel); }Copy the code

Group chat message policy processing class –GroupChatMessageHandler

@Component public class GroupChatMessageHandler implements MessageHandler<GroupChatWebSocketMessage>{ /** * {roomId: ChannelGroup} mapping, ChannelGroup Maintains all users who enter the chat room channel */ private final Map<String, ChannelGroup> roomChannelMap = new ConcurrentHashMap<>();  @Override public ServerResponse<? > handleMsg(GroupChatWebSocketMessage msg) { ChannelGroup roomGroup = roomChannelMap.get(msg.getRoomId()); / /... ServerResponse<String> Response = serverResponse.success (msg.usermsg ())); roomGroup.writeAndFlush(WebSocketMessageUtils.websocketFrame(msg)); return response; } @Override public void registerChannel(GroupChatWebSocketMessage msg) { roomChannelMap.compute(msg.getRoomId(), (key, group) -> { if (group == null) { group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } group.add(msg.getChannel()); return group; }); } @Override public void removeChannel(Channel channel) { roomChannelMap.values() .forEach(channels -> channels.remove(channel)); }}Copy the code

Note: Channel.writeAndFlush (Object) must be a TextWebSocketFrame instance

Spring Netty configuration, server boot class configuration

The configuration classWebSocketConfig

@Slf4j @Configuration public class WebSocketConfig { @Getter @Value("${netty.server.port:9000}") private Integer port; @Getter @Value("${netty.websocket.path:/chat}") private String contextPath; private NioEventLoopGroup bossGroup; private NioEventLoopGroup workerGroup; @Bean public NioEventLoopGroup bossGroup() { return bossGroup = new NioEventLoopGroup(); } @Bean public NioEventLoopGroup workerGroup() { return workerGroup = new NioEventLoopGroup(); } @Bean public ServerBootstrap serverBootstrap(NioEventLoopGroup bossGroup, NioEventLoopGroup workerGroup, ChatServerInitializer chatServerInitializer) { ServerBootstrap serverBootstrap = new ServerBootstrap() // Boss is responsible for receiving TCP connection requests from the client,worker is responsible for I/O processing events from the client. Group (bossGroup, WorkerGroup) / / configure the client channel type. The channel (NioServerSocketChannel. Class). ChildHandler (chatServerInitializer) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) serverBootstrap.bind(port); log.info("netty start on port: {}", port); / / bind the I/O event handler class, defined WebSocketChildChannelHandler return serverBootstrap; } @PreDestroy public void destroy() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); log.info("netty shutdown gracefully"); }}Copy the code

Run the classChatApplication

@SpringBootApplication(scanBasePackageClasses = WebSocketConfig.class, exclude = DataSourceAutoConfiguration.class) //@MapperScan(basePackageClasses = UserInfoMapper.class) public class ChatApplication { public static void main(String[] args) { SpringApplication.run(ChatApplication.class, args); }}Copy the code

In this example, the configuration of the database is removed. For example, when testing the database, remove the @SpringBootApplication parameter configuration and the annotation before @mapperscan to configure the local database and add their own operations. Note: To inject a singleton into a Handler, add a netty @channelHandler. Sharable annotation to the Handler to indicate that one Handler can be added to multiple ChannelPipelines. Otherwise netty will report ChannelPipelineException for injecting singletons into handlers.

Results demonstrate

Private chat demo

Group chat demo

conclusion

The main content of this article is personal based on netty chat room examples of WebSocket implementation, at the same time code some of their own knowledge of Netty simple combing and summary, so netty will not do a very comprehensive introduction (such as ByteBuf, zero copy, etc.). If you want to do real-time service push, you can refer to the group chat push implementation of the example in this paper. You can realize the message push of specified user group by grouping different user channels. As this is only a simple example, please ignore some details of the design, such as handling of handshake exceptions, Channel maintenance (in this example, traversal removal will have performance problems when there are too many users), etc.

The knowledge combed in this paper mainly comes from the following three parts:

  • This paper summarizes the network knowledge and summarizes it in the Reactor thread model for Web architecture
  • Netty In Action
  • Netty source

It’s even harder to write for the month of next January…

The attached

Code Address:Making: Wilson – He/netty – simple – chat

If you are interested in the evolution and implementation of personal strategy mode, you can read my article last month:Spring+ Policy pattern = No policy?(There’s a big difference between code written a month ago and code written a month later, and I get confused when someone asks me about code written a week ago, so don’t ask me about details of previous code.)