public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .localAddress(9000)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                        pipeline.addLast(newSomeSocketClientHandler()); }});// The Clinet client starts the process, ready to trace the CONNECT method
        ChannelFuture future = bootstrap.connect("localhost".8888).sync();
        future.channel().closeFuture().sync();
    } finally {
        if(eventLoopGroup ! =null) { eventLoopGroup.shutdownGracefully(); }}}Copy the code

Bootstrap.java

public ChannelFuture connect(InetAddress inetHost, int inetPort) {
    return connect(new InetSocketAddress(inetHost, inetPort));
}

public ChannelFuture connect(SocketAddress remoteAddress) {
  			// Data verification
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        validate();
        return doResolveAndConnect(remoteAddress, config.localAddress());
    }

    public Bootstrap validate(a) {
        super.validate();
        // There is no core processor, so the work cannot start at all
        if (config.handler() == null) {
            throw new IllegalStateException("handler not set");
        }
        return this;
    }


    /** The required data is verified and the connection service is processed@see #connect()
     */
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        // Channel creation, initialization, and registration
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if(! regFuture.isSuccess()) {return regFuture;
            }
            // Resolve the server address and connect
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if(cause ! =null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); }}});returnpromise; }}Copy the code

AbstractBootstrap.java

// Initialize and register
final ChannelFuture initAndRegister(a) {
    Channel channel = null;
    try {
        / / create parentChannel
        // Create a Channel with no arguments using reflection newInstance
        channel = channelFactory.newChannel();
        // After the object is created, initialize the channel
        init(channel);
    } catch (Throwable t) {
        if(channel ! =null) { // If the condition is true, the channel was created successfully, but there was a problem during initialization
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            // Close the channel forcibly
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // There is a problem with creating a channel
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // Register parentChannel(select eventLoop from group, bind to channel, create and start the thread)
    ChannelFuture regFuture = config().group().register(channel);
    if(regFuture.cause() ! =null) {
        if (channel.isRegistered()) {
            channel.close();
        } else{ channel.unsafe().closeForcibly(); }}// If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    // added to the event loop's task queue for later execution.
    // i.e. It's safe to attempt bind() or connect() now:
    // because bind() or connect() will be executed *after* the scheduled registration task is executed
    // because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}
Copy the code

The trace init() method is an abstract class implemented on both the Clinet side and the Server side to handle business startup processes on different ends at startup time. Note that we are now client startup, so we are tracing the init() in bootstrap.java.

Bootstrap.java

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    // Get pipeline
    ChannelPipeline p = channel.pipeline();
    // Add the ChannelInitializer handler created in Bootstrap to the pipeline
    p.addLast(config.handler());

    // Initialize the channel using the options in Bootstrap
    finalMap<ChannelOption<? >, Object> options = options0();synchronized (options) {
      // Set option to the current channel
        setChannelOptions(channel, options, logger);
    }

    // Use attrs in Bootstrap to initialize a channel
    finalMap<AttributeKey<? >, Object> attrs = attrs0();synchronized (attrs) {
        for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); }}}Copy the code

AbstractBootstrap.java

