Scalable Open Financial Architecture Stack (SOFAStack) is a finance-level cloud native Architecture independently developed by Ant Financial, which contains various components required to build the finance-level cloud native Architecture and is the best practice developed in the Financial scene.

SOFATracer is a component used for distributed system call tracing. It uses unified TraceId to record various network calls in the invocation link in a log to achieve the purpose of perspective network calls. The link data can be used for fault discovery and service governance.

SOFATracer:github.com/sofastack/s…

Disruptor profile

Disruptor aims to provide low-latency, high-throughput work queues in an asynchronous event processing architecture. It ensures that any data is owned by only one thread for write access, thus reducing write contention compared to other constructs. Disruptor is currently used for high performance by a number of well-known projects including Apache Storm, Camel, Log4j 2, and others.

SOFATracer also provides the ability to print logs to local disks asynchronously based on Disruptor high-performance lockless circular queues. SOFATracer provides two similar types of log printing: digest logs and statistics logs. Digest logs: logs that land on disk with each invocation. Statistics logs: statistics logs generated at regular intervals. Regardless of the type of log output, SOFATracer needs to ensure high performance to reduce the impact on the overall business process time.

For a theoretical analysis of Disruptor, see: Disruptor.

A High Performance Inter-thread Messaging Library

case

Start with a small example of Disruptor for perspective; Let’s take a look at its constructor:

public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}
Copy the code
  • EventFactory: creates the eventFactory in the ring buffer;
  • RingBufferSize: The size of the ring buffer, which must be a power of 2;
  • ThreadFactory: Used to create threads for the processor;
  • ProducerType: generator type to support RingBuffer creation with the correct sequencer and publisher; Enumeration type, SINGLE, MULTI two items. Corresponding to SingleProducerSequencer and MultiProducerSequencer;
  • WaitStrategy: waitStrategy.

If we want to build a Disruptor, we need these components. From an eventFactory perspective, you also need a concrete Event to act as a carrier for message events. [The following is a simple modification according to the official case as an example]

Message event LongEvent, a data carrier that can be consumed

public class LongEvent {
    private long value;
    public void set(long value) {
        this.value = value;
    }
    public long getValue(a) {
        returnvalue; }}Copy the code

Create a factory for the message event

public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance(a) {
        return newLongEvent(); }}Copy the code

ConsumerThreadFactory

public class ConsumerThreadFactory implements ThreadFactory {
    private final AtomicInteger index = new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "disruptor-thread-"+ index.getAndIncrement()); }}Copy the code

Create a disruptor. Create a disruptor.

private int ringBufferCapacity = 8;
// Message event production Factory
LongEventFactory longEventFactory = new LongEventFactory();
// Execute the event handler thread Factory
ConsumerThreadFactory consumerThreadFactory = new ConsumerThreadFactory();
// Wait policy for ring buffer.
WaitStrategy waitStrategy = new BlockingWaitStrategy();

/ / build disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(
    longEventFactory,
    ringBufferCapacity,
    longEventThreadFactory,
    ProducerType.SINGLE,
    waitStrategy);
Copy the code

Disruptor is now available and can be started with: start

/ / start disruptor
 disruptor.start();
Copy the code

At this point, you have built a disruptor; But how is it currently used to publish and consume messages?

news

Here are five pieces of data to publish in the for loop:

RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
for (long l = 0; l < 5; l++)
{
    long sequence = ringBuffer.next();
    LongEvent event = ringBuffer.get(sequence);
    event.set(100+l);
    System.out.println("publish event :" + l);
    ringBuffer.publish(sequence);
    Thread.sleep(1000);
}
Copy the code

Now that the message has been published, you need to set the consumer processor for the current Disruptor. We already have LongEvent and EventFactory; Consume messages via EventHandler within Disruptor.

Writing consumer code

public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("Event: " + event.getValue()+"- >" + Thread.currentThread().getName());
        Thread.sleep(2000); }}Copy the code

Set eventHandler to your disruptor’s processing chain:

// The event handler that will process the event -> the handler that consumes the event
LongEventHandler longEventHandler = new LongEventHandler();
disruptor.handleEventsWith(longEventHandler);
Copy the code

Run results (here)

publish event :0
Event: 0 -> disruptor-thread-1
-------------------------------->
publish event :1
Event: 1 -> disruptor-thread-1
-------------------------------->
publish event :2
Event: 2 -> disruptor-thread-1
-------------------------------->
publish event :3
Event: 3 -> disruptor-thread-1
-------------------------------->
publish event :4
Event: 4 -> disruptor-thread-1
-------------------------------->
Copy the code

