Sentinel communication module parsing

An overview of

In Sentinel, there are two modules that need remote communication, namely the communication between Sentinel Client and Dashboard and the communication between Sentinel Client and Token Server. The Sentinel-Transport module and Sentinel-Cluster module are located in Sentinel project respectively. Sentinel Client and Dashboard communicate with each other in the traditional HTTP mode. The Client opens the HTTP interface to receive control instructions from Dashboard, while Dashboard opens the HTTP interface to receive data reports from the Client. Sentinel Client communicates with Token Server through TCP long link.

Sentinel-Transport

The Sentinel-Transport project is used for sentinel Client to receive control commands. The Coomon sub-module defines some common basic abstract classes, such as request body, request response body, command handler, and so on. The actual communication is provided by the remaining submodules, which currently provide three different HTTP endpoints implemented by Netty, NIO, and Spring-MVC. In general, we hope that the sentinel control port can be separated from the web port of the business, and the Sentinel port is not exposed to the public. Therefore, the transport generally adopted is Netty-HTTP or simple-HTTP. Here we focus on netty’s implementation of HTTP Transport.

Netty Server-related functionality is implemented by the HttpServer class, and startup is handled by the NettyHttpCommandCenter. NettyHttpCommandCenter implements the CommandCenter interface. This interface defines beforeStart, start, and stop functions. CommandCenter is more like Lifecycle Listener by definition. There is no defined interface for command processing. Who calls CommandCenter’s beforeStart, start? In Sentinel, InitExecutor is responsible for initializing all implementation classes that implement the InitFunc interface. Finding all implementation classes that implement the InitFunc interface is not a common Bean lookup. Instead, load all implementation classes using the self-implemented SpiLoader file definitions in the meta-INF /services/ directory.

@InitOrder(-1)
public class CommandCenterInitFunc implements InitFunc {

    @Override
    public void init(a) throws Exception {
        CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

        if (commandCenter == null) {
            RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
            return;
        }

        commandCenter.beforeStart();
        commandCenter.start();
        RecordLog.info("[CommandCenterInit] Starting command center: "+ commandCenter.getClass().getCanonicalName()); }}Copy the code

The NettyHttpCommandCenter initializes an HttpServer object. Specifically, the NettyHttpCommandCenter does not start the Netty Server by creating a thread directly. Instead, a single-threaded thread pool is used to submit startup tasks to the pool, which has the advantage of facilitating thread management.

@Override
public void start(a) throws Exception {
    pool.submit(new Runnable() {
        @Override
        public void run(a) {
            try {
                server.start();
            } catch (Exception ex) {
                RecordLog.warn("[NettyHttpCommandCenter] Failed to start Netty transport server", ex); ex.printStackTrace(); }}}); }Copy the code

Sentinel does not need to be set up for Netty. Sentinel does not need to be set up for Netty. Sentinel does not need to be set up for Netty. HttpServerHandler is the key class for handling Http requests.

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();

        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpObjectAggregator(1024 * 1024));
        p.addLast(new HttpResponseEncoder());

        p.addLast(newHttpServerHandler()); }}Copy the code

HttpServerHandler inherited from SimpleChannelInboundHandler, overall processing logic is more intuitive, Http requests will be parsed into CommandRequest object, Find the CommandHandler based on the Request Target of the Request and return the CommandResponse object after processing the Request.

private void handleRequest(CommandRequest request, ChannelHandlerContext ctx, boolean keepAlive)
    throws Exception {
    String commandName = HttpCommandUtils.getTarget(request);
    // Find the matching command handler.CommandHandler<? > commandHandler = getHandler(commandName);if(commandHandler ! =null) { CommandResponse<? > response = commandHandler.handle(request); writeResponse(response, ctx, keepAlive); }else {
        // No matching command handler.
        writeErrorResponse(BAD_REQUEST.code(), String.format("Unknown command \"%s\"", commandName), ctx); }}Copy the code

Where CommandHandler is stored in the HttpServer static variable handlerMap, The CommandHandlerProvider in CommandCenter’s beforeStart function also leverages SpiLoader CommandHandler instantiation and Map population, especially after obtaining all instances. CommandHandler implementation class annotations need to be resolved to complete the mapping binding of the command to the CommandHandler, and you can see that the command is actually the request path for the request.

@CommandMapping(name = "api", desc = "get all available command handlers")
public class ApiCommandHandler implements CommandHandler<String> {

    @Override
    public CommandResponse<String> handle(CommandRequest request) {...returnCommandResponse.ofSuccess(array.toJSONString()); }}Copy the code

In the writeResponse function, Sentinel uses a short link for each HTTP request. Why? Because of the business nature of Sentinel itself, the exchange of control commands is not frequent and does not rely on long links to save the time of connection establishment. This is one of the reasons why Sentinel Transport uses HTTP rather than TCP as the communication port.

HttpResponseStatus status = response.isSuccess() ? OK : BAD_REQUEST;

FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
    Unpooled.copiedBuffer(body));

httpResponse.headers().set("Content-Type"."text/plain; charset=" + SentinelConfig.charset());
httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes());
httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
Copy the code

