preface

Debug the ZooKeeper client and server network connection source code. You want to know what ZooKeeper does when the client sends a request. The last two articles have described the setup of the debugging environment and the election of the group head, hoping to familiarize yourself with the design of ZooKeeper.

  1. ZooKeeper group head election source code analysis
  2. Zookeeper installs the source code debugging environment

Start with a network connection

In the group head election, in addition to analyzing the election algorithm inside, we also specially looked at the basic network connection below the algorithm. This time also start with the network connection layer and continue the detailed analysis. Why reanalyze the network connection layer? There are two reasons:

  1. Because OF my own network connection layer, I know more theory, but less practical operation;
  2. All servers, middleware must have a network connection layer, read these code, understand their network connection solution;

JDK NIO service starts

PS: The best way to read the source code is to start with the problem, not the main method. I started with the main method because I was a beginner reading source code and wanted to know all the details of a path, which is to read the source code vertically. Once you understand that the main method of each component is similar (e.g. how to start network components, etc.), the following source code will directly choose a problem to read. – Note

NIOServerCnxnFactory is the default connection manager that uses JDK NIO internally to implement network connections. In contrast, head of state elections use BIO to do long links. Here’s a quick look at the division of labor within the connector and the key fields of the class:

NIOServerCnxnFactory {

    WorkerService workerPool 
    // A pool of workers dedicated to performing specific IO operations. Notice the difference here from SelectorThread

    AcceptThread {
        // The thread responsible for Channel creation is similar to the Netty Boss thread
        
        private final ServerSocketChannel acceptSocket;  // NIO server Channel
        private final SelectionKey acceptKey;
        
        private final Collection<SelectorThread> selectorThreads;   / / the Workers
    }
    
    SelectorThread {
        // The thread handling the event polls the SelectionKey,
        // If there is a corresponding event, encapsulate it into an IOWorkRequest, put it in a queue, and let the WorkerService execute it
    
        private final Queue<SocketChannel> acceptedQueue;       // This thread is responsible for the data
        private final Queue<SelectionKey> updateQueue;
    }
    
    IOWorkRequest {
        // Encapsulate the request once
        // Belongs to the production-consumer model}}Copy the code

NIOServerCnxnFactory Startup process

    // NIOServerCnxnFactory.java start()
    public void start(a) {
        stopped = false;
        if (workerPool == null) {
            workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
        }
        for (SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                // Start the worker thread groupthread.start(); }}// ensure thread is started once and only once
        if (acceptThread.getState() == Thread.State.NEW) {
            Check the status to ensure that acceptThread is started only once
            acceptThread.start();
        }
        if(expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); }}Copy the code

The AcceptThread class does the job of listening for connections and attaching a Channel to the SelectorThread thread.

    // acceptThread. Java run method
    public void run(a) {
        try {
            while(! stopped && ! acceptSocket.socket().isClosed()) {try {
                    select();       // Loop through here
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    // We need to ignore all exceptions to protect acceptThreads
                    LOG.warn("Ignoring unexpected exception", e); }}}finally {
            closeSelector();
            // This will wake up the selector threads, and tell the
            // worker thread pool to begin shutdown.
            if(! reconfiguring) { NIOServerCnxnFactory.this.stop();
            }
            LOG.info("accept thread exitted run method"); }}private void select(a) {
        try {
            selector.select();

            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while(! stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove();if(! key.isValid()) {continue;
                }
                if (key.isAcceptable()) {
                    if(! doAccept()) {// Execute the method that accepts the connection
                        // If unable to pull a new connection off the accept
                        // queue, pause accepting to give us time to free
                        // up file descriptors and so the accept thread
                        // doesn't spin in a tight loop.
                        pauseAccept(10); }}else {
                    LOG.warn("Unexpected ops in accept select {}", key.readyOps()); }}}catch (IOException e) {
            LOG.warn("Ignoring IOException while selecting", e); }}private boolean doAccept(a) {
        boolean accepted = false;
        SocketChannel sc = null;        // acceptSocket Returns a SocketChannel to the client
        try {
            // Handle Accpet events
            sc = acceptSocket.accept();
            accepted = true;
            if (limitTotalNumberOfCnxns()) {
                throw new IOException("Too many connections max allowed is " + maxCnxns);
            }
            InetAddress ia = sc.socket().getInetAddress();
            int cnxncount = getClientCnxnCount(ia);

            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
                throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
            }

            LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());

            sc.configureBlocking(false);

            // Round-robin assign this connection to a selector thread
            if(! selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator();// form a loop
            }
            SelectorThread selectorThread = selectorIterator.next();
            if(! selectorThread.addAcceptedConnection(sc)) {// Register a SocketChannel with a SelectorThread
                throw new IOException("Unable to add connection to selector queue"
                                      + (stopped ? " (shutdown in progress)" : ""));
            }
            acceptErrorLogger.flush();
            // At this point, Accept is done accepting the connection.
        } catch (IOException e) {
            // accept, maxClientCnxns, configureBlocking
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
            acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
            fastCloseSock(sc);
        }
        return accepted;
    }
