Address: juejin.cn/post/684490…

Netty use

Netty. IO /wiki/user-g… , I used 4.1.xx, generally speaking, it is not a major version change, the change will not be very big. Here is the Netty Server demo, with the official website is the same.

public class Main {

    // The following is a receiver thread with three worker threads
    // With Netty's default thread factory, this parameter can be omitted
    private final static ThreadFactory threadFactory = new DefaultThreadFactory("The Netty Way to Learn");
    // Boss thread pool for receiving client connections
    private final static NioEventLoopGroup boss = new NioEventLoopGroup(1,threadFactory);
    // Worker thread pool, used to handle client operations
    private final static NioEventLoopGroup worker = new NioEventLoopGroup(3,threadFactory);
    / * * the following is in the constructor, if you don't pass the number of threads, the default is 0, super to MultithreadEventLoopGroup here after, end up with a number of CPU cores * 2 as the number of threads, Max (1, 1) * private static final int DEFAULT_EVENT_LOOP_THREADS = math.max (1, 1) SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); * protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { * super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); *} * /

    public static void main(String[] args) throws Exception{
        try {
            new NettyServer(8888).start();
// NIOTest();
            System.out.println(1<<0);
        }catch(Exception e){
            System.out.println(Netty Server startup failed); e.printStackTrace(); }}static class NettyServer{

        private int port;

        NettyServer(int port){
            this.port = port;
        }

        void start(a)throws Exception{
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                ChannelFuture future = serverBootstrap
                        .group(boss, worker)
                        .channel(NioServerSocketChannel.class)
                        // The size of the client connection waiting queue
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        // Receive buffer
                        .option(ChannelOption.SO_RCVBUF, 32*1024)
                        // Connection timed out
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10*1000)
                        .childHandler(new ChildChannelHandle())
                        .bind(this.port)
                        .sync();
                future.channel().closeFuture().sync();

            }catch(Exception e){
                throw e;
            }finally{ boss.shutdownGracefully(); worker.shutdownGracefully(); }}}static class ChildChannelHandle extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            // String encoding
            pipeline.addLast(new StringEncoder());
            // String decode
            pipeline.addLast(new StringDecoder());
            // Custom handle. Handle is processed after state changes
            pipeline.addLast(new StatusHandle());
            // Custom Handle now handles messages that are read
            pipeline.addLast(newCustomHandle()); }}}Copy the code

The operation of the client is simple to use the terminal to operate

Inactive and active states are output, and the output receives the data and returns it to the client as is

Let’s look at the code

  • CustomHandle

The data received from the client is processed

public class CustomHandle extends ChannelInboundHandlerAdapter {

    private Thread thread = Thread.currentThread();
  
		// The event that reads the client data
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	      // Simply output it and return it to the client as is
        System.out.println(thread.getName()+": channelRead content : "+msg); ctx.writeAndFlush(msg); }}Copy the code
  • StatusHandle

Handle(client offline and offline event) for handling state changes

public class StatusHandle extends ChannelInboundHandlerAdapter {
    private Thread thread = Thread.currentThread();
    private String ip;

		// The client goes online
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ip = ctx.channel().remoteAddress().toString();
        System.out.println(thread.getName()+": ["+this.ip+"] channelActive -------");
    }

		// Client offline event
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(thread.getName()+": ["+this.ip+"] channelInactive -------"); }}Copy the code

It’s marked with two places, where we’re going to go next

1.NioServerSocketChannel作用相当于NIO ServerSocketChannel 2.ChildChannelHandle extends ChannelInitializer , 实现 initChannel 方法,这个东西延伸到一个重要的概念,Netty的事件传递 Pipeline


NioServerSocketChannel

This class is Netty’s server-side class for receiving client connections, etc. Those of you who have used NIO know that when serverSocket is opened, you need to register the ACCEPT event to listen for client connections

  • Here are the Java NIO events (Netty is based on NIO, so there will be events like NIO)

  • public static final int OP_READ = 1 << 0; // Read message events

  • public static final int OP_WRITE = 1 << 2; // Write message events

  • public static final int OP_CONNECT = 1 << 3; // Connection ready event

  • public static final int OP_ACCEPT = 1 << 4; // New connection event

Take a look at the NioServerSocketChannel’s inheritance class diagram

From the above demo channel (NioServerSocketChannel. Class) to start, you can see is a factory to generate the channel.

 public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        } else {
            return this.channelFactory((io.netty.channel.ChannelFactory)(newReflectiveChannelFactory(channelClass))); }}Copy the code

Constructor called when the factory method generates NioServerSocketChannel:

public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code

Continue to follow the AbstractNioChannel constructor:

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        // Remember this place records readInterestOp
        this.readInterestOp = readInterestOp;
        try {
        		// Set it to non-blocking
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2); }}throw new ChannelException("Failed to enter non-blocking mode.", e); }}Copy the code

