The Tomcat Connector has three operating modes: BIO, NIO, and APR.

  1. As the name implies, the BIO (blocking I/O) operation means that Tomcat uses traditional Java I/O operations (i.ejava.ioPackage and its subpackages). Tomcat runs in BIO mode by default. In general, BIO mode is the least performing of the three running modes.
  2. Nio (New I/O) is a new I/O operation mode provided by Java SE 1.4 and later (i.ejava.nioPackage and its subpackages). Java NIO is a buffer-based Java API that provides non-blocking I/O operations, so NIO is also known as an acronym for non-blocking I/O. It has better concurrency performance than traditional I/O operations (BIO). To get Tomcat to run in NIO mode, you only need to run in the Tomcat installation directory/conf/server.xmlThe protocol of the Connector node is configured asorg.apache.coyote.http11.Http11NioProtocolCan.
  3. Apr (Apache Portable Runtime/Apache Portable Runtime), is a support library for the Apache HTTP server. Tomcat will use the Apache HTTP server’s core dynamic link library as JNI to handle file reads or network transfers, thus greatly improving Tomcat’s performance in handling static files. Tomcat APR is also the preferred mode for running highly concurrent applications on Tomcat.

It is easy to write a Socket server in the BIO, which accepts a Socket and then throws it into a thread to process the request and generate the response. This article focuses on the code logic of NIO processing in Tomcat.

Key code is inside the org.apache.tomcat.util.net.NioEndpoint this class, it is a major component of the Http11NioProtocol responsible for receiving the socket, don’t look at the code is very long, read carefully will find that there are a lot of common place, such as:

  1. Extensions or wrappers to existing JDK apis, such as ThreadPoolExecutorjava.util.concurrent.ThreadPoolExecutorNioChannel is an extension of ByteChannel, KeyAttachment is a wrapper of NioChannel
  2. Many classes are designed to be non-GC for caching and reuse by constructing a queue using the ConcurrentLinkedQueue class. For example, ConcurrentLinkedQueue processorCache, ConcurrentLinkedQueue keyCache, ConcurrentLinkedQueue EventCache, ConcurrentLinkedQueue nioChannels ConcurrentLinkedQueue Events in Poller

First take a look at the structure diagram of the Connector component:

If you have read the previous Tomcat startup article, you should know that the startup of Connector calls the startInternal method of Connector class, which calls the protocolHandler start(). This method calls the abstract endpoint’s start() method, which calls the startInternal() of the concrete endpoint class, so code analysis starts with the startInternal of the NioEndpoint class.

1. Initialize the core components of the NioEndpoint class

/** 
 * Start the NIO endpoint, creating acceptor, poller threads. 
 */  
