An overview of the

Tomcat involves a lot of content, this paper mainly analyzes its key part of the high concurrency design.

The main content

  1. IO Model Overview
  2. Tomcat IO model implementation
  3. Concurrency control for Tomcat

IO Model Overview

Data interaction process of server from operating system level:

As shown in the figure, when the server receives the request, the user thread in userspace initiates a read call. This read call relies on two processes: copying data from the network card to kernel space, copying data from kernel space to user space, and the write call is the reverse two steps.

Does the user thread occupy the CPU or give away the CPU while processing these two time-consuming Copy operations? How can you process data more efficiently? This is where the different IO models come in.

Synchronous blocking IO

The user thread makes a read call and blocks (giving away the CPU), waiting for the read to finish (copying data into user space) and then waking up.

Synchronize non-blocking IO

The user thread does not block when it makes a read call. Instead, it continually polls the user thread, returning false until the data is copied to the kernel space, where it blocks and waits for the user thread to wake up after the data is copied from the kernel space.

IO multiplexing

The user thread reads the data in two steps, polling select to see if the data is in the kernel, and reading if it is. The read call waits for the data to be copied from kernel space to user space, which blocks. By multiplexing we mean that multiple channels can be obtained from a single select.

Asynchronous I/o

The user thread registers a callback function when it calls read, which is called to notify the user thread when data is in user space. The user thread does not block.

The Tomcat9 presented in this article is by default a high-concurrency design based on the IO multiplexing model.

Tomcat IO model implementation

Assuming we treat Tomcat as a black box and handle requests the way Spring does, the simplified process looks like this:

Tomcat is responsible for reading the kernel data, converting it into Servlet objects, and then writing the returned data through the Response object after processing the services by the Spring framework. Tomcat then writes the returned data into the network adapter through the kernel and finally returns it to the client.

Let’s zoom in on Tomcat and see what happens in the black box.

As shown in the figure, the processing of the request is divided into the following steps:

  1. Tomcat initializes a ServerSocket at startup to listen for IO requests on specified ports (such as 8080)
  2. The Acceptor thread is started and the accept method is looping to receive the IO request.
  3. Wrap ServerChannel as a PollerEvent and register it in Poller’s Event queue
  4. The Poller thread iterates through the Event queue, registering the READ operation of the Poller’s concerned ServerChannel with the Selector
  5. In the same Poller loop, the state of multiple ServerChannels can be queried using Selector, i.emultiplexing
  6. Socketprocessors are created one by one for the serverchannels corresponding to the SelectionKey. The SocketProcessor is Runnable
  7. The SocketProcessor is thrown to the worker thread for processing
  8. The following protocol parsing, Request and Response adaptation, Servlet processing, and service processing are all in the SocketProcessor process

Main code involved:

  • Acceptor thread: Acceptor. The run – > NioEndpoint. SetSocketOptions – > NioEndpoint. Register – > the Poller. AddEvent
  • The Poller thread: the Poller. The run – > the Poller. Events – > PollerEvent. The run – > the Poller. ProcessKey – > AbstractEndpoint. ProcessSocket

In the next section, we will analyze some of the source code.

Concurrency control for Tomcat

The implementation of Tomcat’s IO multiplexing model is analyzed above. In this implementation, each link has key control that affects concurrency.

The kernel – Accept the List

The kernel maintains two queues for each Socket in LISTEN state during TCP three-way handshake establishment:

  • SYN queue: These connections have received a SYN from the client
  • ACCEPT queue: These connections have received an ACK from the client, completed a three-way handshake, and are waiting to be picked up by the system ACCEPT call

Tomcat acceptors are responsible for removing connections from the ACCEPT queue. When an Acceptor fails to process an Acceptor, the connections pile up in the ACCEPT queue. This queue size is controlled by the acceptCount (default: 100). A Socket Error is received when a concurrent request is made.

