This is the 15th day of my participation in the August More Text Challenge. For details, see:August is more challenging

preface

Speaking of NIO and Netty, the Reactor model must be impossible to be wrapped around, because this model architecture is too classic, so let’s take a look at the cornerstone of Netty – Reactor model.

This article will take you to look at the Reactor model, let you have a shallow and perceptual understanding of the Reactor model.

Design models for traditional services

This is the most traditional Socket service design, there are multiple clients connected to the server, the server will open many threads, one thread for a client service.

In most scenarios, processing a network request involves the following steps:

  1. read: Reads data from the socket.
  2. decodeData on the network is transmitted in the form of bytes. To get a real request, it must be decoded.
  3. computeComputing is business processing. You can do whatever you want.
  4. encode: encoding. Similarly, data on the network is transmitted in the form of bytes, that is, sockets only receive bytes. Therefore, encoding is required.

For more on the pitfalls of this model, read the previous article: An In-depth Analysis of Java IO (ii) BIO

NIO distribution model

NIO is a good solution to the traditional Socket problem:

  1. A thread can listen to more than one Socket, no longer a man when off, wan Fu mo open;
  2. Event-driven: when various events occur, the system can notify me and I will deal with them.

I won’t go into too much detail here, but see my previous article: An In-depth Analysis of Java IO (3) NIO

Reactor model

Reactor, also known as a Reactor model, has the following characteristics:

  • The Reactor model responds to I/O events by assigning appropriate processors.
  • Each processor performs non-blocking operations.
  • Managed by binding handlers to events.

The Reactor model integrates the two advantages of distribution model and event driven, and is especially suitable for processing massive I/O events and high concurrency scenarios.

1. Reactor process for processing requests

The request processing process of Reactor is divided into read and write operations.

For the read operation, the process is as follows:

  • The application registers the read-ready event and the associated event handler.
  • The event dispatcher waits for events to occur.
  • When a read-ready event occurs, the event splitter calls the event handler registered in the first step.

The write operation is similar to the read operation, except that the first step is to register a write-ready event.

2. Reactor

Three roles are defined in the Reactor model.

  • Reactor: is responsible for listening and assigning events, and dispatching I/O events to the corresponding Handler. New events include connection ready, read ready, write ready, and so on.
  • Acceptor: Handles new client connections and dispatches requests to the handler chain.
  • Handler: Binds itself to an event and performs a non-blocking read/write task, completechannelAfter processing the business logic, it is responsible for writing out the resultsChannel. Available resource pools.

According to different application scenarios, the Reactor model can be divided into single Reactor single-thread model, single Reactor multi-thread model and master-slave Reactor multi-thread model.

Single Reactor single thread model

The following figure shows a single-thread Reactor design model. The Reactor thread multiplexes the socket, and the Accept thread receives new connections and dispatches requests to the Handler.

1. Message processing process

The message processing process of single Reactor and single thread model is as follows:

  • The Reactor object monitors connection events through select, and forwards events through Dispatch.
  • If it is a connection establishment event, the Acceptor receives the connection and creates a Handler to handle subsequent events.
  • If it is not a set up connection event, then Reactor distributes the call Handler to respond.
  • Handler will complete read, decode, compute, encode, send and a whole set of processes.

2 and disadvantages

Single Reactor single thread model only separates components in code, but the whole operation is still single thread, which cannot make full use of hardware resources. Handler The service processing is not asynchronous.

For some small capacity applications, the single Reactor and single thread model can be used. However, it is not suitable for high load and high concurrency application scenarios. The main reasons are as follows:

  • Even if the CPU load of the Reactor thread reaches 100%, the read, decode, compute, encode, and send of massive messages cannot be satisfied.
  • When a single Reactor thread is overloaded, the processing speed slows down, causing a large number of client connections to time out. After the timeout, the Reactor thread will be resent, which increases the load of the Reactor thread. In the end, a large number of messages will be backlogged and timed out, which becomes the performance bottleneck of the system.
  • If the Reactor thread is interrupted or enters an infinite loop, the communication module of the whole system becomes unavailable and cannot receive and process external messages. As a result, the Reactor node fails.

In order to solve the above problems, a single Reactor multi-threaded model appears.

Single Reactor multi-threaded model

The following figure shows the multi-threaded design model of a single Reactor. This model adopts multi-threading (thread pool) in the part of event Handler.

1. Message processing process

