Disruptor 高性能队列

背景

记录一下 disruptor 的学习笔记

Disruptor 例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.ThreadFactory
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}

object DisruptorTest {

val disruptor = {
val factory = new EventFactory[Event] {
override def newInstance(): Event = Event(-1)
}

val threadFactory = new ThreadFactory(){
override def newThread(r: Runnable): Thread = new Thread(r)
}

val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy())

disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)

disruptor
}

val translator = new EventTranslatorOneArg[Event, Int]() {
override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {
event.id = arg
println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")
}
}

def main(args: Array[String]): Unit = {
disruptor.start()
(0 until 100).foreach { i =>
disruptor.publishEvent(translator, i)
}
disruptor.shutdown()
}
}

case class Event(var id: Int) {
override def toString: String = s"event: ${id}"
}

object TestHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}

object ThenHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}

源码阅读

disrutpor 初始化

先看 Disruptor 构造方法

1
2
3
4
5
6
7
8
public Disruptor(final EventFactory<T> eventFactory, 
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}

在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static <E> RingBuffer<E> create(ProducerType producerType, 
EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
switch (producerType) {
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}

public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize,
WaitStrategy waitStrategy) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

return new RingBuffer<E>(factory, sequencer);
}

RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();

if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}

this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}

private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}

消费事件消息

首先看 disruptor.start(): 消费事件消息入口

1
2
3
4
5
6
7
8
9
10
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();

public RingBuffer<T> start() {
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository) {
consumerInfo.start(executor);
}

return ringBuffer;
}

consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return createEventProcessors(new Sequence[0], handlers);
}

EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();

final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];

final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}

consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}

updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

回头看 disruptor.start() 中的 consumerInfo.start(executor)
executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时, 每个 consumer 被分配一个 new thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}

@Override
public void start(final java.util.concurrent.Executor executor){
//EventProcessor extends Runnable
//executor = BasicExecutor
executor.execute(eventprocessor);
}

public final class BatchEventProcessor<T> implements EventProcessor {
@Override
public void run() {
if (running.compareAndSet(IDLE, RUNNING)) {
sequenceBarrier.clearAlert();

notifyStart();
try {
if (running.get() == RUNNING) {
processEvents();
}
} finally {
notifyShutdown();
running.set(IDLE);
}
} else {
if (running.get() == RUNNING) {
throw new IllegalStateException("Thread is already running");
} else {
earlyExit();
}
}
}
}

private void processEvents() {
T event = null;
long nextSequence = sequence.get() + 1L;

while (true) {
try {
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}

while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}

sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (running.get() != RUNNING) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}

executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法

问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?