Copy the code

Acceptthread. Java:

  • The host ServerSocketChannel acceptSocket is used to accept connections
  • Holds selectorThreads, accepts the connection and attaches the Channel to a SelectorThread

According to NIO’s custom, the SelectorThread has to traverse its Channel to see if it is readable. If it is readable, the SelectorThread extracts the data from the Buffer, cuts it according to the convention, wraps it into a separate request, enqueues it. To be used by WorkerPool(a thread pool group).

// SelectorThread.java 
private void select(a) {
    try {
        selector.select();

        Set<SelectionKey> selected = selector.selectedKeys();
        ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
        Collections.shuffle(selectedList);
        Iterator<SelectionKey> selectedKeys = selectedList.iterator();
        while(! stopped && selectedKeys.hasNext()) {// The general operation of NIO, get the Key, and respond to the operation defined by the Key
            SelectionKey key = selectedKeys.next();
            selected.remove(key);

            if(! key.isValid()) { cleanupSelectionKey(key);continue;
            }
            if (key.isReadable() || key.isWritable()) {
                handleIO(key); // Debug from here
            } else {
                LOG.warn("Unexpected ops in select {}", key.readyOps()); }}}catch (IOException e) {
        LOG.warn("Ignoring IOException while selecting", e); }}// debug to here
private void handleIO(SelectionKey key) {
    // Encapsulate the IO event
    IOWorkRequest workRequest = new IOWorkRequest(this, key);
    NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();

    // Stop selecting this key while processing on its
    // connection
    cnxn.disableSelectable();
    key.interestOps(0);
    touchCnxn(cnxn);
    // Commit to Pool for Worker to consume
    workerPool.schedule(workRequest);
}

Copy the code

So far, we have seen the ZooKeeper server state from startup to NIO service, and after the client connects, the final request will be submitted to workerService.java for execution. WorkerService is just a pool of execution. The request is ultimately processed by nioServerCnxN.doio (key). A lot of circles —

At this point, we assume that the network connection has been established.

TyServerCnxnFactory – Dzookeeper. ServerCnxnFactory = “org.apache.zookeeper.server.Net” can choose to start NettyServerCnxnFactory connection manager, This implementation can provide SSL services.

NIOServerCnxn How to process I/O operations

NIO’s network processing model is mostly similar. See how IORequest continues to handle IO operations.

// Wrap each request to include: what to do, and who to do it to.
private class IOWorkRequest extends WorkerService.WorkRequest {

    // The IO operation is eventually passed to cnxn.doio for processing
    public void doWork(a) {
       if(! key.isValid()) { selectorThread.cleanupSelectionKey(key);return;
        }
        if (key.isReadable() || key.isWritable()) {
            cnxn.doIO(key);     // Perform I/O operations

            // Check if we shutdown or doIO() closed this connection
            if (stopped) {
                cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                return;
            }
            if(! key.isValid()) { selectorThread.cleanupSelectionKey(key);return;
            }
            touchCnxn(cnxn);
        }
    }
}

