preface

This series of articles will explain the evolution of NIO and pave the way for the reactor-Netty library.

About THE Java programming methodology — Reactor and Webflux video sharing, has completed Rxjava and Reactor, site B address is as follows:

Rxjava source code reading and sharing: www.bilibili.com/video/av345…

Reactor source code reading and sharing: www.bilibili.com/video/av353…

This series of source code interpretation based on JDK11 API details may differ from other versions, please solve JDK version issues.

The Channel interpretation

Attach a BIO to NIO source code for some things on NIO

Give channels the ability to support network sockets

Our original purpose is to enhance the Socket, based on the basic requirements, no conditions to create conditions, so in order to make the Channel has the ability to network Socket, it defines a java.nio.channels.Net workChannel interface. Without further ado, let’s look at the definition of this interface:

public interface NetworkChannel extends Channel
{
    NetworkChannel bind(SocketAddress local) throws IOException;

    SocketAddress getLocalAddress(a) throws IOException;

    <T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException;

    <T> T getOption(SocketOption<T> name) throws IOException; Set<SocketOption<? >> supportedOptions(); }Copy the code

Bind the socket to the local SocketAddress using the bind(SocketAddress) method, and return the socket bound address using the getLocalAddress() method. You can use the setOption(SocketOption,Object) and getOption(SocketOption) methods to set and query the configuration options supported by sockets.

bind

. Next let’s look at the Java nio. Channels. ServerSocketChannel class abstraction and its implementation class sun. Nio. Ch. ServerSocketChannelImpl of implementation details. First let’s look at the implementation of bind:

//sun.nio.ch.ServerSocketChannelImpl#bind
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
    synchronized (stateLock) {
        ensureOpen();
        // Determine whether bind has already been called by localAddress
        if(localAddress ! =null)
            throw new AlreadyBoundException();
        //InetSocketAddress(0) indicates all addresses bound to the local machine. The operating system selects the appropriate port
        InetSocketAddress isa = (local == null)?new InetSocketAddress(0)
                                : Net.checkAddress(local);
        SecurityManager sm = System.getSecurityManager();
        if(sm ! =null)
            sm.checkListen(isa.getPort());
        NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
        Net.bind(fd, isa.getAddress(), isa.getPort());
        If the backlog parameter is less than 1, 50 connections are accepted by default
        Net.listen(fd, backlog < 1 ? 50 : backlog);
        localAddress = Net.localAddress(fd);
    }
    return this;
}
Copy the code

Let’s take a look at how the BIND and listen methods in Net are implemented.

Net.bind
//sun.nio.ch.Net#bind(java.io.FileDescriptor, java.net.InetAddress, int)
public static void bind(FileDescriptor fd, InetAddress addr, int port)
        throws IOException
    {
        bind(UNSPEC, fd, addr, port);
    }

static void bind(ProtocolFamily family, FileDescriptor fd,
                    InetAddress addr, int port) throws IOException
{
    // If the protocol domain passed is not IPV4 and supports IPV6, IPV6 is used
    booleanpreferIPv6 = isIPv6Available() && (family ! = StandardProtocolFamily.INET); bind0(fd, preferIPv6, exclusiveBind, addr, port); }private static native void bind0(FileDescriptor fd, boolean preferIPv6,
                                    boolean useExclBind, InetAddress addr,
                                    int port)
    throws IOException;
Copy the code

Bind0 is implemented as a native method:

JNIEXPORT void JNICALL
Java_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6,
                          jboolean useExclBind, jobject iao, int port)
{
    SOCKETADDRESS sa;
    int sa_len = 0;
    int rv = 0;
    // Convert Java InetAddress to C struct sockaddr
    if(NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) ! =0) {
        return;// Conversion failed, method returns
    }
    Int bind(int sockfd, struct sockaddr* addr, socklen_t addrlen)
    rv = NET_Bind(fdval(env, fdo), &sa, sa_len);
    if(rv ! =0) { handleSocketError(env, errno); }}Copy the code

Socket is the user program and the kernel interactive information hub, it has no network protocol address and port number and other information, in the network communication, you must associate a socket with an address. Most of the time, the kernel will automatically bind an address, but sometimes users need to complete the binding process themselves to meet the needs of the actual application; Typically, a server process needs to bind to a well-known address or port to wait for a client to connect. On the client side, there are many cases where the bind method is not called and is automatically bound by the kernel.

If a connection is received, a new Socket will be created, and then the server will operate the new Socket. Here we can focus on the Accept method. By the sun. Nio. Ch. ServerSocketChannelImpl# bind in the end, we know it by Net. Listen (fd, backlog < 1? 50: backlog) Enable listening. If the backlog parameter is less than 1, 50 connections are accepted by default. With this in mind, let’s focus on the details of the Net.listen method.

Net.listen
//sun.nio.ch.Net#listen
static native void listen(FileDescriptor fd, int backlog) throws IOException;
Copy the code

Net.listen is a native method.

JNIEXPORT void JNICALL
Java_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog)
{
    if (listen(fdval(env, fdo), backlog) < 0)
        handleSocketError(env, errno);
}
Copy the code

Int LISTEN (int sockfd, int backlog) returns 0 for success, -1 for failure

Let’s look at some of the other details in bind. The initial ensureOpen() method determines:

//sun.nio.ch.ServerSocketChannelImpl#ensureOpen
// @throws ClosedChannelException if channel is closed
private void ensureOpen(a) throws ClosedChannelException {
    if(! isOpen())throw new ClosedChannelException();
}
//java.nio.channels.spi.AbstractInterruptibleChannel#isOpen
public final boolean isOpen(a) {
        return! closed; }Copy the code

If the socket is closed, a ClosedChannelException is thrown.

Net#checkAddress

//sun.nio.ch.Net#checkAddress(java.net.SocketAddress)
public static InetSocketAddress checkAddress(SocketAddress sa) {
    if (sa == null)// The address is empty
        throw new NullPointerException();
        // Non-inetSocketAddress address
    if(! (sainstanceof InetSocketAddress))
        throw new UnsupportedAddressTypeException(); // ## needs arg
    InetSocketAddress isa = (InetSocketAddress)sa;
    // The address is unrecognizable
    if (isa.isUnresolved())
        throw new UnresolvedAddressException(); // ## needs arg
    InetAddress addr = isa.getAddress();
        // Non-IP4 and IP6 addresses
    if(! (addrinstanceof Inet4Address || addr instanceof Inet6Address))
        throw new IllegalArgumentException("Invalid address type");
    return isa;
}
Copy the code

