当前位置: 首页 > 后端技术 > Java

这么嚣张,号称高性能队列的Disruptor到底是什么来头?

时间:2023-04-01 18:06:00 Java

并发框架Disruptor1。Disruptor概述1.1背景Disruptor是由英国外汇交易公司LMAX开发的高性能队列。数量级),基于Disruptor开发的系统单线程每秒可支持600万个订单。2010年在QCon发表演讲后,获得业界关注。2011年,企业应用软件专家MartinFowler写了一篇很长的介绍。同年还获得了甲骨文官方杜克奖。目前包括ApacheStorm、Camel、Log4j2在内的众多知名项目都应用了Disruptor来获得高性能。需要指出的是,这里所说的队列是系统内部的内存队列,而不是像Kafka这样的分布式队列。有界无锁高并发队列1.2什么是DisruptorDisruptor是JVM中多线程间使用的消息队列。当线程间传输大量数据或对性能有较高要求时,可以考虑使用Disruptor替代ArrayBlockingQueue。官方也对比了Disruptor和ArrayBlockingQueue在不同应用场景下的性能,可视化性能仅为5左右提高到10倍。1.3为什么使用Disruptor传统的阻塞队列使用锁来保证线程安全,而锁是通过操作系统内核上下文切换来实现的,会暂停线程等待锁,直到锁被释放。执行这样的上下文切换会丢失之前保存的数据和说明。由于消费者和生产者之间的速度差异,队列总是接近于满或空,这会导致高水平的写入争用。1.3.1传统队列问题首先这里说的队列仅限于Java内部的消息队列queue队列有界锁结构队列类型ArrayBlockingQueue有界锁数组阻塞LinkedBlockingQueue可选锁链表阻塞ConcurrentLinkedQueue无界无锁链表非阻塞LinkedTransferQueue无界无锁链表阻塞PriorityBlockingQueue无界锁堆阻塞DelayQueue无界锁堆阻塞1.3.2Disruptor应用场景参考一些使用disruptor的框架。1.3.2.1log4j2Log4j2异步日志使用disruptor,日志一般都有buffer,而且是full文件写完了才写,结合NIO增量追加文件应该会更快,所以不管是EventHandler还是WorkHandler,处理延迟应该比较小,写入的文件不多,比较适合这种场景。1.3.2.2Jstorm流处理中不同线程的数据交换,数据计算可能是内存中的大量计算,流计算快进快出,disruptor应该是个不错的选择。1.3.2.3百度uid-generator部分使用了ringbuffer和de-falsesharing的思想来缓存生成的uid,应该也参考了disruptor。1.4Disruptor的核心概念让我们从了解Disruptor的核心概念开始,了解它是如何工作的。下面介绍的概念模型不仅仅是领域对象,更是映射到代码实现的核心对象。1.4.1RingBufferDisruptor中的数据结构用于存储生产者生产的数据顾名思义,它是一个环形缓冲区。RingBuffer曾经是Disruptor中最重要的对象,但从3.0版本开始,它的职责被简化为只存储和更新通过Disruptor交换的数据(事件)。在一些更高级的应用场景中,RingBuffer完全可以被用户自定义实现替代。1.4.2序号,在Disruptor框架中,任何地方都有一个序号生产者生产的数据放在RingBuffer中的地方,消费者应该消费数据的地方,在某个位置的数据是什么RingBuffer,这些都是由这个序号决定的。这个序号可以简单的理解为一个AtomicLong类型的变量。它使用填充的方法来消除缓存的虚假共享问题。1.4.3Sequencer序列号生成器,这个类主要用来协调producers当producer生产数据时,Sequencer会生成一个可用的序列号(Sequence),然后producer就知道数据放到了环形队列中了就可以了.Sequencer是Disruptor真正的核心。该接口有两个实现类SingleProducerSequencer和MultiProducerSequencer,它们定义了并发算法,用于在生产者和消费者之间快速正确地传递数据。1.4.4SequenceBarrier我们都知道,消费者在消费数据的时候,需要知道在哪里消费数据。消费者无法选择自己想要哪种数据消费,想消费哪种数据就拿多少。这个SequencerBarrier充当这样的“栅栏”屏障。如果你的consumer要消费数据,对,我告诉你一个序号(Sequence),你就可以在那个位置消费数据。如果没有数据,就等着吧。1.4.5WaitStrategyWaitStrategy决定了消费者如何等待生产者将事件(Event)放入Disruptor。想象一下这样一种情况,生产者生产速度非常慢,而消费者消费速度非常快。那么难免会出现数据不足的情况。这个时候消费者还怎么等?WaitStrategy就是为解决问题而生的。1.4.6事件从生产者传递给消费者的数据称为事件。它不是Disruptor定义的特定类型,而是由Disruptor的用户定义和指定的。1.4.7EventHandlerDisruptor定义的事件处理接口,由用户实现处理事件,是Consumer的真正实现。1.4.8Producer即生产者,泛指调用Disruptor发布事件的用户代码。Disruptor不定义特定的接口或类型。1.5Disruptor特性Disruptor其实就像一个队列,用来在不同线程之间迁移数据,但是Disruptor也实现了一些其他队列没有的特性,比如:同一个“事件”可以有多个消费者,消费他们可以在处理并行或相互依赖形成一个处理序列(形成依赖图);预分配内存空间用于存储事件内容;针对极高的性能目标进行极度优化和无锁设计;2.Disruptor介绍下面通过一个简单的例子来体验一下Disruptor。生产者会传一个long值给消费者,消费者收到这个值后打印出来。2.1添加依赖com.lmaxdisruptor3.4.22.2DisruptorAPIDisruptor的API很简单,主要如下步骤2.2.1定义事件首先创建一个LongEvent类,作为消息内容放入环形队列。事件(Event)是通过Disruptor交换的数据类型。公共类LongEvent{私有长值;publicvoidset(longvalue){this.value=value;}publiclonggetValue(){返回值;}}2.2.2定义事件工厂为了使用Disruptor的内存预分配事件,我们需要定义一个EventFactory事件工厂(EventFactory)定义如何实例化上一步1中定义的事件(Event),并且需要实现接口com.lmax.disruptor.EventFactory\。Disruptor通过EventFactory预先在RingBuffer中创建了Event实例。一个Event实例实际上是作为一个“数据槽”使用的。在发布之前,发布者首先从RingBuffer中获取一个Event实例,然后向Event实例中填充数据,然后发布到RingBuffer中,然后Consumer获取Event实例并从中读取数据。公共类LongEventFactory实现EventFactory{publicLongEventnewInstance(){returnnewLongEvent();}}2.2.3定义事件处理的具体实现为了让消费者能够处理这些事件,我们在这里定义了一个事件处理器,负责打印事件,通过实现接口com.lmax.disruptor定义了事件处理的具体实现。事件处理程序\。publicclassLongEventHandlerimplementsEventHandler{publicvoidonEvent(LongEventevent,longsequence,booleanendOfBatch){//CommonUtils.accumulation();System.out.println("consumer:"+Thread.currentThread().getName()+"Event:value="+event.getValue()+",sequence="+sequence);}}2.2.4指定等待策略Disruptor定义了com.lmax.disruptor.WaitStrategy接口来抽象Consumer如何等待newEvent,这就是策略模式WaitStrategyYIELDING_WAIT=newYieldingWaitStrategy();的应用;2.2.5启动Disruptor注意ringBufferSize的大小必须是2的N次方//指定事件工厂LongEventFactoryfactory=newLongEventFactory();//指定环形缓冲区的字节大小,必须是2的N次方intbufferSize=1024;//单线程模式,以获得额外的性能Disruptordisruptor=newDisruptor(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.SINGLE,newYieldingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(newLongEventHandler());//启动disruptor线程disruptor.start();2.2.6在Disruptor3.0版本中使用Translators发布事件,由于加入了丰富的Lambda风格的API,可以帮助开发者简化流程,3.0版本之后优先使用EventPublisher/EventTranslator来发布事件。publicclassLongEventProducerWithTranslator{privatefinalRingBufferringBuffer;publicLongEventProducerWithTranslator(RingBufferringBuffer){this.ringBuffer=ringBuffer;}privatestaticfinalEventTranslatorOneArgTRANSLATOR=newEventTranslatorOneArg(){publicvoidtranslateTo(LongEventevent,longsequence,Longdata){event.set(data);}};publicvoidonData(Longdata){ringBuffer.publishEvent(TRANSLATOR,data);}}2.2.7ShutdownDisruptordisruptor.shutdown();//关闭disruptor,该方法将被阻塞,直到所有事件处理完毕2.3代码整合2.3.1LongEventMainconsumer-producer启动类,依赖于构造Disruptor对象并调用start()方法完成启动线程。Disruptor需要ringbuffer环、消费者数据处理工厂、WaitStrategy等ByteBuffer类字节缓冲区,用来包装消息。ProducerType.SINGLE是单线程的,可以提高性能publicclassLongEventMain{publicstaticvoidmain(String[]args){//指定事件工厂//指定环形缓冲区字节大小,必须是2的N次方intbufferSize=1024;//单线程模式,获得额外性能//设置事件业务处理器---consumerdisruptor.handleEventsWith(newLongEventHandler());//启动disruptor线程disruptor.start();//获取ringbufferring接收生产者产生的事件RingBufferringBuffer=disruptor.getRingBuffer();//为环形缓冲区指定事件生产??者LongEventProducerWithTranslatorproducer=newLongEventProducerWithTranslator(ringBuffer);//循环for(inti=0;i<100;i++){//得到一个随机数长值=(long)((Math.random()*1000000)+1);//发布数据producer.onData(value);}//停止disruptor线程disruptor.shutdown();}}2.3.2运行测试测试结果consumer:pool-1-thread-1Event:value=579797,sequence=0consumer:pool-1-thread-1Event:value=974942,sequence=1consumer:pool-1-thread-1事件:value=978977,sequence=2consumer:pool-1-thread-1事件:value=398080,sequence=3consumer:pool-1-thread-1事件:value=867251,sequence=4consumer:pool-1-thread-1事件:value=796707,sequence=5consumer:pool-1-thread-1事件:value=786555,sequence=6consumer:pool-1-thread-1事件:value=182193,sequence=7.....Event:value=forconsumptionreader接收到的数据,sequence是数据在ringbuffer环中的位置。本文参与技术征文征集思考,正在阅读的欢迎加入。如果本文对您有帮助,欢迎关注点赞`,您的支持是我坚持创作的动力。转载请注明出处!