NIOServerCnxn.java 
/** * Handles read/write IO on connection. */
void doIO(SelectionKey k) throws InterruptedException {
    try {
        if(! isSocketOpen()) { LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));
            return;
        }
        if (k.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                handleFailedRead();
            }
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
                if (incomingBuffer == lenBuffer) { // start of next request
                    incomingBuffer.flip();
                    isPayload = readLength(k);      / / whether the value in the buffer is enough | len (data) | data... }
                    incomingBuffer.clear();
                } else {
                    // continuation
                    isPayload = true;
                }
                if (isPayload) { // not the case for 4letterword
                    readPayload();      // Read the payload for the real operation. Cut in here
                } else {
                    // four letter words take care
                    // need not do anything else
                    return; }}}if (k.isWritable()) {
            handleWrite(k);

            if(! initialized && ! getReadInterest() && ! getWriteInterest()) {throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE); }}}catch (CancelledKeyException e) {
        / /... Omit a series of exception handling}}// Read the data and perform the operation
private void readPayload(a) throws IOException, InterruptedException, ClientCnxnLimitException {
    if(incomingBuffer.remaining() ! =0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) { handleFailedRead(); }}if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        incomingBuffer.flip();
        packetReceived(4 + incomingBuffer.remaining());
        if(! initialized) { readConnectRequest();// Process connection requests
        } else {
            readRequest();          // Process transaction requests} lenBuffer.clear(); incomingBuffer = lenBuffer; }}Copy the code

This is the end of the nioServerCnxn.java work. It then passes the request to ZooKeeperServer via readRequest and readConnectRequest.

Let’s pause, assuming we all understand the bytecode network layer services provided by NIO, and now think about the NIOServerCnxn lifecycle and what it primarily does:

  1. When the client is connected to the server, theNIOServerCnxnFactory.AcceptThreadThe build is saved inSelectionKeyattachmentIn the.
  2. Internally held objects include the following:
    • NIOServerCnxnFactory
    • SocketChannel holds the client connection
    • Selector managed by SelectorThread
    • SelectionKey
    • IncomingBuffer outgoingBuffers
  3. Thus, NIOServerCnxn is the connection object used by the server to maintain the client, and this object containsBuffer,SocketAnd provide the following services:
    • When reading data: The NIO Buffer is directly read down, and the object is submitted up toRequestThrottler
    • Write data: By writing data to outgongBuffers, the SelectorThread wakes up the Selector and tells the WorkerService to perform the doIO operation.
    • NIOServerCnxn also takes into account the implementation of Data Watch implementation. We’re not going to talk about it
  4. NIOServerCnxnThere are many possibilities for close of DisconnectReason

Afterword.

  • Read the source code, both to in-depth analysis, but also to look up the summary, analysis of the project module design, the establishment of tree knowledge network. Otherwise you’ll soon forget
  • Hard to eat NIO source code is more difficult, need to timely supplement THE KNOWLEDGE and concepts of JDK-NIO, their own concurrent programming network read the document, and wrote NIO Demo, to read ZooKeeper network connection source more easily.
  • ZooKeeper network layer uses JDK NIO to implement, network layer and application layer through NIOServerCnxn as an intermediary for data processing
  • Any complex project is mostly designed from the top down, and it is best to read the source code in modules or layers.
    • For example: can try to read the network layer, then for the network layer to read, beyond the network layer can stop reading
  • On the one hand, I am not fully sure, so it is not easy to write. I hope I can continue to complete my study through discussion.
  • It would be much easier if the article had several pictures.

trailer

  • When ZooKeeper processes a request,RequestProcessorVery interesting and will continue to be analyzed as we analyze the ZooKeeper data model and implementation.

reference

  1. Tencent ZooKeeper source code and practice revealed
  2. 【Zookeeper】 Source code analysis of network communication (a)
  3. 【Zookeeper】 Source code analysis of network communication (ii)
  4. What is SelectionKey?