As you can see from the above, bind first checks whether the ServerSocket is closed, whether the address is bound, and if neither is, then checks whether the bound SocketAddress is correct or valid. Then bind and listen of the Net utility class to complete the actual ServerSocket address binding and enable listening. If the binding is enabled and the parameter is less than 1, 50 connections are accepted by default.

In contrast to the BIO we touched on earlier in The first article, let’s look at some implementations of the Accept () method:

//sun.nio.ch.ServerSocketChannelImpl#accept()
@Override
public SocketChannel accept(a) throws IOException {
    acceptLock.lock();
    try {
        int n = 0;
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        boolean blocking = isBlocking();
        try {
            begin(blocking);
            do {
                n = accept(this.fd, newfd, isaa);
            } while (n == IOStatus.INTERRUPTED && isOpen());
        } finally {
            end(blocking, n > 0);
            assert IOStatus.check(n);
        }

        if (n < 1)
            return null;
        // The socketChannelImpl processing channel that accepts connections is blocked by default
        // newly accepted socket is initially in blocking mode
        IOUtil.configureBlocking(newfd, true);

        InetSocketAddress isa = isaa[0];
        Build SocketChannelImpl (SocketChannelImpl)
        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

        // check permitted to accept connections from the remote address
        SecurityManager sm = System.getSecurityManager();
        if(sm ! =null) {
            try {
                // Check the address and port permissions
                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throwx; }}/ / return socketchannelimpl
        return sc;

    } finally{ acceptLock.unlock(); }}Copy the code

For accept(this.fd, newfd, isaa), call Accept to accept established connections in the socket, as we saw earlier in BIO, Int accept(int sockfd,struct sockaddr *addr, socklen_t *addrlen);

  • If the fd listens for a socket with no pending connections in its queue and the socket is not marked as blocking, accept() blocks until the connection appears;
  • If the socket is marked non-blocking and there is no waiting connection in the queue, accept() returns error EAGAIN or EWOULDBLOCK

Here the begin (blocking); With end(blocking, n > 0); InterruptibleChannel and Interruptible IO have all been covered in InterruptibleChannel and interruptible IO. The focus here is on waiting for a connection, which can be interrupted abnormally. If the process ends normally, the logic will continue normally. The second argument to end(blocking, n > 0) is also used to indicate that the waiting process is completed. It does not extend functionality.

supportedOptions

SupportedOptions = supportedOptions = supportedOptions = supportedOptions

//sun.nio.ch.ServerSocketChannelImpl#supportedOptions
@Override
public finalSet<SocketOption<? >> supportedOptions() {return DefaultOptionsHolder.defaultOptions;
}
//sun.nio.ch.ServerSocketChannelImpl.DefaultOptionsHolder
private static class DefaultOptionsHolder {
    static finalSet<SocketOption<? >> defaultOptions = defaultOptions();private staticSet<SocketOption<? >> defaultOptions() { HashSet<SocketOption<? >> set =new HashSet<>();
        set.add(StandardSocketOptions.SO_RCVBUF);
        set.add(StandardSocketOptions.SO_REUSEADDR);
        if (Net.isReusePortAvailable()) {
            set.add(StandardSocketOptions.SO_REUSEPORT);
        }
        set.add(StandardSocketOptions.IP_TOS);
        set.addAll(ExtendedSocketOptions.options(SOCK_STREAM));
        // Return a HashSet that cannot be modified
        returnCollections.unmodifiableSet(set); }}Copy the code

Let’s take a look at some of the above configurations:

//java.net.StandardSocketOptions
// The socket accepts the cache size
public static final SocketOption<Integer> SO_RCVBUF =
        new StdSocketOption<Integer>("SO_RCVBUF", Integer.class);
// Whether the address can be reused
public static final SocketOption<Boolean> SO_REUSEADDR =
        new StdSocketOption<Boolean>("SO_REUSEADDR", Boolean.class);
// Whether port can be reused
public static final SocketOption<Boolean> SO_REUSEPORT =
        new StdSocketOption<Boolean>("SO_REUSEPORT", Boolean.class);
// The type of service (ToS) in the Internet Protocol (IP) header.
public static final SocketOption<Integer> IP_TOS =
        new StdSocketOption<Integer>("IP_TOS", Integer.class);
Copy the code

SetOption implementation

With this configuration in mind, let’s take a look at the implementation details of setOption:

//sun.nio.ch.ServerSocketChannelImpl#setOption
@Override
public <T> ServerSocketChannel setOption(SocketOption<T> name, T value)
    throws IOException
{
    Objects.requireNonNull(name);
    if(! supportedOptions().contains(name))throw new UnsupportedOperationException("'" + name + "' not supported");
    synchronized (stateLock) {
        ensureOpen();

        if (name == StandardSocketOptions.IP_TOS) {
            ProtocolFamily family = Net.isIPv6Available() ?
                StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
            Net.setSocketOption(fd, family, name, value);
            return this;
        }

        if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
            // SO_REUSEADDR emulated when using exclusive bind
            isReuseAddress = (Boolean)value;
        } else {
            // no options that require special handling
            Net.setSocketOption(fd, Net.UNSPEC, name, value);
        }
        return this; }}Copy the code

SupportedOptions ().contains(name) supportedOptions().contains(name) supportedOptions() Net.setsocketoption is mainly implemented for Socket configuration. Here, it is only good to make Chinese annotations for its code. The whole logical process is not too complicated.

