Make writing a habit together! This is the third day of my participation in the “Gold Digging Day New Plan · April More text Challenge”. Click here for more details

The basic concept

  • Disruptor, a high-performance asynchronous processing framework, is a lightweight Java message service JMS that enables concurrent operations on queues without locking
  • Disruptor uses a circular array to implement queue-like functionality and is a bounded queue. It is typically applied to producer-consumer scenarios
  • Disruptor is an implementation of the Observer pattern
  • DisruptorSolve performance problems through the following three design solutions:
    • Annular array structure:
      • To avoid garbage collection, use arrays instead of linked lists
      • Arrays are more cache-friendly to the processor
    • Element location:
      • The length of the array is 2^n^, and the speed of positioning can be improved by bit operation
      • The subscripts of elements in an array are incremented
      • Index is of the long type, so there is no need to worry about index overflow
    • Lockless design:
      • Each producer or consumer thread first requests the location of an element in the array that it can operate on. If the request is successful, it writes or reads data directly from the requested location
  • DisruptorandBlockingQueueComparison:
    • BlockingQueue: FIFO queue. When a Producer publishes an event to a queue, consumers get notified. If there are no consuming events in the queue, the consumer is blocked until the producer issues a new event
    • Disruptor can do more than BlockingQueue:
      • Disruptor queues can have multiple consumers for the same event that can be processed in parallel or in a dependency graph that can be processed sequentially
      • Disruptor can pre-allocate memory space for storing event content
      • Disruptor uses an extremely optimized and lock-free design to achieve extremely high performance
  • Typically, a high-performance concurrent queue Disruptor can be used if there are two separate threads of processing
  • Benefits of Disruptor:
    • The use of lockless queues to achieve concurrent operations is very high performance
    • An implementation in which all visitors record their own serial numbers allows multiple producers and consumers to share the same data structure
    • The serial number can be tracked in each object, including RingBuffer,WaitStrategy, Producer, and Consumer. The cache row padding is used to fill the cache line padding, so there are no pseudo-shares or unexpected contention

Disruptor application

Define events

  • Event: events. Data types exchanged in Disruptor queues

Defining event Factories

  • Event Factory: Defines an Event instantiation method that instantiates an Event. Need to implement com interfaces. Lmax. Disruptor. EventFactory < T >
  • DisruptorThrough the event factoryEventFactoryinRingBufferPre-created events inEventAn instance of the
    • An Event instance is similar to a data slot
    • Before publishing, Producer obtains an Event instance from Ringbuffer
    • The Producer then populates the Event instance with data, which is then published to the RingBuffer
    • Finally, the Consumer gets the Event instance and reads the data in the instance

Define the event handling implementation

  • By implementing the interface com. Lmax. Disruptor. EventHandler < T > define the event handling specific implementation

Define the thread pool for event handling

  • Disruptor through Java. Util. Concurrent. The ExecutorService thread to trigger the Consumer’s Consumer event handling

Specify wait policy

  • Disruptor used in the strategy pattern defines Consumer Consumer handle events waiting strategy, through the com. Lmax. Disruptor. WaitStrategy interface implementation
  • There are three common implementations of WaitStrategy:Each strategy has different performance and advantages and disadvantages. Choosing the right strategy based on the hardware characteristics of the CPU in the actual running environment and using specific JVM configuration startup parameters can achieve different performance gains
    • BlockingWaitStrategy:
      • The minimum performance
      • Minimum CPU consumption
      • Provides more consistent performance across different deployment environments
    • SleepingWaitStrategy:
      • Performance and CPU consumption are similar to BlockingWaitStrategy
      • The producer thread is least affected
      • It is suitable for asynchronous log scenarios
    • YieldingWaitStrategy:
      • The highest performance
      • Suitable for use in low latency systems
      • This wait strategy is recommended for scenarios that require very high performance and where the number of event processing threads is smaller than the number of CPU logical cores
      • For example, the CPU enables hyperthreading

Start the Disruptor

EventFactory<Event> eventFactory = new EventFactory();
int ringBufferSize = 1024*1024;

Disruptor<Event> disruptor = new Disruptor<Event>(eventFactory, ringBufferSize, executor, ProcedureType.SINGLE, blockingWaitStrategy);
EventHandler<Event> eventHandler = new EventHandle();
disruptor.handleEventsWith(eventHandler);
disruptor.start();
Copy the code

