Offer to come, dig friends take it! I am participating in the 2022 Spring Recruit Punch card activity. Click here for details

1. The introduction

You can create an EventLoopGroup by creating an executor that can be implemented using the Jdk. This is a thread pool. There is a constructor like this:

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
Copy the code
  • NThreads: Indicates the number of EventLoopGroup threads
  • Executor: Indicates a user-defined executor

Executor We use the thread pool implemented by the JDK as the executor.

Question: What happens when the value of nThreads is greater than the maximum number of threads in the thread pool?

Here’s a code look at what happens with this problem and how to fix it.

2. Case code

Server code

public class DiscardServer {
    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run(a) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); / / (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup(3, Executors.newFixedThreadPool(2.new ThreadFactory() {
            private AtomicInteger threadNumber = new AtomicInteger();
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "Thread-mxsm-"+threadNumber.incrementAndGet()); }}));try {
            ServerBootstrap b = new ServerBootstrap(); / / (2)
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class) / / (3)
                .childHandler(new ChannelInitializer<SocketChannel>() { / / (4)
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TimeServerInHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)          / / (5)
                .childOption(ChannelOption.SO_KEEPALIVE, true); / / (6)
            ChannelFuture f = b.bind(port).sync(); / / (7)
            f.channel().closeFuture().sync();
        } finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        newDiscardServer(port).run(); }}@Sharable
public class TimeServerInHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(this.getClass().getSimpleName()+"--channelRead");
        final ByteBuf time = ctx.alloc().buffer(4); / / (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        ctx.writeAndFlush(time); / / (3)}}Copy the code

Client code:

public class TimeClient {
    public static void main(String[] args) throws Exception {

        for(int i = 0; i < 100; ++i){
            new Thread(new Runnable(){
                @Override
                public void run(a) {
                    try {
                        new TimeClient().test();
                    } catch(Exception e) { e.printStackTrace(); } } }).start(); }}public  void test(a) throws Exception{
        String host = "127.0.0.1";
        int port = Integer.parseInt("8080");
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); / / (1)
            b.group(workerGroup); / / (2)
            b.channel(NioSocketChannel.class); / / (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); / / (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // ch.pipeline().addLast(new TimeClientOutHandler());
                    ch.pipeline().addLast(newTimeClientInHandler()); }});// Start the client.
            ChannelFuture f = b.connect(host, port).sync(); / / (5)
            ByteBuf byteBuf = Unpooled.buffer();
            // Wait until the connection is closed.
            byteBuf.writeBytes("1111".getBytes(StandardCharsets.UTF_8));
            f.channel().writeAndFlush(byteBuf);

            f.channel().closeFuture().sync();
        } finally{ workerGroup.shutdownGracefully(); }}}public class TimeClientInHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(this.getClass().getSimpleName()+"--channelRead");
        ByteBuf m = (ByteBuf) msg; / / (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            //ctx.close();
        } finally{ m.release(); }}}Copy the code

Run the server code and the client code separately, then run the command:

jps
Copy the code

Then find the PID on the server side and run the command

jstack -l <pid>
Copy the code

Running results:

3. Result analysis

You can see from the above thread that only two threads are created in the workerGroup, meaning that the number of threads in this case is determined by the thread pool. Let’s analyze why this is the case from the source code:

Create NioEventLoopGroup constructor calls the MultithreadEventExecutorGroup constructor.

If the Executor position 1 is empty, use Netty’s custom:

Netty’s Executor, as shown above, creates a thread each time it executes a submitted task and then binds the thread to an EventLoop. If the user passes in the Executor is not empty then the incoming Executor is used.

The difference between an Executor using a custom and Netty custom implementation is shown below:

Summary: When using a custom Executor, it is best to keep the number of threads the same as the number of nThreads in the NioEventLoopGroup constructor. If the number is different, it will reduce the throughput of the workGroup

The number of Threads in the NioEventLoopGroup constructor refers to the number of Eventloops, but there is no strict requirement in Netty that one Thread must correspond to one EventLoop

4. To summarize

  • Netty Create NioEventLoopGroup You are advised to use Netty’s default Executor implementation. This avoids the above problems
  • When working with Channel data, you can use the JDK thread pool as a business thread pool for business processing
  • Use Netty directly, or set it to the same.

I am ant back elephant, the article is helpful to you like to pay attention to me, the article has incorrect place please give correct comments ~ thank you