static void setSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption
        name, Object value)
    throws IOException
{
    if (value == null)
        throw new IllegalArgumentException("Invalid option value");

    // only simple values supported by this methodClass<? > type = name.type();if (extendedOptions.isOptionSupported(name)) {
        extendedOptions.setOption(fd, name, value);
        return;
    }
    // Non-integer and Boolean, throw an assertion error
    if(type ! = Integer.class && type ! = Boolean.class)throw new AssertionError("Should not reach here");

    // special handling
    if (name == StandardSocketOptions.SO_RCVBUF ||
        name == StandardSocketOptions.SO_SNDBUF)
    {
        // Determine the size of the receive and send buffers
        int i = ((Integer)value).intValue();
        if (i < 0)
            throw new IllegalArgumentException("Invalid send/receive buffer size");
    }
        // There is data in the buffer, delaying the closing of the socket
    if (name == StandardSocketOptions.SO_LINGER) {
        int i = ((Integer)value).intValue();
        if (i < 0)
            value = Integer.valueOf(-1);
        if (i > 65535)
            value = Integer.valueOf(65535);
    }
    / / UDP unicast
    if (name == StandardSocketOptions.IP_TOS) {
        int i = ((Integer)value).intValue();
        if (i < 0 || i > 255)
            throw new IllegalArgumentException("Invalid IP_TOS value");
    }
    / / UDP multicast
    if (name == StandardSocketOptions.IP_MULTICAST_TTL) {
        int i = ((Integer)value).intValue();
        if (i < 0 || i > 255)
            throw new IllegalArgumentException("Invalid TTL/hop value");
    }

    // map option name to platform level/name
    OptionKey key = SocketOptionRegistry.findOption(name, family);
    if (key == null)
        throw new AssertionError("Option not found");

    int arg;
    // Convert configuration parameter values
    if (type == Integer.class) {
        arg = ((Integer)value).intValue();
    } else {
        boolean b = ((Boolean)value).booleanValue();
        arg = (b) ? 1 : 0;
    }

    boolean mayNeedConversion = (family == UNSPEC);
    boolean isIPv6 = (family == StandardProtocolFamily.INET6);
    // Sets the value of the file descriptor and others
    setIntOption0(fd, mayNeedConversion, key.level(), key.name(), arg, isIPv6);
}
Copy the code

getOption

Next, let’s look at the getOption implementation, source code is as follows:

//sun.nio.ch.ServerSocketChannelImpl#getOption
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(SocketOption<T> name)
    throws IOException
{
    Objects.requireNonNull(name);
    / / the channel support options, throw an UnsupportedOperationException
    if(! supportedOptions().contains(name))throw new UnsupportedOperationException("'" + name + "' not supported");

    synchronized (stateLock) {
        ensureOpen();
        if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
            // SO_REUSEADDR emulated when using exclusive bind
            return (T)Boolean.valueOf(isReuseAddress);
        }
        // If not, delegate to Net
        // no options that require special handling
        return(T) Net.getSocketOption(fd, Net.UNSPEC, name); }}//sun.nio.ch.Net#getSocketOption
static Object getSocketOption(FileDescriptor fd, ProtocolFamily family, SocketOption
        name)
    throws IOException
{ Class<? > type = name.type();if (extendedOptions.isOptionSupported(name)) {
        return extendedOptions.getOption(fd, name);
    }
    // Only integers and Bools are supported, otherwise an assertion error is thrown
    // only simple values supported by this method
    if(type ! = Integer.class && type ! = Boolean.class)throw new AssertionError("Should not reach here");

    // map option name to platform level/name
    OptionKey key = SocketOptionRegistry.findOption(name, family);
    if (key == null)
        throw new AssertionError("Option not found");

    boolean mayNeedConversion = (family == UNSPEC);
    // Get the option configuration described in the file
    int value = getIntOption0(fd, mayNeedConversion, key.level(), key.name());

    if (type == Integer.class) {
        return Integer.valueOf(value);
    } else {
        // We need to see the source code in the previous support configuration, which supports either Boolean or Integer types
        // Therefore, it is not surprising that the return value is Boolean.FALSE or Boolean.TRUE
        return (value == 0)? Boolean.FALSE : Boolean.TRUE; }}Copy the code

ServerSocketChannel and ServerSocket in bind

In the net.bind section, we ended with a note that each connection creates a Socket for the connection to operate on. This can be seen in the Accept method, which, after receiving the connection, New SocketChannelImpl(Provider (), newfd, ISA). If you need to bind to a Socket, you need to bind to a Socket. If you need to bind to a Socket, you need to bind to a Socket. # serversocket (int, int, java.net.InetAddress) = serversocket (int, int, java.net.InetAddress)

//java.net.ServerSocket
 public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
        setImpl();
        if (port < 0 || port > 0xFFFF)
            throw new IllegalArgumentException(
                       "Port value out of range: " + port);
        if (backlog < 1)
          backlog = 50;
        try {
            bind(new InetSocketAddress(bindAddr, port), backlog);
        } catch(SecurityException e) {
            close();
            throw e;
        } catch(IOException e) {
            close();
            throwe; }}//java.net.ServerSocket#setImpl
private void setImpl(a) {
        if(factory ! =null) {
            impl = factory.createSocketImpl();
            checkOldImpl();
        } else {
            // No need to do a checkOldImpl() here, we know it's an up to date
            // SocketImpl!
            impl = new SocksSocketImpl();
        }
        if(impl ! =null)
            impl.setServerSocket(this);
    }
Copy the code

However, our focus here is on the bind(new InetSocketAddress(bindAddr, port), backlog); The code here is as follows:

//java.net.ServerSocket
public void bind(SocketAddress endpoint, int backlog) throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if(! oldImpl && isBound())throw new SocketException("Already bound");
        if (endpoint == null)
            endpoint = new InetSocketAddress(0);
        if(! (endpointinstanceof InetSocketAddress))
            throw new IllegalArgumentException("Unsupported address type");
        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        if (epoint.isUnresolved())
            throw new SocketException("Unresolved address");
        if (backlog < 1)
          backlog = 50;
        try {
            SecurityManager security = System.getSecurityManager();
            if(security ! =null)
                security.checkListen(epoint.getPort());
                // key!!
            getImpl().bind(epoint.getAddress(), epoint.getPort());
            getImpl().listen(backlog);
            bound = true;
        } catch(SecurityException e) {
            bound = false;
            throw e;
        } catch(IOException e) {
            bound = false;
            throwe; }}Copy the code

We see getImpl() and I’ve highlighted what’s going on here, and we go in:

//java.net.ServerSocket#getImpl
SocketImpl getImpl(a) throws SocketException {
    if(! created) createImpl();return impl;
}
Copy the code

CreateImpl (); createImpl(); createImpl();

//java.net.ServerSocket#createImpl
void createImpl(a) throws SocketException {
    if (impl == null)
        setImpl();
    try {
        impl.create(true);
        created = true;
    } catch (IOException e) {
        throw newSocketException(e.getMessage()); }}Copy the code

Here, because the impl was already assigned, imp.create (true) is invoked and created is set to true. And here, at last, comes the point I want to make:

//java.net.AbstractPlainSocketImpl#create
protected synchronized void create(boolean stream) throws IOException {
    this.stream = stream;
    if(! stream) { ResourceManager.beforeUdpCreate();// only create the fd after we know we will be able to create the socket
        fd = new FileDescriptor();
        try {
            socketCreate(false);
            SocketCleanable.register(fd);
        } catch (IOException ioe) {
            ResourceManager.afterUdpClose();
            fd = null;
            throwioe; }}else {
        fd = new FileDescriptor();
        socketCreate(true);
        SocketCleanable.register(fd);
    }
    if(socket ! =null)
        socket.setCreated();
    if(serverSocket ! =null)
        serverSocket.setCreated();
}