Basic concepts and principles

Disruptor

The entire container based on the producer-consumer pattern implemented by ringBuffer. Main attributes:

private final RingBuffer<T> ringBuffer;
private final Executor executor;
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
Copy the code
  • RingBuffer: Holds a ringBuffer object that is used to publish internal Disruptor events.
  • Executor: a thread pool that consumes events;
  • ConsumerRepository: Provides a repository mechanism for associating EventHandler with EventProcessor;
  • Started: Indicates whether the current Disruptor has been started.
  • ExceptionHandler: exceptionHandler that handles the uncaught exceptions of the BatchEventProcessor event cycle.

RingBuffer

Ring queue can be analogous to BlockingQueue. The use of ringBuffer allows memory to be recycled, reducing time-consuming operations such as memory allocation, recycling and expansion in some scenarios.

public final class RingBuffer<E> extends RingBufferFields<E> 
implements Cursored.EventSequencer<E>, EventSink<E>
Copy the code
  • E: implementation that stores data for sharing during exchange or parallel coordination of events -> message events;

Sequencer

The top parent interface of the producer in RingBuffer, which directly realizes SingleProducerSequencer and MultiProducerSequencer; Corresponding to SINGLE and MULTI enumeration values.

EventHandler

The event handler interface is extended to implement concrete consumption logic. Such as the LongEventHandler in the Demo above;

// Callback interface for processing events available in {@link RingBuffer}
public interface EventHandler<T> {
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
Copy the code
  • Event: RingBuffer published events.
  • Sequence: Sequence number of the event being processed.
  • EndOfBatch: used to identify whether it is the last event in the batch from RingBuffer;

SequenceBarrier

Consumer roadblocks dictate how consumers go down. In fact, the barricade is a kind of deflected lock.

final class ProcessingSequenceBarrier implements SequenceBarrier {
    // Wait policy when the need to wait (probe) is not available
    private final WaitStrategy waitStrategy;
    // The serial number of other dependent consumers, which is used in the case of dependent consumption,
    // For example, there are two consumers: A and B.
    private final Sequence     dependentSequence;
    private volatile boolean   alerted = false;
    // Write pointer to Ringbuffer
    private final Sequence     cursorSequence;
    // Sequencer corresponding to RingBuffer
    private final Sequencer    sequencer;
    //exclude method
}
Copy the code

WaitStrategy determines which waiting strategy the consumer uses.

WaitStrategy

Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}.

Wait strategy for EventProcessor; There are eight implementations of disruptor:

At the heart of the difference in these wait strategies is how waitFor is implemented.

EventProcessor

The event handler, which can be understood as a framework for the consumer model, implements the run method of the thread Runnable, enclosing operations such as loop judgments. This interface has three implementation classes:

1, BatchEventProcessor

public final class BatchEventProcessor<T> implements EventProcessor {
    private final AtomicBoolean           running          = new AtomicBoolean(false);
    private ExceptionHandler<? super T>   exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T>         dataProvider;
    private final SequenceBarrier         sequenceBarrier;
    private final EventHandler<? super T> eventHandler;
    private final Sequence                sequence         = new Sequence(                                      Sequencer.INITIAL_CURSOR_VALUE);
    private final TimeoutHandler          timeoutHandler;
    //exclude method
}
Copy the code
  • ExceptionHandler: ExceptionHandler;
  • DataProvider: indicates the data source, corresponding to RingBuffer.
  • EventHandler: callback object that handles events;
  • 1. SequenceBarrier: a corresponding ordinal barrier;
  • TimeoutHandler: TimeoutHandler. By default, the TimeoutHandler is null. If you want to set the TimeoutHandler, you only need to implement TimeoutHandler for the associated EventHandler.

If we choose to use EventHandler, the default is BatchEventProcessor, which corresponds to EventHandler one to one and executes in a single thread.

If a RingBuffer has multiple batcheventProcessors, there will be one thread for each BatchEventProcessor.

2, WorkProcessor

public final class WorkProcessor<T> implements EventProcessor {
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    private final RingBuffer<T> ringBuffer;
    private final SequenceBarrier  sequenceBarrier;
    private final WorkHandler<? super T> workHandler;
    private final ExceptionHandler<? super T> exceptionHandler;
    private final Sequence workSequence;

