This is the 16th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Multithreaded architecture

All of the server applications mentioned above are run on a single thread, which not only connects requests from clients, but also handles read and write events, which is not efficient enough. Today’s computers are multi-core processors, which means multiple threads can run simultaneously, so servers should take advantage of this.

Server threads can create multiple threads and divide them into two groups:

  • One of these threads is dedicated to handling accept events, called the Boss thread
  • Create threads of CPU cores, each with a Selector, that take turns handling read events, called Worker threads

The relationship between them can be understood by the following picture:

The Boss thread is only responsible for Accept events, while the Worker thread is responsible for reading and writing between the client and server, and each maintains a Selector that listens for events on the channel. When the Boss thread detects a connection request from a client, it registers the SocketChannel returned by the connection with one of the Worker threads. When a read/write event occurs, one of the Worker threads will detect the event and process it in the thread. Such a design achieves the separation of functions on the thread.

Implementation of Worker class

The above analysis shows that the Worker class is a new Thread that listens and processes read and write events. Therefore, a Thread object is needed in the Worker class to start the Thread, and a Selector is needed to listen to the event management channel. In addition, a name is set for the Worker. These can be initialized in the constructor.

We then need to implement a register function that receives a SocketChannel object, and the Worker class register function registers the SocketChannel object with the Worker class selector.

Here is the complete Worker class code:

static class Worker implements Runnable{

        private String name;
        private Thread thread;
        private Selector selector;
        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

        public Worker(String name) throws IOException {
            this.name = name;
            thread = new Thread(this, name);
            thread.start();
            selector = Selector.open();
        }


        // Initializes the thread and selector
        public void register(SocketChannel sc) throws IOException {
            // Add tasks to the queue
            queue.add(()->{
                try {
                    sc.register(this.selector,SelectionKey.OP_READ,null);//boss
                } catch(ClosedChannelException e) { e.printStackTrace(); }}); selector.wakeup(); }@Override
        public void run(a) {
            while(true) {try{
                    selector.select();// Worker-0 will block at first and cannot be executed, so wakeup is required to register the worker-0
                    Runnable task = queue.poll();
                    if(task! =null)
                    {
                        task.run();/ / perform the sc. Register (enclosing the selector, SelectionKey. OP_READ, null); //boss
                    }
                    Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = ((SocketChannel) key.channel());
                            channel.read(buffer);
                            buffer.flip();
                            ByteBufferUtil.debugAll(buffer);/ / buffer visualizationkey.cancel(); }}}catch (Exception e)
                {

                }
            }
        }
    }
Copy the code

There is one more thing to note in this code: Must ensure that the sc. Register (enclosing the selector, SelectionKey. OP_READ, null) won’t because the selector. The select () is blocked, you can’t give a socketchannel registration. So the idea here is to use a message queue. When a thread wants to register with a selector, it wakes up with selector. Wakeup (), and then registers under selector. This ensures that the select method does not block when registering channels.

Many Worker

The Worker class is explained above. In the actual server code, multiple workers must be used to manage multiple channels, so one Worker array can be used, and each channel registers a certain Worker in the Worker array. In addition, A variable can be used to assign workers to channels in turn, which is implemented like this:

  1. Create an array of workers and initialize each item. In addition, a workerIndex is used to indicate the subscript of the worker assigned to the next channel in the array. It should be noted that since it is a multi-threaded environment, the int type cannot be used, but AtomicInteger type is used.
Worker[] workers = new Worker[5];
for (int i =0; i< workers.length; i++) { workers[i] =new Worker("worker-"+i);
}
AtomicInteger workerindex = new AtomicInteger();
Copy the code

2. Register the worker for the channel, where module length is required to achieve the purpose of recycling and load balancing.

workers[workerindex.getAndIncrement()% workers.length].register(sc);
Copy the code

Server code

Here is all the code for the server:

@Slf4j
public class ThreadNIOServer {
    public static void main(String[] args) {
        try(ServerSocketChannel server = ServerSocketChannel.open()) {
            Thread.currentThread().setName("BOSS");
            server.bind(new InetSocketAddress(8080));
            server.configureBlocking(false);

            Selector boss = Selector.open();
            server.register(boss, SelectionKey.OP_ACCEPT);

            Create a fixed number of workers
            Worker[] workers = new Worker[5];
            for (int i =0; i< workers.length; i++) { workers[i] =new Worker("worker-"+i);
            }
            AtomicInteger workerindex = new AtomicInteger();


            while(true)
            {
                boss.select();

                Set<SelectionKey> selectionKeys = boss.selectedKeys();
                Iterator<SelectionKey> iter = selectionKeys.iterator();
                while (iter.hasNext())
                {
                    SelectionKey key = iter.next();
                    iter.remove();

                    if(key.isAcceptable())
                    {
                        SocketChannel sc = server.accept();
                        sc.configureBlocking(false);
                        log.debug("connected... {}",sc.getRemoteAddress());
                        //2. Associate socket channel with worker
                        log.debug("before connect...");
                        workers[workerindex.getAndIncrement()% workers.length].register(sc);

                        log.debug("after connect...");
                    }
                    else if(key.isReadable())
                    {

                    }
                }
            }
        }catch (Exception e){

        }
    }


    static class Worker implements Runnable{

        private String name;
        private Thread thread;
        private Selector selector;
        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

        public Worker(String name) throws IOException {
            this.name = name;
            thread = new Thread(this, name);
            thread.start();
            selector = Selector.open();
        }


        // Initializes the thread and selector
        public void register(SocketChannel sc) throws IOException {
            // Add tasks to the queue
            queue.add(()->{
                try {
                    sc.register(this.selector,SelectionKey.OP_READ,null);//boss
                } catch(ClosedChannelException e) { e.printStackTrace(); }}); selector.wakeup(); }@Override
        public void run(a) {
            while(true) {try{
                    selector.select();// Worker-0 will block at first and cannot be executed, so wakeup is required to register the worker-0
                    Runnable task = queue.poll();
                    if(task! =null)
                    {
                        task.run();/ / perform the sc. Register (enclosing the selector, SelectionKey. OP_READ, null); //boss
                    }
                    Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = ((SocketChannel) key.channel());
                            channel.read(buffer);
                            buffer.flip();
                            ByteBufferUtil.debugAll(buffer);/ / buffer visualizationkey.cancel(); }}}catch (Exception e)
                {

                }
            }
        }
    }
}
Copy the code