There is also a HeartbeatSender component in the Transport module for regularly registering Sentinel Client information with Dashboard. The scheduled thread pool in HeartbeatSenderInitFunc is responsible for the task scheduling. It’s easy to use HTTP to call Dashboard remotely and I won’t go into details, but it’s worth mentioning that the HeartbeatSender in the Netty module only sends messages to the first Dashboard address by default. The default HeartbeatSender in the Simple-HTTP module is registered to the Dashboard list address in turn.

// Initialize the sender from the netty module
public HttpHeartbeatSender(a) {
    List<Endpoint> dashboardList = TransportConfig.getConsoleServerList();
    if (dashboardList == null || dashboardList.isEmpty()) {
        RecordLog.info("[NettyHttpHeartbeatSender] No dashboard server available");
        consoleProtocol = Protocol.HTTP;
        consoleHost = null;
        consolePort = -1;
    } else {
        consoleProtocol = dashboardList.get(0).getProtocol();
        consoleHost = dashboardList.get(0).getHost();
        consolePort = dashboardList.get(0).getPort();
        RecordLog.info("[NettyHttpHeartbeatSender] Dashboard address parsed: <{}:{}>", consoleHost, consolePort);
    }
    this.client = HttpClientsFactory.getHttpClientsByProtocol(consoleProtocol);
}
// Address selection function under the simple-HTTP module
private Endpoint getAvailableAddress(a) {
    if (addressList == null || addressList.isEmpty()) {
        return null;
    }
    if (currentAddressIdx < 0) {
        currentAddressIdx = 0;
    }
    int index = currentAddressIdx % addressList.size();
    return addressList.get(index);
}
Copy the code

Sentinel-Cluster

The Sentinel-Cluster module is mainly responsible for the flow control related functions of sentinel cluster. The Common module also defines some general abstract modules such as request body, response body and Encoder. The client and server modules are used to realize the client and server of Sentinel respectively. The EnOVy module is mainly used in K8S without in-depth analysis here. The basic processing mode of cluster flow control is defined in sentinel-Core module. Both client and server realize the interface of TokenService. The client side is mainly used to realize requests like remote server, while the server side is mainly used to realize request processing.

Server

On the Server side, DefaultEmbeddedTokenServer main function can be divided by TokenService token request and release to achieve function, and implemented by ClusterTokenServer TokenServer remote communication module. Here we focus on the NettyTransportServer SentinelDefaultTokenServer.

From the following code we can see that Cluster Server uses a length based protocol, the first two bytes are the length of the data frame, followed by the data content bytes. NettyRequestDecoder and NettyResponseEncoder are responsible for converting the contents of a data frame into a Request object (defined in the Common module) and a Response object into the contents of a data frame. LengthFieldPrepender is to cooperate LengthFieldBasedFrameDecoder will in to transmit content of data frame head and 2 bytes the length of the field. TokenServerHandler is the ChannelInboundHandler that is ultimately used to process the Request.

Take a look at the TCP-related Settings in Sentinel Cluster. So_backlog sets the wait queue for TCP connections. If the token server in a large cluster needs to process a large number of connections, the wait queue needs to be large to prevent connections from being rejected. Allocator sets the buffer allocation mode for Netty. Netty determines whether the direct buffer can be prefered according to the JDK and whether the buffer cleaner can be enabled. Tcp_nodelay is set to true, so that short data frames can be sent immediately without waiting for more data packets to be sent in batches. This attribute is necessary for timely services such as token requests.

ServerBootstrap b = new ServerBootstrap();
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(new LengthFieldBasedFrameDecoder(1024.0.2.0.2));
            p.addLast(new NettyRequestDecoder());
            p.addLast(new LengthFieldPrepender(2));
            p.addLast(new NettyResponseEncoder());
            p.addLast(new TokenServerHandler(connectionPool));
        }
    })
    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    .childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
    .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
    .childOption(ChannelOption.SO_TIMEOUT, 10)
    .childOption(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
Copy the code

DefaultRequestEntityDecoder is mentioned NettyRequestDecoder concrete implementation class, from the following code can more intuitive know agreement by 4 bytes of the request id, 1 byte request type as well as the types of data. EntityDecoder is responsible for data depending on the type of type will be assembled into the corresponding pojo object, such as FlowRequestData | consists of 8 bytes flow id, 4 bytes of token request number and priority 1 byte.

    @Override
    public ClusterRequest decode(ByteBuf source) {
        if (source.readableBytes() >= 5) {
            int xid = source.readInt();
            inttype = source.readByte(); EntityDecoder<ByteBuf, ? > dataDecoder = RequestDataDecodeRegistry.getDecoder(type);if (dataDecoder == null) {
                RecordLog.warn("Unknown type of request data decoder: {}", type);
                return null;
            }

            Object data;
            if (source.readableBytes() == 0) {
                data = null;
            } else {
                data = dataDecoder.decode(source);
            }

            return new ClusterRequest<>(xid, type, data);
        }
        return null;
    }
Copy the code

The processing logic in TokenServerHandler is similar, fetching the corresponding handler based on the type in the Request, and finally interacting with the previously mentioned TokenService. A handler is loaded in a manner similar to CommandHandler in the Transport module, which uses Spi to load all implementation classes of the RequestProcessor interface and parse the @RequestType annotation on the class to register the type handler.

It is worth mentioning that in TokenServerHandler, each established Connection is stored in the ConnectionPool, and the last read time of Conntcion is refreshed every time a Connection corresponding to a Channel is processed. ConnectionPool periodically cleans up idle links through a scheduled task. In addition to the ConnectionPool, a ConnectionManager is defined to manage the binding relationship between connections and namespaces. In plain English, links belonging to a service are grouped together through a ConnectionGroup.

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    globalConnectionPool.refreshLastReadTime(ctx.channel());
    if (msg instanceof ClusterRequest) {
        ClusterRequest request = (ClusterRequest)msg;

        // Client ping with its namespace, add to connection manager.
        if (request.getType() == ClusterConstants.MSG_TYPE_PING) {
            handlePingRequest(ctx, request);
            return;
        }

        // Pick request processor for request type.RequestProcessor<? ,? > processor = RequestProcessorProvider.getProcessor(request.getType());if (processor == null) {
            RecordLog.warn("[TokenServerHandler] No processor for request type: " + request.getType());
            writeBadResponse(ctx, request);
        } else{ ClusterResponse<? > response = processor.processRequest(request); writeResponse(ctx, response); }}}Copy the code