    private final EventReleaser eventReleaser = new EventReleaser() {
            @Override
            public void release(a) { sequence.set(Long.MAX_VALUE); }};private final TimeoutHandler timeoutHandler;
}
Copy the code

Basically similar to BatchEventProcessor, except that the callback object used to process events is WorkHandler.

Schematic diagram

In the absence of consumers, producers keep producing, but remainingCapacity remains constant.

In writing the Demo, it was intended to observe variations in the available RingBuffer capacity without setting consumers. However, in the verification process, the expected result has not been achieved, (note: there is no set of consumers, only producers), let’s look at the results:

publish event :0 bufferSie:8 remainingCapacity:8 cursor:0 --------------------------------> publish event :1 bufferSie:8  remainingCapacity:8 cursor:1 --------------------------------> publish event :2 bufferSie:8 remainingCapacity:8 cursor:2 --------------------------------> publish event :3 bufferSie:8 remainingCapacity:8 cursor:3 --------------------------------> publish event :4 bufferSie:8 remainingCapacity:8 cursor:4 --------------------------------> publish event :5 bufferSie:8 remainingCapacity:8 cursor:5 --------------------------------> publish event :6 bufferSie:8 remainingCapacity:8 cursor:6 --------------------------------> publish event :7 bufferSie:8 remainingCapacity:8 cursor:7 --------------------------------> publish event :8 bufferSie:8 remainingCapacity:8 cursor:8 --------------------------------> publish event :9 bufferSie:8 remainingCapacity:8 cursor:9 -------------------------------->Copy the code

Judging from the results, the value of remainingCapacity should decrease with the number of posts; But it hasn’t really changed at all.

Look at ringBuffer. RemainingCapacity () this method:

/**
 * Get the remaining capacity for this ringBuffer.
 *
 * @return The number of slots remaining.
 */
public long remainingCapacity(a)
{
    return sequencer.remainingCapacity();
}
Copy the code

It and use the sequencer. RemainingCapacity () the method to calculate. In the above example, producerType. SINGLE is used, so let’s look at the implementation of remainingCapacity in SingleProducerSequencer.

@Override
public long remainingCapacity(a)
{
    // The sequence value of the last application completed
    long nextValue = this.nextValue;
    // Calculates the sequence values currently consumed
    long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
    // The sequence value of the current production
    long produced = nextValue;
    return getBufferSize() - (produced - consumed);
}
Copy the code

To explain what this code means:

Assume that the current ringBuffer bufferSize is 8; The serial number applied last time is 5, in fact, it means that the serial number used in production is 5; Assuming that the current consumed serial number is 3, the remaining capacity is: 8- (5-2) = 5.

Because here we can determine the bufferSize and produced values, the result of remainingCapacity depends on the calculation result of the getMinimumSequence.

public static long getMinimumSequence(final Sequence[] sequences, long minimum)
{
    for (int i = 0, n = sequences.length; i < n; i++)
    {
        long value = sequences[i].get();
        minimum = Math.min(minimum, value);
    }
    return minimum;
}
Copy the code

This method takes the smallest Sequence from the Sequence array. If sequences is empty, return minimum. Go back to the previous step and take a look at where the SEQUENCES array came from and where its values were set.

long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
Copy the code

GatingSequences are member variables in SingleProducerSequencer’s parent class AbstractSequencer:

protected volatile Sequence[] gatingSequences = new Sequence[0];
Copy the code

GatingSequences are managed in the following method.

/ * * *@seeSequencer#addGatingSequences(Sequence...) * /
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
Copy the code

The stack of calls to this method traces back to these places:

WorkerPool to manage multiple consumers; The hangdlerEventsWith method is also used to set the consumer. However, in the above test case we wanted to observe the occupation of the ring queue by setting only the producer without setting the consumer, so the gatingSequences would always be empty, so the produced value would be returned as a minimum in the calculation. So each calculation is equivalent to:

return getBufferSize() - (produced - produced) === getBufferSize();
Copy the code

This verifies why the value of remainingCapacity will remain unchanged without setting consumers.

Disruptor practices in SOFATracer

SOFATracer AsyncCommonDigestAppenderManager Disruptor for the packaging, to deal with external components of Tracer in the log. The part with the aid of AsyncCommonDigestAppenderManager SOFATracer under source to analyze how to use the Disruptor.

