As mentioned in the previous article, the NettyServerCnxnFactory is used to handle client connections and requests.

new ServerBootstrap().group(bossGroup, workerGroup)
  // parent channel options
  .option(ChannelOption.SO_REUSEADDR, true)
  // child channels options
  .childOption(ChannelOption.TCP_NODELAY, true)
  .childOption(ChannelOption.SO_LINGER, -1)
  .childHandler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); .// Important, processor
        pipeline.addLast("servercnxnfactory", channelHandler); }});Copy the code

The channelHandler handler is used for client requests.

Cnxnchannelhandler. Java, see channelActive(), channelRead() methods.

    class CnxnChannelHandler extends ChannelDuplexHandler {

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Channel active {}",;

            final Channel channel =;
            if (limitTotalNumberOfCnxns()) {
            InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
            if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
                LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
            // Create a new NettyServerCnxn, equivalent to a Session, and set it to CONNECTION_ATTRIBUTE
            NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);; . }@Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {... allChannels.remove(; NettyServerCnxn cnxn =; . }@Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {... }@Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("message received called {}", msg);
                try {
                    LOG.debug("New message {} from {}", msg,;
                  	// Get the NettyServerCnxn set through channelActive
                    NettyServerCnxn cnxn =;
                    if (cnxn == null) {
                        LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                    } else {
                    // Process network packetscnxn.processMessage((ByteBuf) msg); }}catch (Exception ex) {
                    LOG.error("Unexpected exception in receive", ex);
                    throwex; }}finally{ ReferenceCountUtil.release(msg); }}// Write back to the client
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (LOG.isTraceEnabled()) {
            super.write(ctx, msg, promise); }}Copy the code

In the channelHandler handler, through channelActive(), create a NettyServerCnxn, which is equivalent to a Session on the server side, for the receive and send protocols. Call cnxn.processMessage((ByteBuf) MSG) via channelRead(); Processing protocol.

NettyServerCnxn#processMessage(ByteBuf buf)

void processMessage(ByteBuf buf) {...if (throttled.get()) {
        } else {
            LOG.debug("not throttled");
            if(queuedBuffer ! =null) {
            } else {
               // Receive the message
                // Have to check ! closingChannel, because an error in
                // receiveMessage() could have led to close() being called.. }}}Copy the code


private void receiveMessage(ByteBuf message) {
        try {
            while(message.isReadable() && ! throttled.get()) {if(bb ! =null) {...if (bb.remaining() > message.readableBytes()) {
                        intnewLimit = bb.position() + message.readableBytes(); bb.limit(newLimit); } message.readBytes(bb); bb.limit(bb.capacity()); .if (bb.remaining() == 0) {
                        packetReceived(4 + bb.remaining());

                        ZooKeeperServer zks = this.zkServer;
                        if (zks == null| |! zks.isRunning()) {throw new IOException("ZK down");
                        if (initialized) {
                            // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
                            // we could implement zero-copy queueing.
                          // Process the message
                            zks.processPacket(this, bb);
                        } else {
                            LOG.debug("got conn req request from {}", getRemoteSocketAddress());
                            zks.processConnectRequest(this, bb);
                            initialized = true;
                        bb = null; }}else{... }}}catch(IOException e) { ... }}Copy the code

In this step, Netty’s ByteBuf is converted into NIO’s ByteBuffer, which is handed to ZookeeperServer’s processPacket(this, bb). Process messages.


public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        // This step is mainly deserialization
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");

        // Need to increase the outstanding request count first, otherwise
        // there might be a race condition that it enabled recv after
        // processing request and then disabled when check throttling.
        // Be aware that we're actually checking the global outstanding
        // request before this request.
        // It's fine if the IOException thrown before we decrease the count
        // in cnxn, since it will close the cnxn anyway.

        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        if (h.getType() == OpCode.auth) {
        } else if (h.getType() == OpCode.sasl) {
            processSasl(incomingBuffer, cnxn, h);
        } else {
            if(! authHelper.enforceAuthentication(cnxn, h.getXid())) {// Authentication enforcement is failed
                // Already sent response to user about failure and closed the session, lets return
            } else {
              // This is usually the step to process the message
              // Encapsulate it as a Request object
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
                int length = incomingBuffer.limit();
                if (isLargeRequest(length)) {
                    // checkRequestSize will throw IOException if request is rejectedcheckRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(; submitRequest(si); }}}Copy the code

This step deserializes the ByteBuffer and encapsulates it into a Request object, which is then submitted for further processing.


public void submitRequest(Request si) {
Copy the code


public void enqueueRequest(Request si) {
        if (requestThrottler == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000); }}catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                if (requestThrottler == null) {
                    throw new RuntimeException("Not started"); }}}/ / submit
Copy the code


public void submitRequest(Request request) {
    if (stopping) {
        LOG.debug("Shutdown in progress. Request cannot be processed");
    } else{ request.requestThrottleQueueTime = Time.currentElapsedTime(); submittedRequests.add(request); }}Copy the code

This step involves submitting requests to the submittedRequests queue and waiting for Run () in the RequestThrottler to execute them.


    public void run(a) {
        try {
            while (true) {
                if (killed) {
                    break; } Request request = submittedRequests.take(); .// Throttling is disabled when maxRequests = 0.// A dropped stale request will be null
                if(request ! =null) {
                    if (request.isStale()) {
                    final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
                    if (shouldThrottleOp(request, elapsedTime)) {
                    // Executezks.submitRequestNow(request); }}}catch (InterruptedException e) {
            LOG.error("Unexpected interruption", e);
        int dropped = drainQueue();"RequestThrottler shutdown. Dropped {} requests", dropped);
Copy the code

This step takes () requests from the submittedRequests queue and sends them to the submitRequestNow(Request) in ZookeeperServer. Method is executed immediately.


public void submitRequestNow(Request si) {...try {
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
              // Start processing from the first processor
                if(si.cnxn ! =null) { incInProcess(); }}else {
                LOG.warn("Received packet at server of unknown type {}", si.type);
                // Update request accounting/throttling limits
                newUnimplementedRequestProcessor().processRequest(si); }}catch (MissingSessionException e) {
            LOG.debug("Dropping request.", e);
            // Update request accounting/throttling limits
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request", e);
            // Update request accounting/throttling limitsrequestFinished(si); }}Copy the code

This step is to process the request directly from the first handler, also mentioned in the previous article

PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor processing chain.

So to summarize, we’re basically talking about a network package, deserialized from byte stream, and then packaged into a Request object,

Finally, it goes to the PrepRequestProcessor processor for processing.

The next article continues to analyze the specific processing process.