@Override  
public void startInternal() throws Exception {  
  
    if(! running) { running =true;  
        paused = false;  
  
        // Create worker collection  
        if(getExecutor() == null) {// Construct a thread pool for subsequent execution of SocketProcessor threads, which is the Worker shown above. createExecutor(); } initializeConnectionLatch(); Pollers = new poller [getPollerThreadCount()]; pollers = new poller [getPollerThreadCount()];for (int i=0; i<pollers.length; i++) {  
            pollers[i] = new Poller();  
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);  
            pollerThread.setPriority(threadPriority);  
            pollerThread.setDaemon(true); pollerThread.start(); } // Create Acceptor startAcceptorThreads(); }}Copy the code

StartAcceptorThreads call the superclass implementation of org.apache.tomcat.util.net.AbstractEndpoint:

protected final void startAcceptorThreads() {  
    int count = getAcceptorThreadCount();  
    acceptors = new Acceptor[count];  
  
    for(int i = 0; i < count; I ++) {// Call the createAcceptor method of the subclass, which in this case is the createAcceptor method of the NioEndpoint class. String threadName = getName() +"-Acceptor-"+ i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); }}Copy the code

This is the initialization of Acceptor, Poller, Worker, and other core components.

2. The request is received

After the core component is initialized, an Acceptor thread receives a socket connection.

/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- Acceptor Inner Class / * * * a background thread, listens for TCP/IP connection and will be treated as they are distributed to the corresponding scheduler. * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) {// Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (! running) { break; } state = AcceptorState.RUNNING; If we have reached Max connections, wait countUpOrAwaitConnection(); if we have reached Max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; Try {// Accept the connection, blocking mode used here. // Accept the next incoming connection from the server // socket socket = serverSock.accept(); } catch (IOException ioe) { //we didn't get a socket countDownConnection(); // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // setSocketOptions() will add channel to the Poller // if successful if (running && ! paused) { if (! setSocketOptions(socket)) { countDownConnection(); closeSocket(socket); } } else { countDownConnection(); closeSocket(socket); } } catch (SocketTimeoutException sx) { // Ignore: Normal condition } catch (IOException x) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }}Copy the code

3. Set Socket parameters

After receiving a connection from an Acceptor, call setSocketOptions to set the parameters of a SocketChannel, and register the SocketChannel with a Poller. Take a look at the implementation of setSocketOptions:

/** 
 * Process the specified connection. 
 */  
protected boolean setSocketOptions(SocketChannel socket) {// Process the connection try {// Set SocketChannel to non-blocking mode //disable blocking, APR style, we are gonna be polling it  
        socket.configureBlocking(false); Socket sock = socket.socket(); / / set the Socket parameter values (from the server. On the Connector of the XML node to obtain the parameter value) / / such as Socket to send and receive the cache size, such as heartbeat detection socketProperties. SetProperties (sock); // NioChannel is a wrapper for SocketChannel. // NioChannel is a wrapper for SocketChannel nioChannels.poll(); // Create a new NioChannel if there is none in the cache queueif ( channel == null ) {  
            // SSL setup  
            if(sslContext ! = null) { SSLEngine engine = createSSLEngine(); int appbufsize = engine.getSession().getApplicationBufferSize(); NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()), Math.max(appbufsize,socketProperties.getAppWriteBufSize()), socketProperties.getDirectBuffer()); channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool); }else{ // normal tcp setup NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); channel = new NioChannel(socket, bufhandler); }}else{// Associate SocketChannel with the NioChannel fetched from the cache queue channel.setioChannel (socket);if ( channel instanceof SecureNioChannel ) {  
                SSLEngine engine = createSSLEngine();  
                ((SecureNioChannel)channel).reset(engine);  
            } else{ channel.reset(); GetPoller0 ().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t);  
        } catch (Throwable tt) {  
            ExceptionUtils.handleThrowable(t);  
        }  
        // Tell to close the socket  
        return false;  
    }  
    return true;  
}
Copy the code

The core call is the final getPoller0().register(channel); It wraps the configured SocketChannel as a PollerEvent and adds it to the Poller’s Events cache queue.

4. Read the event registry

The getPoller0 method polls the current Poller array and returns a Poller from it. (See Step 1 above for Poller initialization: Initialization of the NioEndpoint class core component)

/** 
 * Return an available poller in true round robin fashion 
 */  
public Poller getPoller0() {/ / the most simple round-robin scheduling algorithm, the poller counter constantly add 1 to the poller array modulo int independence idx = math.h abs (pollerRotater. IncrementAndGet ()) % pollers. The length;return pollers[idx];  
}  
Copy the code

Then call the Poller object register method:

Public void register(final NioChannel socket) {// Set the socket Poller reference for subsequent processing. Socket.setpoller (this); KeyAttachment KeyAttachment key = keycache.poll (); KeyAttachment key = keycache.poll (); // KeyAttachment is NioChannel's wrapper class final KeyAttachment ka = key! =null? key:new KeyAttachment(socket); / / reset the Poller, NioChannel KeyAttachment object member variables such as a reference to the ka. Reset (this, socket, getSocketProperties () getSoTimeout ()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.poll(); PollerEvent r = eventCache.poll(); PollerEvent r = eventCache.poll(); InterestOps (selectionkey.op_read); // Set the read operation to the interested operation. //this is what OP_REGISTER turns into.if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            elser.reset(socket,ka,OP_REGISTER); // addEvent queue addEvent(r) to Poller object; }Copy the code

Take a look at the addEvent code in the Poller class:

        /**
         * Only used in this class. Will be made private inTomcat 8.0.x * @deprecated */ @deprecated public void addEvent(Runnable event) {events.offer(event); Tomcat 8.0.x * @deprecated */ @deprecated public void addEvent(Runnable event) {events.offer(event);if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
        }
Copy the code

Just two lines, the first line is added from the event object to the cache queue, and the second line wakes up the selector in the blocked state if there is no event in the current event queue.

5.Poller process

The Socket received from an Acceptor is wrapped as a PollerEvent and added to Poller’s event cache queue.

/** * Poller class. */ public class Poller implements Runnable {// You can see that every Poller is associated with a Selector protected Selector; Protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); // Protected AtomicLong wakeupCounter = new AtomicLong(0l); Public Poller() throws IOException {// Synchronize access to the Selector, Create a Selector synchronized (Selector. Class) {// Selector. Open () isn't thread safe // by calling Selector http://bugs.sun.com/view_bug.do?bug_id=6427854 // Affects 1.6.0_29, fixed in 1.7.0_01 this.selector = selector. Open (); ** * Only used in this class. Will be made private in Tomcat 8.0.x * @deprecated */ @Deprecated public void addEvent(Runnable event) { events.offer(event); / / if the queue is not to be processed in the wake up in the blocking state selector if (wakeupCounter. IncrementAndGet () = = 0) the selector. Wakeup (); } // Processes events in the event queue of the Poller. ** @returntrue if some events were processed,
         *   false if queue was empty
         */
        public boolean events() {
            boolean result = false;

            Runnable r = null;
            // 将Poller的事件队列中的事件逐个取出并执行相应的事件线程
            while ( (r = events.poll()) != null ) {
                result = true;
                try {
                    // 执行事件处理逻辑
                    // 这里将事件设计成线程是将具体的事件处理逻辑和事件框架分开 
                    r.run();
                    if ( r instanceof PollerEvent ) {
                        ((PollerEvent)r).reset();
                        // 事件处理完之后,将事件对象返回NIOEndpoint的事件对象缓存中
                        eventCache.offer((PollerEvent)r);
                    }
                } catch ( Throwable x ) {
                    log.error("",x);
                }
            }

            return result;
        }

        // 将socket包装成统一的事件对象PollerEvent,加入到待处理事件队列中
        public void register(final NioChannel socket) {
            socket.setPoller(this);
            KeyAttachment key = keyCache.poll();
            final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
            ka.reset(this,socket,getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            // 从NIOEndpoint的事件对象缓存中取出一个事件对象
            PollerEvent r = eventCache.poll();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            // 将事件添加打Poller的事件队列中
            addEvent(r);
        }
        
        // Poller是一个线程,该线程同Acceptor一样会监听TCP/IP连接并将它们交给合适的处理器处理
        /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                try {
                    // Loop if endpoint is paused
                    while (paused && (!close) ) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }

                    boolean hasEvents = false;

                    // Time to terminate?
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString(
                                    "endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    } else {
                        // 执行事件队列中的事件线程
                        hasEvents = events();
                    }
                    try {
                        if ( !close ) {
                            if (wakeupCounter.getAndSet(-1) > 0) {
                                // 把wakeupCounter设成-1,这是与addEvent里的代码呼应,这样会唤醒selector
                                //if we are here, means we have other stuff to do
                                //do a non blocking select
                                // 以非阻塞方式查看selector是否有事件发生
                                keyCount = selector.selectNow();
                            } else {
                                // 查看selector是否有事件发生,超过指定时间则立即返回
                                keyCount = selector.select(selectorTimeout);
                            }
                            wakeupCounter.set(0);
                        }
                        if (close) {
                            // 执行事件队列中的事件线程
                            events();
                            timeout(0, false);
                            try {
                                selector.close();
                            } catch (IOException ioe) {
                                log.error(sm.getString(
                                        "endpoint.nio.selectorCloseFail"), ioe);
                            }
                            break;
                        }
                    } catch ( NullPointerException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch ( CancelledKeyException x ) {
                        //sun bug 5076772 on windows JDK 1.5
                        if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
                        if ( wakeupCounter == null || selector == null ) throw x;
                        continue;
                    } catch (Throwable x) {
                        ExceptionUtils.handleThrowable(x);
                        log.error("",x);
                        continue;
                    }
                    //either we timed out or we woke up, process events first
                    if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                    Iterator iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                    // 根据向selector中注册的key遍历channel中已经就绪的keys,并处理这些key
                    // Walk through the collection of ready keys and dispatch
                    // any active event.
                    while (iterator != null && iterator.hasNext()) {
                        SelectionKey sk = iterator.next();
                        // 这里的attachment方法返回的就是在register()方法中注册的
                        // 而KeyAttachment对象是对socket的包装
                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
                        // Attachment may be null if another thread has called
                        // cancelledKey()
                        if (attachment == null) {
                            iterator.remove();
                        } else {
                            // 更新通道最近一次发生事件的时间
                            // 防止因超时没有事件发生而被剔除出selector
                            attachment.access();
                            iterator.remove();
                            // 具体处理通道的逻辑
                            processKey(sk, attachment);
                        }
                    }//while

                    //process timeouts
                    // 多路复用器每执行一遍完整的轮询便查看所有通道是否超时
                    // 对超时的通道将会被剔除出多路复用器
                    timeout(keyCount,hasEvents);
                    if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            System.err.println(oomParachuteMsg);
                            oomt.printStackTrace();
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                }
            }//while
            synchronized (this) {
                this.notifyAll();
            }
            stopLatch.countDown();

        }

        // 处理selector检测到的通道事件 
        protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
            boolean result = true;
            try {
                if ( close ) {
                    cancelledKey(sk, SocketStatus.STOP, attachment.comet);
                } else if ( sk.isValid() && attachment != null ) {
                    // 确保通道不会因超时而被剔除
                    attachment.access();//make sure we don't time out valid sockets
                    sk.attach(attachment);//cant remember why this is here
                    NioChannel channel = attachment.getChannel();
                    // 处理通道发生的读写事件
                    if (sk.isReadable() || sk.isWritable() ) {
                        if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        } else {
                            if ( isWorkerAvailable() ) {
                                // 在通道上注销对已经发生事件的关注
                                unreg(sk, attachment, sk.readyOps());
                                boolean closeSocket = false;
                                // Read goes before write
                                if (sk.isReadable()) {
                                    // 具体的通道处理逻辑
                                    if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (!closeSocket && sk.isWritable()) {
                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
                                        closeSocket = true;
                                    }
                                }
                                if (closeSocket) {
                                    // 解除无效通道
                                    cancelledKey(sk,SocketStatus.DISCONNECT,false);
                                }
                            } else {
                                result = false;
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk, SocketStatus.ERROR,false);
                }
            } catch ( CancelledKeyException ckx ) {
                cancelledKey(sk, SocketStatus.ERROR,false);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("",t);
            }
            return result;
        }

        // 这个unreg()很巧妙,防止了通道对同一个事件不断select的问题
        protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
            //this is a must, so that we don't have multiple threads messing with the socket
            reg(sk,attachment,sk.interestOps()& (~readyOps));
        }

        // 向NioChannel注册感兴趣的事件,具体代码看下面的PollerEvent类的说明
        protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
            sk.interestOps(intops);
            attachment.interestOps(intops);
            attachment.setCometOps(intops);
        }

    }
