preface

This article will explain in detail the evolution of NIO’s functionality and pave the way for the reactor-Netty library.

About THE Java programming methodology — Reactor and Webflux video sharing, has completed Rxjava and Reactor, site B address is as follows:

Rxjava source code reading and sharing: www.bilibili.com/video/av345…

Reactor source code reading and sharing: www.bilibili.com/video/av353…

Scenario generation into the

From BIO to NIO source code, let’s take a look at NIO.

In the previous article, we can see that we are going to do asynchronous non-blocking, our own is to create A thread pool at the same time A part of the code to do A timeout docking changes to the client, but the disadvantages are also very clear, we transform the thinking, here for an example scenario, A and B class students one on one class to complete the task, Each pair of people to get the task is different, the consumption of time has long have short, task because there are so students would reward, the traditional mode, the class A and class B, not by the words, even if only A heartbeat detection task all get together, in this case, the client will have no data to be sent, just want to tell the server is still alive, this kind of circumstance, If class B comes to a student to do docking, it is very problematic, each student in class B can be regarded as a thread on the server side. So, we need a manager, and that’s where the Selector comes in, and as a manager, we tend to have to manage the state of the students, whether they’re waiting for a task, whether they’re receiving a message, whether they’re sending a message, and so on, and the Selector is more action oriented, and you just do things with those state tags, So those status tags actually need to be managed, and that’s where the SelectionKey comes in. Then we need to enhance the packaging of these students to carry such tags. Similarly, we should further free our hands for students, such as providing them with a computer, so that students can do more things, then the computer here is the existence of Buffer. Therefore, there are three main roles in NIO: Buffer Buffer, Channel Channel, Selector Selector. We have covered all of them. Next, we will analyze and interpret the source code step by step.

The Channel interpretation

Give a Channel the ability to be asynchronous and interruptible

As can be seen from above, students actually represent the existence of a Socket, so here Channel is the enhanced packaging of it, that is, the concrete implementation of Channel should have the Socket field, and then the concrete implementation class is also closely around the Socket has the function to do the article. So, we first look at the Java. Nio. Channels. The Channel interface Settings:

public interface Channel extends Closeable {

    /**
     * Tells whether or not this channel is open.
     *
     * @return {@code true} if, and only if, this channel is open
     */
    public boolean isOpen(a);

    /**
     * Closes this channel.
     *
     * <p> After a channel is closed, any further attempt to invoke I/O
     * operations upon it will cause a {@link ClosedChannelException} to be
     * thrown.
     *
     * <p> If this channel is already closed then invoking this method has no
     * effect.
     *
     * <p> This method may be invoked at any time.  If some other thread has
     * already invoked it, however, then another invocation will block until
     * the first invocation is complete, after which it will return without
     * effect. </p>
     *
     * @throws  IOException  If an I/O error occurs
     */
    public void close(a) throws IOException;

}
Copy the code

This is pretty straightforward, checking whether a Channel is open, closing a Channel, and we’ll talk more about how ClosedChannelException actually happens in code. Sometimes, a Channel may be closed and broken asynchronously, which is also desirable. So to achieve this effect we have to set up an interface that can do this effect. The effect should be that if a thread performs an IO operation on a Channel that implements the interface, another thread can call the Channel’s close method. Result, carries on the IO operations that blocking threads will receive a AsynchronousCloseException anomalies.

