Disruptor 高性能队列
背景
记录一下 disruptor 的学习笔记
Disruptor 例子
1 | import java.util.concurrent.ThreadFactory |
源码阅读
disrutpor 初始化
先看 Disruptor 构造方法
1 | public Disruptor(final EventFactory<T> eventFactory, |
在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面
1 | public static <E> RingBuffer<E> create(ProducerType producerType, |
消费事件消息
首先看 disruptor.start(): 消费事件消息入口
1 | private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>(); |
consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链
1 | public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { |
回头看 disruptor.start() 中的 consumerInfo.start(executor)
executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时, 每个 consumer 被分配一个 new thread
1 | public Disruptor( |
executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法
问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?
我们看 processEvents 方法执行逻辑
先获取 BatchEventProcessor.sequence 并 +1
通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence
先看下 BlockingWaitStrategy.waitFor 的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence,
SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
if (cursorSequence.get() < sequence) {
lock.lock();
try {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally {
lock.unlock();
}
}
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
}
return availableSequence;
}如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待
否则 就用 dependentSequence 作为 availableSequence 返回
然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19final class ProcessingSequenceBarrier implements SequenceBarrier {
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final Sequencer sequencer;
ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
final Sequence cursorSequence, final Sequence[] dependentSequences) {
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) {
dependentSequence = cursorSequence;
} else {
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
}在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)
createEventProcessors 仅会被 Disruptor.handleEventsWith 和 EventHandlerGroup.handleEventsWith1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52public class Disruptor<T> {
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
}
public class EventHandlerGroup<T> {
private final Disruptor<T> disruptor;
private final ConsumerRepository<T> consumerRepository;
private final Sequence[] sequences;
EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository,
final Sequence[] sequences) {
this.disruptor = disruptor;
this.consumerRepository = consumerRepository;
this.sequences = Arrays.copyOf(sequences, sequences.length);
}
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return disruptor.createEventProcessors(sequences, handlers);
}
public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) {
return handleEventsWith(handlers);
}
}EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence
demo 例子中 disruptor.handleEventsWith(TestHandler).then
(ThenHandler)
通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler
这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行
生产事件消息
接着看 disruptor.publishEvent(translator, i)
就是往 ringBuffer 里面放数据,
1 | public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { |
get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象
translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位
waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息
1 | protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); |
总结
流程理清楚了,我们看看 知识点
- ringbuffer
- 内存使用率很高,不会造成内存碎片,几乎没有浪费。业务处理的同一时间,访问的内存数据段集中。
可以更好的适应不同系统,取得较高的性能。内存的物理布局简单单一,不太容易发生内存越界、悬空指针等 bug,出了问题也容易在内存级别分析调试。
做出来的系统容易保持健壮。
- 内存使用率很高,不会造成内存碎片,几乎没有浪费。业务处理的同一时间,访问的内存数据段集中。
- cpu cache
- CPU 访问内存时会等待,导致计算资源大量闲置,降低 CPU 整体吞吐量。
由于内存数据访问的热点集中性,在 CPU 和内存之间用较为快速而成本较高(相对于内存)的介质做一层缓存,就显得性价比极高了
- CPU 访问内存时会等待,导致计算资源大量闲置,降低 CPU 整体吞吐量。
常见问题
distruptor 框架导致 CPU 高
当消费者的消费速度大于生产者时,消费者有不同的 WaitStrategy 进行等待, 构建 disruptor 的时候,可以指定等待策略
1
new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy())
下面是 WaitStrategy 的一些实现类的简介 对应类上的comment
- BusySpinWaitStrategy
- 将线程绑定在 CPU 核上,减少线程切换,轮训获取 ringbuffer 上的数据
- PhasedBackoffWaitStrategy
- 通过先自旋,后 yield 让出 CPU,最后通过自定义的 fallbackStrategy 来实现当代
- BlockingWaitStrategy
- 通过 lock 来实现等待,ringbuffer中有消息后 会通过 signalAll 来唤起锁进行消费
- TimeoutBlockingWaitStrategy
- 通过带有超时时间的 lock 来实现等待,过了超时时间会 throw exception
- SleepingWaitStrategy
- 通过先自旋,后 yield 让出 CPU,最后 lock 的方式来进行等待,这个锁可以减少锁的唤起带来的损耗
- YieldingWaitStrategy
- 通过先自旋,后 yield 让出 CPU 的方式来进行等待
- BusySpinWaitStrategy
参考资料
https://github.com/LMAX-Exchange/disruptor/wiki/Introduction