preface

In a previous article on NioEventLoop source code, Netty used MpscQueue instead of the blocking queue provided by the JDK. Because of the length of the article, MpscQueue was not covered in detail. Today, it’s here!!

Earlier versions of Netty used a self-implemented task queue, which has since been replaced with the JCTools lock-free queue. That’s all, because it’s so efficient.

What is Mpsc?? JCTools provides many queues, and you need to choose the right queue for different application scenarios to avoid potential problems. Here is an explanation of the meaning of “MSPC” as follows:

  • M: Multiple.
  • S: That’s right.
  • P: He is a Producer.
  • C) Consumer D) Consumer

So MpscQueue is simply a high performance lock free queue for multiple producers, single consumers!

As mentioned in the previous article, NioEventLoop is a single-threaded thread pool, and tasks submitted to the EventLoop are serialized by the thread. So the production and consumption model of EventLoop’s task queue is: multiple producers, single consumers.

So this article will focus on the MpscQueue, and the rest of the queue will be studied by yourself.

MpscQueue source code analysis

The MpscQueue is not provided by Netty, so it can not be seen in the Netty project source code. .

Netty default queue is org. Jctools. The queues. MpscUnboundedArrayQueue, here only analyze it. The MpscUnboundedArrayQueue is an unbounded queue for “multi-producer, single-consumer”, meaning it has no capacity limit and you can keep submitting tasks to it, even if there is no consumer consumption data.

The MpscUnboundedArrayQueue class has a lot of inheritance relationships, and the class diagram is a bit complicated, but that’s ok, we don’t have to analyze all the code, just focus on its immediate parentBaseMpscLinkedArrayQueueThat’s it. The core logic is there. Got itBaseMpscLinkedArrayQueueYou basically know the idea of an implementation.

To illustrate, MpscQueue has a lot of code like the following:

byte b000,b001,b002,b003,b004,b005,b006,b007;// 8b
byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
Copy the code

The reader doesn’t have to pay too much attention to it. These attributes are not useful for anything other than byte padding, which involves CPU hardware Cache lines.

In addition to the main Memory of the computer, each CPU core has its own separate cache, which is divided into level 1 cache, level 2 cache and so on, the closer the CPU core, the higher the cache efficiency. Why does the CPU have a cache? To fill the gap, cpus were added to multilevel caching, where the CPU caches data from main memory and performs calculations that are written to main memory at some point in the future.

The smallest unit of data cached by a CPU is the Cache Line, which on most cpus is 64 bytes. If any data in the Cache Line fails, the entire Cache Line is considered invalid and needs to be reloaded from main memory.

Therefore, if Java objects/properties are cached on the same Cache Line, it is possible that all threads’ Cache lines will be invalidated because other threads have modified some data in that block, causing all threads to load from main memory again, causing unnecessary overhead.

To sum up, MpscQueue is optimized to allocate frequently read and written data to different Cache lines to avoid interaction.

So, back to the MpscUnboundedArrayQueue, let’s talk about one of the ways it’s implemented.

The basic data structure of MpscUnboundedArrayQueue consists of an “array + linked list”. It has two Pointers: producerBuffer and consumerBuffer, which point to arrays of producer production and consumer consumption, respectively. It also has two index Pointers: producerIndex and consumerIndex, which represent the producer’s production index and the consumer’s consumption index, respectively, and are incremented by two steps. It also has a producerLimit pointer for the producer, which represents the upper limit at which the producer can produce messages, and the Queue is expanded by creating a new array of the same length, and then the last element of the old array points to the new array to form a one-way linked list.

The author draws a diagram to describe the data structure change of MpscQueue:

The overall process is almost said, the following start to analyze the source code, first look at a few more important attributes:

// Producer index
private volatile long producerIndex;
ProducerIndex == producerLimit indicates that the queue needs to be expanded
private volatile long producerLimit;
protected long producerMask;
// The array to which the current producer points
protected E[] producerBuffer;

// Consumer index
private volatile long consumerIndex;
protected long consumerMask;
// The array to which the current consumer points
protected E[] consumerBuffer;

// After the array is filled with producers, a JUMP will be filled, representing the queue expansion, and the consumer will consume the next array when it encounters the JUMP.
private static final Object JUMP = new Object();

// After the consumer has consumed a complete array, the last element is set to BUFFER_CONSUMED.
private static final Object BUFFER_CONSUMED = new Object();
Copy the code

The MpscQueue is made up of arrays. ChunkSize is the size of the array. It must be a power of two.

