Tomcat series 2 – EndPoint source code parsing

In the previous section we described the overall architecture of Tomcat. We learned that Tomcat is divided into two large components, a connector and a container. This time, the EndPoint component belongs in the connector. It is a communication endpoint that implements the TCP/IP protocol. An EndPoint is an interface. Its AbstractEndpoint class can be defined as AprEndpoint, Nio2Endpoint, and NioEndpoint.

  • AprEndpoint: The APR mode is used to solve asynchronous I/O problems at the operating system level, greatly improving the processing and response performance of the server. However, enabling this mode requires the installation of some additional dependent libraries.
  • Nio2Endpoint: Uses code to implement asynchronous IO
  • NioEndpoint: Uses JAVA NIO to implement non-blocking IO, which is the default startup of Tomcat and the focus of our talk.

Important components in the NioEndpoint

We know that NioEndpoint works with Linux multiplexers, where there are two simple steps.

  1. Create a Selector, register various channels with it, then call the Select method and wait for an event of interest to occur in the Channel.
  2. If something interesting happens, such as a read event, the information is read out of the channel.

NioEndpoint uses five components to implement these two steps. The five components are LimitLatch, Acceptor, Poller, SocketProcessor, and Executor

/**
 * Threads used to accept new connections and pass them to worker threads.
 */
protected List<Acceptor<U>> acceptors;

/**
 * counter fornr of connections handled by an endpoint */ private volatile LimitLatch connectionLimitLatch = null; /** * The socket pollers. */ private Poller[] pollers = null; SocketProcessor /** * External Executor based thread pool. */ private Executor Executor = null;Copy the code

We can see the five components defined in the code. What exactly are these five components?

  • LimitLatch: Connection controller, responsible for controlling the maximum number of connections
  • Acceptor: is responsible for receiving new connections and returning oneChannelObject toPoller
  • Poller: you can think of it as NIOSelector, responsible for monitoringChannelThe state of the
  • SocketProcessor: can be thought of as a encapsulated task class
  • Executor: Tomcat’s own extended thread pool for executing task classes

Graphically, this is the following relationship

Let’s take a look at the key code in each component separately

LimitLatch

LimitLatch is mainly used to control the maximum number of connections that Tomcat can receive. If this number is exceeded, Tomcat blocks the connection thread until other connections are released before consuming the connection. So how does LimitLatch do that? We can look at the LimitLatch class


public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);

    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        protected int tryAcquireShared(int ignored) {
            long newCount = count.incrementAndGet();
            if(! released && newCount >limit) {
                // Limit exceeded
                count.decrementAndGet();
                return- 1; }else {
                return 1;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true; } } private final Sync sync; Private final AtomicLong count; // Maximum connection count private volatile longlimit;
    private volatile boolean released = false;
}

Copy the code

We can see its internal implementation AbstractQueuedSynchronizer, AQS is a framework for implementing its class can customize the threads of control when suspended when released. The limit parameter is the maximum number of connections to control. We can see that AbstractEndpoint calls the countUpOrAwait method on LimitLatch to determine whether a connection can be obtained.

    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }
Copy the code

How does AQS know when to block a thread? That is, can’t get a connection? These depends on user realize AbstractQueuedSynchronizer themselves to define when to get connected, when to release the connection. You can see that the Sync class overwrites the tryAcquireShared and tryReleaseShared methods. The tryAcquireShared method defines that if the current number of connections is greater than the set maximum number of connections, then -1 is returned to indicate that the thread is placed in the AQS queue to wait.

Acceptor

Acceptors accept connections. If an Acceptor implements the Runnable interface, where should a new thread be opened to execute an Acceptor’s run method? In the startAcceptorThreads AbstractEndpoint method.

protected void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);

    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-"+ i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); }}Copy the code

As you can see, you can set several acceptors to be opened. The default is one. A port can only correspond to one ServerSocketChannel. Where is this ServerSocketChannel initialized? Acceptor Acceptor = new Acceptor<>(this); If this is passed in, the connection should be initialized by the Endpoint component. The connection is initialized in the initServerSocket method of the NioEndpoint.

// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
    if(! getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); }else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}

Copy the code