Go back to the chain call of ServerBootstrap, and then look at the bind(port) method, and if you trace it down, you’ll see

private ChannelFuture doBind(final SocketAddress localAddress) {
  			// Initialize and register
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if(regFuture.cause() ! =null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if(cause ! =null) {
                        promise.setFailure(cause);
                    } else{ promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code

See initAndRegister method

final ChannelFuture initAndRegister(a) {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if(channel ! =null) {
                channel.unsafe().closeForcibly();
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
  			// To see the registration here, continue to read
        ChannelFuture regFuture = config().group().register(channel);
        if(regFuture.cause() ! =null) {
            if (channel.isRegistered()) {
                channel.close();
            } else{ channel.unsafe().closeForcibly(); }}return regFuture;
    }
Copy the code

config().group().register(channel); Looking down, trace AbstractChannel’s register –> regist0(promise) (leaving out some calling code in the middle because there are too many calls)

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
                }
                boolean firstRegistration = neverRegistered;
                // Perform registration
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
	              Make sure we actually call the handleAdded method in pipeline before using the promise notification
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                // Call regist first
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                ChannelActive is called only if a channel has not been registered before
                // There is a need to prevent channelActive being called multiple times while channel deregistered(logout) and re-registered(repeat call regist)
                if (isActive()) {
                    if (firstRegistration) {
                        Execute the channelActive method
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // Channel is registered and autoRead() is set. This means we need to start reading again to process inbound data
                        // See https://github.com/netty/netty/issues/4805beginRead(); }}}catch (Throwable t) {
                // Close the channel directly to avoid FD leak.closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}Copy the code

Seeing the doRegister() method, keep going, tracing back to AbstractNioChannel’s doRegister() method

protected void doRegister(a) throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // Call the Java NIO registry here
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
                return;
            } catch (CancelledKeyException e) {
                if(! selected) { eventLoop().selectNow(); selected =true;
                } else {
                    throwe; }}}}Copy the code

Those of you who have written about NIO should be familiar with the sentence above:

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);

Copy the code

This is a registry that calls Java NIO. Why ops = 0 at the time of registration? Keep tracking and omit a bunch of calls to…. Finally, AbstractNioChannel’s doBeginRead() method will be called to modify the interestOps of selectionKey, and the client will do the same for the registered read event.

protected void doBeginRead(a) throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if(! selectionKey.isValid()) {return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
  	// // here is to determine whether the same event has been registered, if not, modify ops
    if ((interestOps & readInterestOp) == 0) {
        // This. ReadInterestOp was initialized on the top of the page when ops == 0
        ReadInterestOp = this.readinterestop = this.readinterestop = this.readinterestop = this.readinterestopselectionKey.interestOps(interestOps | readInterestOp); }}Copy the code

The ACCEPT event on the server was described above, followed by the client connection process.

The server accepts the last call to NIO’s register method. Read also calls NIO’s register, but before SocketChannel(client) calls register, There was a server.accept() method on the server to get the client connection, and as a result, the accept method was found in the NioServerSocketChannel.

/ / 1
    protected int doReadMessages(List<Object> buf) throws Exception {
        // Accept client, passed serverSocketChannel
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if(ch ! =null) {
                // Create a new Netty Channel and set OPS =1 (read). The last call to doBeginRead will change the ops value, just like the server's
                buf.add(new NioSocketChannel(this, ch));
                return 1; }}catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2); }}return 0;
    }
    / / 2
    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run(a) throws IOException {
                		// NiO's method
                    returnserverSocketChannel.accept(); }}); }catch (PrivilegedActionException e) {
            throw(IOException) e.getCause(); }}Copy the code

When the client connects, the server.accept() method above is triggered, and AbstractChannel’s register method is invoked

AbstractChannel.this.pipeline.fireChannelRegistered();// This method calls the following two methods
Copy the code
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run(a) { next.invokeChannelRegistered(); }}); }}private void invokeChannelRegistered(a) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch(Throwable t) { notifyHandlerException(t); }}else{ fireChannelRegistered(); }}Copy the code

Some calls are omitted (Netty calls are a bit complicated, I have to say) Finally, the handlerAdded method of ChannelInitializer will be called. These are all related to a pipeline.

Pipeline

A pipeline (whatever) is a Handle messaging chain.

Have a look at the two methods in AbstractChannelHandlerContext first

    // Find the next inboundHandle (find intBound from the current position back)
    private AbstractChannelHandlerContext findContextInbound(a) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next; // Look later
        } while(! ctx.inbound);return ctx;
    }

    // Look up the next OutboundHandle (look up outBound from the current location)
    private AbstractChannelHandlerContext findContextOutbound(a) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev; // look ahead
        } while(! ctx.outbound);return ctx;
    }