Copy the code

As you can see, socketCreate(true); , its implementation is as follows:

@Override
void socketCreate(boolean stream) throws IOException {
    if (fd == null)
        throw new SocketException("Socket closed");

    int newfd = socket0(stream);

    fdAccess.set(fd, newfd);
}
Copy the code

The local method socket0(stream) gets a file descriptor from which the Socket is created and bound accordingly. We’ll look back on sun. Nio. Ch. ServerSocketChannelImpl# the accept (), new here SocketChannelImpl object is to get connected after doing, that for the server, when binding with Socket, here, When we use ServerSocketChannel, tend to use the JDK to provide us with the unified method of the open to me, but also to reduce the complexity, we use. Here is the Java nio. Channels. ServerSocketChannel# open:

//java.nio.channels.ServerSocketChannel#open
public static ServerSocketChannel open(a) throws IOException {
    return SelectorProvider.provider().openServerSocketChannel();
}
//sun.nio.ch.SelectorProviderImpl#openServerSocketChannel
public ServerSocketChannel openServerSocketChannel(a) throws IOException {
    return new ServerSocketChannelImpl(this);
}
//sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(SelectorProvider)
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
    this.fd =  Net.serverSocket(true);
    this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#serverSocket
static FileDescriptor serverSocket(boolean stream) {
    return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}
Copy the code

As you can see, as soon as you create a New ServerSocketChannelImpl object, you get a socket and then you can bind. ServerSocketChannel#open gets the ServerSocketChannel type. When we accept a connection from a client, we should create a Socket channel between the client and the server to communicate with each other. Sun. Nio. Ch. ServerSocketChannelImpl# the accept () is a SocketChannel sc = new SocketChannelImpl (provider (), newfd, isa); The class returns an object of type SocketChannel, in which the Socket’s methods for reading and writing data can be defined.

Extended by the socket method of ServerSocketChannel

For ServerSocketChannel, we also need to touch on methods like socket():

//sun.nio.ch.ServerSocketChannelImpl#socket
@Override
public ServerSocket socket(a) {
    synchronized (stateLock) {
        if (socket == null)
            socket = ServerSocketAdaptor.create(this);
        returnsocket; }}Copy the code

We see ServerSocketAdaptor, and we can see from this class comment that this is a class that is the same as the ServerSocket call, but is implemented with ServerSocketChannelImpl underneath. The adaptation is intended to match the way we use ServerSocket, so the ServerSocketAdaptor inherits ServerSocket and overwrites its methods in sequence, so we have new options when we write this piece of code.

InterruptibleChannel and interruptible IO. This article has been involved in Java nio. Channels. Spi. AbstractInterruptibleChannel# close, here, let’s review some of these details, Which brings us to our new topic:

//java.nio.channels.spi.AbstractInterruptibleChannel#close
public final void close(a) throws IOException {
    synchronized (closeLock) {
        if (closed)
            return;
        closed = true; implCloseChannel(); }}//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel(a) throws IOException {
        implCloseSelectableChannel();

        // clone keys to avoid calling cancel when holding keyLock
        SelectionKey[] copyOfKeys = null;
        synchronized (keyLock) {
            if(keys ! =null) { copyOfKeys = keys.clone(); }}if(copyOfKeys ! =null) {
            for (SelectionKey k : copyOfKeys) {
                if(k ! =null) {
                    k.cancel();   // invalidate and adds key to cancelledKey set}}}}//sun.nio.ch.ServerSocketChannelImpl#implCloseSelectableChannel
@Override
protected void implCloseSelectableChannel(a) throws IOException {
    assert! isOpen();boolean interrupted = false;
    boolean blocking;

    // set state to ST_CLOSING
    synchronized (stateLock) {
        assert state < ST_CLOSING;
        state = ST_CLOSING;
        blocking = isBlocking();
    }

    // wait for any outstanding accept to complete
    if (blocking) {
        synchronized (stateLock) {
            assert state == ST_CLOSING;
            long th = thread;
            if(th ! =0) {
                // If the local thread is not null, the local Socket is closed
                // And notify the thread to notify closure
                nd.preClose(fd);
                NativeThread.signal(th);

                // wait for accept operation to end
                while(thread ! =0) {
                    try {
                        stateLock.wait();
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            }
        }
    } else {
        // non-blocking mode: wait for accept to complete
        acceptLock.lock();
        acceptLock.unlock();
    }

    // set state to ST_KILLPENDING
    synchronized (stateLock) {
        assert state == ST_CLOSING;
        state = ST_KILLPENDING;
    }

    // close socket if not registered with Selector
    // If the Selector is not registered, kill it
    // Close the file description
    if(! isRegistered()) kill();// restore interrupt status
    // Interrupt the thread with the interrupt method
    // Finally, set the thread state to interrupt
    if (interrupted)
        Thread.currentThread().interrupt();
}

@Override
public void kill(a) throws IOException {
    synchronized (stateLock) {
        if(state == ST_KILLPENDING) { state = ST_KILLED; nd.close(fd); }}}Copy the code
Close () application of channel

Close () is not used in InterruptibleChannel and Interruptible IO. It is used more in SocketChannel, where the client and server exchange data. It is normal to close unused channels. Here, in the sun. Nio. Ch. ServerSocketChannelImpl# the accept () in the source code:

@Override
public SocketChannel accept(a) throws IOException {...// newly accepted socket is initially in blocking mode
        IOUtil.configureBlocking(newfd, true);

        InetSocketAddress isa = isaa[0];
        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

        // check permitted to accept connections from the remote address
        SecurityManager sm = System.getSecurityManager();
        if(sm ! =null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throwx; }}return sc;

    } finally{ acceptLock.unlock(); }}Copy the code

Check the validity of the remote address of the received connection. If the authentication fails, close the SocketChannel created above. Another practical use of close() is to close the Socket if an exception occurs while the client is establishing a connection:

//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
public static SocketChannel open(SocketAddress remote)
        throws IOException
    {
        SocketChannel sc = open();
        try {
            sc.connect(remote);
        } catch (Throwable x) {
            try {
                sc.close();
            } catch (Throwable suppressed) {
                x.addSuppressed(suppressed);
            }
            throw x;
        }
        assert sc.isConnected();
        return sc;
    }
Copy the code

Then, we will find in the implCloseSelectableChannel nd. PreClose (fd); And nd. Close (fd); , the SocketChannelImpl ServerSocketChannelImpl both for implCloseSelectableChannel implementations can be seen, what is the nd, here, In the case of ServerSocketChannelImpl, there is a static code block at the end of the class (SocketChannelImpl) that executes when the class is loaded:

/ / C: / Program Files/Java/JDK - 11.0.1 / lib/SRC. Zip! /java.base/sun/nio/ch/ServerSocketChannelImpl.java:550
static {
     // Load the niO, NET repository
        IOUtil.load();
        initIDs();
        nd = new SocketDispatcher();
    }
Copy the code

That is, when the ServerSocketChannelImpl class is loaded, the SocketDispatcher object is created. Through SocketDispatcher allowed in different platform invoke different local method to read and write operations, and then based on this class, we can in the sun. Nio. Ch. SocketChannelImpl do Socket I/O operations.

//sun.nio.ch.SocketDispatcher
class SocketDispatcher extends NativeDispatcher
{

    static {
        IOUtil.load();
    }
    / / read operation
    int read(FileDescriptor fd, long address, int len) throws IOException {
        return read0(fd, address, len);
    }

    long readv(FileDescriptor fd, long address, int len) throws IOException {
        return readv0(fd, address, len);
    }
    / / write operations
    int write(FileDescriptor fd, long address, int len) throws IOException {
        return write0(fd, address, len);
    }

    long writev(FileDescriptor fd, long address, int len) throws IOException {
        return writev0(fd, address, len);
    }
    // Preclose the file descriptor
    void preClose(FileDescriptor fd) throws IOException {
        preClose0(fd);
    }
    // Close the file description
    void close(FileDescriptor fd) throws IOException {
        close0(fd);
    }

    //-- Native methods
    static native int read0(FileDescriptor fd, long address, int len)
        throws IOException;

    static native long readv0(FileDescriptor fd, long address, int len)
        throws IOException;

    static native int write0(FileDescriptor fd, long address, int len)
        throws IOException;

    static native long writev0(FileDescriptor fd, long address, int len)
        throws IOException;

    static native void preClose0(FileDescriptor fd) throws IOException;

    static native void close0(FileDescriptor fd) throws IOException;
}
Copy the code

FileDescriptor

We’ve seen a lot of file descriptors in the previous code, so we’re going to focus on it here. An instance of the FileDescriptor class acts as an opaque processing of the underlying machine-specific structure, indicating that a file is opened, a socket is opened, or another byte source or sink is opened. The main purpose of the file descriptor is to create a FileInputStream or FileOutputStream to contain it. Note: Applications should not create their own file descriptors. Let’s take a look at some of the source code:

public final class FileDescriptor {

    private int fd;

    private long handle;

    private Closeable parent;
    private List<Closeable> otherParents;
    private boolean closed;

    /** * true, if file is opened for appending. */
    private boolean append;

    static {
        initIDs();
    }
    /** * Clean up without explicitly closing the FileDescriptor. */
    private PhantomCleanable<FileDescriptor> cleanup;

    /** * Construct an invalid FileDescriptor object. Fd or handle will be set later */
    public FileDescriptor(a) {
        fd = -1;
        handle = -1;
    }

    /** * Used for standard input, output, And error only. * For Windows the corresponding handle is initialized. * For Unix the append mode is cached. Output and errors. * For Windows, initialize the corresponding handle. * For Unix, cache attach mode. *@param fd the raw fd number (0, 1, 2)
     */
    private FileDescriptor(int fd) {
        this.fd = fd;
        this.handle = getHandle(fd);
        this.append = getAppend(fd); }... }Copy the code

In, java.lang.System#out, java.lang.System#err:

public static final FileDescriptor in = new FileDescriptor(0);
public static final FileDescriptor out = new FileDescriptor(1);
public static final FileDescriptor err = new FileDescriptor(2);
Copy the code

To test whether the file descriptor is valid, use the following methods:

//java.io.FileDescriptor#valid
public boolean valid(a) {
        return(handle ! = -1) || (fd ! = -1);
    }
Copy the code

A return value of true indicates that socket operations or other active network connections represented by the file descriptor object are valid, whereas false indicates that socket operations or other active network connections are invalid. More content, the reader can go into the source code, but more explanation here. In order to give you a better understanding of the above, we will cover more in a later section.

Unscramble the SocketChannel in the NIO packet

In the previous section, we touched on SocketChannels. Here, let’s touch on the details.

We can also create a Socket channel by calling the open method of this class. Note here:

  • Cannot be arbitrary pre-existingsocketcreatechannel.
  • The newly createdsocket channelOpened but not yet connected.
  • Try in unconnectedchannelOn the callI/OThe action will result in a throwNotYetConnectedException.
  • You can do this by callingconnectMethods the connectionsocket channel;
  • Once connected,socket channelWill remain connected until it closes.
  • Is there a connection?socket channelYou can call it by determiningisConnectedMethods.

Socket channels support non-blocking connections:

  • You can createsocket channelAnd then you can go throughconnectMethod to set up to remotesocketThe connection.
  • By calling thefinishConnectMethod to end the connection.
  • You can determine whether a connection operation is in progress by callingisConnectionPendingMethod to determine.

Socket channels support asynchronous closure, similar to the asynchronous closure operation in the Channel class.

  • ifsocketThe input is closed by one thread and another thread is heresocket channelThe read operation in the blocked thread will not read any bytes and will return- 1
  • ifsocketThe output is closed by one thread while another thread is insocket channelIf the thread is blocked because it is writing, the blocked thread will receiveAsynchronousCloseException.

Next, let’s look at the concrete implementation method.

ServerSocketChannel and SocketChannel open()

//java.nio.channels.SocketChannel#open()
public static SocketChannel open(a) throws IOException {
    return SelectorProvider.provider().openSocketChannel();
}
//java.nio.channels.SocketChannel#open(java.net.SocketAddress)
// This method saves us from calling connect again
public static SocketChannel open(SocketAddress remote)
    throws IOException
{
    / / default is blocked, the discussed the AbstractSelectableChannel
    SocketChannel sc = open();
    try {
        sc.connect(remote);
    } catch (Throwable x) {
        try {
            sc.close();
        } catch (Throwable suppressed) {
            x.addSuppressed(suppressed);
        }
        throw x;
    }
    assert sc.isConnected();
    return sc;
}
//sun.nio.ch.SelectorProviderImpl#openSocketChannel
public SocketChannel openSocketChannel(a) throws IOException {
    return new SocketChannelImpl(this);
}
//sun.nio.ch.SocketChannelImpl#SocketChannelImpl(java.nio.channels.spi.SelectorProvider)
SocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
     // Call the socket function. True indicates TCP
    this.fd = Net.socket(true);
    this.fdVal = IOUtil.fdVal(fd);
}
//sun.nio.ch.Net#socket(boolean)
static FileDescriptor socket(boolean stream) throws IOException {
    return socket(UNSPEC, stream);
}
//sun.nio.ch.Net#socket(java.net.ProtocolFamily, boolean)
static FileDescriptor socket(ProtocolFamily family, boolean stream)
    throws IOException {
    booleanpreferIPv6 = isIPv6Available() && (family ! = StandardProtocolFamily.INET);return IOUtil.newFD(socket0(preferIPv6, stream, false, fastLoopback));
}
//sun.nio.ch.IOUtil#newFD
public static FileDescriptor newFD(int i) {
    FileDescriptor fd = new FileDescriptor();
    setfdVal(fd, i);
    return fd;
}
static native void setfdVal(FileDescriptor fd, int value);
Copy the code

Socket (true) : socket(true) : socket(true) : socket(true) : socket(true) : socket(true) : socket(true) : socket(true)

JNIEXPORT jint JNICALL Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6, jboolean stream, jboolean reuse, jboolean ignored) { int fd; // stream is a datagram. TCP is SOCK_STREAM,UDP is SOCK_DGRAM. int type = (stream ? SOCK_STREAM : SOCK_DGRAM); Int domain = (ipv6_available() &&preferipv6)? AF_INET6 : AF_INET; // Call Linux socket function,domain represents protocol; Fd = socket(domain, type, 0); fd = socket(domain, type, 0); If (fd < 0) {return handleSocketError(env, errno); } /* Disable IPV6_V6ONLY to ensure dual-socket support */ if (domain == AF_INET6) { int arg = 0; If (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg, if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg, sizeof(int)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IPV6_V6ONLY"); close(fd); return -1; }} //SO_REUSEADDR has four uses: //1. Use this option when a socket1 with the same local address and port is in TIME_WAIT state and socket2 of the program you started wants to use that address and port. SO_REUSEADDR allows multiple instances (processes) of the same server to be started on the same port. However, the IP address bound to each instance cannot be the same. SO_REUSEADDR Allows a single process to bind the same port to multiple sockets, but the IP address bound to each socket is different. //4.SO_REUSEADDR allows repeated binding of identical addresses and ports. But this only applies to UDP multicast, not TCP; if (reuse) { int arg = 1; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, sizeof(arg)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set SO_REUSEADDR"); close(fd); return -1; } } #if defined(__linux__) if (type == SOCK_DGRAM) { int arg = 0; int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP; if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) && (errno ! = ENOPROTOOPT)) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IP_MULTICAST_ALL");  close(fd); return -1; } //IPV6_MULTICAST_HOPS is used to control the range of multicast, / / 1 said only in local networks forwarding, / / more introduction please refer to (http://www.ctt.sbras.ru/cgi-bin/www/unix_help/unix-man?ip6+4); /* By default, Linux uses the route default */ if (domain == AF_INET6 && type == SOCK_DGRAM) { int arg = 1; if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg, sizeof(arg)) < 0) { JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException", "Unable to set IPV6_MULTICAST_HOPS"); close(fd); return -1; } } #endif return fd; }Copy the code

After Linux 3.9, the SO_REUSEPORT configuration was added. This configuration is very powerful, so that multiple sockets (whether listening or not, whether TCP or UDP) can be bound to the same address and port as long as the SO_REUSEPORT property is set before binding. To prevent port hijacking, there is a special restriction: all sockets wishing to share source addresses and ports must have the same effective user ID. So that one user cannot “steal” a port from another user. In addition, the kernel uses a “special trick” not used on other systems to handle the SO_REUSEPORT socket:

  • For UDP sockets, the kernel tries to forward datagrams on average;
  • For TCP listening sockets, the kernel tries to evenly distribute new client connection requests (returned by ACCEPT) to sockets that share the same address and port (server listening sockets).

For example, multiple instances of a simple server application can use the SO_REUSEPORT socket to achieve a simple load balancing because the kernel already allocates requests.

As you can see in the previous code, after the socket is successfully created, the file descriptor is created by calling ioutil.newfd. In this case, I just want to know if this Socket can input, or can read, or can have errors, refer to the standard states at the end of the FileDescriptor section, and the same thing happens here, because when we write and read into the Socket, the standard states are one of three: input, output, and error. And the Socket is bound to a SocketChannel, so just bind the FileDescriptor to it, so we can get the state of it. Since FileDescriptor provides no external method for setting fd, setfdVal is implemented via local methods:

JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val)
{
    (*env)->SetIntField(env, fdo, fd_fdID, val);
}
Copy the code

For those of you who are familiar with shell programming or commands under Linux, we know that the shell redirects the error message using 2>, that is, the error message is written from channel 2, where 0 and 1 also point to the same channel. It also represents the state, so that we can operate on the state representing the Socket, that is, change the interest OPS of the SelectionKey, that is, the SelectionKey is classified according to the input and output types, and then we have the operation of the read and write state. We’ll make a stamp here and go into more detail in the next post.

Let’s go back to SocketChannel’s open method. We can see that SelectorProvider provider () openSocketChannel () returns the SocketChannelImpl object instances. SocketChannelImpl(SelectorProvider SP) does not operate on this.state, which defaults to 0, ST_UNCONNECTED, and the Socket is blocked by default. So, in general, when using asynchronous, it is common to use the open method with no arguments, and then call configureBlocking to set non-blocking.

SocketChannel connect read

As you can see from the above, we call the connect method to connect to the remote server. The source code is as follows:

//sun.nio.ch.SocketChannelImpl#connect
@Override
public boolean connect(SocketAddress sa) throws IOException {
    InetSocketAddress isa = Net.checkAddress(sa);
    SecurityManager sm = System.getSecurityManager();
    if(sm ! =null)
        sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());

    InetAddress ia = isa.getAddress();
    if (ia.isAnyLocalAddress())
        ia = InetAddress.getLocalHost();

    try {
        readLock.lock();
        try {
            writeLock.lock();
            try {
                int n = 0;
                boolean blocking = isBlocking();
                try {
                    Thread Interruptible Blocker is supported by setting the Interruptible Blocker property of the current thread
                    beginConnect(blocking, isa);
                    do {
                    // Call the connect function implementation. If blocking mode is used, it will wait until success or an exception occurs
                        n = Net.connect(fd, ia, isa.getPort());
                    } while (n == IOStatus.INTERRUPTED && isOpen());
                } finally {
                    endConnect(blocking, (n > 0));
                }
                assert IOStatus.check(n);
                // The connection succeeded
                return n > 0;
            } finally{ writeLock.unlock(); }}finally{ readLock.unlock(); }}catch (IOException ioe) {
        // connect failed, close the channel
        close();
        throwSocketExceptions.of(ioe, isa); }}Copy the code