SOFATracer uses two different event models, a StringEvent used internally by SOFATracer and a SofaTacerSpanEvent used by external extensions. The analysis is based on the event model SofaTacerSpanEvent. StringEvent news event model corresponding to the disruptor is AsyncCommonAppenderManager class encapsulation.

SofaTracerSpanEvent ( -> LongEvent)

Define the message event model. SofaTacerSpanEvent has the same basic structure as LongEvent in the previous Demo, except that it holds different message data internally. LongEvent is a data of type Long. SofaTacerSpanEvent holds SofaTracerSpan.

public class SofaTracerSpanEvent {
    private volatile SofaTracerSpan sofaTracerSpan;
    public SofaTracerSpan getSofaTracerSpan(a) {
        return sofaTracerSpan;
    }
    public void setSofaTracerSpan(SofaTracerSpan sofaTracerSpan) {
        this.sofaTracerSpan = sofaTracerSpan; }}Copy the code

Consumer ( -> LongEventHandler)

Consumer is AsyncCommonDigestAppenderManager inner classes; By implementing the EventHandler interface, the consumer exists as a consumer.

There is also a in AsyncCommonAppenderManager, the local people think can go out, This enables AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager code looks more clean.

private class Consumer implements EventHandler<SofaTracerSpanEvent> {
       // Set of log types. Log types outside this set will not be processed
        protected Set<String> logTypes = Collections.synchronizedSet(new HashSet<String>());
        @Override
        public void onEvent(SofaTracerSpanEvent event, long sequence, boolean endOfBatch)
                                throws Exception {
            // Get specific message data sofaTracerSpan
            SofaTracerSpan sofaTracerSpan = event.getSofaTracerSpan();
            // If there is no data, no processing is done
            if(sofaTracerSpan ! =null) {
                try {
                    String logType = sofaTracerSpan.getLogType();
                    // Verify that the current log type can be consumed by the current consumer
                    if (logTypes.contains(logType)) {
                        // Get the encoding type
                        SpanEncoder encoder = contextEncoders.get(logType);
                        / / get the appender
                        TraceAppender appender = appenders.get(logType);
                        // Encode the data
                        String encodedStr = encoder.encode(sofaTracerSpan);
                        if (appender instanceof LoadTestAwareAppender) {
                            ((LoadTestAwareAppender) appender).append(encodedStr,
                                TracerUtils.isLoadTest(sofaTracerSpan));
                        } else {
                            appender.append(encodedStr);
                        }
                        // Flush buffer, log outputappender.flush(); }}catch (Exception e) {
                   // Exception omitted}}}public void addLogType(String logType) { logTypes.add(logType); }}Copy the code

SofaTracerSpanEventFactory (- > LongEventFactory)

A Factory used to generate message events.

public class SofaTracerSpanEventFactory implements EventFactory<SofaTracerSpanEvent> {
    @Override
    public SofaTracerSpanEvent newInstance(a) {
        return newSofaTracerSpanEvent(); }}Copy the code

ConsumerThreadFactory (-> LongEventThreadFactory )

The Factory used to generate the consuming thread.

public class ConsumerThreadFactory implements ThreadFactory {
    private String workName;
    public String getWorkName(a) {
        return workName;
    }
    public void setWorkName(String workName) {
        this.workName = workName;
    }
    @Override
    public Thread newThread(Runnable runnable) {
        Thread worker = new Thread(runnable, "Tracer-AsyncConsumer-Thread-" + workName);
        worker.setDaemon(true);
        returnworker; }}Copy the code

Build the Disruptor

Disruptor build is done in the middle of AsyncCommonDigestAppenderManager constructor.

public AsyncCommonDigestAppenderManager(int queueSize, int consumerNumber) {
    // Use this calculation to ensure that realQueueSize is a power of 2 (return the smallest power of 2 currently greater than or equal to queueSize)
    int realQueueSize = 1< < (32 - Integer.numberOfLeadingZeros(queueSize - 1));
    // Build Disruptor using producerType.multi
    The wait strategy is BlockingWaitStrategy
    disruptor = new Disruptor<SofaTracerSpanEvent>(new SofaTracerSpanEventFactory(),
        realQueueSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy());
    // Consumer list
    this.consumers = new ArrayList<Consumer>(consumerNumber);
    
    for (int i = 0; i < consumerNumber; i++) {
        Consumer consumer = new Consumer();
        consumers.add(consumer);
        // Set the exception handler
        disruptor.setDefaultExceptionHandler(new ConsumerExceptionHandler());
        // Bind the consumer
        disruptor.handleEventsWith(consumer);
    }

    // Whether discarding is allowed can be obtained from the configuration file
    this.allowDiscard = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
        SofaTracerConfiguration.TRACER_ASYNC_APPENDER_ALLOW_DISCARD, DEFAULT_ALLOW_DISCARD));
    
    if (allowDiscard) {
        // Whether to record the number of lost logs
        this.isOutDiscardNumber = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_NUMBER,
            DEFAULT_IS_OUT_DISCARD_NUMBER));
        // Whether to record TraceId and RpcId of lost logs
        this.isOutDiscardId = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_ID,
            DEFAULT_IS_OUT_DISCARD_ID));
        // When the number of lost logs reaches the threshold, logs are generated once
        this.discardOutThreshold = Long.parseLong(SofaTracerConfiguration.getProperty(
            SofaTracerConfiguration.TRACER_ASYNC_APPENDER_DISCARD_OUT_THRESHOLD,
            DEFAULT_DISCARD_OUT_THRESHOLD));
        if (isOutDiscardNumber) {
            this.discardCount = new PaddedAtomicLong(0L); }}}Copy the code

