Saturday 31 December 2011

Concurrent Sequencing

A few weeks ago one of the users of the Disruptor posted some worrying benchmarks:

ThreePublisherToOneProcessorSequencedThroughputTest
  run 0: BlockingQueue=645,161 Disruptor=1,772 ops/sec
  run 1: BlockingQueue=1,250,000 Disruptor=20,000,000 ops/sec
  run 2: BlockingQueue=1,250,000 Disruptor=56 ops/sec

It appears under heavy contention with fewer available cores than busy threads the Disruptor can perform terribly. After a bit of investigation I managed to isolate the problem. One of the most complex parts of the Disruptor is the multi-threaded claim strategy. It is the only place in the Disruptor where - out of necessity - we break the single-writer principal.

The approach that we used was very simple. Each thread claims a slot in the ring buffer using AtomicLong.incrementAndGet(). This ensures that each claim will return a unique sequential value. The complexity arrives when the multiple threads try to publish their sequence. We require that all events placed in the ring buffer must be made available to event processors in a strictly sequential order. To ensure this behaviour we have a method called serialisePublishing(). Our simple implementation would have the thread that is publishing busy spin until the last published sequence (the cursor) is one less the value being published.

This works because each sequence published is unique and strictly ascending. For example if one thread wants to publish the value 8, it will spin until the cursor value reaches 7. Because no other thread will be trying publish value 8 it can make progress and ensuring the sequential publishing behaviour in the process. However, this busy spin loop causes problems when there are more threads than cores. The threads that need wait for the prior sequences to be published can starve out the thread that should be updating the cursor. This leads to the unpredictable results shown above.

We need a better solution. In an ideal world there would be a Java API that would compile down to the Intel MONITOR/MWAIT instructions, unfortunately they're limited to Ring 0, so require a little kernel assistance to be useful. Another instruction (but unavailable in Java) would be the Intel PAUSE instruction that could used in the middle of the spin loop. One of the problems with busy loops on modern processors is in order keep the pipeline full the CPU may speculatively execute the condition at the top of the loop, causing an unnecessarily high number of instructions to fill the CPU pipeline. This can starve other logical threads of CPU resources. The PAUSE instruction on hyper-threaded Intel processors can improve this situation.

Java has neither of those, so we need to go back and address the shape of the serialisePublishing method. For the redesign I drew some inspiration from Cliff Click's non-blocking hash map. There 2 aspects of his design that are very interesting:

  • No locks
  • No CAS retries

While the first is obvious, the second is trickier. Any one familiar CAS operations will have seen the traditional loop until success approach to handling concurrent updates. For example the incrementAndGet method inside the AtomicLong in Oracle's JVM uses a similar loop. It could look something like1:

While there are no locks here it is not necessarily wait-free. It is theoretically possible, if a number of threads are trying to increment the value, for one (or more) of the threads to get stuck unable to make progress if other threads are constantly winning the race to the atomic operation. For an algorithm to be wait free all calls must complete within a fixed number of operations. One way to get closer to a true wait free algorithm is to design the algorithm such that a failure of a CAS operation is a signal to exit rather than to retry the operation. Cliff Click's approach was to model the algorithm using a state machine, where all states are valid and a transition between states is typically a CAS operation. E.g. image a state machine with 3 states {A, B, C} and 2 transitions {A->B, B->C}. If a instance of the state machine is in state A and 2 threads try to apply the transition A->B only one will succeed. For the thread that fails to apply its CAS operation retrying the operation makes no sense. The instance has already transitioned to state B. In fact the failure of CAS operation is an indication that the instance is already in the desired state. The thread can exit if B is the desired state of the action or try to apply the B->C transition if that is what's required.