There are two things we can see here

  1. The second parameter in the bind method said waiting queue length of the operating system, namely the Tomcat no longer accept connections (to set the maximum number of connections), but the operating system level or to accept connections, will this connection information into the waiting queue at this time, so the size of the queue is the parameter Settings.
  2. ServerSocketChannelIs set to block mode, that is, to accept connections in a blocking manner. There may be questions. In normal NIO programming, channels should be set to non-blocking mode. Just to explain, if you’re in non-blocking mode you have to set oneSelectorConstant polling, but accepting connections only blocks one channel.

Note that each Acceptor generates a PollerEvent and places it in a Poller queue randomly. The Poller Queue object is set to SynchronizedQueue because multiple acceptors may place PollerEvent objects on the Poller’s Queue at the same time.

public Poller getPoller0() {
    if (pollerThreadCount == 1) {
        return pollers[0];
    } else {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        returnpollers[idx]; }}Copy the code

What are operating system-level connections? In TCP’s three-way handshake, the system usually maintains two queues for each Socket in LISTEN state. One is a half-connection queue (SYN) : these connections have received a SYN from the client. The other is a full connection queue (ACCEPT) : these links have received an ACK from the client, completed a three-way handshake, and are waiting to be picked up and used by the application calling the ACCEPT method.

All acceptors share this connection, and in an Acceptor’s run method, some important code is placed.

 @Override
    public void run() {
        // Loop until we receive a shutdown command
        while(endpoint.isrunning ()) {try {// If the maximum number of connections is reached, the thread waits for endpoint.countuporawaitConnection(); U socket = null; Try {/ / call the accept method to obtain a connection socket. = the endpoint serverSocketAccept (); } catch Exception (ioe) {/ / out abnormal after the current number of connections to lose 1 endpoint. CountDownConnection (); } // Configure the Socketif(endpoint.isRunning() && ! endpoint.isPaused()) { //setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else{ endpoint.destroySocket(socket); }}Copy the code

We can get two points in there

  1. The runtime determines whether the maximum number of connections has been reached, and if so blocks the thread wait, which is calledLimitLatchComponent determined.
  2. The most important step is configuring the socket, yesendpoint.setSocketOptions(socket)This code
 protected boolean setSocketOptions(SocketChannel socket) {// Process the connection try {// Sets the socket to non-blocking mode for Poller calls socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);

            NioChannel channel = null;
            if(nioChannels ! = null) { channel = nioChannels.pop(); }if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else{ channel = new NioChannel(socket, bufhandler); }}else{ channel.setIOChannel(socket); channel.reset(); Poller0().register(channel); poller0 ().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

Copy the code

The important thing is to bind acceptors to a Poller, and the two components communicate through a queue. Each Poller maintains a SynchronizedQueue, where channelEvents are placed in the queue, and the Poller fetches the events from the queue for consumption.

Poller

We can see that Poller is an internal class to NioEndpoint, which implements the Runnable interface. We can see that a Quene and Selector are maintained in its class, as defined below. So a Poller is essentially a Selector.

private Selector selector;
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
Copy the code

The focus is on its run method, which cuts down some of the code and shows only the important ones.

  @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                boolean hasEvents = false;
                try {
                    if(! Close) {// Check to see if there is a connection, and if so, register the Channel with Selector hasEvents = events(); }if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }
                if (keyCount == 0) {
                    hasEvents = (hasEvents | events());
                }
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while(iterator ! = null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be nullif another thread has called
                    // cancelledKey()
                    if (socketWrapper == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }

            getStopLatch().countDown();
        }

Copy the code

The main thing is to call the events() method, which continuously checks to see if there are Pollerevent events in the queue, and if there are Pollerevent events, then pulls them out and registers them with the Selector, and then polls all the registered channels to see if there are any events.

SocketProcessor

We know that Poller calls when an event occurs in a polling Channel to encapsulate the event and hand it off to the thread pool. So this wrapper class is a SocketProcessor. When we open this class, we can see that it also implements the Runnable interface, which defines tasks to be performed by threads in the thread pool Executor. So how do you convert a byte stream in a Channel into a ServletRequest object that Tomcat requires? The Http11Processor is called to convert the byte stream to the object.

Executor

Executor is a custom version of Tomcat’s thread pool. We can look at its class definition and see that it actually extends Java’s thread pool.

public interface Executor extends java.util.concurrent.Executor, Lifecycle

Copy the code

The two most important parameters in a thread pool are the number of core threads and the maximum number of threads, and the normal Java thread pool execution flow looks like this.

  1. If the current thread is smaller than the number of core threads, then one thread is created for each task.
  2. If the current thread is larger than the number of core threads, then the task is put into the task queue. All threads compete for the task.
  3. If the queue is full, temporary threads are created.
  4. If the maximum number of threads is reached and the queue is full, an exception is thrown.

However, Tomcat’s custom thread pool is different. The execute method is overridden to achieve its own task processing logic.

  1. If the current thread is smaller than the number of core threads, then one thread is created for each task.
  2. If the current thread is larger than the number of core threads, then the task is put into the task queue. All threads compete for the task.
  3. If the queue is full, temporary threads are created.
  4. If the total number of threads reaches the maximum number, the task is queued again and the task is queued again.
  5. If it is still full, an exception is thrown.

The difference is the difference of step 4. The processing strategy of the native thread pool is to throw an exception as long as the number of current threads is greater than the maximum number of threads, while Tomcat is to try again if the number of current threads is greater than the maximum number of threads, and to throw an exception if it is still full. Here is the execution logic for customizing thread pool EXECUTE.

public void execute(Runnable command, long timeout, TimeUnit unit) {
    submittedCount.incrementAndGet();
    try {
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        if(super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue(); try {if(! queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); }}else{ submittedCount.decrementAndGet(); throw rx; }}}Copy the code

In the code, we can see that submittedCount. IncrementAndGet (); Why does this phrase exist? We can look at the definition of this parameter. This parameter simply defines the number of tasks that have been submitted to the thread pool but have not yet been executed.

/**
 * The number of tasks submitted but not yet finished. This includes tasks
 * in the queue and tasks that have been handed to a worker thread but the
 * latter did not start executing the task yet.
 * This number is always greater or equal to {@link #getActiveCount()}.
 */
private final AtomicInteger submittedCount = new AtomicInteger(0);

Copy the code

Why is there such a parameter? We know that custom queues inherit from LinkedBlockingQueue, which by default has no boundaries. We then pass in an argument, maxQueueSize, to the constructed queue. But Tomcat’s task queue is unlimited by default, so there is a problem. If the current thread reaches the number of core threads and starts adding tasks to the queue, it will always be added successfully. No new threads will be created. So when do you create a new thread?

There are two places to create a new thread in the thread pool, one is smaller than the core thread, to create a thread for each task. The other is when the core thread is exceeded and the task queue is full, a temporary thread is created.

So how do you tell if the task queue is full? It would be nice if the maximum queue length was set, but Tomcat is not set by default, so it is infinite by default. So Tomcat’s TaskQueue inherits LinkedBlockingQueue and overwrites the Offer method to define when to return false.

@Override
public boolean offer(Runnable o) {
    if (parent==null) returnsuper.offer(o); If the current number of threads is equal to the maximum number of threads, no new threads can be created, but can only be added to the task queueif (parent.getPoolSize() == parent.getMaximumPoolSize()) returnsuper.offer(o); // If the number of submitted tasks is less than or equal to the number of current threads, it can be processed and put into the queueif (parent.getSubmittedCount()<=(parent.getPoolSize())) returnsuper.offer(o); If the current number of threads is less than the maximum number of threads, returnfalseThe new threadif (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    return super.offer(o);
}

Copy the code

That’s what submittedCount is all about, to give the thread pool a chance to create new threads if the task queue size is infinite.

conclusion

Part of the above knowledge is looking at the teacher Li Double in-depth dismantling of Tomcat summary, combined with the source code in-depth understanding, when just read the article feel that they understand, but then when the source code will find that they do not understand. Therefore, knowledge can never be ours if it is merely seen without use. Through the Tomcat connector this small piece of source code learning, in addition to some common knowledge of the practical use, such as AQS, locking applications, custom thread pool needs to consider points, NIO applications and so on. And the learning of design thinking in general, modular design, much like today’s microservices feel, breaking up a function point into multiple modules so that it can be easily replaced or upgraded later.

The articles

How to debug Tomcat source code breakpoint

Tomcat Series 1 — Overall architecture

A weird trip to find StackOverflowError problems

A simple RPC framework for hand – to – hand

A Simple RPC Framework (2) – Project transformation

Do a simple IOC hand to hand