Copy the code

6.PollerEvent Process

The core of Poller processing is to start the PollerEvent in the execution event queue, iterate through the ready key from the Selector, and hand it off to the processSocket method once an event of interest occurs. PollerEvent is used to register or update interested events with the socket:

    /**
     *
     * PollerEvent, cacheable object forPoller events to avoid GC */ public static class PollerEvent implements Runnable {// Each PollerEvent saves a reference to NioChannel protected NioChannel socket; protected int interestOps; protected KeyAttachment key; public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) { reset(ch, k, intOps); } public void reset(NioChannel ch, KeyAttachment k, int intOps) { socket = ch; interestOps = intOps; key = k; } public voidreset() {
            reset(null, null, 0);
        }

        @Override
        public void run() {// The socket is registered with selector for the first time, completing the registration of socket read eventsif ( interestOps == OP_REGISTER ) {
                try {
                    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                } catch (Exception x) {
                    log.error("", x); }}else{// The socket was already registered with selector, Final SelectionKey key = socket.getioChannel ().keyfor (socket.getPoller().getSelector()); try { boolean cancel =false;
                    if(key ! = null) { final KeyAttachment att = (KeyAttachment) key.attachment();if( att! =null ) { //handle callback flagif (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
                                att.setCometNotify(true);
                            } else {
                                att.setCometNotify(false); } interestOps = (interestOps & (~OP_CALLBACK)); Att.access (); // Remove the callback flag; // Refresh the last access time of events to prevent events from timeout. //to prevent timeout //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; att.interestOps(ops); key.interestOps(ops); }else {
                            cancel = true; }}else {
                        cancel = true;
                    }
                    if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
                }catch (CancelledKeyException ckx) {
                    try {
                        socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
                    }catch (Exception ignore) {}
                }
            }//end if
        }//run

        @Override
        public String toString() {
            return super.toString()+"[intOps="+this.interestOps+"]"; }}Copy the code

7. Submit the socket to the Worker for execution

