面向 CTRL C + V 的编码工具人。
说到队列,大家都很熟悉,像生活中不管是吃饭还是买东西基本上都会遇到排队,先排队的人先付款,不允许插队!先进先出,这就是典型的“队列”。
简单回顾 jdk 里的队列
1. 阻塞队列:
ArrayBt Q j Alocky r PingQueue: Object[] + count + lock.condition (notEmpty、nL ) 9 @otFQ a lull)。
- 入队:
- 不阻塞:add、offer 满了直接报错。
- 阻塞:put 满F , ? / T j q ~ g了:notFu( ` 4 U - 1l! f :l.await()(当出队和删除元素时唤醒 put 操作)。
- 出队:
- take():当空时,notEmp@ w s _ @ Zty.aj x J R + +wait()(当有元素入队时唤醒)。
- poll():当空时直接返回 null。
LinkedBlockin{ H 3 e a ygQueue:Node 实现、加锁(读锁、写锁分离)、可选的有界队列。需要考虑实际使用中的内存问题,防止溢) & C出。
应用:
EexcutoK K i xrs 默h t C 8认是使用 LinkedBlockin8 L 9 -gQueue,但是在实际应用中,更应该手动创建B @ K b : D 0线程池使用有界@ P ! G @队列,防止生产者生产过快,导致内存溢出。
2. 延迟队列:
DelayQl o k P _ S 6ueue : PE / d j F b 7 priorityQueue (优先级队列) + Lock.condition (延迟等待) + leader (避免不必要的空等待)。
主要方法:
- getDelay() 延迟时间。
- compareTo() 通过该方法比较从 PriS C q u :orityQueue 里取值。
- 入n ] s x ! f n {队:
- add、put、offer:入队时会将换唤醒等待中的线程,进, { 3 d P B U c行一次出队处理。
- 出队:
- 如果队列里无数据,元素入队时会被唤醒。
- 有数据,会阻塞至时间满足。
- take()阻塞:
- poll():满足队列有数据并且 delay 时间不大于0会取出元素,否则立即返回 nE H r @ f , $ t xull 可能会抢占成为 leader。
还有优先级队列等就不一一细说,有兴趣的同学可以去看一下。
应用:
延时任务:设置任务延迟多久执行;需要设置过期值的处理,例如缓存过期。
实现方式:每次 getDealy() 方法提供一个缓存创建时间与当前时间的差值,出队时 compareTo() 方法取差值最小的。每次入队时都会q 6 T [ o重新取出队列O , L里差值最小的值进行处理。
我们使用队列的,更多的是像生产E E C C !者、消费者这种场景。这种场景大多数情况又对处理速度有着要求,所以我们会使用多_ I L线程技术。使用多线程就可能会出现并发,为了避免出错,我们会选择线程安全的队列。例如 ArrayBlockingQueue、Linke( @ $ R M ) l P 5d! a 8 ` . l 9 uBlockingQueue 或者是 ConcurrentLinkedQueue。前俩者是通过加锁取实现,后面一种是通过 cas 去实现线程安全。但是又要考虑到生产者过快可能造出的内存溢出的问题,所以看起来 ArrayBlockingQueue 是最符& R + Q } r合要求的。但是恰恰加锁效率又会变慢,所以就引出了我们今天讨论的主题:Disruptor !
Disruptor
介绍
Martin Fowler 在自己网站上写了一篇 LMAX 架构的文章,在文章中他介绍了 LMB _ w IAX 是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在 JVMj q 8 + k 平台上,其核心是9 ^ | v s v一个S Y 3 U S W f业务逻辑处理器,它能够在一个线程里每秒处理 600 万订单。使用事件源驱动方式,业务逻辑处理器的核心是 Disruptor 。w k U } . w 7 e
为什么说 Disruptor 的性能要更优于 ArrayBlockingQuc Z Heue,有什么根据吗?先不探究原理,先看一段代码。
比f @ p ~ ( ; z c u较:ArrayBlockingQueue VS Disruptor
上面俩幅图分别是比较 100000000 条数据下,ArrayBlockingQueue 和 Disruptor 的存取效率。 同样我也比较了:
- 当数据为 10000000 条时,Disruptor 为 1101ms,ArrayBlockingQueue 为 2782ms;
- 当数据为 50000000 条时,Disruptor 为 5002ms,ArrayBlockingQueue 为 13770ms;
代码O & 4 L使用的都是单生产者、单消费者。当使用多线程的时候,Disruptor 还会更快。所以可以看出,Disruptor 在性能上是优于 ArrayBlockingQueue 的。
接下来我们来看一下 DisrupQ Y 7 ktor 是如何做到 无阻塞、多生产、多消费 的。
上图简单的画了一下构建 Diw N ` ~sruptor 的各h = r个参数以及 ringBufV ! ! @ p #fer 的构造,下面简单的说一下。
- EventFactory:创建事件(任务)的工厂类。(这里任务会创建好,保存在内存中,可以看做是一个空任务)。
- ringBufferSize:容器的长度。( Disruptor 的核心容器是 ri* & E r ? 7 S ;ngBuffer,环转数组,有F 7 I ! C t j i 0限长度)。
- Execuk r 8tor:消费者线程池? $ q r 8,执行任务的线程6 A v T H d N。(每一个消费者都需要从线程池里获得线程去消费任务)。
- Prod- 2 IuctType:生产者类型:单生产者、多生产者。
- WaitStrategy:等待策略。(当队列里的数据都被消费完之后,消费者和生产者之间的等待策略)。
- RingBuffer:存放数据的容器。
- Even~ s n * F A 8 AtHandler:事件处理器。
Show me the code
下面简单看一下 Dc I 0 $isrupt3 n g ; 3or 的示例代码。
- 主类:定义Disruptor
p# S K 5 dublic class TMainDisruptor {
public static void main Q n(Strin- f / U Cg[] args) throws Interrup2 r 0 G = 4 c ctedException {
//创建生产者工厂
TEventFactory eventFactory =t , w _ n 5 G ! new TEventFactory(F - . # M b)。
int ringbuffersize = 1024 * 1024。
ExecutorService executorService = Executors.newFixedThreadPool$ b j s 0(2)。
/**
* 实例化一个 Disruptor,_ & wDisruptor 本身并不做为生产者或是消费者,它更多像是一个包装器,将真正核心的生产者、消费者以及生产消费的动作以及容器串起来
*
* 1、消息工厂i r [ 9 ; T对象
* 2、容 * j S % o . D J器的长度
*u t W 3、线程池# E A # @ g D @ t
* 4、生产者模式
* 5、等待策略
*/
Disruptor<OrderEvent> disruptor = new Disruptor(eventFactory, ringbuffersize, executorService, ProducerType.SINGLE,
new BlockingWaitStrategy())。
//这里定义了俩个相同事件
TEventHandler t1 = new TEventHandler()。
TEventHandler t2 = new TEventHandler()。
//跟消费者建立关系--监听
disruptor.handleEventsWith(t1)。
//顾名思义:执行完t1后执行t2。(对同一个任务线性执行)
disruptor.} Q 1 fafP } X / G nte7 / a Fr(t1).han~ E P Y h q b 6dleEf 9 ^ M 7 KventsWith(t2)。
//启动
disruptor.start()。
//数据存储工具
RingBuffer ringBuffer = disruptor.getRin2 e ! ; c O u 9 wgBuffer()。
//创建生产者
TEventProducer producere l J O c _ = new TEventProducer(ringBuffer)。
//投递数据
for(lm F G S 8ong i=0。i<10000。i++) {
producer.sendData(i)。
}
executorServicW o me.shutdown()。
disruptor.shutdown()。
}
}
- 实例工厂
public class TEventFacm y ytory implemene b ) c o p ? qt^ r os EventFactory<OrderEvent> {
@Overr# D Zide
public OrderEvent newInstance() {
//实例化数据(建好空数据,等后面取的时候可以直接用)
rx P _eturn new OrderEvent()。
}
}
- 对象
@Data
public class OrderEvent {
private Loj z - 8 nng id。
private String price。
private String finalPrice。
}
- 消费者执行事件:任务执行体
public( L a M O v j @ class TEventHandler implements EvenE U + ? U S 2 u ctHandler<OrderEvent> {
/**
* 事件驱动监听--消费者消费的主体
*/
@+ L M ~ aOverride
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
//简单打印一下当前事件ID和执行线程的名称
System.out.println(event.getId() + \" \" +Thread.currn / U % v gentThread().getName())。
}
}
- 生产者
@Data
@AllArgsConstructor
public class TEv3 x [ % # D T 4entProducer {
private RingBuffer&la : ct;OrderEvent>v 8 R V W ringBuffer。
public void sendData(long id) {
//获取下一个可用序号
long sequence = ringBuffer.next()。
try {
//获取一个空对象(没有填充值)
Or T 0 = G 0 U mrderEvent orderEvent = ringBuffer.get(sequence)。
//赋值
orderEvent.sets | R Q nId(id)。
}finally {
//提交
ringBuc 3 J I * ! yffer.publish(sequence)。
}
}
}
以上代码就是一个简单的 Disruptor 的 demo 示例。运行代码就可以看到 handler 打印数据。demo 跑起来后,就可以进行程序员最爱的 debug 大法了。
启动过程分析之消费者
在 TMainDisruptor 类的 main 方法里,定义完 Disruptor 并关联好任务处理事件后,| @ % 7 T v就调用了 disruptor.start() 方法,可以z ? q i P W l看出在调用了f 4 7 h Y X # g r start() 方法后,消费者线程就已经开启。
- startu 6 G S Q n() -- 开启 Disruptor,运行事件处理器。
public RingBuffer<k U ) n y NT> startO r c - ; v a()
{
checkOnlyi J P E k L @StartedOnce(x * X Y s a)。
//G ! w =在前面 handleEventsWith() 方法里添加的 handler 对象会加入到 consumerRepository 里,这里遍历 consumerRepository 开启消费者线程
for (final ConsumerInfo consumerI+ z _nfo : consumerRepository)
{s _ M
//从线程池中获取一个线程来开启消费事件处理器。(消费者开启监听,一旦有生产者投递,即Q , { L : V可消费)
//这里开启的线程对象为 BatchEventProcessor 的实例
consumerInfo.start(executor)。
}
return ringBuffer。
}
- hL ] E P I DandleEventsWith()--> createEvS & N Ven0 L ptProcessors(; k x @ e j # j) -- 调用的核心方法,作用是创建事件处理器。
@Safe, w U i } d A v rVarargs
pub^ S s h ) j 4 | +lic final EventHandlerGrj : $ W C _ ~ 8 coup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers)。
}
EventHandlerGroup<T> createEventProcessors(
final Sequence[: p R A I | : [] barrierSequences,
fX , ) 0 9 !inal EventHandler<? super T>[] eventHandlers)
{
...
final Sequence[] processorSequences = new Sequence[eventHandlers.length]。R E 8 , N ( , : N
//创建 sequence 序号栅栏
final SequenceBarrier barrier = ringBuffer.newk 0 { zBarrier(barrierSeque= q / 3 z 0 2 inces)。
for (int i = 0, eventHandlersLength = eD J ! v L Uvent= 4 a - , v r cHandlers.length。i < eventHandlersLength。i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i]^ v ? # 9 w o $。
final BatchEven| _tProcessor<T> batcZ ) ^ D %hEvk , 5 o I v * | ;entProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler)。
...
//这里将消费者加入到 consumerReposito? s # k { M C ! ry 中---ConsuD A bmerRepository
consumerRepository.add(batchEventProcessor, eventHandler, barrier)。
processorSequences[i] = batchEventProcessor.getSequence()。
}
...
}
在看上面的 handleEventsWith() 方法中,可以看到构建了一个 BatchEventProc. y g n N @ % %essor(继承了 Runnable 接口)对象,start()方法启动的同样也是这个对象的实例。这个对象继承自 EventProcessor ,EvW u DentProcessor 是 Disruptor 里非常核心的一1 3 ? 0 3个接口,它的实现Q x . X类的作l w N用是轮询接收 RingBuffer 提供的事件,并在没有可处理事件是实现等待策M ^ ~ A S ^ 8略。这个接口的实现类必须要关联一个线程去执行,通常我R 8 c Q ^ m们不需要自己去实现它。这里主要说一下它的默认实现类 BatchEventProcessor 类。
BatchEventProcessor:主要事件循环,处理 Disruptor 中的 event,拥有消费者的 Sp I F h L 7 T Mequence。它的核心主要主要包含以下:
- 核心私有成员变量
- SeZ B O Tquence sequence :维护当前消费者消费的 ID。
- SequenceBarr$ : n M Xier sequence( J 4 o z dBarrier :序号屏障,协调消费者的消费 ID,主要作用是获取消e K K U g费者的可用序号,并提供等待策略的执行。
- EventHandler<? super T>| d 8 & u; eventHandler :消费者的消费逻辑(也就是我们实现的业务逻辑)。
- DataProvider dx F 5 4 1ataProvider :获取消费对象。RingBuffer 实现了此接口,主要 u $ s m @是提供业务对象(例如上面代码中的 OrderEvent )。
- 核心方法
- processEvents():由于 BatchEb l ? h 7 DventProcessor 继承自 Runnable 接口,所3 - ! 5 7 g以在前面启动它后,run() 方法会执行,2 3 7 ( m g而 run(e l 8 I f A S l W) 方法内部则会调用此方法。
private void p` U 8 c * k X i IrocessEvents()
{
T event = null。
////获取当前消费者维护的序号中并+1,即下一个消费序号
long nextSequence = sequenc^ h 2 s o Je.get() + 1L。
while (true)
{
try
{
//获取可执行的最大的任务 ID,如果没有,waitFor() 方法内会进行等待
final long a` G .vailableSequence =- / W = 2 % i R seque$ ! f UnceBarrier.waitFor(nextSequence)。
if (batchStartAware != nu~ I H j $ d Y Kll &&H # | , @ M D h; availableSequence >T N 5 q ) c= nextSequence)
{
batchStarD _ R }tAware.onBatchStart(availableSequence - nextSequence + 1)。
}
//不断获取对应位置的任务进行消费 直到上面查 s 询到的 availableSequence 消费完
w7 & % r q i % # hile (nextSequence <= availableSeq- _ Auence)
{
event = dataProvider.get(nextSequence)。
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequeD M S [ $ ] B lnce)。
nextSequence++。
}
sequen7 x D vce.set(availableSequence)。
}
...
}
}
以上代码片段中消费者事件处理器的核心代码,sequenceBarrier.waitFor(nextSequence) 方法B l k 7 &内部,会比较当前消费者序号与可用序号的大小,当可用序号(availableSequence)大于当前消费者序号(nextSequence),再获取到当前可用的最大的事件序号 IQ 7 RD(waitFot()方法内部调用 sequencer.getHighestPublishedSeq] E 2 | f ? I ~uence(sequenceU ; 9 a ! q 2, availableSequence)),进行循环g H i B b Z I ]消费。可用序号是维护在 ProcessingSequenceBarrier 里的,ProcessingSequeJ Z u k =nceBarrier 是通过 ringBuffer.newBarrier() 创建出来的。请看下图:
由图可以看出,在获得可用序号时,SequenceBarrier 在 EventProcessorH Q H 3 A 6 和 RingBuffer 中充当协调的角色。多消费事件和单消费事件在 depeK % , r U z #ndentSequence 上的处理/ - # , @ e o {有一些不同,可以看下 ProcessingSequenceBarrier 的 dependentSequence 的赋值以及 get() 方法 (Util.getMinimumSequence(sequences)) 这里细节就不再x R W G i !展开说明了。
启动过程分析之生产者
在上面生产者的代码中,可以看到我们首先调用了b r Y N j N U ringBuffer.next() 方法,获取可用序号,再# x p y j w {获取到该序号下事先通过 factox c A i D kry 创建好的空事件对象,在我们对空事件对象进行赋值后,再调用 publish 方法将事件发布,则消费者就可以获取进行消费了。
生产者这里的核心代码如下m : ~ +,这里我截取的是多生产者模式下的代码:
@Override
public lonU O 6 S sg next(int n)
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException(\"n must be > 0 and < bufferSize\")。
}
long current。
long next。
do
{
//cursor 为生产者维护的 sequence 序列,获取到当前可投递的的下标,即当前投递到该位置
current = cursor.get()。
//再+n获取下一个下表,即下一次投R 7 q y 3递的位置。
next = current + n。
long wrapH $ HPoint = next - bufferS8 & `ize。
//目的:也是实现快读的读写。gatingSequenceCache 独占缓存行
long cachedGatingSequence = g6 C D ) Q e t batingSequenceCache.get()。
if (wrapPoz ^ T # p Bint > cachedGatingSequ@ R I T Z k s (ence || cachedGatingSequence > current)
{
//获取消费者最小序号
long gatingSequq S 0 D o Pence = Util.% Y _ r [getMinimumSequence(gatingSequences, cur- ^ Jrent)。
if (wrapPoint > gatingSequence)
{
//如果不符合,则阻塞线r ] n程 1ns(paF W ( . 5 P @ c grk()不会有死锁的问题)
LockSupport.parkNak w q S } gnos(1)。P g f |// TODO, should we spin based on the wait stram % = H { /tegy?
continue。
}
gatingSequenceCache.set(gatingSequence)。
}
//多个生产者时要保证线程安全(这里更新的 cursor 同时也是等待策略里的 waitFor() 方法的 cursor 参数,因此这里更新成功后,则等待策略会通过,表示有新的任务进来,就会消费)
else if (cursoK e Xr.compareAndSet(current, next))
{
break。
}
}
while (true)。
return next。
}
这里主要讲一下 cursor 对b c m L } m m 6象和 Util.getMinimumSequence(gatingSequenc& y F = C _es, current) 方法。
cursor 对象是生产者维护的一个生产者序号,标示当前生产者已经生产到哪一个位置以及下一个位置。它是 Sequv ? y - { t b e Mence 类的一个实P Y J例化对象。下图是 Sequence 类= t 2 c的类图。从图里可以看出,Sequence 继承以及间接继承了 RhsPaddK # #ing 和 LhsPadding 类,而这俩个类都各定义了 7 个 long 类型的成员变量。而 SeqH 8 o f i Nuence 的 get() 方法返回的也是一个 long 类型的值 value。这个是 Disruptor 的 核心设计之一--填充T 4 _ W e ?缓存行,, & f ] W !消除伪共享 。
在 64 位的计算机中,单个缓存行一般占 64 个字节,当 cpu 从换存里取数据时,会将该相关数据的其它数据取出来填满一个缓存行,R ] 0 这时如T b t {果其它数据更新,则缓存行缓存的该数据也会失效,当下次需要使用该数据时又需要重新从内存中提取数据。ArrayBlockingQueue 获取数据时,很容易碰到伪共享导致缓存行失效,而 Disruptor 这里当在 vaule 的左右各填充 7 个 long 类型的数据时,每次取都能确保该数据独占缓存行,也不会有其他的数据更新导致该数据失效。避免了伪共享的问题( jdk 的[ T h k l并发包下也有一些消除伪共享的设计)。
在讲 Util.getMinimumSequence(gatingSequed e 9 + S - p [ ,nces, current) 方法之前我们先说一下 RingBuffer。
RingBuffer :它是一个首尾相接的环状的容器,用来在多线程中传递数据。第一张图里面创建r V e | V Disruptor 的多个参数其实都是用来创建 RingBuffer 的,比如生产者类型(单 or 多)、实例化工厂、容器长度、等待策略等。结构简单如下图:
单生产者单消费者模式下很好理解,每次都从 ringBuffer 中直接获取下一个可用序号。那么如果是多生产者多消费怎么办呢?
多生@ e z u产多消费模型
简单分析,多个生产者同时向 ringbuffer 投递数据,假设此时俩个生产者将 ringbuffer 已经填满,因为 sequence 的序号是自增+1(若不b J K N =满足获取条件则循环挂起当前线程),所以生产的时候能保证线程安全,只需要一个 sequence 即可。当多消费者来= _ ~ | .消费的时候,因为消费速度不同,例如消费者 1 来消费 0、1,消费者 2 消费 2、4,消费者 3 消费 3。当消费者消费完T T . G Z q } 0 后,消费者 2 消费完 2 后,o e d i P T C 消费者 3 消费完 3M } @ 后,生产者再往队列投递数据时,其他位置还未被消费,会投递到第 0 个位置, 此时再想投递数据时,虽然消费 2 的第二个位置空缺、消费者 3m Z B 的第三个位置空缺,消费者还在消费 1 时,无法继续投递。因为是通过比较消费者自身维护的 sequence 的最小的序号,来进行比较。
所以这里 Util.getMin# | =imumSequence(gatingSequences, current) 方法也i X J y 5 L s 7 1就无需再多说,它就是为了获取到多个消费者的最小序号,判X / 7 1 _ w H o 7断当前 ringBuff C ` A e UeK X R q ( I E f :r 中的剩余可用序号是否大于消费者最小序号,是的话,则不能投递,需要阻塞当前线程(LockSupport.parkNanos(1))。注意:这 f K n x f q里没有用到锁。
上面说到 ringBuffer 有定义长度,说明是R M a M - i一个) * T有界的队列,那么可能会出现以下俩种情况:当W s & p消费者消费速度大于生产者生产者速度,生产者还未来得及往队列写入,或者生产者生产速g - I度大于消费者消费速度,此时怎么办呢?而且上面也多次提到没有满足条件的消费事件时,消费者会等待,接下来说一下消费者的等待策略。
常用的WaitStz _ a ? k # / = Irategy等待策略(消费者h p ) o z {等待)
- BlocX W a & O / ? KkingWaitStrategy 使用了锁,低效的策略。
- SleepingWaitStrategy 对生产者线程的影响最小,适合用于异步日志类似的场景。(w ] ] . ! Y G V =不加锁空等)
- YieldingWaitStrategy 性能最好,适合用于低延迟的系统,在要求极高性能且之间处理线数小于 cpu 逻辑核心数的场景中,推荐使用。(无锁策略。主要是使用了 Thr1 t $ 9 : Wead.yield() 多线程交替执行)
这里着重介绍一下 Yieldp B |ingWaitStrZ ^ d , y _ %ategy 策略,因为这k j } - +个是性能最高的,当我们的业务场景需要极速处理生Q _ q k ^ M G q产- f 0 S j f (消费时,选它准没错! 上面提到 BatchEventProcessor 的 processEvents 方法里调用了 waitFor() 方法,waitFo/ ? }r() 方法是 WaitStrategy 接口的定义的方法,所有的等待策略的实现类都实现了它。看下 YieldingWaitStrategy 的实现方法:
@Override
public long waitFor(
final long sequ: Z D Kence, Sequence cursor, final Sequence dependentSequence, final Seque} r F K =nceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence。
int counter = SPIN_TRIES。//1$ P ^00
while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter)。
}
ret; p - @ o $urn availableSequence。
}
private intE } C ~ ` applyWaitMethod(final SequenceBarrier barri- s n g M @ I @ Yer, int counN u n 7teq K E 8 W o [ nr)
throws AlertException
{
barrier.checkAlert()。
if (0 == counter)
{
Thread.yie3 ~ K K G 9ld()。
}
else
{z E * | ! / 2
--counter。
}
retur| j /n counter。
}
上面也提到过,当 availableSequi A ) j L :ence 小于 sequence 时,会等待,直到 ava{ k } M u /ilP H J S Y , ] 0 1ableSequence 不小于 sequence,waitFor() 方法才会返回告知消费者有可以消费的消费序号。这里主要是 applyWaitMethod 方法,它会首先进行一个 100 次的循环,C Y [ b Q不断去尝试当前返回条件是否满足,当 counter 减为 0 时,不断的挂起当前线程。yield() 方法是使当前线程交出执行权,再加入到竞争行列中,所以你可以测试,当你消费者没有可消费的数据时,会不停的在这里执行,直到有可用的消费事件。注意:这里也没有用到锁。这句_ Y 1 h X T话有点熟悉。。。因为在前面讲到多生产者多消费模型时,我提到生产者2 9 Z )的等待也没有用到锁。所以这是 Disruptor 快的另外一个O 5 { g ! #原因,多生产( 7 5 3 # ; 1 ~多消费场景下,无锁。但是有同学肯定可以想到,这里当没有可用消费w & o Q | Q 1 `事件的时候消费者线程还在不停竞争 cpu 在执行、不断的转,所以这也是 Disruptor 做为一款工业级产品时,对cpu的极致压榨以换取性能,包括前面的填充缓存行,用更多的缓存空间去换取更快的e N ] : !效率。
当然 Disruptor 也提供了 BlockingWaitStrategy 的锁等待通知以及 SleepingWaitStrategy 的空等策略以及一些其它的策略,来供我们在不同的业务场- _ k A G N T景选取不同的策略搭配使用。所以它也并不是无脑的干你的 cpu,你可以选~~~
Show me the code
以下是多生产V G *多消费的部分代A n ` J ? i 6 A Q码:
public class TMainWorker {
pub* { W * c 2 ) G klic static void main(String[] args) throws InterruptedException {
System.out.println(g e ! f\"begin\")。
Ra d U Y N u M n uingb k x P $ B % 7 PBuffer ringBuffer = RingBuffer.create(Producej } n ; * *rType.MULTI, new TEventFactory(), 1024*1024,
new YieldingWaitStrategy())。
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier()。
//多消费者
TWorkHanler[] workHanlers = new TWorkHanler[5]。
for(int i=0。i<5。i++) {
workHanlers[i] = new TWorks 3 x f c # 3Hanler(\"h\N ) G D S" + i, neo @ a Pw AtomicInteger(0))。
}
WorkerPool workerPool = new WorkerPool(ringBuffer, sequenceBarrier, new TexceptionHandler(), workHanlers)。
//获得消费池里的工作 sequence 的序号,遍历他们,找到最小的供使用。
ringBuffer.addGating- [ f 0 NSequences(workerPool.getWorkJ ` @erSequences())。
//创建消费者工厂
Execf s O C & D rutorService executorService = new ThreadPool- i K 7 1Executor(3, 3,
0L, Tig / S ) # { tmeUnit.MILLISECONDS,
new LinkedBlockingQueue<>())。
workerPool.start(executorService)。
CountDownLatch cod e B ? s SuntDownLatch = new CountDownLatch(1)。
for(iQ 5 m % i f F g rnt i=0。i<3。i++) {
new Thread(()-> {
TEventProducer producer = new TED z 2 n ! 9veJ B + ` X ontProducer(ringBuffer)。
try {
countDownLatch.await()。
} catch (InterruptedException e) {
e.printStackTrace()。
}
for(int a=0。a<10000。af W 5 - 8 1++) {
producer.sendData(a)。
}
}).start(7 d -)。
}
countDownLatch.countDown()O $ W J | . P [ R。
}
至此,Disruptor 的基本核心概念已经介绍完毕!接下来介绍一下 Disruptor 的好玩的地方。
Disruptor多边形操作
上图是? c I S P Q @ Disruptor 的官方文档列出的 Disruptor 可以做的一些操作示例。
简单看一下如何实现文中第一张图里的多边形操作?
d! k w f + . J H isruptor.handleEventsWith(E1, E2)。
disruptor.a_ % e * e 2 p _ Ofter(E1).handleEventsWith(E3# ^ - + L 3 T ;)。
disrupt| s Y M Q Dor.after(E2).haE u H G E inc E ! q a o ( QdleEventsWith(q : 0E4)。
disruptor.after(V Y & ( S Z cE3, E4).handleH ~ #EventsWith(E5)。
借B [ ; b _助 Di9 A W ysruptor 里的强大语义,可以组合出各种多边p o B T & X形(骚)操作。
public EventHaU | J J B HndlerGroup<T> handleEventsWith(final EventProcessor... processors)
{
...
}
@SafeVarargs
@SuppressWarnings(m m * f + W +\"varargs\")
public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorU % % @ / r (kHandler<T>... workHaU D q h c j undlers)
{
...
}
public EventHandlerGroue C m / o P 6 R {p<T> after(final EventProcessor... processors)
{
...
}
上面的方法入参都是...,你懂的吧!
应用:
- Apache Storm、Camel、Log4j2
Log4j2 example:
使用了实现 EventTranslatoM f r P K Tr 的提交机制(文中并未介绍。。。有兴趣的同学可以了解学习以下。。。)。
可参考美团文章:https://tech.meituan.com/2016/11/18/disruptor.html 中指出:美团在公司内部统一推行日志接入规范,要求必须使用 Log4j2,使普通单机 QPS 的上限不再只停8 ) & y Z留在几千,极高地提升了服务性能。