Publish event

  • DisruptorIn the eventEventThe release of thePublishThe process is a two-phase commit process:
    • Step 1: Get the sequence number of the next writable event from RingBuffer
    • Step 2: Obtain the corresponding Event object and write the data to the Event object
    • Step 3: Submit the event to RingBuffer
  • Disruptor requires the ringbuffer.publish () method to be called. That is, publish() is executed even if an exception occurs, which requires the Producer to determine the correctness and integrity of the data carried in the implementation of the event processing

Close the Disruptor

  • Disruptor.shutdown () : shutdown disruptor. Method blocks until all events have been processed
  • Executor.shutdown () : Closes the thread pool used by Disruptor. If a thread pool needs to be shutdown, you must manually shut it down. Disruptor does not automatically shutdown the used thread pool when shutdown

Disruptor principle

The core concept

RingBuffer

  • RingBuffer: indicates the RingBuffer
  • RingBuffer begins with 3.0 and is solely responsible for storing and updating event data exchanged via Disruptor
  • In advanced application scenarios for Disruptor,RingBuffer can be replaced with a user-defined implementation

Sequence Disruptor

  • Manage event data exchanged via Sequence Disruptor using sequential increment ordinals, and always process events incrementally
  • Sequence is used to track the progress of processing that identifies a particular event handler, including RingBuffer and Consumer
  • Using Sequence to identify progress prevents the problem of pseudo Sharing Flase Sharing between CPU caches of different sequences

Sequencer

  • Sequencer is at the heart of Disruptor
  • SequencerInterfaces have two implementation classes:
    • SingleProducerSequencer
    • MultiProducerSequencer
    • This is a concurrency algorithm that defines fast, correct data transfer between producers and consumers

Sequence Barrier

  • Used to keep Sequence references to the Published Sequence and the remaining Consumer dependencies of the RingBuffer
  • The Sequence Barrier defines the logic for whether the Consumer has any more events that it can handle

WaitStrategy

  • WaitStrategy defines the strategy for the Consumer to wait for events

Event

  • Disruptor Data exchanged between producers and consumers is called an Event
  • The Event type is not defined by Disruptor, but is specified by the user of the Disruptor from the definition

EventProcessor

  • The EventProcessor holds the Sequence of the specified Consumer and provides an EventLoop for invoking the event-processing implementation

EventHandler

  • Disruptor the event handling interface defined in Disruptor, which is implemented by the Consumer for specific event handling, is the true implementation of Consumer

Producer

  • He is a Producer. Generally refers to the caller of a Disruptor publishing event. No specific interfaces or types are defined in Disruptor

Memory preallocation

  • RingBufferUse an arrayObject[] entriesTo store elements:
    • When RingBuffer is initialized, all array elements entries are specified as a specific Event parameter, and the detail property in the Event is null
    • When producers write messages to RingBuffer,RingBuffer does not directly point the array element entries to the Event object, but obtains the Event object first and changes the detail attribute in the Event object
    • When consuming, the consumer also reads the Event from the RingBuffer and reads the detail property in the Event object
    • Thus, during production and consumption, the array entries in RingBuffer do not change, no temporary objects are generated, and the elements in the array remain alive until RingBuffer dies
  • By doing this, you can minimize the frequency of garbage collection GC in the JVM and improve performance
