The use of the Disruptor

1. Introduction

The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX’s research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange’s infrastructure.

LMAX Disruptor LMAX Disruptor is a high-performance interthread messaging library. It grew out of LMAX’s work on concurrency, performance, and non-blocking algorithms, and is now a core part of its Exchange infrastructure.)

— Quoted from GITHUB introduction

Disruptor is a high-performance asynchronous processing framework that can be considered the fastest messaging framework (lightweight JMS), an implementation of the observer pattern, or event listener pattern. Here is the wiki address:

https://github.com/LMAX-Exchange/disruptor/wiki
Copy the code

2.Disruptor design

Disruptor addresses slow queue speeds with a circular array structure that uses arrays rather than linked lists to avoid garbage collection. Also, arrays are more processor-friendly to the cache mechanism. Element location array length 2^n, through bit operation, speed up the location. Subscripts take the form of increments. Don’t worry about index overflow. Index is of type long, and even 1 million QPS would take 300,000 years to run out. Lockless design Each producer or consumer thread requests the location of an element in the array that it can manipulate, and then writes or reads data directly from that location. Let’s ignore the ring structure of an array and show you how to implement a lock-free design. The entire process is thread-safe through the atomic variable CAS.

3.Disruptor Implementation Features

Another key detail for implementing low latency is the use of lock-free algorithms in Disruptor, where all memory visibility and correctness takes advantage of memory barriers or CAS operations. Using CAS for multi-threaded security is significantly faster than locking used by most concurrent queues. CAS are CPU-level instructions that are more lightweight and do not require operating system support like locks, so there is no need to switch between user and kernel mode and no context switch for each invocation. There is only one use case where a lock is required, and that is BlockingWaitStrategy, and the only way to do that is to use Condition to make the consumer wait until a new event arrives. Many low-latency systems use busy wait to avoid Condition jitter, however performance can degrade significantly during system busy wait operations, especially when CPU resources are severely constrained, such as WEB servers in virtual environments.

Disruptor implements the producer-consumer model

Here we make a producer and consumer model to put into LongValue according to the original author Demo. The relevant code is as follows:

Maven rely on
<dependencies> <groupId>com.lmax</groupId> <artifactId> Disruptor </artifactId> <version> </dependency> </dependencies>Copy the code
LongEvent
// Define the type of data exchanged by the event Disruptor.
public class LongEvent  {

	private Long value;

	public Long getValue(a) {
		return value;
	}

	public void setValue(Long value) {
		this.value = value; }}Copy the code
LongEventFactory
public class LongEventFactory implements EventFactory<LongEvent> {

	public LongEvent newInstance(a) {

		return newLongEvent(); }}Copy the code
LongEventHandler
// Consumers get data
public class LongEventHandler implements EventHandler<LongEvent> {

    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println("Consumers get data :"+ longEvent.getValue()); }}Copy the code
LongEventProducer
/ / producer
public class LongEventProducer {

    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // Get the following table position of the event queue
        long sequence = ringBuffer.next();
        try {
            // Fetch an empty queue
            LongEvent longEvent = ringBuffer.get(sequence);
            // Assign a value to the empty queue
            longEvent.setValue(byteBuffer.getLong(0));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("Producers send data...."); ringBuffer.publish(sequence); }}}Copy the code
MainTest
public class MainTest {

    public static void main(String[] args) {

        Create a thread pool
        ExecutorService executor = Executors.newCachedThreadPool();

        // 2. Create factory
        LongEventFactory longEventFactory = new LongEventFactory();

        // 3. Create a ringBuffer size
        int ringbuffer = 1024 * 1024; // 2 to the N

        // create disruptor
        Disruptor<LongEvent> longEventDisruptor = new Disruptor<>(
                longEventFactory, ringbuffer, executor,
                ProducerType.MULTI, new YieldingWaitStrategy()
        );

        // 5. Connect consumers
        longEventDisruptor.handleEventsWith(new LongEventHandler());
        / / 6. Start
        longEventDisruptor.start();

        // 7. Create a RingBuffer
        RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();

        // create a producer
        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);

        // 9. Specify the buffer size
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 0; i < 10; i++) {
            byteBuffer.putLong(0,i); longEventProducer.onData(byteBuffer); } executor.shutdown(); longEventDisruptor.shutdown(); }}Copy the code
The result is as follows:
Producers send data.... Producers send data.... Producers send data.... Producers send data.... Consumer access data:0Producers send data.... Consumer access data:1Producers send data.... Consumer access data:2Producers send data.... Consumer access data:3Producers send data.... Consumer access data:4Producers send data.... Consumer access data:5Producers send data.... Consumer access data:6Consumer access data:7Consumer access data:8Consumer access data:9
Copy the code