You can see from Poller’s analysis of the process in Step 5 that its Run method ends up calling processKey() to handle the channel events detected by selector, and processSocket ends up calling the specific channel processing logic. Look at the implementation of the processSocket method:

    public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
        try {
            KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
            if (attachment == null) {
                return false;
            }
            attachment.setCometNotify(false); SocketProcessor sc = processorCache.poll(); SocketProcessor sc = processorCache.poll();if ( sc == null ) sc = new SocketProcessor(socket,status);
            elsesc.reset(socket,status); // Send the socket with an event to the Worker for processingif( dispatch && getExecutor()! =null ) getExecutor().execute(sc);else sc.run();
        } catch (RejectedExecutionException rx) {
            log.warn("Socket processing request was rejected for:"+socket,rx);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }
Copy the code

Poller sends the socket where the event occurs to the Worker thread Worker for further processing through the coordination of NioEndpoint. This is the end of the whole event framework, and the following is the Worker’s processing.

8. Process the request from the socket

The NIO implementation of Tomcat 6 has a Worker class, which has been removed in Tomcat 7, but the Worker is still responsible for the implementation of the SocketProcessor class as seen above.

/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- SocketProcessor Inner Class / / this Class is equivalent to a worker, but only in an external thread pool is simple to use. /** * This class is the equivalent of the Worker, but will simply useinan * external Executor thread pool. */ protected class SocketProcessor implements Runnable { // Each SocketProcessor saves a reference to NioChannel protected NioChannel socket = NULL; protected SocketStatus status = null; public SocketProcessor(NioChannel socket, SocketStatus status) { reset(socket,status); } public void reset(NioChannel socket, SocketStatus status) { this.socket = socket; this.status = status; } @Override public voidrunSelectionKey = socket.getioChannel ().keyfor (socket.getPoller().getSelector()); KeyAttachment ka = null;if(key ! = null) { ka = (KeyAttachment)key.attachment(); } // Upgraded connections need to allow multiple threads to access the // connection at the same time toenable blocking IO to be used when
            // NIO has been configured
            if(ka ! = null && ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) { synchronized (ka.getWriteThreadLock()) {doRun(key, ka); }}else {
                synchronized (socket) {
                    doRun(key, ka);
                }
            }
        }

        private void doRun(SelectionKey key, KeyAttachment ka) {
            try {
                int handshake = -1;

                try {
                    if(key ! = null) { // For STOP there is no point trying to handshake as the // Poller has been stopped.if (socket.isHandshakeComplete() ||
                                status == SocketStatus.STOP) {
                            handshake = 0;
                        } else {
                            handshake = socket.handshake(
                                    key.isReadable(), key.isWritable());
                            // The handshake process reads/writes from/to the
                            // socket. status may therefore be OPEN_WRITE once
                            // the handshake completes. However, the handshake
                            // happens when the socket is opened so the status
                            // must always be OPEN_READ after it completes. It
                            // is OK to always set this as it is only used if
                            // the handshake completes.
                            status = SocketStatus.OPEN_READ;
                        }
                    }
                }catch ( IOException x ) {
                    handshake = -1;
                    if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
                }catch ( CancelledKeyException ckx ) {
                    handshake = -1;
                }
                if ( handshake == 0 ) {
                    SocketState state = SocketState.OPEN;
                    // Process the request from this socket
                    if(status == null) {// Key code, State = handler.process(ka, socketStatus.open_read); }else {
                        state = handler.process(ka, status);
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket and pool
                        try {
                            close(ka, socket, key, SocketStatus.ERROR);
                        } catch ( Exception x ) {
                            log.error("",x); }}}else if (handshake == -1 ) {
                    close(ka, socket, key, SocketStatus.DISCONNECT);
                } else {
                    ka.getPoller().add(socket, handshake);
                }
            } catch (CancelledKeyException cx) {
                socket.getPoller().cancelledKey(key, null, false);
            } catch (OutOfMemoryError oom) {
                try {
                    oomParachuteData = null;
                    log.error("", oom);
                    if(socket ! = null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
                    }
                    releaseCaches();
                }catch ( Throwable oomt ) {
                    try {
                        System.err.println(oomParachuteMsg);
                        oomt.printStackTrace();
                    }catch (Throwable letsHopeWeDontGetHere){
                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                    }
                }
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            }catch ( Throwable t ) {
                log.error("",t);
                if(socket ! = null) { socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
                }
            } finally {
                socket = null;
                status = null;
                //return to cache
                if (running && !paused) {
                    processorCache.offer(this);
                }
            }
        }

        private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,
                SocketStatus socketStatus) {
		...
        }
    }
Copy the code

As you can see, the SocketProcessor looks for the appropriate Handler Handler to do the final socket conversion processing.

The following diagram summarizes the main flow of NioEndpoint:

Acceptors and pollers are arrays of threads. Worker is an Executor pool.