The message processing process of single Reactor multi-threaded model is as follows:

  • The Reactor object uses select to monitor events requested by clients, and dispatches events after receiving them.
  • If the connection request event is set up, the Acceptor processes the connection request by accepting, and then creates a Handler object to handle the subsequent events after the connection completes.
  • If it is not a set up connection event, then Reactor distributes the call Handler to respond.
  • The Handler is only responsible for responding to events and does not do specific business processing. After reading data, it will distribute the data to subsequent Worker thread pools for business processing.
  • The Worker thread pool allocates independent threads to complete the real business processing and sends the response result to the Handler for processing.
  • After receiving the response result, the Handler sends the response result to the Client.

In contrast to the first model, the business logic is handled by the thread pool, and the Handler sends the response back to the client. In this way, the performance cost of Reactor can be reduced, so that the Reactor can concentrate on event distribution and improve the throughput performance of the whole application.

2 and disadvantages

The single Reactor multi-thread model has the following problems.

  • Multi-threaded data sharing and access is complex. If the subthread completes the business processing and sends the results to the main thread (Reactor) for sending, mutual exclusion and protection mechanism of shared data will be involved.
  • Reactor monitors and responds to all events. It runs only in the main thread, which may cause performance problems. For example, there are millions of concurrent client connections, or the server needs to authenticate the client handshake for security, but the authentication itself is very performance consuming.

In order to solve the performance problems mentioned above, a third master-slave Reactor multithreading model was developed.

Master-slave Reactor multithreaded model

Compared with the single Reactor multi-threaded model, the master-slave Reactor multi-threaded model divides the Reactor into two parts.

  • MainReactor (mainReactor) monitors the Server Socket, processes network I/O connection events, and registers the established SocketChannel to SubReactor.
  • The SubReactor (from the Reactor) mainly interacts with the socket connected to the Reactor for data interaction and event business processing. Generally, the number of subreactors is the same as the number of cpus.

Nginx, Swoole, Memcached, and Netty have all adopted this implementation.

The message processing process of the master-slave Reactor multi-thread model is as follows:

  • A Reactor thread is randomly selected from the main thread pool as the Acceptor thread to bind the listening port and receive client connections.
  • After receiving client connection requests, the Acceptor thread creates a new SocketChannel and registers it with the other Reactor threads in the main thread pool. The SocketChannel is responsible for access authentication, IP blacklist and whitelist filtering, and handshake.
  • After the above steps are completed, the link at the business layer is established, the SocketChannel is removed from the multiplexer of the Reactor thread in the main thread pool, re-registered to the threads in the sub-thread pool, and a Handler is created to handle various connection events.
  • When a new event occurs, the SubReactor calls the Handler corresponding to the connection to respond.
  • After the Handler reads the data, it distributes the data to subsequent Worker thread pools for service processing.
  • The Worker thread pool allocates independent threads to complete the real business processing and sends the response result to the Handler for processing.
  • After receiving the response result, the Handler sends the response result to the Client.

Example of master-slave Reactor multithreaded model

1, the Reactor

public class Reactor implements Runnable {

	private final Selector selector;
	private final ServerSocketChannel serverSocketChannel;

	public Reactor(int port) throws IOException {
		selector = Selector.open(); // Open a Selector
		serverSocketChannel = ServerSocketChannel.open(); // Create a Server channel
		serverSocketChannel.socket().bind(new InetSocketAddress(port)); // Bind the service port
		serverSocketChannel.configureBlocking(false); // In selector mode, all channels must be non-blocking

		// Reactor is the entry. The first events registered to a channel are accept
		SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

		// Bind the Acceptor processing class
		sk.attach(new Acceptor(serverSocketChannel));
	}

