Following up on the previous article, this article looks at the processing chain on the Zookeeper server.

PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor processing chain.

PrepRequestProcessor.java

Create transaction request headers, transaction bodies, session checks, ACL checks, version checks, etc.

PrepRequestProcessor#run()

		@Override
    public void run(a) {...try {
            while (true) { ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size()); Request request = submittedRequests.take(); . request.prepStartTime = Time.currentElapsedTime(); pRequest(request); }}catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }
Copy the code

Fetch the Request from the queue and give it to pRequest(Request);

PrepRequestProcessor#pRequest(Request request)

		protected void pRequest(Request request) throws RequestProcessorException {
        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
        request.setHdr(null);
        request.setTxn(null);

        if(! request.isThrottled()) {// Generate the transaction header and body
          pRequestHelper(request);
        }

        / / transaction ID
        request.zxid = zks.getZxid();
        long timeFinishedPrepare = Time.currentElapsedTime();
        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
      // Pass to the next handler
        nextProcessor.processRequest(request);
        ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
    }
Copy the code

This step mainly generates the transaction header and body and passes it on to the next processor.

private void pRequestHelper(Request request) throws RequestProcessorException {
        try {
            switch (request.type) {
            case OpCode.createContainer:
            case OpCode.create:
            case OpCode.create2:
                CreateRequest create2Request = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break; .// There is no need to create a transaction request
            //All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
            case OpCode.exists:
            case OpCode.getData:
            case OpCode.getACL:
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                break;
            default:
                LOG.warn("unknown type {}", request.type);
                break; }}catch (KeeperException e) {
            ...
        } catch(Exception e) { ... }}Copy the code

This step falls into two categories for requests, one is a write request, also known as a transaction request,pRequest2Txn(). The other is a read request, which is a non-transactional request.

protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
        if (request.getHdr() == null) {
            // Transaction request header
            request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                    Time.currentWallTime(), type));
        }

        switch (type) {
        case OpCode.create:
        case OpCode.create2:
        case OpCode.createTTL:
        case OpCode.createContainer: {
           / / the next step
            pRequest2TxnCreate(type, request, record, deserialize);
            break; }}Copy the code

This step creates the transaction request header.

private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
        if (deserialize) {
            ByteBufferInputStream.byteBuffer2Record(request.request, record);
        }

        int flags;
        String path;
        List<ACL> acl;
        byte[] data;
        long ttl;
        if (type == OpCode.createTTL) {
            CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
            flags = createTtlRequest.getFlags();
            path = createTtlRequest.getPath();
            acl = createTtlRequest.getAcl();
            data = createTtlRequest.getData();
            ttl = createTtlRequest.getTtl();
        } else {
            CreateRequest createRequest = (CreateRequest) record;
            flags = createRequest.getFlags();
            path = createRequest.getPath();
            acl = createRequest.getAcl();
            data = createRequest.getData();
            ttl = -1;
        }
        // Node creation mode
        CreateMode createMode = CreateMode.fromFlag(flags);
        validateCreateRequest(path, createMode, request, ttl);
        String parentPath = validatePathForCreate(path, request.sessionId);

        List<ACL> listACL = fixupACL(path, request.authInfo, acl);
        ChangeRecord parentRecord = getRecordForPath(parentPath);

        // check ACL
        zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
        int parentCVersion = parentRecord.stat.getCversion();
        if (createMode.isSequential()) {
            path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
        }
        // Verify the path
        validatePath(path, request.sessionId);
        try {
            if(getRecordForPath(path) ! =null) {
                throw newKeeperException.NodeExistsException(path); }}catch (KeeperException.NoNodeException e) {
            // ignore this one
        }
        boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
        if (ephemeralParent) {
            throw new KeeperException.NoChildrenForEphemeralsException(path);
        }
        / / version
        int newCversion = parentRecord.stat.getCversion() + 1;
        zks.checkQuota(path, null, data, OpCode.create);
        if (type == OpCode.createContainer) {
            request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
        } else if (type == OpCode.createTTL) {
            request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
        } else {
            request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
        }

        TxnHeader hdr = request.getHdr();
        long ephemeralOwner = 0;
        if (createMode.isContainer()) {
            ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
        } else if (createMode.isTTL()) {
            ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
        } else if (createMode.isEphemeral()) {
            ephemeralOwner = request.sessionId;
        }
        StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
        parentRecord.childCount++;
        parentRecord.stat.setCversion(newCversion);
        parentRecord.stat.setPzxid(request.getHdr().getZxid());
        parentRecord.precalculatedDigest = precalculateDigest(
                DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
        addChangeRecord(parentRecord);
        ChangeRecord nodeRecord = new ChangeRecord(
                request.getHdr().getZxid(), path, s, 0, listACL);
        nodeRecord.data = data;
        nodeRecord.precalculatedDigest = precalculateDigest(
                DigestOpCode.ADD, path, nodeRecord.data, s);
        setTxnDigest(request, nodeRecord.precalculatedDigest);
  			// Add to the transaction being processed
        addChangeRecord(nodeRecord);
    }