public MpscUnboundedArrayQueue(int chunkSize) {
    super(chunkSize);
}
Copy the code

In the constructor of the parent class, we evaluate the mask, initialize an array, point both producerBuffer and consumerBuffer to the same array, and then set producerLimit based on the mask.

Assuming initialCapacity is 8, the length of the array is 9, because the last element will be used to hold the address of the expanded array, forming a linked list. Each array also reserves a slot for the JUMP element, which indicates that the queue has been expanded. When a consumer meets a JUMP element, he/she will find the expanded array through the last element and continue to consume. Therefore, a maximum of seven elements can be reserved in an array.

/** * initialize *@paramInitialCapacity The array capacity, which requires a power of 2 */
public BaseMpscLinkedArrayQueue(final int initialCapacity) {
	InitialCapacity must be greater than or equal to 2
	RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2."initialCapacity");

	// Make sure the capacity is a power of 2, find the next power of 2 under initialCapacity
	int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);

	// Limit = (capacity-1)*2
	long mask = (p2capacity - 1) < <1;
	// need extra element to point at next array
	// An extra element is required to link the next array
	E[] buffer = allocateRefArray(p2capacity + 1);
	// Producer and consumer buffers point to the same array
	producerBuffer = buffer;
	producerMask = mask;
	consumerBuffer = buffer;
	consumerMask = mask;
	// set producerLimit = mask
	soProducerLimit(mask);
}
Copy the code

Once the Queue is initialized, there is a continuous stream of production and consumption data, so I’ll focus on the Offer () and poll() methods.

Offer () analysis

Offer (e) adds element E to the queue, the production data. In MpscQueue, queues cannot hold empty data, so non-empty data is checked first. The thread incrementing producerIndex by 2 steps via CAS. CAS guarantees that only one thread succeeds. If CAS succeeds, the thread can add element E to the specified slot in the array. If the CAS fails, the concurrency fails and the CAS will spin and retry.

ProducerIndex has not reached producerLimit. If producerLimit has reached the upper limit, the queue may need to be expanded. The offerSlowPath() method determines whether the queue needs to be expanded. If the queue needs to be expanded, only one thread is assigned to expand the queue. Here, another CAS operation is performed, and the thread incrementing producerIndex by 1 step.

Therefore, in the logic of offer(e), producerIndex is also judged to be odd, if it is odd, it indicates that the queue is expanding. Because MpscQueue expands very quickly, it doesn’t need to migrate elements, it just creates a new array and connects to the old array, so there’s no need to suspend other threads. When the thread finds that the queue is expanding, it spins and retries until it’s done.