Also, we should consider the possibility that if a thread is performing an IO operation on a Channel that implements this interface, another thread may call the blocking thread’s interrupt method (Thread#interrupt()), causing the Channel to close. Then the blocked thread should receive ClosedByInterruptException exception, at the same time to set the interrupt status to the blocked thread.

At that time, if the interrupt status is set in the thread, this time on the Channel and invoked the IO blocking operation, so the Channel can be closed, at the same time, the thread will immediately by a ClosedByInterruptException abnormalities, Its interrupt state remains unchanged. This interface is defined as follows:

public interface InterruptibleChannel
    extends Channel
{

    /**
     * Closes this channel.
     *
     * <p> Any thread currently blocked in an I/O operation upon this channel
     * will receive an {@link AsynchronousCloseException}.
     *
     * <p> This method otherwise behaves exactly as specified by the {@link
     * Channel#close Channel} interface.  </p>
     *
     * @throws  IOException  If an I/O error occurs
     */
    public void close(a) throws IOException;

}
Copy the code

It according to the specific implementation of the above logic is in the Java nio. Channels. The spi. AbstractInterruptibleChannel, about the interpretation of this class, let’s refer to this article InterruptibleChannel and interruptible IO

Gives a Channel the ability to be multiplexed

As we said earlier, a Channel can be used by a Selector, and a Selector is assigned tasks based on the state of the Channel, so a Channel should provide a method registered with a Selector that binds to it. That is, an instance of a Channel calls register(Selector,int,Object). Note that because the Selector is managed by state values, this method returns a SelectionKey object that represents the state of the channel on the Selector. Now, the SelectionKey, it’s a bunch of things, but I won’t talk about them here.

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException
    {
        if((ops & ~validOps()) ! =0)
            throw new IllegalArgumentException();
        if(! isOpen())throw new ClosedChannelException();
        synchronized (regLock) {
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if(! isOpen())throw new ClosedChannelException();
                SelectionKey k = findKey(sel);
                if(k ! =null) {
                    k.attach(att);
                    k.interestOps(ops);
                } else {
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
                returnk; }}}//java.nio.channels.spi.AbstractSelectableChannel#addKey
    private void addKey(SelectionKey k) {
        assert Thread.holdsLock(keyLock);
        int i = 0;
        if((keys ! =null) && (keyCount < keys.length)) {
            // Find empty element of key array
            for (i = 0; i < keys.length; i++)
                if (keys[i] == null)
                    break;
        } else if (keys == null) {
            keys = new SelectionKey[2];
        } else {
            // Grow key array
            int n = keys.length * 2;
            SelectionKey[] ks =  new SelectionKey[n];
            for (i = 0; i < keys.length; i++)
                ks[i] = keys[i];
            keys = ks;
            i = keyCount;
        }
        keys[i] = k;
        keyCount++;
    }
Copy the code

Once registered with a Selector, a Channel remains registered until it is unregistered. When unregistered, all resources assigned to a Channel by Selector are unregistered. A Channel doesn’t provide a method to unregister it directly, so let’s think about it a little bit differently. We can just unregister the Selector Key that represents its registration. You can explicitly cancel the key here by calling SelectionKey#cancel(). The Channel is then unregistered during the next Selector operation.

//java.nio.channels.spi.AbstractSelectionKey#cancel
    /** * Cancels this key. * * 

If this key has not yet been cancelled then it is added to its * selector's cancelled-key set while synchronized on that set.

*/
public final void cancel(a) { // Synchronizing "this" to prevent this key from getting canceled // multiple times by different threads, which might cause race // condition between selector's select() and channel's close(). synchronized (this) { if (valid) { valid = false; // Call the Selector cancel method again ((AbstractSelector)selector()).cancel(this); }}}//java.nio.channels.spi.AbstractSelector#cancel void cancel(SelectionKey k) { synchronized(cancelledKeys) { cancelledKeys.add(k); }}// Unregister the Channel at the next select operation //sun.nio.ch.SelectorImpl#select(long) @Override public final int select(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("Negative timeout"); // Focus on this method return lockAndDoSelect(null, (timeout == 0)? -1 : timeout); } //sun.nio.ch.SelectorImpl#lockAndDoSelect private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) throws IOException { synchronized (this) { ensureOpen(); if (inSelect) throw new IllegalStateException("select in progress"); inSelect = true; try { synchronized (publicSelectedKeys) { // Focus on this method returndoSelect(action, timeout); }}finally { inSelect = false; }}}//sun.nio.ch.WindowsSelectorImpl#doSelect protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException { assert Thread.holdsLock(this); this.timeout = timeout; // set selector timeout processUpdateQueue(); // Focus on this method processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; }... }/** * sun.nio.ch.SelectorImpl#processDeregisterQueue * Invoked by selection operations to process the cancelled-key set * / protected final void processDeregisterQueue(a) throws IOException { assert Thread.holdsLock(this); assert Thread.holdsLock(publicSelectedKeys); Set<SelectionKey> cks = cancelledKeys(); synchronized (cks) { if(! cks.isEmpty()) { Iterator<SelectionKey> i = cks.iterator();while (i.hasNext()) { SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); i.remove(); // remove the key from the selector implDereg(ski); selectedKeys.remove(ski); keys.remove(ski); // remove from channel's key set deregister(ski); SelectableChannel ch = ski.channel(); if(! ch.isOpen() && ! ch.isRegistered()) ((SelChImpl)ch).kill(); }}}}Copy the code

Here, when a Channel is closed, either by calling Channel#close or by breaking the thread, it implicitly cancels all keys for the Channel and internally calls k.canel ().

//java.nio.channels.spi.AbstractInterruptibleChannel#close
    /**
     * Closes this channel.
     *
     * <p> If the channel has already been closed then this method returns
     * immediately.  Otherwise it marks the channel as closed and then invokes
     * the {@link #implCloseChannel implCloseChannel} method in order to
     * complete the close operation.  </p>
     *
     * @throws  IOException
     *          If an I/O error occurs
     */
    public final void close(a) throws IOException {
        synchronized (closeLock) {
            if (closed)
                return;
            closed = true; implCloseChannel(); }}//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
     protected final void implCloseChannel(a) throws IOException {
        implCloseSelectableChannel();

        // clone keys to avoid calling cancel when holding keyLock
        SelectionKey[] copyOfKeys = null;
        synchronized (keyLock) {
            if(keys ! =null) { copyOfKeys = keys.clone(); }}if(copyOfKeys ! =null) {
            for (SelectionKey k : copyOfKeys) {
                if(k ! =null) {
                    k.cancel();   // invalidate and adds key to cancelledKey set}}}}Copy the code

If the Selector itself is closed, the Channel is unregistered, and the key representing the Channel registration becomes invalid:

//java.nio.channels.spi.AbstractSelector#close
public final void close(a) throws IOException {
        boolean open = selectorOpen.getAndSet(false);
        if(! open)return;
        implCloseSelector();
    }
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector(a) throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if(! selch.isOpen() && ! selch.isRegistered()) ((SelChImpl)selch).kill(); selectedKeys.remove(ski); i.remove(); }assertselectedKeys.isEmpty() && keys.isEmpty(); }}}Copy the code

If multiple Ops are supported by a channel, a particular selector cannot be registered again after it is registered once. Also is in the second call Java. Nio. Channels. Spi. AbstractSelectableChannel# register method to get, will only to the change of the Ops, will not register, because registration will produce a new SelectionKey object. We can call Java nio. Channels. SelectableChannel# isRegistered method to determine whether to one or more registered channel that the Selector.

//java.nio.channels.spi.AbstractSelectableChannel#isRegistered
 // -- Registration --

    public final boolean isRegistered(a) {
        synchronized (keyLock) {
            // We call the addKey method when it registers with a Selector, which increases the keyCount each time it registers with a Selector.
            returnkeyCount ! =0; }}Copy the code

At this point, by inheriting the SelectableChannel class, the channel can be safely used by multiple concurrent threads. Here, note that the inherited AbstractSelectableChannel after this class, the newly created channel is in blocking mode. However, operations related to multiplexing a Selector must be based on non-blocking mode, so a channel must be placed in non-blocking mode before registering a Selector, and it may not return to blocking mode until unregistered. Here, we cover the blocking and non-blocking modes of channels. In blocking mode, each I/O operation invoked on a Channel blocks until it completes. In non-blocking mode, I/O operations never block and can transfer fewer bytes than requested, or none at all. We can determine whether a channel isBlocking by calling its isBlocking method.

//java.nio.channels.spi.AbstractSelectableChannel#register
 public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException
    {
        if((ops & ~validOps()) ! =0)
            throw new IllegalArgumentException();
        if(! isOpen())throw new ClosedChannelException();
        synchronized (regLock) {
     // If the mode is blocking, it will return true, and then an exception will be thrown
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if(! isOpen())throw new ClosedChannelException();
                SelectionKey k = findKey(sel);
                if(k ! =null) {
                    k.attach(att);
                    k.interestOps(ops);
                } else {
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
                returnk; }}}Copy the code

Therefore, we can use the following examples as a reference:

public NIOServerSelectorThread(int port)
	{
		try {
			// Open ServerSocketChannel, which listens for client connections. It is the parent channel for all client connections
			serverSocketChannel = ServerSocketChannel.open();
			// Set the pipe to non-blocking mode
			serverSocketChannel.configureBlocking(false);
			// create a ServerSocket object with ServerSocketChannel, namely ServerSocket
			serverSocket = serverSocketChannel.socket();
			// Bind a listening port to the server Socket
			serverSocket.bind(new InetSocketAddress(port));
			// Create a multiplexer
			selector = Selector.open();
			// Register ServerSocketChannel with the Selector multiplexer and listen for the ACCEPT event
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("The server is start in port: "+port);
		} catch(IOException e) { e.printStackTrace(); }}Copy the code

Due to time, I will stop here for the time being, and the rest will be explained in the next chapter.