How does this apply to our concurrent sequencing problem? We could allow threads to continue to make progress while waiting for other threads to catch by maintaining a list of sequences that are pending publication. If a thread tries to publish a sequence that is greater than 1 higher than current cursor (i.e. it would need to wait for another thread to publish its sequence) it could place that sequence into the pending list and return. The thread that is currently running behind would publish its own sequence, then check the pending list and publish those sequences before exiting.

To represent this as a state machine we would have 3 states {unpublished, pending, published} and 2 transitions {unpublished->pending, pending->published}. In recognition of the fact that computing resources are finite, we have a guard condition on the unpublished->pending transition. I.e. a limit on number of sequences we allow in the pending state. Because each sequence is unique, the transition unpublished->pending does not require a CAS operation. The pending list is represented as an AtomicLongArray and the transition is a simple AtomicLongArray.set() where the index is the sequence modulo the size of the pending list2. The final transition pending->published is where the CAS operation comes in. The thread will first try to publish its own sequence number. If that passes then the thread will try to publish the next value from the pending list. If the CAS fails the thread leaves the method. The failure means that the value is already published or will be by some other thread.

Running the multi-publisher performance test on my 2-Core laptop (where at least 4 threads would normally be required):

ThreePublisherToOneProcessorSequencedThroughputTest
  run 0: BlockingQueue=5,832,264 Disruptor=6,399,590 ops/sec
  run 1: BlockingQueue=5,521,506 Disruptor=6,470,816 ops/sec
  run 2: BlockingQueue=5,373,743 Disruptor=6,931,928 ops/sec

Sanity restored.

This update will be included in the 2.8 release of the Disruptor as the default implementation of the MultiThreadedClaimStrategy. The old implementation will still be available as MultiThreadedLowContentionClaimStrategy. Those who have plenty of cores where the publishers aren't often contented may find the old implementation faster, which it should be as it is simpler and requires fewer memory barriers. I'm going to continue to revise and work on this code. While improved, it is not truly wait free. It is possible for one of the threads to get stuck doing all of the publishing.

1 The AtomicLong.getAndIncrement() does use a slightly different loop structure by the semantics are the same.
2 Actually it's a mask of the sequence and the pending list size minus 1. This is equivalent when the size of the pending list is a power of 2.

6 comments:

Olivier Deheurles said...

Hi Mike,

Just a couple of remarks/questions:
- don't you think the array can cause false sharing issues? With this algorithm 2 or more threads will be updating the array elements next to each other and often on the same cache line. AtomicPaddedLongArray? A lot of wasted cache space 
- I talked about that some time ago with Martin: instead of having a single ring buffer imagine you have one per producer thread. One mediator thread is used 'in the middle' to read from all those ring buffers and to produce to the main ring buffer.

P1 -> RB1 ->
P2 -> RB2 -> Mediator -> MAIN RING BUFFER -> CONSUMERS
P3 -> RB3 ->

This setup respects the single writer principle: each producer commits to its own ring buffer, only the mediator thread reads from multiple ones, which is not an issue (as long as you don’t have too many producer buffers).

I’m not saying this is the way to go but maybe it could be just an additional strategy?

Olivier

Michael Barker said...

Hi Olivier,

1. False Sharing:

Yes I think that it can, I did try some time ago with padding out the array, but didn't see much difference. I should retest it again. I think the CAS instruction outweighs the cost of false sharing.

2. Multiple ring buffers

