So arrogant, what is the origin of the Disruptor who claims to be a high-performance queue?

Concurrency Framework Disruptor

1. Disruptor overview

1.1 Background

​ Disruptor is a high-performance queue developed by British foreign exchange trading company LMAX. The original intention of research and development is to solve the delay problem of memory queue (it was found in the performance test that it is in the same order of magnitude as I/O operations). The system based on Disruptor is single-threaded. It can support 6 million orders per second. After giving a speech at QCon in 2010, it gained the attention of the industry. In 2011, Martin Fowler, an enterprise application software expert, wrote a long introduction. In the same year it also won the official Duke Award from Oracle.

​ At present, many well-known projects including Apache Storm, Camel, Log4j 2 have applied Disruptor to achieve high performance.

It should be pointed out that the queue mentioned here is a memory queue inside the system, not a distributed queue like Kafka.

Bounded lock-free high concurrency queue

1.2 What is a Disruptor

​ Disruptor is a message queue used between multiple threads in a JVM. Its function is similar to that of ArrayBlockingQueue, but Disruptor is much better than ArrayBlockingQueue in terms of function and performance. When a large amount of data is passed between multiple threads or performance requirements are required When higher, consider using Disruptor as an alternative to ArrayBlockingQueue.

The official also compared the performance of Disruptor and ArrayBlockingQueue in different application scenarios, and the visual performance is only about 5~10 times improved.

1.3 Why use Disruptor

​ Traditional blocking queues use locks to ensure thread safety, and locks are implemented by operating system kernel context switching, which will suspend threads to wait for locks until the lock is released.

Performing such a context switch will lose previously saved data and instructions. Due to the speed difference between consumers and producers, the queue is always near full or empty, which can lead to high levels of write contention.

1.3.1 The traditional queue problem

First of all, the queue mentioned here is limited to the message queue inside Java

queueboundednessLockstructurequeue type
ArrayBlockingQueuegot worldlockarrayblock
LinkedBlockingQueueoptionallocklinked listblock
ConcurrentLinkedQueueUnboundedno locklinked listnon-blocking
LinkedTransferQueueUnboundedno locklinked listblock
PriorityBlockingQueueUnboundedlockheapblock
DelayQueueUnboundedlockheapblock
1.3.2 Disruptor Application Scenario

Refer to some frameworks that use disruptor.

1.3.2.1 log4j2

​ Log4j2 asynchronous log uses the disruptor. The log generally has a buffer, and it is written to the file when it is full. Incremental appending of files combined with NIO should also be faster, so whether it is EventHandler or WorkHandler processing should have a relatively small delay and write files. Not much, so the scene is more appropriate.

1.3.2.2 Jstorm

​ In stream processing, data is exchanged in different threads, and there may be a lot of in-memory computing for data computing. Stream computing is fast in and out, and disruptor should be a good choice.

1.3.2.3 Baidu uid-generator

​ Some uses ideas such as Ring buffer and de-pseudo-sharing to cache the generated uid, and it should also refer to the disruptor in part.

1.4 Core Concepts of Disruptor

Start by understanding the core concepts of Disruptor to understand how it works. The conceptual model introduced below is both a domain object and a core object that maps to code implementation.

1.4.1 Ring Buffer

A data structure in the Disruptor that stores the data produced by the producer

As the name suggests, a ring buffer. The RingBuffer used to be the main object in the Disruptor, but since version 3.0, its responsibilities have been reduced to just storing and updating the data (events) exchanged through the Disruptor. In some more advanced application scenarios, Ring Buffer can be completely replaced by user-defined implementation.

1.4.2 Sequence

Sequence numbers, in the Disruptor framework, sequence numbers are everywhere

​ Where the data produced by the producer is placed in the RingBuffer, where the consumer should consume the data, and what data is in a certain position in the RingBuffer are all determined by this serial number. This serial number can be simply understood as a variable of type AtomicLong. It uses the padding method to eliminate the false sharing problem of the cache.

1.4.3 Sequencer

Sequence number generator, this class is mainly used to coordinate producers

When the producer produces data, the Sequencer will generate an available sequence number (Sequence), and then the producer will know where the data is placed in the circular queue.

Sequencer is the real core of Disruptor. This interface has two implementation classes SingleProducerSequencer and MultiProducerSequencer, which define concurrent algorithms for fast and correct data transfer between producers and consumers.

1.4.4 Sequence Barrier

serial number barrier

​ We all know that when consumers consume data, they need to know where to consume the data. Consumers can't always consume which data they want to consume, so they can consume which data they want. This SequencerBarrier acts as such a "fence"-like barrier. If your consumer wants to consume data, well, I will tell you a sequence number (Sequence), and you will consume the data at that location. If there is no data, just wait

1.4.5 Wait Strategy

The Wait Strategy determines how a consumer waits for a producer to put an Event into the Disruptor.

​ Imagine a scenario where producers produce very slowly and consumers consume very fast. Then there will inevitably be insufficient data. How can consumers wait at this time? WaitStrategy was born to solve problems.

1.4.6 Event

The data passed from the producer to the consumer is called Event. It is not a specific type defined by the Disruptor, but is defined and specified by the user of the Disruptor.

1.4.7 EventHandler

​ The event handling interface defined by Disruptor is implemented by the user to process events and is the real implementation of Consumer.