Start the Disruptor

Disruptor startup entrusted to the AsyncCommonDigestAppenderManager start method to execute.

public void start(final String workerName) {
    this.threadFactory.setWorkName(workerName);
    this.ringBuffer = this.disruptor.start();
}
Copy the code

Let’s see where SOFATracer calls start:

  • CommonTracerManager: this hold the AsyncCommonDigestAppenderManager inside a singleton class, and the static static block of code to invoke the start method; This is used to print normal logs;
  • SofaTracerDigestReporterAsyncManager: This class is also holding a singleton AsyncCommonDigestAppenderManager class to like, and provides a getSofaTracerDigestReporterAsyncManager method to get the singleton, The start method is called in this method; This object is used to output summary logs;

Publish event

In the previous Demo, events were published through a for loop. In SOFATracer, event publishing is triggered when there is a Tracer log that needs to be output. This corresponds to the log append operation, which appends the log to the ring buffer.

public boolean append(SofaTracerSpan sofaTracerSpan) {
    long sequence = 0L;
    // Whether discarding is allowed
    if (allowDiscard) {
        try {
            If tryNext is allowed to discard the sequence, no exception will be thrown
            sequence = ringBuffer.tryNext();
        } catch (InsufficientCapacityException e) {
            // Whether to output TraceId and RpcId of lost logs
            if (isOutDiscardId) {
                SofaTracerSpanContext sofaTracerSpanContext = sofaTracerSpan
                    .getSofaTracerSpanContext();
                if(sofaTracerSpanContext ! =null) {
                    SynchronizingSelfLog.warn("discarded tracer: traceId["
                                              + sofaTracerSpanContext.getTraceId()
                                              + "]; spanId[" + sofaTracerSpanContext.getSpanId()
                                              + "]"); }}// Whether to output the number of lost logs
            if ((isOutDiscardNumber) && discardCount.incrementAndGet() == discardOutThreshold) {
                discardCount.set(0);
                if (isOutDiscardNumber) {
                    SynchronizingSelfLog.warn("discarded " + discardOutThreshold + " logs"); }}return false; }}else {
        // Use the next method if discards are not allowed
        sequence = ringBuffer.next();
    }

    try {
        SofaTracerSpanEvent event = ringBuffer.get(sequence);
        event.setSofaTracerSpan(sofaTracerSpan);
    } catch (Exception e) {
        SynchronizingSelfLog.error("fail to add event");
        return false;
    }
    / / release
    ringBuffer.publish(sequence);
    return true;
}
Copy the code

Call logic for SOFATracer event publishing:

Tracing the flow of the call, you can see that a message event was published when the current SPAN call to Finish or reportSpan was called in SOFATracer.

summary

This article provides a brief analysis of SOFATracer code that uses Disruptor for logging. For more internal details, check out SOFATracer code. As a relatively low-level middleware component, SOFATracer is basically invisible in actual business development. But as a technique for learning, there’s still a lot to dig for.

SOFATracer:github.com/sofastack/s…

If you are interested in middleware, welcome to join our team. Those interested in SOFA technology system can follow the SOFAStack community

Financial Class Distributed Architecture (Antfin_SOFA)