	@Override
	public void run(a) {
		try {
			while(! Thread.interrupted()) {int count = selector.select(); // Block before the ready event arrives
				if (count == 0) {
					continue;
				}
				Set<SelectionKey> selected = selector.selectedKeys(); // Get the ready events obtained by this select
				Iterator<SelectionKey> it = selected.iterator();
				while (it.hasNext()) {
					// This is where task distribution takes placedispatch((SelectionKey) (it.next())); } selected.clear(); }}catch(IOException e) { e.printStackTrace(); }}void dispatch(SelectionKey k) {
		// The attached object is Acceptor
		Runnable r = (Runnable) (k.attachment());

		// Call the previously registered callback object
		if(r ! =null) { r.run(); }}}Copy the code

The module content contains two core methods, select and Dispatch, and the module is responsible for listening for ready events and handling the distribution of events. Distribute the attached object as the Acceptor processing class.

2, Acceptor,

public class Acceptor implements Runnable {

	private final ServerSocketChannel serverSocketChannel;

	private final int coreNum = Runtime.getRuntime().availableProcessors(); // Number of CPU cores

	private final Selector[] selectors = new Selector[coreNum]; // Create selector for SubReactor

	private int next = 0; // Use subindex of subReactor

	private SubReactor[] reactors = new SubReactor[coreNum]; // subReactor

	private Thread[] threads = new Thread[coreNum]; // subReactor processing thread

	Acceptor(ServerSocketChannel serverSocketChannel) throws IOException {
		this.serverSocketChannel = serverSocketChannel;
		/ / initialization
		for (int i = 0; i < coreNum; i++) {
			selectors[i] = Selector.open();
			reactors[i] = new SubReactor(selectors[i], i); // Initialize the sub reactor
			threads[i] = new Thread(reactors[i]); // Initialize the thread running the sub reactor
			threads[i].start(); // Run (SubReactor run)}}@Override
	public void run(a) {
		SocketChannel socketChannel;
		try {
			socketChannel = serverSocketChannel.accept(); / / the connection
			if(socketChannel ! =null) {
				System.out.println(String.format("accpet %s", socketChannel.getRemoteAddress()));
				socketChannel.configureBlocking(false);

				// Note that a selector cannot register a new event when it is selected, so pause the segment triggered by the select method.
				// Weakup (weakup) and setRestart (setRestart) do this. Refer to the run method in the SubReactor
				reactors[next].registering(true);
				selectors[next].wakeup(); // Causes a blocked selector operation to return immediately
				SelectionKey selectionKey = 
						socketChannel.register(selectors[next], SelectionKey.OP_READ); // Register a read event
				selectors[next].wakeup(); // Causes a blocked selector operation to return immediately

				// After the event registration is complete, we need to trigger the execution of select again.
				// Set the Restart to false (for details, see SubReactor run).
				reactors[next].registering(false);

				/ / bind Handler
				selectionKey.attach(new AsyncHandler(socketChannel, selectors[next], next));
				if (++next == selectors.length) {
					next = 0; // Redistribute after crossing the boundary}}}catch(IOException e) { e.printStackTrace(); }}}Copy the code

This module processes the connection ready event, initializes a batch of subreactors for distribution, gets the socketChannel from the client, and binds the Handler so that you can continue with the read and write tasks.

3, subReactor

public class SubReactor implements Runnable {
	private final Selector selector;
	private boolean register = false; // Register switch representation
	private int num; // The serial number is the subindex when the Acceptor initializes the SubReactor

	SubReactor(Selector selector, int num) {
		this.selector = selector;
		this.num = num;
	}

	@Override
	public void run(a) {
		while(! Thread.interrupted()) { System.out.println(String.format("NO %d SubReactor waitting for register...", num));
			while(! Thread.interrupted() && ! register) {try {
					if (selector.select() == 0) {
						continue; }}catch (IOException e) {
					e.printStackTrace();
				}
				Set<SelectionKey> selectedKeys = selector.selectedKeys();
				Iterator<SelectionKey> it = selectedKeys.iterator();
				while(it.hasNext()) { dispatch(it.next()); it.remove(); }}}}private void dispatch(SelectionKey key) {
		Runnable r = (Runnable) (key.attachment());
		if(r ! =null) { r.run(); }}void registering(boolean register) {
		this.register = register; }}Copy the code

This class is responsible for the select event that an Acceptor sends itself, which in this case is actually the read and send operations.

4, AsyncHandler

public class AsyncHandler implements Runnable {

	private final Selector selector;

	private final SelectionKey selectionKey;
	private final SocketChannel socketChannel;

	private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
	private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);

	private final static int READ = 0; // Read ready
	private final static int SEND = 1; // The response is ready
	private final static int PROCESSING = 2; / / processing

	private int status = READ; // All connections start with a read action

	private int num; // From the reactor number

	// Enable the asynchronous processing thread pool with 4 threads
	private static final ExecutorService workers = Executors.newFixedThreadPool(5);

	AsyncHandler(SocketChannel socketChannel, Selector selector, int num) throws IOException {
		this.num = num; // The Handler is marked to distinguish which execution is triggered from the reactor
		this.socketChannel = socketChannel; // Receive the client connection
		this.socketChannel.configureBlocking(false); // Set to non-blocking mode
		selectionKey = socketChannel.register(selector, 0); // Register the client with the selector
		selectionKey.attach(this); // Attach a Handler object, currently a Handler object
		selectionKey.interestOps(SelectionKey.OP_READ); // The connection is complete, so the next step is to read
		this.selector = selector;
		this.selector.wakeup();
	}

	@Override
	public void run(a) {
		// If a task is being processed asynchronously, then the run does not trigger any processing directly.
		// Read and send are only responsible for simple data reading and response. Business processing does not block processing here at all
		switch (status) {
		case READ:
			read();
			break;
		case SEND:
			send();
			break;
		default:}}private void read(a) {
		if (selectionKey.isValid()) {
			try {
				readBuffer.clear();

				// The end of the read method means that "read ready" changes to "read Done", marking the end of a ready event
				int count = socketChannel.read(readBuffer);
				if (count > 0) {
					status = PROCESSING; // Set to processing
					workers.execute(this::readWorker); // Asynchronous processing
				} else {
					selectionKey.cancel();
					socketChannel.close();
					System.out.println(String.format("NO %d SubReactor read closed", num)); }}catch (IOException e) {
				System.err.println("An exception occurred while processing a read service! Exception message:" + e.getMessage());
				selectionKey.cancel();
				try {
					socketChannel.close();
				} catch (IOException e1) {
					System.err.println("An exception occurred while closing the channel for read service! Exception message:"+ e.getMessage()); }}}}void send(a) {
		if (selectionKey.isValid()) {
			status = PROCESSING; // Set to execute
			workers.execute(this::sendWorker); // Asynchronous processing
			selectionKey.interestOps(SelectionKey.OP_READ); // Reset to read}}// Business processing after reading the information
	private void readWorker(a) {
		try {

			// Simulate a time-consuming operation
			Thread.sleep(5000L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		try {
			System.out.println(String.format("NO %d %s -> Server: %s", 
					num, socketChannel.getRemoteAddress(),
					new String(readBuffer.array())));
		} catch (IOException e) {
			System.err.println("An exception occurred while processing read services asynchronously! Exception message:" + e.getMessage());
		}
		status = SEND;
		selectionKey.interestOps(SelectionKey.OP_WRITE); // Register write events
		this.selector.wakeup(); // Wake up the thread blocked in SELECT
	}

	private void sendWorker(a) {
		try {
			sendBuffer.clear();
			sendBuffer.put(String.format("NO %d SubReactor recived %s from %s", num,
					new String(readBuffer.array()), 
					socketChannel.getRemoteAddress()).getBytes());
			sendBuffer.flip();

			// The end of the write method indicates that the write is complete, marking the end of an event
			int count = socketChannel.write(sendBuffer);

			if (count < 0) {
				// In the write scenario, the value -1 also means that the client is disconnected
				selectionKey.cancel();
				socketChannel.close();
				System.out.println(String.format("%d SubReactor send closed", num));
			}

			// If the connection is not disconnected, switch to read again
			status = READ;
		} catch (IOException e) {
			System.err.println("An exception occurred during asynchronous send processing! Exception message:" + e.getMessage());
			selectionKey.cancel();
			try {
				socketChannel.close();
			} catch (IOException e1) {
				System.err.println("An exception occurred when the send service was closed! Exception message:"+ e.getMessage()); }}}}Copy the code

AsyncHandler takes care of the next read and write operations.

5, MainSubReactorDemo

public class MainSubReactorDemo {

    public static void main(String[] args) throws IOException {
        new Thread(new Reactor(2333)).start(); }}Copy the code

The client

1, the Connector

public class Connector implements Runnable {

	private final Selector selector;

	private final SocketChannel socketChannel;

	Connector(SocketChannel socketChannel, Selector selector) {
		this.socketChannel = socketChannel;
		this.selector = selector;
	}

	@Override
	public void run(a) {
		try {
			if (socketChannel.finishConnect()) {
				// The connection is complete (three handshakes with the server are completed)
				System.out.println(String.format("connected to %s", socketChannel.getRemoteAddress()));

				// After the connection is established, the Handler will handle the following actions (read/write, etc.)
				newHandler(socketChannel, selector); }}catch(IOException e) { e.printStackTrace(); }}}Copy the code

2, the Handler

public class Handler implements Runnable {

	private final SelectionKey selectionKey;
	private final SocketChannel socketChannel;

	private ByteBuffer readBuffer = ByteBuffer.allocate(2048);
	private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);

	private final static int READ = 0;
	private final static int SEND = 1;

	private int status = SEND; // In contrast to the server, the default is to start sending data

	private AtomicInteger counter = new AtomicInteger();

	Handler(SocketChannel socketChannel, Selector selector) throws IOException {
		this.socketChannel = socketChannel; // Receive the client connection
		this.socketChannel.configureBlocking(false); // Set to non-blocking mode
		selectionKey = socketChannel.register(selector, 0); // Register the client with the selector
		selectionKey.attach(this); // Attach a Handler object, currently a Handler object
		selectionKey.interestOps(SelectionKey.OP_WRITE); // The connection is set up. The next step is to read
		selector.wakeup(); // Evoke select blocking
	}

	@Override
	public void run(a) {
		try {
			switch (status) {
			case SEND:
				send();
				break;
			case READ:
				read();
				break;
			default:}}catch (IOException e) {
			// If the client is writing/reading data from the server, the client is writing/reading data from the server.
			// If the server suddenly disconnects due to network or other reasons, the client should close itself and exit the program
			System.err.println("An exception occurred while sending or reading! Exception message:" + e.getMessage());
			selectionKey.cancel();
			try {
				socketChannel.close();
			} catch (IOException e2) {
				System.err.println("An exception occurred while closing the channel! Exception message:"+ e2.getMessage()); e2.printStackTrace(); }}}void send(a) throws IOException {
		if (selectionKey.isValid()) {
			sendBuffer.clear();
			int count = counter.incrementAndGet();
			if (count <= 10) {
				sendBuffer.put(String.format("msg is %s", count).getBytes());
				sendBuffer.flip(); // Switch to read mode, which allows the channel to read data from the buffer
				socketChannel.write(sendBuffer);

				// Switch to read again to receive the response from the server
				status = READ;
				selectionKey.interestOps(SelectionKey.OP_READ);
			} else{ selectionKey.cancel(); socketChannel.close(); }}}private void read(a) throws IOException {
		if (selectionKey.isValid()) {
			readBuffer.clear(); // Switch to buffer write mode, which allows the channel to write its own contents to the buffer
			socketChannel.read(readBuffer);
			System.out.println(String.format("Server -> Client: %s".new String(readBuffer.array())));

			// After receiving the response from the server, continue to send data to the server
			status = SEND;
			selectionKey.interestOps(SelectionKey.OP_WRITE); // Register write events}}}Copy the code

3, NIOClient

public class NIOClient implements Runnable {

    private Selector selector;

    private SocketChannel socketChannel;

    NIOClient(String ip, int port) {
        try {
            selector = Selector.open(); // Open a Selector
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false); // Set to non-blocking mode
            socketChannel.connect(new InetSocketAddress(ip, port)); // Connect the service
            
            // Entry, the initial events registered to a client channel are connection events
            SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_CONNECT);
            
            // Attach handler class, first initializing the connection ready handler class
            sk.attach(new Connector(socketChannel, selector));
        } catch(IOException e) { e.printStackTrace(); }}@Override
    public void run(a) {
        try {
            while(! Thread.interrupted()) {// Block before the ready event arrives
                selector.select();
                
                // Get the ready events obtained by this select
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    // This is where task distribution takes placedispatch((SelectionKey) (it.next())); } selected.clear(); }}catch(IOException e) { e.printStackTrace(); }}void dispatch(SelectionKey k) {
    	// Attach object to Connector (
        Runnable r = (Runnable) (k.attachment()); 
        
        // Call the previously registered callback object
        if(r ! =null) { r.run(); }}}Copy the code

4, ClientDemo

public class ClientDemo {

    public static void main(String[] args) {
        new Thread(new NIOClient("127.0.0.1".2333)).start();
        new Thread(new NIOClient("127.0.0.1".2333)).start(); }}Copy the code

5, test,

Run the above application and client and output the following in the console:

NO 2 SubReactor waitting for register...
NO 1 SubReactor waitting for register...
NO 3 SubReactor waitting for register...
NO 0 SubReactor waitting for register...
accpet /127.0. 01.:63223
NO 0 SubReactor waitting for register...
accpet /127.0. 01.:63226
NO 1 SubReactor waitting for register...
NO 0 /127.0. 01.:63223-> Server: MSG1                                                                                       
NO 1 /127.0. 01.:63226-> Server: MSG1                                                                                       
NO 0 /127.0. 01.:63223-> Server: MSG2                                                                           
NO 1 /127.0. 01.:63226-> Server: MSG2                                                                                       
NO 0 /127.0. 01.:63223-> Server: MSG3                                                                              
NO 1 /127.0. 01.:63226-> Server: MSG3                                                                                       
Copy the code

conclusion

The above is a detailed introduction of the Reactor model. I believe that the students have a certain understanding of the Reactor model and a deeper understanding of the Netty architecture. In the next section we dig deeper into the Netty source code.

At the end

I am a code is being hit is still trying to advance. If the article is helpful to you, remember to like, follow yo, thank you!