1.4.8 Producer

​ That is, the producer, which generally refers to the user code that calls the Disruptor to publish the event. The Disruptor does not define a specific interface or type.

1.5 Disruptor Features

Disruptor is actually like a queue, used to migrate data between different threads, but Disruptor also implements some features that other queues do not have, such as:

  • The same "event" can have multiple consumers, which can be processed in parallel or depend on each other to form a processing sequence (forming a dependency graph);
  • Pre-allocated memory space for storing event content;
  • Extremely optimized and lock-free design for extremely high performance goals;

2. Getting Started with Disruptor

Let's use a simple example to experience the Disruptor. The producer will pass a value of type long to the consumer, and the consumer will print the value after receiving the value.

2.1 Add dependencies

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

2.2 Disruptor API

Disruptor's API is very simple, mainly in the following steps

2.2.1 Defining Events

First create a LongEvent class, this class will be put into the circular queue as the message content.

Event (Event) is the data type exchanged through Disruptor.

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }
}
2.2.2 Defining an event factory

In order to preallocate event s using the Disruptor's memory, we need to define an EventFactory

​ Event Factory defines how to instantiate the event (Event) defined in the previous step 1, and needs to implement the interface com.lmax.disruptor.EventFactory\<T\>.

The Disruptor pre-creates the Event instance in the RingBuffer via the EventFactory.

An Event instance is actually used as a "data slot". Before the publisher publishes it, he obtains an instance of the Event from RingBuffer, fills it with data, then publishes it to RingBuffer, and then gets the Event instance and read data from it by Consumer.

public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }
}
2.2.3 Define the specific implementation of event processing

In order for consumers to process these events, we define an event handler here, which is responsible for printing events

The concrete implementation of event handling is defined by implementing the interface com.lmax.disruptor.EventHandler\<T\>.

public class LongEventHandler implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        //CommonUtils.accumulation();
        System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() + ",sequence=" + sequence);
    }
}
2.2.4 Specify Waiting Policy

Disruptor defines the com.lmax.disruptor.WaitStrategy interface to abstract how the Consumer waits for new events, which is the application of the strategy pattern

WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
2.2.5 Start Disruptor

Note that the size of ringBufferSize must be 2 to the Nth power

// Specify the event factory
LongEventFactory factory = new LongEventFactory();

// Specify the size of the ring buffer in bytes, which must be 2 to the N th power
int bufferSize = 1024;

//Single-threaded mode for extra performance
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                                                          bufferSize, Executors.defaultThreadFactory(),
                                                          ProducerType.SINGLE,
                                                          new YieldingWaitStrategy());
//Set up event business processor---consumer
disruptor.handleEventsWith(new LongEventHandler());

//Start the disruptor thread
disruptor.start();
2.2.6 Publishing events using Translators

In the 3.0 version of Disruptor, due to the addition of a rich Lambda-style API, it can be used to simplify the process for group developers. Therefore, it is preferred to use Event Publisher/Event Translator to publish events after version 3.0.

public class LongEventProducerWithTranslator {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.set(data);
                }
            };

    public void onData(Long data) {
        ringBuffer.publishEvent(TRANSLATOR, data);
    }
}
2.2.7 Close Disruptor
disruptor.shutdown();//Close the disruptor, the method will block until all events have been processed

2.3 Code Integration

2.3.1 LongEventMain

The consumer-producer startup class, which relies on constructing the Disruptor object and calling the start() method to complete the startup thread. Disruptor requires ringbuffer ring, consumer data processing factory, WaitStrategy, etc.

  • ByteBuffer class byte buffer, used to wrap messages.
  • ProducerType.SINGLE is single-threaded, which can improve performance
public class LongEventMain {
    public static void main(String[] args) {
        // Specify the event factory
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer in bytes, which must be 2 to the N th power
        int bufferSize = 1024;

        //Single-threaded mode for extra performance
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                bufferSize, Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy());

        //Set up event business processor---consumer
        disruptor.handleEventsWith(new LongEventHandler());

        //Start the disruptor thread
        disruptor.start();
        // Get the ring buffer ring, which is used to receive events produced by the producer
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //Specify an event producer for the ring buffer
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        //loop through
        for (int i = 0; i < 100; i++) {
            //get a random number
            long value = (long) ((Math.random() * 1000000) + 1);
            //publish data
            producer.onData(value);
        }
        //stop the disruptor thread
        disruptor.shutdown();
    }
}
2.3.2 Running the test

Test Results

consumer:pool-1-thread-1 Event: value=579797,sequence=0
consumer:pool-1-thread-1 Event: value=974942,sequence=1
consumer:pool-1-thread-1 Event: value=978977,sequence=2
consumer:pool-1-thread-1 Event: value=398080,sequence=3
consumer:pool-1-thread-1 Event: value=867251,sequence=4
consumer:pool-1-thread-1 Event: value=796707,sequence=5
consumer:pool-1-thread-1 Event: value=786555,sequence=6
consumer:pool-1-thread-1 Event: value=182193,sequence=7
.....

Event: value = is the data received by the consumer, and sequence is the position of the data in the ringbuffer ring.

If this article is helpful to you, please follow and like `, your support is the driving force for me to persist in creating.

Please indicate the source!

Tags: Java

Posted by OriginalSixRules on Tue, 20 Sep 2022 02:12:07 +0930