当前位置: 首页 > 科技观察

SpringBoot使用Disruptor作为内部高性能消息队列_0

时间:2023-03-12 21:18:29 科技观察

工作中遇到一个使用Disruptor作为消息队列的项目。你没看错,不是Kafka,也不是rabbitmq。Disruptor最大的优势之一就是速度快。还有一点就是它是开源的。下面做一个简单的记录。Disruptor简介Disruptor是英国外汇交易公司LMAX开发的一款高性能队列。研发的初衷是为了解决内存队列的延迟问题(在性能测试中发现与I/O操作在同一个数量级)。基于Disruptor开发的系统单线程可以支持每秒600万个订单。2010年在QCon发表演讲后,获得业界关注。Disruptor是一个开源Java框架,旨在实现最高可能的吞吐量(TPS)和最低可能的生产者-消费者问题(PCP)延迟。从功能上看,Disruptor实现了“队列”的功能,它是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合。Disruptor是LMAX在线交易平台的关键组成部分。使用这个框架,LMAX平台可以以600万TPS的速度处理订单。除金融领域外,Disruptor可用于其他通用应用,可带来显着的性能提升。事实上,Disruptor与其说是一个框架,不如说是一种设计思想。对于具有“并发、缓冲、生产者-消费者模型、事务处理”等元素的程序,Disruptor提出了一个实质性的提升性能??(TPS)的方案。Disruptor的github主页:https://github.com/LMAX-Exchange/disruptorDisruptor的核心概念首先要了解Disruptor的核心概念才能理解它是如何工作的。下面介绍的概念模型不仅仅是领域对象,更是映射到代码实现的核心对象。RingBuffer,顾名思义,就是一个环形缓冲区。RingBuffer曾经是Disruptor中最重要的对象,但从3.0版本开始,它的职责被简化为只存储和更新通过Disruptor交换的数据(事件)。在一些更高级的应用场景中,RingBuffer完全可以被用户自定义实现替代。SequenceDisruptor通过顺序递增的序号来管理交换的数据(事件),数据(事件)的处理总是沿着序号一个接一个递增。Sequence用于跟踪特定事件处理程序(RingBuffer/Consumer)的处理进度。虽然一个AtomicLong也可以用来标记进度,但是定义一个Sequence来负责这个问题还有另外一个目的,就是防止不同Sequence之间的CPU缓存错误共享(FalseSharing)问题。(注:这是Disruptor实现高性能的关键点之一,关于虚假分享网上已经有很多介绍,这里不再赘述)。SequencerSequencer是Disruptor真正的核心。该接口有两个实现类SingleProducerSequencer和MultiProducerSequencer,它们定义了并发算法,用于在生产者和消费者之间快速正确地传递数据。SequenceBarrier用于保存对RingBuffer主要发布的Sequence和Consumer所依赖的其他Consumer的Sequences的引用。SequenceBarrier还定义了确定Consumer是否有更多事件要处理的逻辑。WaitStrategy定义了Consumer如何等待下一个事件的策略。(注:Disruptor定义了多种不同的策略,针对不同的场景提供不同的性能)Event在Disruptor的语义中,生产者和消费者之间交换的数据称为事件(Event)。它不是Disruptor定义的特定类型,而是由Disruptor的用户定义和指定的。EventProcessorEventProcessor持有特定消费者(Consumer)的Sequence,提供事件循环(EventLoop)调用事件处理实现。EventHandlerDisruptor定义的事件处理接口是用户实现的,用来处理事件,是Consumer的真正实现。Producer,生产者,只是指调用Disruptor发布事件的用户代码。Disruptor不定义特定的接口或类型。Case-demo通过以下8个步骤,就可以把Disruptor弄回家:1.添加pom.xml依赖com.lmaxdisruptor3.3.42.MessageBodyModel/***MessageBody*/@DatapublicclassMessageModel{privateStringmessage;}3.构造EventFactory}}4。ConstructEventHandler-consumer@Slf4jpublicclassHelloEventHandlerimplementsEventHandler{@OverridepublicvoidonEvent(MessageModelevent,longsequence,booleanendOfBatch){try{//这里停止1000ms是为了确认消费者消息是异步的Thread.sleep(1000);log.info("消费者开始处理消息");if(event!=null){log.info("消费者消费的信息为:{}",event);}}catch(Exceptione){log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}}5。构造BeanManager/***获取实例化对象*/@ComponentpublicclassBeanManagerimplementsApplicationContextAware{privatestaticApplicationContextapplicationContext=null;@OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{this.applicationContext=applicationContext;}publicstaticApplicationContextgetApplicationContext(){returnapplicationContext;}publicstaticObjectgetBean(Stringname){returnapplicationContext.getBean(name);}publicstaticTgetBean(Classclazz){returnapplicationContext.getBean(clazz);}}6.ConstructMQManager@ConfigurationpublicclassMQManager{@Bean("messageModel")publicRingBuffermessageModelRingBuffer(){//事件处理线程池的定义,Disruptor通过java.util.concurrent.ExecutorSerivceExecutorService执行器提供的线程触发消费者事件处理=执行rs.newFixedThreadPool(2);//指定事件工厂HelloEventFactoryfactory=newHelloEventFactory();//指定ringbuffer字节大小,必须是2的N次方(可以将模运算转为位运算提高效率),否则会影响效率intbufferSize=1024*256;//单线程模式,获得额外性能Disruptordisruptor=newDisruptor<>(factory,bufferSize,executor,ProducerType.SINGLE,newBlockingWaitStrategy());//设置Event业务处理器---consumerdisruptor.handleEventsWith(newHelloEventHandler());//启动disruptor线程disruptor.start();//获取ringbufferring接收producer产生的事件RingBufferringBuffer=disruptor.getRingBuffer();返回环形缓冲区;}7.构造Mqservice和实现类-producerpublicinterfaceDisruptorMqService{/***message*@parammessage*/voidsayHelloMq(Stringmessage);}@Slf4j@Component@ServicepublicclassDisruptorMqServiceImplimplementsDisruptorMqService{@AutowiredprivateRingBuffermessageeModelRingBuffer;@OverridepublicvoidsayHelloMq(Stringmessage){log.info("记录消息:{}",message);//获取下一个Event槽的下标longsequence=messageModelRingBuffer.next();try{//用数据填充事件MessageModelevent=messageModelRingBuffer.get(sequence);event.setMessage(消息);log.info("向消息队列添加一条消息:{}",event);}赶上(异常e){日志。error("无法将事件添加到messageModelRingBufferfor:e={},{}",e,e.getMessage());}finally{//发布Event,激活观察者消费,将序列传递给消费者//注意最后的publish方法一定要放在finally中,保证一定要调用;如果一个请求的sequence没有提交,会阻塞后续的发布操作或者其他producermessageModelRingBuffer.publish(sequence);}}}8。构造测试类和方法@Slf4j@RunWith(SpringRunner.class)@SpringBootTest(classes=DemoApplication.class)publicclassDemoApplicationTests{@AutowiredprivateDisruptorMqServicedisruptorMqService;/***项目内部使用Disruptor作为消息队列*@throwsException*/@TestpublicvoidsayHelloMqTest()throwsException{disruptorMqService.sayHelloMq("消息已到,Helloworld!");log.info("消息队列已经发送完毕");//这里之所以停2000ms是为了保证消息的处理是异步的Thread.sleep(2000);}}试运行结果:2020-04-0514:31:18.543INFO7274---[main]c.e.u.d.d.s.Impl.DisruptorMqServiceImpl:记录消息:消息已到,世界您好!2020-04-0514:31:18.545INFO7274---[main]c.e.u.d.d.s.Impl.DisruptorMqServiceImpl:添加消息到消息队列:MessageModel(message=消息已到达,Helloworld!)2020-04-0514:31:18.545INFO7274---[main]c.e.utils.demo.DemoApplicationTests:消息队列已发送2020-04-0514:31:19.547INFO7274---[pool-1-thread-1]c.e.u.d.disrupMq.mq.HelloEventHandler:Consumerprocessingmessagestarted2020-04-0514:31:19.547INFO7274---[pool-1-thread-1]c.e.u.d.disrupMq.mq.HelloEventHandler:Consumer消耗的信息是:MessageModel(message=消息已到达,世界您好!)2020-04-0514:31:19.547INFO7274---[pool-1-thread-1]c.e.u.d.disrupMq.mq.HelloEventHandler:Consumer处理消息结束总结其实generator->consumer模式很常见,通过一些消息队列可以轻松实现上面的效果上的区别在于Disruptor是作为内存中的队列实现的,并且是无锁的。这就是Disruptor高效的原因。