我们看 processEvents 方法执行逻辑

  1. 先获取 BatchEventProcessor.sequence 并 +1

  2. 通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence

  3. 先看下 BlockingWaitStrategy.waitFor 的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
     public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, 
    SequenceBarrier barrier)
    throws AlertException, InterruptedException {
    long availableSequence;
    if (cursorSequence.get() < sequence) {
    lock.lock();
    try {
    while (cursorSequence.get() < sequence) {
    barrier.checkAlert();
    processorNotifyCondition.await();
    }
    }
    finally {
    lock.unlock();
    }
    }

    while ((availableSequence = dependentSequence.get()) < sequence) {
    barrier.checkAlert();
    }

    return availableSequence;
    }

    如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待
    否则 就用 dependentSequence 作为 availableSequence 返回
    然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值

  4. dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    final class ProcessingSequenceBarrier implements SequenceBarrier {
    private final WaitStrategy waitStrategy;
    private final Sequence dependentSequence;
    private volatile boolean alerted = false;
    private final Sequence cursorSequence;
    private final Sequencer sequencer;

    ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
    final Sequence cursorSequence, final Sequence[] dependentSequences) {
    this.sequencer = sequencer;
    this.waitStrategy = waitStrategy;
    this.cursorSequence = cursorSequence;
    if (0 == dependentSequences.length) {
    dependentSequence = cursorSequence;
    } else {
    dependentSequence = new FixedSequenceGroup(dependentSequences);
    }
    }
    }

    在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)
    createEventProcessors 仅会被 Disruptor.handleEventsWithEventHandlerGroup.handleEventsWith

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    public class Disruptor<T> {
    public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
    return createEventProcessors(new Sequence[0], handlers);
    }

    EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers) {
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
    final EventHandler<? super T> eventHandler = eventHandlers[i];

    final BatchEventProcessor<T> batchEventProcessor =
    new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

    if (exceptionHandler != null) {
    batchEventProcessor.setExceptionHandler(exceptionHandler);
    }

    consumerRepository.add(batchEventProcessor, eventHandler, barrier);
    processorSequences[i] = batchEventProcessor.getSequence();
    }

    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    }
    }

    public class EventHandlerGroup<T> {
    private final Disruptor<T> disruptor;
    private final ConsumerRepository<T> consumerRepository;
    private final Sequence[] sequences;

    EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository,
    final Sequence[] sequences) {
    this.disruptor = disruptor;
    this.consumerRepository = consumerRepository;
    this.sequences = Arrays.copyOf(sequences, sequences.length);
    }

    public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
    return disruptor.createEventProcessors(sequences, handlers);
    }

    public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) {
    return handleEventsWith(handlers);
    }
    }

    EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence
    demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler)
    通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler
    这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行

生产事件消息

接着看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放数据,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {
final long sequence = sequencer.next();
translateAndPublish(translator, sequence, arg0);
}

private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
try {
translator.translateTo(get(sequence), sequence, arg0);
} finally {
sequencer.publish(sequence);
}
}

public E get(long sequence) {
return elementAt(sequence);
}

get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象
translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位
waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息

1
2
3
4
5
6
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@Override
public void publish(long sequence) {
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}

总结

流程理清楚了,我们看看 知识点

  • ringbuffer
    • 内存使用率很高,不会造成内存碎片,几乎没有浪费。业务处理的同一时间,访问的内存数据段集中。
      可以更好的适应不同系统,取得较高的性能。内存的物理布局简单单一,不太容易发生内存越界、悬空指针等 bug,出了问题也容易在内存级别分析调试。
      做出来的系统容易保持健壮。
  • cpu cache
    • CPU 访问内存时会等待,导致计算资源大量闲置,降低 CPU 整体吞吐量。
      由于内存数据访问的热点集中性,在 CPU 和内存之间用较为快速而成本较高(相对于内存)的介质做一层缓存,就显得性价比极高了

常见问题

  • distruptor 框架导致 CPU 高

    • 当消费者的消费速度大于生产者时,消费者有不同的 WaitStrategy 进行等待, 构建 disruptor 的时候,可以指定等待策略

      1
      new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy())

      下面是 WaitStrategy 的一些实现类的简介 对应类上的comment

      • BusySpinWaitStrategy
        • 将线程绑定在 CPU 核上,减少线程切换,轮训获取 ringbuffer 上的数据
      • PhasedBackoffWaitStrategy
        • 通过先自旋,后 yield 让出 CPU,最后通过自定义的 fallbackStrategy 来实现当代
      • BlockingWaitStrategy
        • 通过 lock 来实现等待,ringbuffer中有消息后 会通过 signalAll 来唤起锁进行消费
      • TimeoutBlockingWaitStrategy
        • 通过带有超时时间的 lock 来实现等待,过了超时时间会 throw exception
      • SleepingWaitStrategy
        • 通过先自旋,后 yield 让出 CPU,最后 lock 的方式来进行等待,这个锁可以减少锁的唤起带来的损耗
      • YieldingWaitStrategy
        • 通过先自旋,后 yield 让出 CPU 的方式来进行等待

参考资料

https://github.com/LMAX-Exchange/disruptor/wiki/Introduction