/ / initialize the server port listening - NioEndpoint. InitServerSocket
protected void initServerSocket(a) throws Exception {
    if(! getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr =new InetSocketAddress(getAddress(), getPortWithOffset());
        // Bind acceptCount, which can be resized by configuring server.tomcat.accept-count
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}
Copy the code

Acceptor-LimitLatch

A maximum connection controller that processes requests concurrently, internally via AQS for waiting. If the number of concurrent requests exceeds maxConnections (default: 8192), the server will wait. If an Acceptor thread receives an exception, the Socket is abnormally closed, or the SocketProessor is finished processing the connection count.

// The Acceptor thread executes the logic -acceptor.run
public void run(a) {
        // Loop until we receive a shutdown command
        while (endpoint.isRunning()) {
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                // LimitLatch controls the maximum number of concurrent connections. If the maximum number of concurrent connections is reached, the system waits until there are no more connections available
                endpoint.countUpOrAwaitConnection();

                // Endpoint might have been paused while waiting for latch
                // If that is the case, don't accept new connections
                if (endpoint.isPaused()) {
                    continue;
                }

                U socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // Call the accept method to extract the IO request from the kernel
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // We didn't get a socket
                    // If an error occurs, the connection number is reclaimed
                    endpoint.countDownConnection();
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if(endpoint.isRunning() && ! endpoint.isPaused()) {// setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else{ endpoint.destroySocket(socket); }}catch (Throwable t) {
            }
        }
        state = AcceptorState.ENDED;
    }

Copy the code

So how do you know how many connections there are? You can use lsof to view files that occupy port 8080

lsof -i :8080

You can also use netstat, but note that you need to filter ESTABLISHED to see which connections are currently communicating

netstat -anp | grep 8080 | grep ESTABLISHED

Poller-PollerEvent Queue

SynchronizedQueue is a lightweight synchronization queue implemented by Tomcat itself. The default size is 128, which is automatically expanded to twice the current size. The main features are very lightweight, internal use of System. Copy to improve performance, GC friendly.

// Register PollerEvent with synchronizedQueue-nioendpoint.register
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
    socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    PollerEvent r = null;
    if(eventCache ! =null) {
        r = eventCache.pop();
    }
    if (r == null) {
        / / PollerEvent packing
        r = new PollerEvent(socket, OP_REGISTER);
    } else {
        r.reset(socket, OP_REGISTER);
    }
    Add pollerEvents to poller. events (SynchronizedQueue)
    addEvent(r);
}
Copy the code
// Register the event of interest with Selector -pollerEvent.run
public void run(a) {
    if (interestOps == OP_REGISTER) {
        try {
            // Register interested data readable events with Selector
            socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
        } catch (Exception x) {
            log.error(sm.getString("endpoint.nio.registerFail"), x); }}}Copy the code
// Poller thread executes logic -poller.run
public void run(a) {
    while (true) {
        boolean hasEvents = false;
        try {
            if(! close) {// The events() method internally iterates through the PollerEvent queue and registers the event with the Selector
                hasEvents = events();
                if (wakeupCounter.getAndSet(-1) > 0) {
                    // If we are here, means we have other stuff to do
                    // Do a non blocking select
                    keyCount = selector.selectNow();
                } else {
                    // When the data is readable, the selector returns the number of reads from the readyList, and the data states of multiple channels can be queried at once
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0); }}catch (Throwable x) {
        }

        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        while(iterator ! =null && iterator.hasNext()) {
            // Polling data for ready channels
            SelectionKey sk = iterator.next();
            NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
            // Attachment may be null if another thread has called
            // cancelledKey()
            if (socketWrapper == null) {
                iterator.remove();
            } else {
                iterator.remove();
            // Perform data parsing and subsequent business processingprocessKey(sk, socketWrapper); }}}}Copy the code

Execotor – ThreadPoolExecutor, TaskQueue

To maximize performance, Tomcat extends the default thread pool policy and thread pool queues. When the thread pool reaches its maximum number, it does not reject immediately. Instead, it tries to add tasks to the task queue again.

/ / thread pool extension - org. Apache. Tomcat. Util. Threads. ThreadPoolExecutor. Execute
public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // When the maximum number of threads is reached, an attempt is made to put the task on the queue
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if(! queue.force(command, timeout, unit)) { submittedCount.decrementAndGet();throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); }}catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw newRejectedExecutionException(x); }}else {
                submittedCount.decrementAndGet();
                throwrx; }}}Copy the code

The thread pool queue used by Tomcat is LinkedBlockingQueue, which defaults to unbounded mode. This creates a problem: when the number of threads reaches the core thread count, tasks can be added to the queue indefinitely and no new threads can be created. Tomcat added a maximumPoolSize intervention in the logic for adding tasks to the queue, so that tasks cannot be added until the number of threads reaches maximumPoolSize, so that new threads can be created.

// Thread pool queue extension - taskqueue.offer
public boolean offer(Runnable o) {
    // If this method is called, the current number of threads is greater than the number of core threads
    if (parent==null) return super.offer(o);

    // When the number of threads is equal to the maximum number of threads, no new threads can be created and the task will be queued
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);

    // The current number of threads is greater than the number of core threads and smaller than the maximum number of threads

    // If the number of tasks is smaller than the number of threads, the table name has free threads, do not need to create, put into the queue
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);

    // If the number of tasks is greater than the number of threads and the number of threads is smaller than the maximum number of threads, a new thread should be created
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        
    // Queue other cases
    return super.offer(o);
}
Copy the code

conclusion

  1. IO Model Introduction
  2. Tomcat implementation of IO multiplexing
  3. Acceptor, Poller, Executor
  4. Accept List, LimitLatch, SynchronizedQueue, thread pool extension

Author: Johnny