The original poster tried something similar (https://groups.google.com/d/msg/lmax-disruptor/w8CJB2nL9bs/32JWjcsi7nIJ) without much success. I think it is work revisiting. If you wanted to prototype it in C# I can port it back if it works well.

satch said...

Hi Mike - hows if going? I like the blog.

I downloaded the disruptor - it looks good (though I don't profess to understand much of it at this stage!).

I looked at this post and it intrigued me as it is a good example of thread starvation. I had a quick go to look at the difference in speeds between the two approaches, and I got a surprising result. On my quad-core Intel Core i3 350M I found that the old solution was quicker than your new solution by quite a bit:

OLD

ThreePublisherToOneProcessorSequencedThroughputTest run 0: BlockingQueue=4,367,956 Disruptor=12,572,290 ops/sec
ThreePublisherToOneProcessorSequencedThroughputTest run 1: BlockingQueue=4,362,240 Disruptor=12,109,469 ops/sec
ThreePublisherToOneProcessorSequencedThroughputTest run 2: BlockingQueue=4,343,293 Disruptor=12,210,012 ops/sec

NEW

ThreePublisherToOneProcessorSequencedThroughputTest run 0: BlockingQueue=4,465,880 Disruptor=7,716,049 ops/sec
ThreePublisherToOneProcessorSequencedThroughputTest run 1: BlockingQueue=4,399,665 Disruptor=7,774,840 ops/sec
ThreePublisherToOneProcessorSequencedThroughputTest run 2: BlockingQueue=4,421,452 Disruptor=7,726,781 ops/sec

It looks like the old solution is quite a bit better when you have enough cores for the threads.

So I did a couple more experiments and found that if I just insert a Thread.yield() into the while loop of the old solution, then I get roughly equivalent performance to the new solution:

ThreePublisherToOneProcessorSequencedThroughputTest run 0: BlockingQueue=4,312,203 Disruptor=7,299,802 ops/sec
ThreePublisherToOneProcessorSequencedThroughputTest run 1: BlockingQueue=4,305,705 Disruptor=7,989,773 ops/sec
ThreePublisherToOneProcessorSequencedThroughputTest run 2: BlockingQueue=4,373,114 Disruptor=8,105,698 ops/sec

If I then add a 1000 RETRIES before yielding (as you did for the new solution) then I got almost equivalent performance as for no yield (presumably 1000 cycles is enough for the other threads to have updated the sequence most of the time):


ThreePublisherToOneProcessorSequencedThroughputTest run 0: BlockingQueue=4,443,457 Disruptor=10,262,725 ops/sec
ThreePublisherToOneProcessorSequencedThroughputTest run 1: BlockingQueue=4,471,671 Disruptor=11,036,309 ops/sec
ThreePublisherToOneProcessorSequencedThroughputTest run 2: BlockingQueue=4,581,061 Disruptor=10,498,687 ops/sec

Anyhow, just a couple of experiments. I've got a flat in London now - fancy a beer some time?

Doug

Michael Barker said...

Hi Doug,

That's pretty consistent with our experience. If you have sufficient cores a simple spin loop works great. When starved for cores, you have a pay a cost to ensure that all threads get their fair share of CPU time.

I assume that you've got 4 physical cores. On my 2-Core + HT laptop (4 logical cores) the new version runs faster. That may be because the heavy spin loop is filling up the pipeline starving out the other hyper threads.

Definitely up for a beer. Drop me an email to sort out a time.

Mike.

satch said...

Well, I thought it was a quad-core, but it looks like it is a dual-core with HT. So now I'm not sure why your laptop and mine are giving such different results.

BTW - did you try using the old solution, but just inserting a yield like so:

final long expectedSequence = sequence - batchSize;
int counter = RETRIES;
while (expectedSequence != cursor.get()) {
if (--counter == 0) {
Thread.yield();
counter = RETRIES;
}
}
cursor.set(sequence);

Michael Barker said...

We've tried in the past with limited success. The results tend to be less predictable. Sometimes it works well sometimes it's just as bad as a busy spin. Relying on more consistent forward progress rather than spinning (even with a yield) when contended should give more predictable results in more cases.

More than anything it highlights the complexity involved and the performance trade-offs that need to be made when the single writer principal is violated.

Cliff Click explains why yield can cause a lot of pain for the OS scheduler. (http://www.azulsystems.com/blog/cliff/2011-09-23-a-pair-of-somebody-elses-concurrency-bugs). The new approach should be friendlier in this regard as it allows more progress to be made within each thread's quantum.