Recently, I participated in the design of a random number distribution platform, considering how to achieve high concurrency performance of the platform. In the selection of technical implementation, I first referred to Baidu’s UID-Generator, which adopts the realization form of double RingBuffer. The uID-Generator’s dual RingBuffer is a reference to the Disruptor implementation. Therefore, in this series of articles, we will explore and learn about the Disruptor framework, which won Duke’s Application Framework Innovation Award in 2011.

1 introduction

Martin Fowler wrote an article on his website about the LMAX architecture, a new retail financial trading platform running on the JVM that can generate a large number of trades with very low latency. A single thread achieves a TPS of 6 million orders per second, which is incredibly high even though the business logic is a pure memory operation. So what is it about LMAX that allows a single thread to process 6 million orders per second? Disruptor.

Disruptor is an open source concurrency framework that won Duke’s Application Framework Innovation Award in 2011. Disruptor uses an event-driven approach that enables network Queue concurrency without locking.

Disruptor Framework Overview

Disruptor framework Disruptor Framework uses the Ring Buffer, a Ring array, as the core data structure for asynchronous event processing. The JDK’s BlockingQueue is a BlockingQueue that internally synchronizes threads between producers and consumers through a locking mechanism. Like BlockingQueue, Disruptor framework is designed to exchange data between producers and consumers around the Ring Buffer, but with higher performance. I have tested the Disruptor framework against ArrayBlockingQueue in the same environment and found that the Disruptor framework handled data several times faster than ArrayBlockingQueue.

Why does the Disruptor Framework perform better? It has the following characteristics:

  1. Preloaded memory can be understood as using the memory pool;
  2. Unlocked,
  3. Single thread to write
  4. Eliminating pseudo sharing
  5. Use memory barriers
  6. Serial number fence mechanism

3 Related Concepts

Disruptor: is a core class that uses the Disruptor framework and holds references to RingBuffer, consumer thread pool, consumer collection ConsumerRepository, and consumer ExceptionHandler ExceptionHandler;

RingBuffer: Located at the center of the Disruptor framework, RingBuffer is an array of Ring objects that are created using preloading mechanisms and can be reused as a bridge for data exchange between producers and consumers, holding Sequencer references.

Sequencer: Sequencer is the core of the Disruptor framework and implements all concurrent algorithms for fast and correct data transfer between producers and consumers. It has two implementation classes, SingleProducerSequencer and MultiProducerSequencer.

Sequence:Sequence is used to identify the processing progress of the Ring Buffer and the consumer Event Processor. Each consumer Event Processor and the Ring Buffer maintain a Sequence respectively, supporting concurrent operations and sequential writes. It also improves performance by populating cached rows to eliminate pseudo-sharing.

Sequence barriers :Sequence barriers coordinate the data exchange progress between producers and consumers by tracking the cursorSequence of producers and the Sequence of each consumer (EventProcessor). Its implementation class ProcessingSequenceBarrier WaitStrategy held waiting strategy is the core to realize serial number barrier.

Wait Strategy:Wait Strategy is a strategic way to determine how consumers Wait for producers. When consumers consume too fast, should they be asked to Wait? Should consumers Wait by locking or without locking?

EventProcessor: The EventProcessor can be understood as the consumer thread, which will always get data from the Ring Buffer to consume data. It has two core implementation classes: BatchEventProcessor and WorkProcessor.

Event Handler: An Event Handler can be understood as a Handler that the consumer implements the business logic, referenced by the BatchEventProcessor class, In an infinite loop of the BatchEventProcessor thread, data is continually fetched from the Ring Buffer for consumption by the Event Handler.

Producer: Producer, usually ringbuffer.publishevent is used to produce data.

Four introductory DEMO

// LongEvent.java
public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }

    public long get(a) {
        return this.value; }}Copy the code
// LongEventFactory.java
public class LongEventFactory implements EventFactory<LongEvent>
{
    @Override
    public LongEvent newInstance(a)
    {
        return newLongEvent(); }}Copy the code
// LongEventHandler.java
public class LongEventHandler implements EventHandler<LongEvent>
{
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println(new Date() + ":Event-"+ event.get()); }}Copy the code
// LongEventTranslatorOneArg.java
public class LongEventTranslatorOneArg implements EventTranslatorOneArg<LongEvent.ByteBuffer> {
    @Override
    public void translateTo(LongEvent event, long sequence, ByteBuffer buffer) {
        event.set(buffer.getLong(0)); }}Copy the code
// LongEventMain.java
public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        int bufferSize = 1024;
        final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
                new LongEventFactory(),
                bufferSize,
                Executors.newSingleThreadExecutor(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy()
        );

        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();


        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb);
            Thread.sleep(1000); }}}Copy the code

Output result:

Reference: lmax – exchange. Making. IO/disruptor/u…

If you feel good, please mercilessly forward and like it!

Github address:

Github.com/yuanmabiji/…