Copy the code

Why is this, we from AbstractChannelHandleContext read and write two methods to check

AbstractChannelHandleContext

read

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        // findContextInbound() is the first of the two methods, which results in finding a subsequent inbound handle
        // Call the following method
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
	The channelRead event of the next InBoundHandleHandle is triggered when the InBoundHandleHandle event is read back from the current position
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                public void run(a) { next.invokeChannelRead(m); }}); }}Copy the code

write

// When writing a data (out of the stack) event, the current Handle looks forward to the OutBoundHandle, triggering the write event of the previous OutBoundHandle
  private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = this.findContextOutbound();
    Object m = this.pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
      if (flush) {
        next.invokeWriteAndFlush(m, promise);
      } else{ next.invokeWrite(m, promise); }}else {
      Object task;
      if (flush) {
        task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);
      } else {
        task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);
      }

      if(! safeExecute(executor, (Runnable)task, promise, m)) { ((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel(); }}}Copy the code

A place you might not normally notice:

In the method above is AbstractChannelHandleContext, AbstractChannel also have write method, then we will speak CTX. Write () and channel. The write () the two difference method call

ctx.write vs channel.write

AbstractChannelHandleContext#ctx.write()

    public ChannelFuture write(Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        } else {
            try {
                if (this.isNotValidPromise(promise, true)) {
                    ReferenceCountUtil.release(msg);
                    returnpromise; }}catch (RuntimeException var4) {
                ReferenceCountUtil.release(msg);
                throw var4;
            }
          	// Call the following method
            this.write(msg, false, promise);
            returnpromise; }}private void write(Object msg, boolean flush, ChannelPromise promise) {
        // Find the next outboundHandle
        AbstractChannelHandlerContext next = this.findContextOutbound();
        Object m = this.pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        // The following code is too omitted. }Copy the code


AbstractChannel#channel.write()

    // ctx.channel().write()
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        // Call the following method
        return this.pipeline.writeAndFlush(msg, promise);
    }
    public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        // tail is the last handle
        return this.tail.writeAndFlush(msg, promise);
    }
Copy the code

Conclusion: Ctx.write () this method looks for OutBoundHandle before the current handle begins for event delivery. Channel.write () looks up the OutBoundHandle from the last handle(tail) of the pipeline to pass the event. So channel.write() passes events through all outboundHandles.


// String encoding
pipeline.addLast(new StringEncoder()); // 1.outbound
// String decode
pipeline.addLast(new StringDecoder()); // 2.inbound
// Custom handle. Handle is processed after state changes
pipeline.addLast(new StatusHandle()); // 3.inbound
// Custom Handle now handles messages that are read
pipeline.addLast(new CustomHandle()); // 4.inbound
Copy the code

The order of adding the above four handles is out, in, in, in, so the final call will be like the following (the number before handle is only the order of adding into pipeline).

ChannelInitializer

See the ChildChannelHandle class above for this class. This method is called when a channel is registered, which initializes the client-side pipeline, and when the method returns, This instance (ChannelInitializer) will be removed from the ChannelPipeline (client pipeline). The original:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter/ * * *This method will be called once the {@link Channel} was registered. After the method returns this instance  
 * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
 *
 * @param ch            the {@link Channel} which was registered.
 * @throws Exception    is thrown if an error occurs. In that case it will be handled by
 *                      {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close
 *                      the {@link Channel}.
 */
protected abstract void initChannel(C ch) throws Exception;
Copy the code

initChannel

In addition to this abstract method, the class also has an overloaded method, the implementation of which is in this method.

 private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
              	/ / the second step
              	// We call our own abstract method and add the handle we defined earlier to the client pipeline
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
              	// Step 3, remove yourself (ChannelInitializer) from the pipeline
              	// The client will not have ChannelInitializer for the handle(which is also a Handle)
              	// It is used to initialize the handle in the client pipeline
                remove(ctx);
            }
            return true;
        }
        return false;
    }

Copy the code

remove

// Remove yourself from the client-side pipeline
private void remove(ChannelHandlerContext ctx) {
    try {
        ChannelPipeline pipeline = ctx.pipeline();
        if (pipeline.context(this) != null) {
            pipeline.remove(this); }}finally{ initMap.remove(ctx); }}Copy the code

This article is a little bit more code, if only demo use, do not need to spend any time, if you want to have a deeper understanding of Netty, you can start from the event here on the source of a little bit of analysis.

The last

This content is over here, the last last, thank you very much to see here!! Your reading is an affirmation of the author!! Feel the article has the help of the officer easily point praise again walk bai (finally exposed me is to cheat praise (◒. ◒)), each of your likes is very important to the author (incredibly true) and a double to the author’s writing!!