As mentioned in the previous article, the NettyServerCnxnFactory is used to handle client connections and requests.

new ServerBootstrap().group(bossGroup, workerGroup)
  .channel(NettyUtils.nioOrEpollServerSocketChannel())
  // parent channel options
  .option(ChannelOption.SO_REUSEADDR, true)
  // child channels options
  .childOption(ChannelOption.TCP_NODELAY, true)
  .childOption(ChannelOption.SO_LINGER, -1)
  .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); .// Important, processor
        pipeline.addLast("servercnxnfactory", channelHandler); }});Copy the code

The channelHandler handler is used for client requests.

Cnxnchannelhandler. Java, see channelActive(), channelRead() methods.

@Sharable
    class CnxnChannelHandler extends ChannelDuplexHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Channel active {}", ctx.channel());
            }

            final Channel channel = ctx.channel();
            if (limitTotalNumberOfCnxns()) {
                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                channel.close();
                return;
            }
            InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
            if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
                channel.close();
                return;
            }
						
            // Create a new NettyServerCnxn, equivalent to a Session, and set it to CONNECTION_ATTRIBUTE
            NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); . }@Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {... allChannels.remove(ctx.channel()); NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); . }@Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {... }@Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("message received called {}", msg);
                }
                try {
                    LOG.debug("New message {} from {}", msg, ctx.channel());
                  	// Get the NettyServerCnxn set through channelActive
                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                    if (cnxn == null) {
                        LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                    } else {
                    // Process network packetscnxn.processMessage((ByteBuf) msg); }}catch (Exception ex) {
                    LOG.error("Unexpected exception in receive", ex);
                    throwex; }}finally{ ReferenceCountUtil.release(msg); }}// Write back to the client
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (LOG.isTraceEnabled()) {
                promise.addListener(onWriteCompletedTracer);
            }
            super.write(ctx, msg, promise); }}Copy the code

In the channelHandler handler, through channelActive(), create a NettyServerCnxn, which is equivalent to a Session on the server side, for the receive and send protocols. Call cnxn.processMessage((ByteBuf) MSG) via channelRead(); Processing protocol.

NettyServerCnxn.java

NettyServerCnxn#processMessage(ByteBuf buf)