About beginConnect and endConnect, is aimed at AbstractInterruptibleChannel begin () and end in an enhanced method. What we need to know here is that if a Channel is not blocked, we don’t need to worry about the connection breaking. As the name suggests, only blocking wait is necessary to consider interrupting the occurrence of the scene. The rest of the details have been fully commented out in the code for readers to see for themselves.

//sun.nio.ch.SocketChannelImpl#beginConnect
private void beginConnect(boolean blocking, InetSocketAddress isa)
    throws IOException
{   // Begin is entered only if it is blocked
    if (blocking) {
        // set hook for Thread.interrupt
        Thread Interruptible Blocker is supported by setting the Interruptible Blocker property of the current thread
        begin();
    }
    synchronized (stateLock) {
        // The default is open unless the close method is called
        ensureOpen();
        // Check the connection status
        int state = this.state;
        if (state == ST_CONNECTED)
            throw new AlreadyConnectedException();
        if (state == ST_CONNECTIONPENDING)
            throw new ConnectionPendingException();
        // Asserts whether the current state is unconnected, if so, the assignment indicates that the connection is in progress
        assert state == ST_UNCONNECTED;
        // Indicates that the connection is in progress
        this.state = ST_CONNECTIONPENDING;
        // only if the local address is unbound, that is, if the bind method is not called,
        // This method is also seen in ServerSocketChannel
        if (localAddress == null)
            NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
        remoteAddress = isa;

        if (blocking) {
            // record thread so it can be signalled if neededreaderThread = NativeThread.current(); }}}Copy the code

In the connection process, we need to pay attention to several connection states :ST_UNCONNECTED, ST_CONNECTED, ST_CONNECTIONPENDING, ST_CLOSING, ST_KILLPENDING, ST_KILLED, Because it is a public state, it can be joined by multiple threads. Thus, state is defined as a volatile variable that requires stateLock to act as a synchronized lock when changing.

//sun.nio.ch.SocketChannelImpl#endConnect
private void endConnect(boolean blocking, boolean completed)
    throws IOException
{
    endRead(blocking, completed);
    // If n>0, the connection is successful and the status is ST_CONNECTED
    if (completed) {
        synchronized (stateLock) {
            if(state == ST_CONNECTIONPENDING) { localAddress = Net.localAddress(fd); state = ST_CONNECTED; }}}}//sun.nio.ch.SocketChannelImpl#endRead
private void endRead(boolean blocking, boolean completed)
    throws AsynchronousCloseException
{   // Enter only when blocked
    if (blocking) {
        synchronized (stateLock) {
            readerThread = 0;
            // notify any thread waiting in implCloseSelectableChannel
            if(state == ST_CLOSING) { stateLock.notifyAll(); }}/ / and the begin in pairs, when a thread is interrupted, throw ClosedByInterruptException
        // remove hook for Thread.interruptend(completed); }}Copy the code

Net.connect(fd, ia, ISa.getPort ()) :

//sun.nio.ch.Net#connect
static int connect(FileDescriptor fd, InetAddress remote, int remotePort)
    throws IOException
{
    return connect(UNSPEC, fd, remote, remotePort);
}
//sun.nio.ch.Net#connect
static int connect(ProtocolFamily family, FileDescriptor fd, InetAddress remote, int remotePort)
    throws IOException
{
    booleanpreferIPv6 = isIPv6Available() && (family ! = StandardProtocolFamily.INET);return connect0(preferIPv6, fd, remote, remotePort);
}
Copy the code

This method will eventually call native methods.

JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6, jobject fdo, jobject iao, jint port)
{
    SOCKETADDRESS sa;
    int sa_len = 0;
    int rv;
    Struct sockaddr format
    if(NET_InetAddressToSockaddr(env, iao, port, &sa, &sa_len, preferIPv6) ! =0) {
        return IOS_THROWN;
    }
    // Pass in fd and sockaddr to establish a connection with the remote server
   // If configureBlocking(false) is set, it will not block, otherwise it will block until timeout or an exception occurs
    rv = connect(fdval(env, fdo), &sa.sa, sa_len);
    //0 indicates that the connection is successful. If the connection fails, obtain the cause by errno
    if(rv ! =0) {
        // Not blocked, connection not established (-2)
        if (errno == EINPROGRESS) {
            return IOS_UNAVAILABLE;
        } else if (errno == EINTR) {
            / / the interrupt (3)
            return IOS_INTERRUPTED;
        }
        return handleSocketError(env, errno);
    }
    // It takes time to establish a TCP connection, so unless it is a local network,
    // In general, non-blocked mode returns IOS_UNAVAILABLE more often;
    return 1;
}
Copy the code

As can be seen from the above comment, if the connection is not blocked and the connection is not immediately established, the return is -2, that is, the connection is not successfully established. According to the previous source of beginConnect, the state is ST_CONNECTIONPENDING. Then, under the non-blocking condition, When does it become ST_CONNECTED? Is there a way to query the status or wait for the connection to complete? Let us to pay attention to the sun. Nio. Ch. SocketChannelImpl# finishConnect

SocketChannelImpl finishConnect interpretation

First of all, we look back, we are involved in front of the sun. The nio. Ch. ServerSocketAdaptor usage, convenient we only Socket programming habits people use, here, we can also see the core of basic implementation logic, SocketAdaptor = ServerSocketAdaptor = SocketAdaptor = ServerSocketAdaptor = ServerSocketAdaptor = ServerSocketAdaptor = ServerSocketAdaptor = ServerSocketAdaptor

//java.net.Socket#Socket
private Socket(SocketAddress address, SocketAddress localAddr,
                boolean stream) throws IOException {
    setImpl();

    // backward compatibility
    if (address == null)
        throw new NullPointerException();

    try {
        createImpl(stream);
        if(localAddr ! =null)
            bind(localAddr);
        connect(address);
    } catch (IOException | IllegalArgumentException | SecurityException e) {
        try {
            close();
        } catch (IOException ce) {
            e.addSuppressed(ce);
        }
        throwe; }}Copy the code

Here, we can call the Java. Nio. Channels. SocketChannel# open (), and then call the resulting SocketChannel object socket () method, you can get sun. Nio. Ch. SocketAdaptor object instance. Let’s look at SocketAdaptor’s Connect implementation:

//sun.nio.ch.SocketAdaptor#connect
public void connect(SocketAddress remote) throws IOException {
    connect(remote, 0);
}

public void connect(SocketAddress remote, int timeout) throws IOException {
    if (remote == null)
        throw new IllegalArgumentException("connect: The address can't be null");
    if (timeout < 0)
        throw new IllegalArgumentException("connect: timeout can't be negative");

    synchronized (sc.blockingLock()) {
        if(! sc.isBlocking())throw new IllegalBlockingModeException();

        try {
            // If the timeout is not set, it will wait until the connection or an exception occurs
            // no timeout
            if (timeout == 0) {
                sc.connect(remote);
                return;
            }
            // If timeout is set, the Socket is set to non-blocking
            // timed connect
            sc.configureBlocking(false);
            try {
                if (sc.connect(remote))
                    return;
            } finally {
                try {
                    sc.configureBlocking(true);
                } catch (ClosedChannelException e) { }
            }

            long timeoutNanos = NANOSECONDS.convert(timeout, MILLISECONDS);
            long to = timeout;
            for (;;) {
                // Make connections by counting the timeout, loop indefinitely within the allowed time range,
                // If time out, close the Socket
                long startTime = System.nanoTime();
                if (sc.pollConnected(to)) {
                    boolean connected = sc.finishConnect();
                    // See the explanation below
                    assert connected;
                    break;
                }
                timeoutNanos -= System.nanoTime() - startTime;
                if (timeoutNanos <= 0) {
                    try {
                        sc.close();
                    } catch (IOException x) { }
                    throw newSocketTimeoutException(); } to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS); }}catch (Exception x) {
            Net.translateException(x, true); }}}Copy the code

One small note: In Java, the Assert keyword was introduced in Java SE 1.4. To avoid errors caused by using the assert keyword in older Versions of Java code, Java does not enable assert checking by default (all assertion statements are ignored at this point!). ) , you need to enableassertion checking with the -enableassertions or -EA switch. Through the above source code comments, I believe everyone already know the process of roughly, about the sun. The nio. Ch. SocketChannelImpl# finishConnect what did, here, let’s explore:

//sun.nio.ch.SocketChannelImpl#finishConnect
@Override
public boolean finishConnect(a) throws IOException {
    try {
        readLock.lock();
        try {
            writeLock.lock();
            try {
                // no-op if already connected
                if (isConnected())
                    return true;

                boolean blocking = isBlocking();
                boolean connected = false;
                try {
                    beginFinishConnect(blocking);
                    int n = 0;
                    if (blocking) {
                        do {
                            // In blocking cases, the second argument is passed true
                            n = checkConnect(fd, true);
                        } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
                    } else {
                        // In non-blocking cases, false is passed as the second argument
                        n = checkConnect(fd, false);
                    }
                    connected = (n > 0);
                } finally {
                    endFinishConnect(blocking, connected);
                }
                assert(blocking && connected) ^ ! blocking;return connected;
            } finally{ writeLock.unlock(); }}finally{ readLock.unlock(); }}catch (IOException ioe) {
        // connect failed, close the channel
        close();
        throwSocketExceptions.of(ioe, remoteAddress); }}//sun.nio.ch.SocketChannelImpl#checkConnect
private static native int checkConnect(FileDescriptor fd, boolean block)
    throws IOException;
Copy the code

About beginFinishConnect and endFinishConnect and our analysis before sun. Nio. Ch. SocketChannelImpl# beginConnect with sun. Nio. Ch. SocketChannelImpl# endC Onnect process is similar, do not understand readers can look back. CheckConnect (fd, true); checkConnect(fd, true);

JNIEXPORT jint JNICALL
Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this,
                                               jobject fdo, jboolean block)
{
    int error = 0;
    socklen_t n = sizeof(int);
    // Get the fd in FileDescriptor
    jint fd = fdval(env, fdo);
    int result = 0;
    struct pollfd poller;
    // File descriptor
    poller.fd = fd;
    The requested event is a write event
    poller.events = POLLOUT;
    // The event returned
    poller.revents = 0;
    
    //-1 indicates that the process is blocked. 0 indicates that the process is returned immediately without blocking
    result = poll(&poller, 1, block ? -1 : 0);
    // Less than 0 indicates invocation failure
    if (result < 0) {
        if (errno == EINTR) {
            return IOS_INTERRUPTED;
        } else {
            JNU_ThrowIOExceptionWithLastError(env, "poll failed");
            returnIOS_THROWN; }}// When not blocked, 0 indicates that no connection is ready
    if(! block && (result ==0))
        return IOS_UNAVAILABLE;
    // The number of sockets ready for write or error >0
    if (result > 0) {
        errno = 0;
        result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n);
        / / error
        if (result < 0) {
            return handleSocketError(env, errno);
        // An error occurred
        } else if (error) {
            return handleSocketError(env, error);
        } else if((poller.revents & POLLHUP) ! =0) {
            return handleSocketError(env, ENOTCONN);
        }
        // The socket is ready to write, that is, the connection has been established
        // connected
        return 1;
    }
    return 0;
}
Copy the code

Specific process as shown in the source code comments, which is blocking the sun before we source and local method. Nio. Ch. SocketChannelImpl# finishConnect corresponding behavior. In addition, see from the above source code, the bottom is to poll the socket state, so as to determine whether the connection is established successfully; Since in non-blocking mode the finishConnect method returns immediately, according to the processing here of sun.nio.ch.SocketAdaptor#connect, which uses a loop to determine whether a connection has been established, this is not recommended in our nio programming and is a semi-finished product. Instead, it is recommended to register Selector, get the SelectionKey of the connection through ops=OP_CONNECT, and then call finishConnect to set up the connection. Can finishConnect not be called? The answer is no, because only in finishConnect is the state updated to ST_CONNECTED, and the state is judged when both read and write calls are made.

So here, we’re kind of getting into the Selector and the SelectionKey that we’re going to cover, and we’re going to cover that in more detail in the next chapter.