Copy the code

This step is basically the various checks to the final condition of the transaction request queue being processed.

Let’s take a look at one processor, the SyncRequestProcessor

The main job of this handler is to log requests to the transaction log.

		public void processRequest(final Request request) {
        Objects.requireNonNull(request, "Request cannot be null");

        request.syncQueueStartTime = Time.currentElapsedTime();
        // Record the request
        queuedRequests.add(request);
        ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
    }
Copy the code

run()

			@Override
   	  public void run(a) {
        try {
            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            resetSnapshotStats();
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());

                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                if (si == null) {
                    /* We timed out looking for more writes to batch, go ahead and flush immediately */
                    flush();
                    si = queuedRequests.take();
                }

                if (si == REQUEST_OF_DEATH) {
                    break;
                }

                long startProcessTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

                // track the number of records written to the log
                if(! si.isThrottled() && zks.getZKDatabase().append(si)) {if (shouldSnapshot()) {
                        resetSnapshotStats();
                        // roll the log
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if(! snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping");
                        } else {
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run(a) {
                                    try {
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally{ snapThreadMutex.release(); } } }.start(); }}}else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read or a throttled request(which doesn't need to be written to the disk),
                    // and there are no pending flushes (writes), then just pass this to the next processor
                    if(nextProcessor ! =null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceofFlushable) { ((Flushable) nextProcessor).flush(); }}continue;
                }
              // Add to queue
                toFlush.add(si);
                if(shouldFlush()) { flush(); } ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); }}catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }
Copy the code

Zks.getzkdatabase ().append(si) writes a log, but has not committed it yet.

Flush () commits the transaction and passes it to the next handler

			private void flush(a) throws IOException, RequestProcessorException {
        if (this.toFlush.isEmpty()) {
            return;
        }

        ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());

        long flushStartTime = Time.currentElapsedTime();
        / / submit
        zks.getZKDatabase().commit();
        ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);

        if (this.nextProcessor == null) {
            this.toFlush.clear();
        } else {
            while (!this.toFlush.isEmpty()) {
                final Request i = this.toFlush.remove();
                long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
                // Pass to the next handler
                this.nextProcessor.processRequest(i);
            }
            if (this.nextProcessor instanceof Flushable) {
                ((Flushable) this.nextProcessor).flush();
            }
        }
        lastFlushTime = Time.currentElapsedTime();
    }
Copy the code

FinalRequestProcessor.java

The FinalRequestProcessor responds to requests and changes the state of the in-memory database