static void setChannelOptions(Channel channel, Map
       
        , Object> options, InternalLogger logger)
       > {
    / / traverse the options
    for(Map.Entry<ChannelOption<? >, Object> e: options.entrySet()) {// Initialize the currently traversed option to channelsetChannelOption(channel, e.getKey(), e.getValue(), logger); }}@SuppressWarnings("unchecked")
private static void setChannelOption(Channel channel, ChannelOption
        option, Object value, InternalLogger logger) {
        try {
            // Write option to channel config
            if(! channel.config().setOption((ChannelOption<Object>) option, value)) { logger.warn("Unknown channel option '{}' for channel '{}'", option, channel); }}catch (Throwable t) {
            logger.warn(
                    "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t); }}Copy the code

That’s it for client initialization and registration, now go back to bootstap.java and continue reading the **doResolveAndConnect()** method.

/** The required data is verified and the connection service is processed@see #connect()
 */
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // Channel creation, initialization, and registration
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
        if(! regFuture.isSuccess()) {return regFuture;
        }
        // Resolve the server address and connect
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                // failure.
                Throwable cause = future.cause();
                if(cause ! =null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); }}});returnpromise; }}// Resolve the server address and connect
    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            // Get the eventLoop bound to the channel
            final EventLoop eventLoop = channel.eventLoop();
            // Create an address resolver
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

            // What if the current address resolver does not support the address format, or the address has already been resolved? Hard even
            // If the connection succeeds, it succeeds; if the connection fails, it fails. Success or failure, the results are written into promises
            if(! resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {// Resolver has no idea about what to do with the specified remote address or it's resolved already.
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }

            // Handle the case where the address is not resolved and the address format is supported by the parser
            // Resolve the address asynchronously
            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

            // Handle the case where parsing is complete
            if (resolveFuture.isDone()) {
                final Throwable resolveFailureCause = resolveFuture.cause();
                // If an exception occurs during parsing, close the channel, otherwise connect to the resolved address
                if(resolveFailureCause ! =null) {
                    // Failed to resolve immediately
                    channel.close();
                    promise.setFailure(resolveFailureCause);
                } else {
                    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                    // getNow() gets the parse result from the asynchronous result
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                }
                return promise;
            }

            // Wait until the name resolution is finished.
            // If parsing is not complete, add listeners to it
            resolveFuture.addListener(new FutureListener<SocketAddress>() {
                / / callback
                // If an exception occurs during parsing, close the channel, otherwise connect to the resolved address
                @Override
                public void operationComplete(Future<SocketAddress> future) throws Exception {
                    if(future.cause() ! =null) {
                        channel.close();
                        promise.setFailure(future.cause());
                    } else{ doConnect(future.getNow(), localAddress, promise); }}}); }catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }


private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        // The exeuct from the eventLoop of the current Channel connects to the server address asynchronously
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run(a) {
              // If the local address is not configured, the remote connection is made
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                  // Prepare to tracechannel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }}); }Copy the code

Following up on the connection code, abstractchannel.java connect()

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, localAddress, promise);
    }


    @Override
    public final ChannelFuture connect( SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        // Use the endpoints in pipeline for connection processing
        return tail.connect(remoteAddress, localAddress, promise);
    }
Copy the code

AbstractChannelHandlerContext.java

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      // Prepare to trace
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run(a) {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
              // Prepare to trace
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch(Throwable t) { notifyOutboundHandlerException(t, promise); }}else{ connect(remoteAddress, localAddress, promise); }}Copy the code

DefultChannelPipeline.java

@Override
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.connect(remoteAddress, localAddress, promise);
}
Copy the code

AbstractNioChannel.java

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
    }

    try {
        if(connectPromise ! =null) {
            // Already a connect in process.
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();
        // Prepare to trace
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run(a) {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if(connectPromise ! =null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if(connectTimeoutFuture ! =null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null; close(voidPromise()); }}}); }}catch(Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); }}Copy the code

NioSocketChannel.java

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    // Bind the localAddress specified in bootstrap
    if(localAddress ! =null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        // Connect directly to the specified server address, which may or may not succeed at the first time
        // If it fails, a connection-ready event for the channel occurs, setting the stage for the next selector selection
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if(! connected) {// If the connection is not successful, it specifies the event it cares about as connection-ready
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if(! success) { doClose(); }}}private void doBind0(SocketAddress localAddress) throws Exception {
        // If the JDK version is greater than or equal to 7, use this method to connect
        if (PlatformDependent.javaVersion() >= 7) {
            SocketUtils.bind(javaChannel(), localAddress);
        } else{ SocketUtils.bind(javaChannel().socket(), localAddress); }}//end!!!
Copy the code