Netty source code interpretation

Let’s start with the Netty version

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.16..Final</version>
    </dependency>
Copy the code

Bind startup source

Our service is started by binding ports with the following code, so we’ll start with the bind() method

ChannelFuture f = b.bind(port).sync();
Copy the code

Always follow up to io.net ty. The bootstrap. AbstractBootstrap# doBind method

private ChannelFuture doBind(final SocketAddress localAddress) {
        //1 Initialization and registration logic
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if(regFuture.cause() ! =null) {
            return regFuture;
        }

        if (regFuture.isDone()) 
            ChannelPromise promise = channel.newPromise();
            //2 port binding logic
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if(cause ! =null) {
                         promise.setFailure(cause);
                    } else{ promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code

1 Initialization and registration logic

Following up on the initAndRegister() method and following up on the comments below, do you feel strangely familiar with the Java NIO code analogy?

final ChannelFuture initAndRegister(a) {
        Channel channel = null;
        try {
            // create ServerSocketChannel
            channel = channelFactory.newChannel();
            // Initialize the Channel
            init(channel);
        } catch (Throwable t) {
            if(channel ! =null) 
                channel.unsafe().closeForcibly();
            }
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // (2) register selector
        ChannelFuture regFuture = config().group().register(channel);
        if(regFuture.cause() ! =null) {
            if (channel.isRegistered()) {
                channel.close();
            } else{ channel.unsafe().closeForcibly(); }}return regFuture;
    }
Copy the code

(1) create NioServerSocketChannel

Look back at the code for discarding the service

.channel(NioServerSocketChannel.class)
Copy the code

Here is the NioServerSocketChannel instantiation, see io.net ty. Channel. ReflectiveChannelFactory# newChannel

@Override
    public T newChannel(a) {
        try {
            / / here refers to the clazz is NioServerSocketChannel. Class, specific how incoming, everyone can see the code yourself, are not covered here.
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class "+ clazz, t); }}Copy the code

Instantiate it by reflection so let’s just look at the NioServerSocketChannel constructor and see what’s going on, See io.net ty. Channel. Socket. Nio. NioServerSocketChannel# NioServerSocketChannel ()

    /** * Create a new instance */
    public NioServerSocketChannel(a) {there are two things going on here:1ServerSocketChannel (DEFAULT_SELECTOR_PROVIDER)2)thisMethod continues to call the constructorthis(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    // The constructor is called at (2)
    public NioServerSocketChannel(ServerSocketChannel channel) {
        Selectionkey.op_accept (selectionkey.op_accept, selectionkey.op_accept, selectionkey.op_accept)
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
Copy the code

See io.net ty. Channel. Nio. AbstractNioChannel# AbstractNioChannel

/**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // We still continue to follow up
        super(parent);
        this.ch = ch;
        // The selectionkey. OP_ACCEPT event is stored in the member variable
        this.readInterestOp = readInterestOp;
        try {
            // Set to non-blocking mode, feel familiar again?
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2); }}throw new ChannelException("Failed to enter non-blocking mode.", e); }}Copy the code

See io.net ty. Channel. AbstractChannel# AbstractChannel (io.net ty. Channel. The channel)

    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        // The Java NIO ServerSocketChannel is stored in the member variable
        this.parent = parent;
        // Give this Channel an ID
        id = newId();
        // instantiate broadening
        unsafe = newUnsafe();
        // A Pipeline is associated with a Channel.
        pipeline = newChannelPipeline();
    }
Copy the code

Let’s review what we did to create a NioServerSocketChannel:

  • (1) Create a Java NIO underlying SeverSocketChannel as its own member variable;
  • (2) Give yourself an ID;
  • (3) Create a Pipeline as its own member variable;
  • (4) Set to non-blocking mode.

(2) Register selector

Config ().group() = bossGroup (workerGroup); That the entire config (.) group (). The register (channel); This can be interpreted as registering our Channel on EventLoop. Let’s ignore Netty’s method of selecting which bossGroup EventLoop to register on EventLoop. See io.net ty. Channel. SingleThreadEventLoop# register (io.net ty. Channel. ChannelPromise)

@Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // The unsafe declaration for NioServerSocketChannel is unsafe. Note that the current EventLoop and NioServerSocketChannel are both passed in
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
Copy the code

Continue to follow up, see io.net ty. Channel. AbstractChannel. AbstractUnsafe# register

 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {... AbstractChannel.this.eventLoop = eventLoop;

            // Now we are in main thread, so we will go else
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try { 
                    // This is where eventLoop is enabled, which we'll cover in the next chapter
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run(a) {
                            // So the current eventLoop is registeredregister0(promise); }}); }catch(Throwable t) { ... }}}Copy the code

In register0 continue with, see io.net ty. Channel. AbstractChannel. AbstractUnsafe# register0

private void register0(ChannelPromise promise) {
            try {
                if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
                }
                boolean firstRegistration = neverRegistered;
                // Continue to follow this methoddoRegister(); . }Copy the code

Follow up doRegister (), see io.net ty. Channel. Nio. AbstractNioChannel# doRegister

@Override
    protected void doRegister(a) throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // Let's think back to Java NIO to Netty (2)
                selectionKey = javaChannel().register(eventLoop().selector, 0.this);
                return;
            } catch(CancelledKeyException e) { ... }}Copy the code

In this code, javaChannel() gets the Java NIO ServerSocketChannel. This register method is also the Java NIO register method. Register this ServerSocketChannel with the selector of the current eventLoop. Now, some of you might be asking, ServerSocketChannel should register the OP_ACCEPT event with the selector, but what’s going on here? Don’t worry, look down.

2 Port binding logic

After when creating the NioServerSocketChannel, invoked doBind0 to create good NioServerSocketChannel incoming, see io.net ty. The bootstrap. AbstractBootstrap# doBind0

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run(a) {
                if (regFuture.isSuccess()) {
                    Bind on eventLoop of NioServerSocketChannel
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code

Continue to follow up, see io.net ty. Channel. AbstractChannel. AbstractUnsafe# bind

@Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {...boolean wasActive = isActive();
            try {
                //(1) Port binding
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if(! wasActive && isActive()) { invokeLater(new Runnable() {
                    @Override
                    public void run(a) {
                        // (2) Propagate the ChannelActive event on pipeline: how do I register the OP_ACCEPT event with selectorpipeline.fireChannelActive(); }}); } safeSetSuccess(promise); }Copy the code

(1) Port binding

To continue, see io.net ty. Channel. Socket. Nio. NioServerSocketChannel# doBind

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        // Does this place feel familiar again? Call Java NIO bind for port binding. See "From Java NIO to Netty (1)".
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else{ javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code

(2) Spread ChannelActive event on pipeline

To follow, see io.net ty. Channel. DefaultChannelPipeline. HeadContext# channelActive

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This is where events are propagated to the next list node
        ctx.fireChannelActive();
        // It depends on this
        readIfIsAutoRead();
    }
    private void readIfIsAutoRead(a) {
            if (channel.config().isAutoRead()) {
                // Go insidechannel.read(); }}Copy the code

Move on until you reach the io.net ty. Channel. Nio. AbstractNioChannel# doBeginRead (if you don’t know how to get advice the debug mode to talk to myself)

    @Override
    protected void doBeginRead(a) throws Exception {
        // The official comment makes it clear that these two methods will eventually be called here
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if(! selectionKey.isValid()) {return;
        }

        readPending = true;
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // Remember when we created NioServerSocketChannel and assigned the selectionkey. OP_ACCEPT event to readInterestOp? So if I look at this, I'm going to add this event to this selectionKeyselectionKey.interestOps(interestOps | readInterestOp); }}Copy the code

So, if we’re done with the bind source code, do we have any idea what we’re doing? Compare this code to Java NIO to Netty (4), but with selector on eventloop. So let’s look at the source code for two EventLoopGroups in the next chapter.

        / / open the ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // Listen on port 9999
        serverSocketChannel.socket().bind(new InetSocketAddress(9999));
        // Set to non-blocking mode
        serverSocketChannel.configureBlocking(false);
        / / create the Selector
        Selector selector = Selector.open();
        // Register the selector serverSocketChannel with the Selector and indicate during the registration that the serverSocketChannel can Accept
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
Copy the code