LMAX Disruptor: A Superior Alternative to Bounded Queues for exchanging data between concurrent threads
LMAX Disruptor: High performance alternative to bounded queues for exchanging data between concurrent threads
Disclaimer: Please be aware that this post is quite extensive. It serves as an introductory blog where I gather and interpret the LMAX Disruptor based on my comprehension.
The disruptor pattern is a high-performance, low-latency messaging framework that facilitates the exchange of data between threads in a concurrent system. It was developed by the LMAX Exchange, a financial trading platform.
For applications where high throughput is paramount, the traditional bounded queue often falls short. This is where the LMAX Disruptor shines. Engineered as a cutting-edge alternative, it revolutionizes data exchange between threads, offering unparalleled throughput. This makes it an ideal choice for systems where performance and speed are critical.
LMAX Group is the leading independent operator of multiple institutional execution venues for FX and cryptocurrency trading.
In trading platforms, achieving very low latency and high throughput is crucial. Therefore, many trading systems opt for a single large server to process core logic in a single thread, minimizing network latency, serialization, and I/O delays.
However, as the server must handle additional processing before executing the core logic, the need arises for multi-threading to perform concurrent tasks. To facilitate communication between these threads, queuing mechanisms are commonly employed. For instance, the LMAX system, developed in Java, utilized the ArrayBlockingQueue. Despite expectations, performance testing revealed suboptimal system performance. Conducting analytics on each component identified the bottleneck, revealing the queue as the primary culprit. Below is a high-level diagram depicting the system architecture, along with the processing time of each component.
They use multiple queues in their system, but the performance of those queues is not good, as other components of their system are optimized. Hence, they came to the solution that they need a faster middle-man - the thing that stands between the threads to communicate with each other. That’s how LMAX Disruptor was born.
Before coming to how LMAX Disruptor was built, it’s essential to understand the complexity of concurrency as well as why the queues are bad.
Locks are mechanisms used to synchronize access to shared resources in a multithreaded environment. They prevent multiple threads from modifying a resource at the same time, which can lead to inconsistent or corrupted data. When a thread acquires a lock, it gains exclusive access to a protected section of code or data, while other threads are blocked from accessing it until the lock is released.
2 threads want to manipulate the same variable Source: Dissecting the Disruptor: Why it's so fast (part one) - Locks Are Bad - Trisha Gee
With lock, the thread 2 will lock the shared resource until it’s done with its operations. Source: Dissecting the Disruptor: Why it's so fast (part one) - Locks Are Bad - Trisha Gee
However, they can be costly. When contention occurs, arbitration is required, typically involving the operating system kernel. The kernel must switch context between multiple threads, suspending those waiting for locks, and resuming those where locks have been released. Context switching negatively impacts performance due to the overhead of saving the current thread's state, loading the next thread's state, and potential cache misses of the new thread.
Here is the benchmark when using the lock. What we instructed the program to do is just to increase a counter variable, and you can see that even in single thread, the performance of program with lock is 15x times slower than one which does not have lock. Thing gets worse when we have 2 threads, which results ~77x times slower)
LockBenchmark.singleThreadIncrement 1116080.320 ± 441440.834 ops/ms
LockBenchmark.singleThreadIncrementWithLock 72622.450 ± 21373.710 ops/ms
LockBenchmark.twoThreadsIncrementWithLock 14420.572 ± 2109.659 ops/msLockBenchmark.singleThreadIncrement 1116080.320 ± 441440.834 ops/ms
LockBenchmark.singleThreadIncrementWithLock 72622.450 ± 21373.710 ops/ms
LockBenchmark.twoThreadsIncrementWithLock 14420.572 ± 2109.659 ops/msThe benchmark code can be found at: link
Besides, locks are bad since it can lead to many other problem such as DeadLock, LiveLock, Priority Inversion (A lower-priority thread holding a lock needed by a higher-priority thread)
One other way to deal with the contention when updating to the same resource is CAS.
CAS stands for Compare-And-Swap (or Compare-And-Set), which is a low-level atomic operation used in multithreading and concurrent programming. CAS is typically supported directly by modern CPU architectures. The idea of CAS is really simple, it takes 3 operands: the variable it wants to change, the expected current value of that variable and the new value. Before updating the value of variable to the new value, it first checks if the current value of that variable is the same as the the expected current. If they’re not equal, which means there is another thread made change to that variable before, the CAS will fail.
This CAS approach is significantly more efficient than locks because it does not require a context switch to the kernel for arbitration.
Here is the benchmark when using CAS
LockBenchmark.singleThreadIncrement 1372254.769 ± 129369.016 ops/ms
LockBenchmark.singleThreadIncrementWithCAS 140081.727 ± 10568.910 ops/ms
LockBenchmark.singleThreadIncrementWithLock 78202.644 ± 1326.388 ops/ms
LockBenchmark.twoThreadsIncrementWithCAS 56764.961 ± 2351.039 ops/ms
LockBenchmark.twoThreadsIncrementWithLock 19364.467 ± 675.668 ops/msLockBenchmark.singleThreadIncrement 1372254.769 ± 129369.016 ops/ms
LockBenchmark.singleThreadIncrementWithCAS 140081.727 ± 10568.910 ops/ms
LockBenchmark.singleThreadIncrementWithLock 78202.644 ± 1326.388 ops/ms
LockBenchmark.twoThreadsIncrementWithCAS 56764.961 ± 2351.039 ops/ms
LockBenchmark.twoThreadsIncrementWithLock 19364.467 ± 675.668 ops/msThe benchmark code can be found at: link
As you can see that, the CAS benchmark shows a big improvement compared to code with Lock (2x times in single thread ~3x times in two threads). Of course, it can not beat the singleThreadIncrement.
But there’s no free lunch, developing a program using lock is difficult, but developing a lock-free program with CAS operations is much more complex. In the benchmark code, it’s just a “increment of counter variable”, but in the real life, the shared resource that we want to manipulate is generally not that simple.
Modern CPUs employ performance optimizations that can result in out-of-order execution. It means, the CPU can re-organize your instruction with the guarantee of the same result before and after re-ordering.
One example is:
Thread #1, Core #1
if (B == 1)
System.out.println(A)if (B == 1)
System.out.println(A)Thread #2, Core #2
A = 1;
B = 1;A = 1;
B = 1;There is no guarantee that the order of execution of the code in Thread #2, Core #2 will follow the order we put in the code, the CPU can re-order it like this.
A = 1;
B = 1;A = 1;
B = 1;In a scenario without reordering, we expect that if B is set to 1, then A must already be 1. However, due to optimization, the order of these assignments might be swapped, and Thread 2 might see B = 1 but A still equals 0.
That’s why we need the memory barrier.
Memory barriers serve two primary purposes:
- Ordering Memory Accesses: They ensure that specific memory operations occur in a definite order. Without memory barriers, the compiler or the CPU may reorder instructions for optimization purposes, which can lead to unexpected behavior in concurrent environments.
- Visibility of Memory Writes: They ensure that changes made to shared variables by one thread are visible to other threads in a predictable manner.
In Java, we can use the keyword volatile to achieve it.
CPU cache
in modern computer nowadays, one CPU will have multiple cores (CPU core), each CPU core has some cache layers (L1 and L2). The CPU has the L3 cache layer, and finally is the main memory.
Here is the latency of each cache layer
When we run multi-thread, the OS will allocate each thread in one CPU core. When the CPU is performing an operation, it's first going to look in L1 for the data it needs, then L2, then L3, and finally if it's not in any of the caches the data needs to be fetched all the way from main memory. The further it has to go, the longer the operation will take. So if you're doing something very frequently, you want to make sure that data is in L1 cache.
Note that when it finds the data it needs, it will populate that data back to the lower cache layer. For example, if it did not find the data in L1, L2, but found in L3. Then it will replicate that cache missed data to L2, then L1.
Cache line
Let’s dive deeper, in each cache layer, the data is stored in units called cache lines. The size of cache lines are a power of 2 of contiguous bytes which are typically 32-256. The most common cache line size is 64 bytes. For example, A Java long is 8 bytes, so in a single cache line you could have 8 long variables.
Cache coherence
Because each CPU core has its own cache layer. It is possible to have many copies of shared data: one copy in the main memory and one in the local cache of each processor that requested it. When one of the copies of data is changed, the other copies must reflect that change. Cache coherence is the discipline which ensures that the changes in the values of shared operands (data) are propagated throughout the system in a timely fashion. Because cache line is the smallest unit, so when a single variable in cache line is invalidated, all other variables in the same cache line must be invalidated as well.
Okay, now we have the basic understanding of how data is stored and shared between multiple threads. Now, it comes to very interesting problem. It’s called False Sharing
False sharing is a performance issue that arises in multithreaded programs, especially on multicore processor systems. It occurs when threads on different cores inadvertently access data lying on the same cache line, regardless of whether they are accessing different variables. This results in inefficient cache usage and can considerably impair performance.
Example:
Imagine a scenario where we have two variables, x and y. These exist within the same cache line and are replicated across multiple CPU core caches. For simplicity, we'll overlook the cache layer and focus solely on the cache line.
Now, if CPU core 1 changes the value of variable x, it needs to reflect this change in CPU core 2 due to cache coherence. As detailed in the cache coherence section, this reflection invalidates the entire cache line containing the variable x.
Conversely, if CPU core 2 updates the value of variable y, it must invalidate the cache line containing variable y in CPU core 1.
Do you see the issue? Each time a single variable is invalidated, the CPU must reflect this change across all cache lines from every CPU core containing the variable. But what if CPU core 2 only cares about the value of y and has no interest in variable x? Does it need to invalidate its cache unnecessarily?
You might wonder why it would cache a variable it doesn't care about. A good example of this could be if x and y are the head and tail pointers of a single queue, thus, they are always stored together.
In the past, the Netflix team encountered the issue with False Sharing which has a huge impact in the performance of their app. You can read the detail at:
One solution for this is to add padding between data elements which we know for sure that they should be in the same cache line.
One solution to communicate between multiple threads is to utilize queues.
Most of the time, Queues use either linked-lists or arrays for the underlying storage of elements, and to avoid the catastrophic of exhausting memory, queues are commonly constrained in size.
Key Components:
-
Producers: Entities that add items to the queue. They produce data, tasks, or messages that need to be processed.
-
Consumers: Entities that take items out of the queue for processing. They consume and act on the data, tasks, or messages from the queue.
5. Problem with Queue
Queue usually has 2 pointers called head and tail. the producer(s) manipulate the tail, while consumer(s) manipulate the head. That leads to the write contention on those variable, to guarantee the integrity, the lock is generally used on those variables. And you already know how bad locks are.
Moreover, the head and tail generally occupy the same cache-line which might lead to the False Sharing problem as I mentioned in previous section.
In Java, there is another problem comes from the garbage collector. With queue backed by linked-list, the memory allocation for the new item, and the memory re-claimed for the processed items is a significant problems. Especially when the add/take operations are robust.
Okay, now go to the main part of this blog. :)) I hope that you won’t give up after the really long and “not really relevant” sections above. But believe me, all those components will be connected at this point.
We’ve talked a lot about the problem of concurrency, shared memory and queues. So how does LMAX Disruptor resolve them.
Here is the overview of LMAX Disruptor
Let’s break down the components of Disruptor
This is where the events are stored.
Characteristic:
- Fixed-size circular array, the memory allocation is done right after we init the ring buffer
- Elements are overwritten from time to time (if we reach the end of the array, it will start off from the beginning, and the data in the first slot will be overwritten) → Garbage collector does not have to do anything, since the item is mutated inline, no item is added/removed
- The size of the ring buffer might vary, but it must be power of 2
- Sequence access: since it’s backed by an array, the elements are sequential accessed, which is much more faster than random access (queue backed by linked list), it’s also very caching friendly, since the elements might be in the same cache line.
In the Problem with Queue section, we talked about the write contention occurs with the head and tail. With the Disruptor, that problem is eliminated since that concern has been teased out into the ProducerSequencer (Producer-side) and SequenceBarrier (Consumer-side)
Definition
A Sequence is essentially a holder for a value that represents a particular point in the ring buffer. This value is used to track the progress of both producers and consumers as they read from or write to the buffer.
The value of a Sequence corresponds to an index in the ring buffer
- In producer-side, the value indicates the highest slot that has been published by the producer.
- In consumer-side, the value tracks highest slot that has been processed by the consumer.
The value of Sequence is always increased, but it’s easily to convert that value back to the index in the ring buffer.
Lock-free head and tail
By using Sequence, Disruptor gets rid of write contention with the head and tail - the problem of traditional queue
Each consumer has its own Sequence → No write contention with the “tail”
All the producers will share the same Sequence (head). You might think, oh, there’s the write contention here. Not really. If we only have one producer → No write contention at all. If we have multiple producers, the the write contention can be easily resolved by using CAS
→ We have lock-free head and tail
Resolve False Sharing
So what’s about the False Sharing problem?
Let’s see how Disruptor implement the Sequence
They add 7 long variables before and 7 long variables after a volatile long variable. The reason for this is that the common cache line is 32, 64 and 128 bytes. By doing so, they will guarantee that the Sequence will acquire the whole cache line, hence, does not share with other variable → False Sharing can’t occur
One interesting note here is that they do not use the built-in AtomicLong but implement their own CAS using Unsafe . IMO, the reason for this is that they want to have more control of the memory barrier, and they want to avoid object wrapping, since AtomicLong wraps variable in object, which can add overhead in term of memory and access.
On the producer side, the Disruptor uses ProducerSequencer (previously referred to as a producer barrier).
These are what stored in the ProducerSequencer
- bufferSize - the size of ringBuffer
- waitStrategy used by the EventProcessor (the consumer)
- cursor - the sequence of the producer
- gatingSequences - refers to the sequences of the consumers
The ProducerSequencer is responsible for tracking which slots in the ring buffer are available for writing. It ensures that producers do not overwrite unprocessed events (by checking the minimum sequence of gatingSequences) and manages the available capacity of the buffer. It is also responsible for notify the consumer via waitStrategy when a new event is publish.
There are 2 types of Sequencer:
- SingleProducerSequencer: used when we only have one producer
- MultiProducerSequencer. used when we have multiple producers.
There are 3 main component in the consumer side:
- Wait Strategies: The Wait Strategy determines how a consumer will wait for events to be placed into the Disruptor by a producer. There are a couple of Wait Strategy available, based on your needs and your server condition, carefully when choosing one.
- Wait Strategy List
- BlockingWaitStrategy
- Pros: Low CPU utilization, good for low-throughput scenarios.
- Cons: Higher latency, especially in environments where low latency is critical.
- SleepingWaitStrategy
- Pros: Lower CPU usage than busy spinning, suitable for light loads.
- Cons: Latency is higher than more aggressive strategies.
- YieldingWaitStrategy
- Pros: Balances CPU utilization and latency, good for medium-load scenarios.
- Cons: Can incur thread scheduling overhead.
- BusySpinWaitStrategy
- Pros: Lowest possible latency, best for extremely high-performance requirements.
- Cons: Highest CPU utilization, not suitable for all environments.
- LiteBlockingWaitStrategy
- Pros: Lower CPU usage than blocking, with moderate latency.
- Cons: Can be slower in scenarios where low-latency is a priority.
- TimeoutBlockingWaitStrategy
- Pros: Allows for timeouts, useful for scenarios where you need to take action if no events occur within a certain time.
- Cons: Similar to BlockingWaitStrategy, with the added complexity of timeout handling.
- PhasedBackoffWaitStrategy
- Pros: Combines different strategies (e.g., spinning, yielding, sleeping) based on phases of wait, adaptive to different scenarios.
- Cons: More complex, requires tuning for optimal performance.
- BlockingWaitStrategy
- Wait Strategy List
- EventProcessor and Sequence: Each consumer (or EventProcessor) in the Disruptor maintains its own Sequence. This Sequence tracks up to which point in the ring buffer the consumer has processed events. It ensures that consumers only process events that are fully published and ready.
- SequenceBarrier: In scenarios with multiple consumers, the Disruptor can track dependencies between different stages of consumers. For example, one consumer might need to wait for another to complete before it can process an event. This is managed through the SequenceBarrier
In order to publish a new event, there are 3 steps:
- The ProducerSequencer check for the next available slot(s) (we can publish more than 1 event at a time). If there is not enough available slot it will wait
- After desired slots are claimed successfully. Update the items in those slots with new events
- Update the sequence of ProducerSequencer then notify EventProcessors
Let denote:
- the current seq of producer is producerSeq
- The number of desired slots is slots
- The size of entries is bufferSize
- The minimum sequence of all consumers is minConsumerSeq
If producerSeq + slots - bufferSize > minConsumerSeq → we do not have enough slot
For example:
- producerSeq = 7
- slots = 5
- bufferSize = 8
- minConsumerSeq = 3
- producerSeq + slots - bufferSize = 4 > (3 = minConsumerSeq)
Here is the visualization of above case
It’s trivial that if it claims 5 slots, it will have to overwrite the slots which has not been processed yet (slot 4)
Because the entries is a circular array, after it reaches the end of the array, it will go back to the array beginning.
For example, we have the entries with size 16. The producer want to publish a new single event, its current sequence is 4, the current sequence of consumer (let’s assume that we only have 1 consumer for now) is 3.
Here is the simple flow.
- The producer call next() to get the available slot in entries for the new event
- The ProducerSequencer gets current sequence of the consumer - in this case is 3
- The desired producer sequence is: 4(current) + 1(claimed) = 5. it’s available → the producerSequencer returns slot 5 to the Producer
- The Producer update items at slot 5 in entries
- The ProducerSequencer update its sequence to 5 and notify the SequenceBarrier of consumer
It’s very similar to the Single producer - Single Event, but in this case, the producer will call next(number_of_slots) to request for number_of_slots from the RingBuffer
The following steps are the same as the Single producer - Single Event
Note: Actually, underneath, the next() method in Single producer - Single Event calls the next(1)
The ProducerSequencer (actually it’s the MultipleProducerSequencer) will be in charge of request and allocate the desired slot(s) for each producer, it uses CAS to deal with the write contention to the producer sequence.
The following steps are the same as above.
The EventProcessor continuously checks for new items via the SequenceBarrier. If there are new events (meaning the current sequence number of the producer(s) is larger than its own sequence number), it retrieves all items up to the current sequence of the producer(s) and then processes them.
Let’s analyze an example here:
The sequence of the consumer is 8, it nows wait for the new event in slot 9
When it gets the available sequence from the ProducerSequencer, it returns the value 12 - indicate the current sequence of the producer(s) is 12.
So, in that case, it will continue to consume all the events from 9 to 12, and update its sequence to 12 as well.
There may be instances when we need consumers to process events in a specific order. The disruptor supports this scenario.
In this case, when it get the seq of slot to process, it needs to check the minimum processed seq of all dependent consumers as well. Following is the diagram for this case.
I know that it might be confusing at first, but let’s take a look at following example, hope that it would help you understand it.
For example, we have Consumer3 (EventProcessor3) must run after Consumer1 (EventProcessor1) and Consumer2 (EventProcessor2), when it gets the seq number to process, apart from getting from the ProducerSequencer, it also needs to get seqs of Consumer1 and Consumer2 as well, in this case, they are 10 and 11 respectively. Even though the ProducerSequencer returns 12, but because Consumer1 has just consumed to seq 10, so Consumer3 can not process event larger than 10. Therefore, it will process events 9 and 10 and update its seq to 10.
It’s a long way until this, and I think that all the thing we’ve talked about makes no sense if we do not have a benchmark to show how efficient it brings to the table.
We have some scenarios to test it out
Here is the benchmark between Disruptor and ArrayBlockingQueue
You can find the benchmark code from the official repo at: disruptor/src/perftest/java/com/lmax/disruptor at v3.3 · LMAX-Exchange/disruptor (github.com)
In the latency setting, I will publish 1 million events
Benchmark ran on MacBook Pro M1, 16 GB Ram, 8-core CPU
You can find the benchmark code at: commit
Okay, LMAX build Disruptor to serve their expectation for their system, let’s see how they use it as well as break down their system.
Here’s the overview of their system before adopting Disruptor
And here’s the overview of their system with Disruptor
They use one input disruptor and multiple output disruptor. For the output disruptors, I do not have enough knowledge about what they do, so I’ll focus only on the input disruptor
Before doing the business logic processing, they have 3 things to be done (so we have 3 consumers before Business Logic Processor)
- Journaler: Persist the events in a file - it’s the event-sourcing pattern, so that when the system is crashed, after it wakes up, it can read all the events from the file then continue to process them.
- Replicator: Even though the restart time of LMAX server is quite small (less than one minute), but with the requirement of the trading platform, it’s still not acceptable, that’s why they decided to run multiple server at the same time, there will be one master node which receives the events, and some standby node which receives the replicated events from master node. all those server processes the events, but only the output from master node is accepted. That’s why we have the replicator here.
- Un-marshaller: The event from the receiver is raw, so they need to be un-marshalled. This comes to the fact that, when Un-marshaller consumer consume the event in Ring Buffer, it will mutate the field in the event, which is not a best practice, but LMAX guarantees that each field of the item in Ring Buffer is only manipulated by one parallel consumer.
The Journaler and Replicator can be executed in parallel, followed by the Un-marshaller, and finally the Business Logic Processor is the last consumer.
With Disruptor, the LMAX can handle about 6M TPS, it’s a impressive number that a system can handle.
Here is the comparison of the system performance when using ArrayBlockingQueue and Disruptor
Ref: Arcturus — Inventory Processing System - Tiki Engineering
Xây dựng kiến trúc chịu tải lớn ở Tiki - YouTube (Vietnamese)
Arcturus is a service which is in charge of processing the inventory for Tiki (one of the largest e-commerce platform in Vietnam).
The challenge of inventory processing system at Tiki
- The consistency data of inventory: The data of inventory are updated frequently by customers and warehouse operations, and it’s vital to guarantee the consistent of data (e.g there’s no item has negative quantity) since inventory is a crucial part of the system.
- The service must be able to handle large amount of transactions with low latency, especially in peak hours at sale days or hot items.
The most important thing of this service is to guarantee the correctness and the order of transaction, followed by the throughput of the system. For example, in case there is one million requests to buy just one left iPhone, there must be only one successful order and it must (or maybe should) be the earliest one that satisfy all the conditions based on the business logic.
So how did Tiki engineers managed to build a system that assure the order of request and maintain the low latency, high throughput at the same time?
This is their high level architecture
Components:
- API Gateways for HTTP Requests
- Command Queue - Kafka to receive write request - act as a persistent command layer and assure the ordering
- Leader Node Inventory Processor - single server handles all the traffic for inventory system, this is the one we’ll dive deep in the next section - which utilizes Disruptor
- Inventory Database: persistence layer
The high level flow is:
Clients or other integrated services make requests to Arcturus API, inside Arcturus, it needs to call some external services to gather the needed data, then process its main business logic (check available quantity, update count, ….), finally, it persists those changes asynchronously to database and respond to clients/integrated services.
The tough part in here is that gathering needed data from external services are I/O operations, which are slow if processing sequentially, at the same time those external calls need to be in a specific order. In order to resolve this issue, they came to Disruptor.
Let’s see how did they do that
The heart of Arcturus is the Leader Node Inventory Processor which is a single service that that handles all the traffics. The reason for that is nothing faster than processing everything in memory in a single server.
In order to deal with the ordering I/O operations, Tiki engineers decided to apply Disruptor into their system.
Here’s the high level architecture of Leader Node Inventory Processor
The consumer pull the messages from Kafka, then utilizing multiple threads to handle call to external services. Here’s the interesting part, before delegating calls to external service, the consumer claims slots in a RingBuffer , then passes that slot to those threads, after done with I/O operations, the thread writes the result to the its claimed slot on RingBuffer. Since the items in RingBuffer are ordered, the written results are ordered as well. So, by using RingBuffer, they can have multiple threads handle I/O operations at the same time, but also guarantee the their ordering.
The core of the system lies in a single-threaded business logic processor, which consumes data from the input RingBuffer and performs all necessary operations in memory. Remarkably, this setup demonstrates remarkable efficiency, capable of handling up to 1 million transactions per second on an average server.
After done processing the business logic, the system utilizes two separate RingBuffer outputs: one for data persistence and another for responding to client requests.
Now, one might inquire about the risk of data loss in case of server failure, given the reliance on in-memory operations. However, this concern is mitigated by Arcturus' strategic use of Kafka. When persisting data to the database, the system also logs the offset at which processing occurred. In the event of a server outage, upon restoration, the system simply retrieves the last offset from the database and resumes processing data from that point onward.
Here’s their benchmark
disruptor-docs Archives - Trisha Gee
LMAX - How to Do 100K TPS at Less than 1ms Latency - InfoQ
Mechanical Sympathy: Memory Barriers/Fences (mechanical-sympathy.blogspot.com)
High-Performance Concurrency with LMAX Disruptor (youtube.com)
Blogs And Articles · LMAX-Exchange/disruptor Wiki (github.com)