void processMessage(ByteBuf buf) {...if (throttled.get()) {
            ...
        } else {
            LOG.debug("not throttled");
            if(queuedBuffer ! =null) {
                appendToQueuedBuffer(buf.retainedDuplicate());
                processQueuedBuffer();
            } else {
               // Receive the message
                receiveMessage(buf);
                // Have to check ! closingChannel, because an error in
                // receiveMessage() could have led to close() being called.. }}}Copy the code

NettyServerCnxn#receiveMessage(buf);

private void receiveMessage(ByteBuf message) {
        checkIsInEventLoop("receiveMessage");
        try {
            while(message.isReadable() && ! throttled.get()) {if(bb ! =null) {...if (bb.remaining() > message.readableBytes()) {
                        intnewLimit = bb.position() + message.readableBytes(); bb.limit(newLimit); } message.readBytes(bb); bb.limit(bb.capacity()); .if (bb.remaining() == 0) {
                        bb.flip();
                        packetReceived(4 + bb.remaining());

                        ZooKeeperServer zks = this.zkServer;
                        if (zks == null| |! zks.isRunning()) {throw new IOException("ZK down");
                        }
                        if (initialized) {
                            // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
                            // we could implement zero-copy queueing.
                          // Process the message
                            zks.processPacket(this, bb);
                        } else {
                            LOG.debug("got conn req request from {}", getRemoteSocketAddress());
                            zks.processConnectRequest(this, bb);
                            initialized = true;
                        }
                        bb = null; }}else{... }}}catch(IOException e) { ... }}Copy the code

In this step, Netty’s ByteBuf is converted into NIO’s ByteBuffer, which is handed to ZookeeperServer’s processPacket(this, bb). Process messages.

ZookeeperServer.java

ZookeeperServer#processPacket()

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        // This step is mainly deserialization
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");

        // Need to increase the outstanding request count first, otherwise
        // there might be a race condition that it enabled recv after
        // processing request and then disabled when check throttling.
        //
        // Be aware that we're actually checking the global outstanding
        // request before this request.
        //
        // It's fine if the IOException thrown before we decrease the count
        // in cnxn, since it will close the cnxn anyway.
        cnxn.incrOutstandingAndCheckThrottle(h);

        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        if (h.getType() == OpCode.auth) {
            ...
            return;
        } else if (h.getType() == OpCode.sasl) {
            processSasl(incomingBuffer, cnxn, h);
        } else {
            if(! authHelper.enforceAuthentication(cnxn, h.getXid())) {// Authentication enforcement is failed
                // Already sent response to user about failure and closed the session, lets return
                return;
            } else {
              // This is usually the step to process the message
              // Encapsulate it as a Request object
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
                int length = incomingBuffer.limit();
                if (isLargeRequest(length)) {
                    // checkRequestSize will throw IOException if request is rejectedcheckRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); submitRequest(si); }}}Copy the code

This step deserializes the ByteBuffer and encapsulates it into a Request object, which is then submitted for further processing.

ZookeeperServer#submitRequest()

public void submitRequest(Request si) {
        enqueueRequest(si);
}
Copy the code

ZookeeperServer#enqueueRequest()

public void enqueueRequest(Request si) {
        if (requestThrottler == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000); }}catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (requestThrottler == null) {
                    throw new RuntimeException("Not started"); }}}/ / submit
        requestThrottler.submitRequest(si);
 }
Copy the code

RequestThrottler#submitRequest()

public void submitRequest(Request request) {
    if (stopping) {
        LOG.debug("Shutdown in progress. Request cannot be processed");
        dropRequest(request);
    } else{ request.requestThrottleQueueTime = Time.currentElapsedTime(); submittedRequests.add(request); }}Copy the code

This step involves submitting requests to the submittedRequests queue and waiting for Run () in the RequestThrottler to execute them.

RequestThrottler#run()

@Override
    public void run(a) {
        try {
            while (true) {
                if (killed) {
                    break; } Request request = submittedRequests.take(); .// Throttling is disabled when maxRequests = 0.// A dropped stale request will be null
                if(request ! =null) {
                    if (request.isStale()) {
                        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
                    }
                    final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
                    ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
                    if (shouldThrottleOp(request, elapsedTime)) {
                      request.setIsThrottled(true);
                      ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
                    }
                    // Executezks.submitRequestNow(request); }}}catch (InterruptedException e) {
            LOG.error("Unexpected interruption", e);
        }
        int dropped = drainQueue();
        LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
    }
Copy the code

This step takes () requests from the submittedRequests queue and sends them to the submitRequestNow(Request) in ZookeeperServer. Method is executed immediately.

ZookeeperServer#submitRequestNow()

public void submitRequestNow(Request si) {...try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                setLocalSessionFlag(si);
              // Start processing from the first processor
                firstProcessor.processRequest(si);
                if(si.cnxn ! =null) { incInProcess(); }}else {
                LOG.warn("Received packet at server of unknown type {}", si.type);
                // Update request accounting/throttling limits
                requestFinished(si);
                newUnimplementedRequestProcessor().processRequest(si); }}catch (MissingSessionException e) {
            LOG.debug("Dropping request.", e);
            // Update request accounting/throttling limits
            requestFinished(si);
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request", e);
            // Update request accounting/throttling limitsrequestFinished(si); }}Copy the code

This step is to process the request directly from the first handler, also mentioned in the previous article

PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor processing chain.

So to summarize, we’re basically talking about a network package, deserialized from byte stream, and then packaged into a Request object,

Finally, it goes to the PrepRequestProcessor processor for processing.

The next article continues to analyze the specific processing process.