/** * add an element e to the queue, production data *@param e
 * @return* /
@Override
public boolean offer(final E e) {
	if (null == e) { // Non-null check
		throw new NullPointerException();
	}

	long mask;
	E[] buffer;// The array to which the producer points
	long pIndex;// Production index

	while (true) {
		long producerLimit = lvProducerLimit();
        // Get the producer index
		pIndex = lvProducerIndex();
		// The production index is incremented by 2 steps and is not usually odd, as in offerSlowPath() it is set to odd
		if ((pIndex & 1) = =1) {
			// An odd number indicates expansion, spin, and wait for expansion to complete
			continue;
		}

		mask = this.producerMask;
		buffer = this.producerBuffer;
		// If the production index reaches producerLimit, it may need to be expanded.
		if (producerLimit <= pIndex) {
			int result = offerSlowPath(mask, pIndex, producerLimit);
			switch (result) {
				case CONTINUE_TO_P_INDEX_CAS:
					//producerLimit reaches the limit, but some data has been consumed in the current array.
					break;
				case RETRY://CAS failed. Try again
					continue;
				case QUEUE_FULL:// Queue full, offer failed
					return false;
				case QUEUE_RESIZE:// Capacity needs to be expanded
					resize(mask, buffer, pIndex, e, null);
					return true; }}if (casProducerIndex(pIndex, pIndex + 2)) {
			// CAS succeeded in incrementing producerIndex
			break; }}final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
	// Replace the specified position of the buffer array with e, not based on the subscript, but on the address offset of the slot, UNSAFE.
	soRefElement(buffer, offset, e); // release element e
	return true;
}
Copy the code

OfferSlowPath () tells threads if the queue is full, needs to be expanded, or needs spin retries. If the producerIndex reaches producerLimit, it does not mean that the queue has to be expanded. If the consumer has already consumed some of the array elements pointed by the producer, it means that there are still slots in the array that can be used.

/ * * *@param mask
 * @paramPIndex Indicates the producer index *@paramProducerLimit indicates the producerLimit *@return* /
private int offerSlowPath(long mask, long pIndex, long producerLimit) {
	// Consumer index
	final long cIndex = lvConsumerIndex();
	// The size of the array buffer, (length -1) * 2
	long bufferCapacity = getCurrentBufferCapacity(mask);

	// Consumption index + current array capacity > production index indicates that some elements of the current array have been consumed. The slots that have been consumed will not be expanded.
	if (cIndex + bufferCapacity > pIndex) {
		if(! casProducerLimit(producerLimit, cIndex + bufferCapacity)) {// CAS failed, spin again
			return RETRY;
		} else {
			// Retry CAS to modify the production index
			returnCONTINUE_TO_P_INDEX_CAS; }}// Queue is full based on producer and consumer indexes. Unsolved queues are never full
	else if (availableInQueue(pIndex, cIndex) <= 0) {
		return QUEUE_FULL;
	}
	// grab index for resize -> set lower bit
	// CAS increses producerIndex by 1, odd number indicates resize
	else if (casProducerIndex(pIndex, pIndex + 1)) {
		return QUEUE_RESIZE;
	} else {
		// resize failed. Try again
		returnRETRY; }}Copy the code

If capacity expansion is required, the thread will CAS to change producerIndex to an odd number, so that other threads can sense that the queue is expanding, and the line that needs to produce data spins first, and then continue operations after capacity expansion is complete.

Resize () is the core method for scaling. It first creates a new array of the same length, points the producerBuffer to the new array, and then places the element E in the new array, with the last element of the old element pointing to the new array, forming a linked list. It also fills the slot of the old element with the JUMP element, representing queue expansion.

// Expand capacity: create a new E[] and connect oldBuffer and newBuffer.
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
	assert(e ! =null && s == null) || (e == null|| s ! =null);
	// The length of the next Buffer, MpscQueue will build a Buffer of the same length
	int newBufferLength = getNextBufferSize(oldBuffer);
	final E[] newBuffer;
	try {
		// create a new E[]
		newBuffer = allocateRefArray(newBufferLength);
	} catch (OutOfMemoryError oom) {
		assert lvProducerIndex(a) == pIndex + 1;
		soProducerIndex(pIndex);
		throw oom;
	}

	// The producer Buffer points to the new E[]
	producerBuffer = newBuffer;
	// Calculate the new Mask. If the Buffer length is unchanged, the Mask is also unchanged
	final int newMask = (newBufferLength - 2) < <1;
	producerMask = newMask;

	// Set the oldBuffer JUMP element according to the offset, increment and then reset, loop after loop
	final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
	// With Mask unchanged, oldBuffer JUMP corresponds to newBuffer's consumption position.
	final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

	// Put the element e in the new array
	soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
	The last element of the old array holds the address of the new array.
	soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);

	// Consumer index
	final long cIndex = lvConsumerIndex();
	// Check if Queue is full against consumer and producer indexes. For unbounded queues, integer.max_value is returned and will never be full.
	final long availableInQueue = availableInQueue(pIndex, cIndex);
	RangeUtil.checkPositive(availableInQueue, "availableInQueue");

	// Set new producerLimit
	soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

	/* During capacity expansion, producerIndex will be set to pIndex+1. An odd number indicates that capacity expansion is in progress. Non-capacity expansion threads will spin again and wait for capacity expansion to complete. Now that the element has been queued, set producerIndex to pIndex+2 to let other threads know that capacity expansion is complete. * /
	soProducerIndex(pIndex + 2);

	/* Set the specified position of the old array to JUMP. The consumer will know when the queue is expanded and will look for the array connected to next. * /
	soRefElement(oldBuffer, offsetInOld, JUMP);
}
Copy the code

Offer () the main process is this: CAS grabs slots, ensuring that only a single thread can produce, and the thread whose CAS fails spins and retries. If a queue needs to be expanded, producerIndex is set to an odd number, and other threads spin until capacity expansion is complete. After capacity expansion, producerIndex is set to an even number to notify other threads to continue production.

Poll () analysis

The element is produced in order to call poll() for consumption.

Poll () first finds the current consumption Array pointed to by the consumerBuffer, calculates the memory address offset of the element to be consumed relative to the Array according to the consumption index consumerIndex, and retrieves the element from that offset.

If the element is null, it does not mean that the queue is empty. If the index is different from that of the producerIndex, then the producerIndex must be greater than the producerIndex, indicating that the producer is already producing. ProducerIndex was moved, but there was no time to fill the array with elements. Since the producer increments the producerIndex before filling the array, there is a very short time difference between the two steps. If the consumer happens to consume data within this time difference, it spins and waits for the producer to fill the array.

If the element is JUMP, it indicates that the queue has been expanded. The consumer needs to find the new array based on the last element of the array and consume the elements of the new array.

Poll () does not do concurrency control.
@Override
public E poll(a) {
	/* The consumerBuffer and producerBuffer are initialized in the constructor of the Queue, and when initialized, they point to the same array. As the producer continues to produce data and the Queue expands, the producerBuffer slowly points to a new array. * /
	final E[] buffer = consumerBuffer;
	// Consumer index
	final long index = lpConsumerIndex();
	final long mask = consumerMask;

	// Computes the address offset of the element in the array that the consumer needs to consume
	final long offset = modifiedCalcCircularRefElementOffset(index, mask);
	// Fetch element e according to offset
	Object e = lvRefElement(buffer, offset);
	if (e == null) {
		if(index ! = lvProducerIndex()) {/* SetproducerIndex (); /* setproducerIndex (); There is a time lag in between, at which point it spins and waits for the element to complete. * /
			do {
				e = lvRefElement(buffer, offset);
			}
			while (e == null);
		} else {// The element has been consumed
			return null; }}if (e == JUMP) {// Indicates that the queue is expanded
		/* The last element of the current array is used to retrieve the next array to be consumed, and the consumer also sets the last element to BUFFER_CONSUMED, indicating that the current array is consumed. * /
		final E[] nextBuffer = nextBuffer(buffer, mask);
		// Consume elements from the new array
		return newBufferPoll(nextBuffer, index);
	}

	// After fetching the element, set the original slot to null
	soRefElement(buffer, offset, null);
	/ / increment consumerIndex
	soConsumerIndex(index + 2);
	return (E) e;
}
Copy the code

If the queue is expanded, nextBuffer() finds the new array, and it sets the last element of the old array to BUFFER_CONSUMED, meaning the current array has been consumed and removed from the list.

// Find the new array
private E[] nextBuffer(final E[] buffer, final long mask) {
    // Computes the address offset of the last element of the array
    final long offset = nextArrayOffset(mask);
    // Find the next array
    final E[] nextBuffer = (E[]) lvRefElement(buffer, offset);
    // The consumer Buffer points to the new array
    consumerBuffer = nextBuffer;
    // recalculate the Mask
    consumerMask = (length(nextBuffer) - 2) < <1;
    // Set the last element of the old array to BUFFER_CONSUMED.
    soRefElement(buffer, offset, BUFFER_CONSUMED);
    return nextBuffer;
}
Copy the code

After getting the new array, newBufferPoll() is called to consume data from the new array:

// Consume data from the expanded array with the same index index
private E newBufferPoll(E[] nextBuffer, long index) {
	The consumerIndex calculates the memory offset of the element to be consumed relative to the Array
	final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
	// Fetch the element according to offset
	final E n = lvRefElement(nextBuffer, offset);
	if (n == null) {// The offer() element cannot be null
		throw new IllegalStateException("new buffer must have at least one element");
	}
	// Set that slot to null after the element is fetched
	soRefElement(nextBuffer, offset, null);
	/ / increment consumerIndex
	soConsumerIndex(index + 2);
	return n;
}
Copy the code

After the consumer takes out the data, the original slot of the array will be filled with NULL, which means that the slot is not used and can be reused.

At this point, the analysis of poll() consumption process is complete, and it can be seen that there is no suspended thread in the whole process, at most it is spin wait.

conclusion

MpscQueue is a “multi-producer single-consumer” high-performance lockless queue, which conforms to the task consumption model of Netty EventLoop. It uses a lot of CAS operations, and where concurrency control is required, ensure that only one thread succeeds, and that the other CAS threads fail with spin retries, all without locking or blocking. These processes, whether expanding or waiting for elements to fill the array, are extremely fast, so a short spin is more efficient than a thread hanging and then waking up. MpscQueue consists of a series of arrays, the last element of which points to the next array, forming a one-way linked list. When the array is expanded, the JUMP element is filled in the original slot, and the consumer knows to look for a new array to consume.

MpscQueue is lock-free, non-blocking, and provides a performance improvement over the synchronous blocking queue provided by the JDK, which is why later versions of Netty replaced task queues with JCtools.