private void fill(EventFactory<E> eventFactory) {
	for (int i = 0; i < bufferSize; i++) {
		// Initialize the entries element in the array using the factory methodentries[BUFFER_PAD + i] = eventFactory.newInstance(); }}Copy the code

Eliminating pseudo sharing

  • Disruptor pseudo-sharing: If two independent concurrent variables are located on the same cache row, they can affect each other’s cache validity in the case of concurrency, thereby affecting the performance of concurrent operations
  • DisruptorTo eliminate fake sharing:
    • Sequence. Java is populated with multiple long variables, ensuring that a Sequence number monopolizes a cache line
private static class Padding {
	public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7; 
}
Copy the code

Eliminate locks and CAS operations

  • Disruptor eliminates locks and CAS operations by combining sequencebarriers and sequences to coordinate and manage processing relationships between consumers and producers
  • DisruptorEach consumer and producer has its own serial numberSequence,The serial numberSequenceThe following conditions must be met:
    • Condition 1: The Sequence number of the consumer must be smaller than the Sequence number of the producer
    • Condition 2: The value of the Sequence number of the consumer must be smaller than the value of the Sequence number of the consumer in front of the dependency
    • Condition 3: The Sequence number of the producer cannot be greater than that of the Sequence number that the consumer is consuming. This prevents the producer from overwriting the event messages that have not been consumed
  • Conditions one and two are implemented in the waitFor() method of the SequenceBarrier:
/** * Waiting for the given serial number value can be used by the consumer **@paramSequence The next ordinal value * that the consumer expects to obtain@returnLong Specifies the serial number */ available to the consumer
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
	checkALert();

	AvailableSequence = availableSequence = availableSequence = availableSequence = availableSequence = availableSequence The relationship between the two depends on the waitStrategy * -yieldingWaitStrategy used: After 100 spins, the smallest sequence in the sequence is returned directly. There is no guarantee that the value returned is greater than or equal to the given sequence value * -BlockingWaitStrategy: Block until a given sequence number is available. Instead of returning a value equal to the given sequence number, the returned value is greater than or equal to the given sequence number */
	long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

	// If the currently available ordinal value is less than the given ordinal value, the currently available ordinal value is returned and the caller EventProcessor continues to wait
	if (availableSequence < sequence) {
		return sequence;
	}

	/ / batch
	return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
Copy the code
  • Condition 3 is the SequenceBarrier established for producers. The logical judgment occurs when producers obtain the next available entry from RingBuffer, and RingBuffer will entrust Sequencer to process the acquisition of the next available entry:
@Override
public long next(a) {
	if (n < 1) {
		throw new IllegalArgumentException("n must be > 0");
	}
	long nextValue = this.nextValue;
	// The next ordinal value is equal to the current ordinal value plus the desired number of ordinals
	long nextSequence = nextValue + n;
	
	// Use the next ordinal value minus the total value bufferSize in RingBuffer to determine whether overwriting occurs
	long wrapPoint = nextSequence - bufferSize;
	
	/* * cachedValue is the minimum ordinal value in the consumer of the cache * cachedValue is not the minimum ordinal value in the current consumer, but the last time the method call entered the following if condition, Minimum ordinal value in the assigned consumer * * This saves overhead by not calling getMinimumSequence every time to calculate the minimum ordinal value in the consumer when determining whether an override occurs. Just make sure that * when the producer value is greater than the cache cachedGatingSequence by a bufferSize, refetch getMinimumSequence() */
	long cachedGatingSequence = this.cachedValue;

	/* * wrapPoint > cachedGatingSequence : CachedGatingSequence If the producer has exceeded the minimum cachedGatingSequence of the last cached consumer by one bufferSize, the cachedGatingSequence needs to be reobtained to prevent the producer from producing and the consumer from consuming. * cachedGatingSequence > nextValue: Both producer and consumer ordinal values increase sequentially, and the producer Sequence precedes the consumer Sequence, which precedes rather than exceeds it. If nextValue is greater than long. MAXVALUE, then nextValue + 1 will be negative, wrapPoint will be negative, and then cachedGatingSequence will be greater than nextValue. GetMinimumSequence () captures the minimum ordinal value of the consumer, but does not represent the last consumer */
	if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
		cursor.setVolatile(nextValue);
		long minSequence;
		while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
			// The producer blocks, waiting for the consumer to consume, and continues until no coverage occurs
			LockSupport.parkNanos(1L);
		}
		this.cacheValue = minSequence;
	}
	this.nextValue = nextSequence;
	return nextSequence;
}
Copy the code

Batch effect

  • When producers are faster than consumers, consumers can catch up with producers through batch processing
    • Consumers can obtain multiple aligned array event elements from RingBuffer at one time for consumption processing, thus improving consumption efficiency
/** * Waiting for the given serial number value can be used by the consumer **@paramSequence The next ordinal value * that the consumer expects to obtain@returnLong Specifies the serial number */ available to the consumer
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
	checkALert();
	long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
	if (availableSequence < sequence) {
		return sequence;
	}

	/* * Get the maximum serial number that consumers can consume to improve efficiency through batch processing: AvailableSequence > sequence (availableSequence = availableSequence) Sequence * - Indicates the sequence number of events that can be consumed. The minimum value is Sequence-1 */
	return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
Copy the code