public void processRequest(Request request) {
        ProcessTxnResult rc = null;
        if(! request.isThrottled()) {// Apply transaction
          rc = applyRequest(request);
        }
        if (request.cnxn == null) {
            return;
        }
        ServerCnxn cnxn = request.cnxn;

        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();

        String lastOp = "NA";
        // Notify ZooKeeperServer that the request has finished so that it can
        // update any request accounting/throttling limits
        zks.decInProcess();
        zks.requestFinished(request);
        Code err = Code.OK;
        Record rsp = null;
        String path = null;
        int responseSize = 0;
        try{... .case OpCode.create: {
                lastOp = "CREA";
                rsp = new CreateResponse(rc.path);
                err = Code.get(rc.err);
                requestPathMetricsCollector.registerRequest(request.type, rc.path);
                break; }}Copy the code

The important step is to apply the transaction, which is to actually modify the in-memory database logic and return the response to the client.

ApplyRequest () applies the transaction

			private ProcessTxnResult applyRequest(Request request) {
        ProcessTxnResult rc = zks.processTxn(request);

        // ZOOKEEPER-558:
        // In some cases the server does not close the connection (e.g., closeconn buffer
        // is not being queued - zookeeper-558) properly.
        // when the client closes the connection. The server should still close the session, though.
        // Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
        if (request.type == OpCode.closeSession && connClosedByClient(request)) {
            // We need to check if we can close the session id.
            // Sometimes the corresponding ServerCnxnFactory could be null because
            // we are just playing diffs from the leader.
            if (closeSession(zks.serverCnxnFactory, request.sessionId)
                || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
                returnrc; }}if(request.getHdr() ! =null) {
            /* * Request header is created only by the leader, so this must be * a quorum request. Since we're comparing timestamps across hosts, * this metric may be incorrect. However, it's still a very useful * metric to track in the happy case. If there is clock drift, * the latency can go negative. Note: headers use wall time, not * CLOCK_MONOTONIC. */
            long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
            if (propagationLatency >= 0) { ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency); }}return rc;
    }
Copy the code

ZookeeperServer#processTxn()

// entry point for FinalRequestProcessor.java
    public ProcessTxnResult processTxn(Request request) {
        TxnHeader hdr = request.getHdr();
        processTxnForSessionEvents(request, hdr, request.getTxn());

        final booleanwriteRequest = (hdr ! =null);
        final boolean quorumRequest = request.isQuorum();

        // return fast w/o synchronization when we get a read
        if(! writeRequest && ! quorumRequest) {return new ProcessTxnResult();
        }
        synchronized (outstandingChanges) {
           // Process transactions
            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());

            // request.hdr is set for write requests, which are the only ones
            // that add to outstandingChanges.
            if (writeRequest) {
                long zxid = hdr.getZxid();
                while(! outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) { ChangeRecord cr = outstandingChanges.remove();  ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                    if (cr.zxid < zxid) {
                        LOG.warn(
                            "Zxid outstanding 0x{} is less than current 0x{}",
                            Long.toHexString(cr.zxid),
                            Long.toHexString(zxid));
                    }
                    if(outstandingChangesForPath.get(cr.path) == cr) { outstandingChangesForPath.remove(cr.path); }}}// do not add non quorum packets to the queue.
            if (quorumRequest) {
                getZKDatabase().addCommittedProposal(request);
            }
            returnrc; }}Copy the code

ZookeeperServer.java

		private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
        if (hdr == null) {
            // Direct return, non-transactional request
            return new ProcessTxnResult();
        } else {
            returngetZKDatabase().processTxn(hdr, txn, digest); }}Copy the code

ZKDataBase.java

		public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest) {
        return dataTree.processTxn(hdr, txn, digest);
    }
Copy the code

DataTree.java

		public ProcessTxnResult processTxn(TxnHeader header, Record txn, TxnDigest digest) {
        ProcessTxnResult result = processTxn(header, txn);
        compareDigest(header, txn, digest);
        return result;
    }
Copy the code

DataTree.java

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) {
            case OpCode.create:
                CreateTxn createTxn = (CreateTxn) txn;
                rc.path = createTxn.getPath();
                // Create a node
                createNode(
                    createTxn.getPath(),
                    createTxn.getData(),
                    createTxn.getAcl(),
                    createTxn.getEphemeral() ? header.getClientId() : 0,
                    createTxn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    null);
                break; }}Copy the code

At this point, the process of creating a node ends.

To summarize

PrepRequestProcessor – Various pre-checks to create transaction request headers

-> SyncRequestProcessor — records transaction logs

-> FinalRequestProcessor — real logical processing, applying transactions, modifying in-memory databases