In addition, several CommandHandlers are defined in the Server module to cooperate with Transport to complete the control and information acquisition of cluster flow control.

Client

The netty Settings on the client side are similar to those on the server side. Let’s first pay attention to the connection management on the client side. Note that channelUnregistered in TokenClientHandler calls back a disconnectCallback, in which a connection is attempted once. This function is triggered when channelUnregistered removes the registration from eventLoop if the connection fails, so it will be called again if the connection fails and the reconnection interval will increase as the number of failures increases.

private Runnable disconnectCallback = new Runnable() {
    @Override
    public void run(a) {
        if(! shouldRetry.get()) {return;
        }
        SCHEDULER.schedule(new Runnable() {
            @Override
            public void run(a) {
                if (shouldRetry.get()) {
                    RecordLog.info("[NettyTransportClient] Reconnecting to server <{}:{}>", host, port);
                    try {
                        startInternal();
                    } catch (Exception e) {
                        RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
                    }
                }
            }
        }, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS); cleanUp(); }};Copy the code

Next, we will pay attention to the client side to send token request processing, as shown in the following code, first set a request ID for the request, using AtomicInteger, and then send the request through channel. We know that netty is an asynchronous operation, but the token request for flow control is a synchronous operation, and in order to convert asynchrony into a synchronous operation, the await operation of the ChannelPromise is used to block wait. The newly generated ChannelPromise will be uniformly put into the TokenClientPromiseHolder according to the request ID. When the client side receives the response corresponding to the request, Get the ChannelPromise corresponding to the request from the TokenClientPromiseHolder, set the Response result, and the thread blocking the await function will proceed to get the Response from the cluster. If a timeout occurs within the configured time, an exception is thrown and the client determines whether to degrade or permit the client based on the configuration.

@Override
public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
    if(! isReady()) {throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY);
    }
    if(! validRequest(request)) {throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
    }
    int xid = getCurrentId();
    try {
        request.setId(xid);

        channel.writeAndFlush(request);

        ChannelPromise promise = channel.newPromise();
        TokenClientPromiseHolder.putPromise(xid, promise);

        if(! promise.await(ClusterClientConfigManager.getRequestTimeout())) {throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
        }

        SimpleEntry<ChannelPromise, ClusterResponse> entry = TokenClientPromiseHolder.getEntry(xid);
        if (entry == null || entry.getValue() == null) {
            // Should not go through here.
            throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
        }
        return entry.getValue();
    } finally{ TokenClientPromiseHolder.remove(xid); }}Copy the code

Finally, we’ll look at DefaultClusterTokenClient, except NettyClient responsible for start-up in the constructor also registered a Server information change listener, When Server information changes, the ChangeServer function is called to close the original connection and start a new one.

public DefaultClusterTokenClient(a) {
    ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() {
        @Override
        public void onRemoteServerChange(ClusterClientAssignConfig assignConfig) { changeServer(assignConfig); }}); initNewConnection(); }private void changeServer(/*@Valid*/ ClusterClientAssignConfig config) {
        if (serverEqual(serverDescriptor, config)) {
            return;
        }
        try {
            if(transportClient ! =null) {
                transportClient.stop();
            }
            // Replace with new, even if the new client is not ready.
            this.transportClient = new NettyTransportClient(config.getServerHost(), config.getServerPort());
            this.serverDescriptor = new TokenServerDescriptor(config.getServerHost(), config.getServerPort());
            startClientIfScheduled();
            RecordLog.info("[DefaultClusterTokenClient] New client created: {}", serverDescriptor);
        } catch (Exception ex) {
            RecordLog.warn("[DefaultClusterTokenClient] Failed to change remote token server", ex); }}Copy the code