当前位置: 首页 > 后端技术 > Java

构建高性能内存队列:Disruptor一直滴滴神~

时间:2023-04-01 15:24:05 Java

Java中的队列有哪些ArrayBlockingQueue使用ReentrantLockLinkedBlockingQueue使用ReentrantLockConcurrentLinkedQueue使用CAS等我们知道使用锁性能比较低,尝试使用一个免锁设计。接下来,让我们了解一下Disruptor。Disruptor简单使用github地址:github.com/LMAX-Exchan...简介:Disruptor是一个开源的并发框架,获得2011杜克程序框架创新奖[Oracle],可以实现无锁网络。队列并发操作。英国外汇交易公司LMAX开发的高性能队列号称单线程每秒可支持600万笔订单。日志框架Log4j2的异步模式使用Disruptor来处理限制。它是一个内存队列,这意味着它不能支持分布式场景。.数据传输对象的简单使用@DatapublicclassEventData{privateLongvalue;}复制代码consumerpublicclassEventConsumerimplementsWorkHandler{/***consumercallback*@parameventData*@throwsException*/@OverridepublicvoidonEvent(EventDataeventData)throwsException{Thread.sleep(5000);System.out.println(Thread.currentThread()+",eventData:"+eventData.getValue());}}拷贝代码生产者publicclassEventProducer{privatefinalRingBufferringBuffer;publicEventProducer(RingBufferringBuffer){this.ringBuffer=ringBuffer;}publicvoidsendData(Longv){//casboothlongnext=ringBuffer.next();尝试{EventDataeventData=ringBuffer。得到(下一个);eventData.setValue(v);}finally{//通知等待的消费者System.out.println("EventProducer发送成功,sequence:"+next);ringBuffer.publish(下一步);}}}复制代码测试类publicclassDisruptorTest{publicstaticvoidmain(String[]args){//2的n次方intbufferSize=8;Disruptordisruptor=newDisruptor(()->newEventData(),//事件工厂bufferSize,//环形数组大小Executors.defaultThreadFactory(),//线程池工厂ProducerType.MULTI,//支持多个事件发布者newBlockingWaitStrategy());//等待策略//设置消费者disruptor.handleEventsWithWorkerPool(newEventConsumer(),newEventConsumer(),newEventConsumer(),newEventConsumer());破坏者.start();RingBufferringBuffer=disruptor.getRingBuffer();EventProducereventProducer=newEventProducer(ringBuffer);长我=0;对于(;;){i++;eventProducer.sendData(i);尝试{Thread.sleep(1500);}catch(InterruptedExceptione){e.printStackTrace();}}}}复制代码核心组件基于上面的简单例子,其实很简单,Disruptor已经帮我们封装好了生产消费模型的实现,我们来看一下基础有哪些核心组件支撑了一个高性能的无锁队列?RingBuffer:循环数组,底层使用数组项,初始化时填充数组,避免不断创建新对象带来的开销。后续操作只会更新条目。写入进度Sequencecursor所有消费者的进度数组Sequence[]gatingSequencesMultiProducerSequencer可用区availableBuffer[使用空间提高查询效率]Sequence:是一个序号,用来标记处理进度,也可以看做是一个atomicInteger;还有一个特性,为了解决虚假共享问题而引入:cachelinepadding。这个后面会介绍。workProcessor:处理事件循环,在循环中获取Disruptor事件,然后将事件分配给各个handlerEventHandler:负责业务逻辑的handler,自己实现。WaitStrategy:消费者等待事件的策略。定义了以下策略。sleepingWaitStrategy:spin+yield+sleepBlockingWaitStrategy:lock,适用于CPU资源受限(不需要切换线程),以及系统吞吐量没有要求的情况策略解析有问题的代码?1、多个生产者如何保证消息生产不会互相覆盖。【如何实现互斥效果】每个线程获取不同的一段数组空间,然后通过CAS判断这段空间是否已经分配。接下来我们看多生产类MultiProducerSequencer中的next方法【获取生产序列号】//消费者最后一次消费的最小序列号//第二点会讲到privatefinalSequencegatingSequenceCache=newSequence(Sequencer.INITIAL_CURSOR_VALUE);//当前进度的序号protectedfinalSequencecursor=newSequence(Sequencer.INITIAL_CURSOR_VALUE);//所有消费者的序号//第二点会讲protectedvolatileSequence[]gatingSequences=newSequence[0];publiclongnext(intn){if(n<1){thrownewIllegalArgumentException("n必须>0");}长电流;接下来很久;do{//当前进度的序号,Sequence的值是有可见性的,更保证线程间可以申请的最新值是可以感知的current=cursor.get();//申请的序号空间:最大序号next=current+n;longwrapPoint=next-bufferSize;//最小的消费者序列号longcachedGatingSequence=gatingSequenceCache.get();//大于一个圆||最小消耗序列号>当前进度if(wrapPoint>cachedGatingSequence||cachedGatingSequence>current){longgatingSequence=Util.getMinimumSequence(门控序列,当前);//表示超过1圈,没有额外的空间可以申请if(wrapPoint>gatingSequence){LockSupport.parkNanos(1);//TODO,我们是否应该基于等待策略进行自旋?继续;}//将最小值更新为序列的值gatingSequenceCache.set(gatingSequence);}//CAS成功后更新当前Sequence的值elseif(cursor.compareAndSet(current,next)){break;}}而(真);returnnext;}复制代码2.生产者向序号员申请写入序号。如果序列号正在被消费,Sequencer怎么知道哪些序列号可以写入呢?【未消费覆盖怎么处理】从gatingSequences中获取最小的序号,producer最多可以写入这个序号的最后一位通俗地说,申请的序列号不能大于最小消费者序列号【申请最大序列号-buffersize必须小于/等于最小消费者序列号】,可以申请当前写入的序列号numberpublicfinalEventHandlerGrouphandleEventsWithWorkerPool(finalWorkHandler...workHandlers){returncreateWorkerPool(newSequence[0],workHandlers);}EventHandlerGroupcreateWorkerPool(finalSequence[]barrierSequences,finalWorkHandler[]workHandlers){finalSequenceBarriersequenceBarrier=ringBuffer.newBarrier(barrierSequences);最终WorkerPoolworkerPool=newWorkerPool<>(ringBuffer,sequenceBarrier,exceptionHandler,workHandlers);consumerRepository.add(workerPool,sequenceBarrier).getWorkPoolworkSequence[]);updateGatingSequencesForNextInChain(barrierSequences,workerSequences);返回新的EventHandlerGroup<>(this,consumerRepository,workerSequences);}privatevoidupdateGatingSequencesForNextInChain(finalSequence[]barrierSequences,finalSequenceSe[]processqueues){if(processorSequences.length>0){//消费者启动后,所有的消费者都会存储在AbstractSequencergatingSequencesringBuffer.addGatingSequences(processorSequences);for(finalSequencebarrierSequence:barrierSequences){ringBuffer.removeGatingSequence(barrierSequence);}consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);}}复制代码3.在多个生产者的情况下,生产者申请一个可以写入的序号,然后写入这些序号,那么消费者如何感知哪个可以写入序号消耗多少?【借用问题1来说明】这个前提是在多生产者的情况下,第一点我们说每个线程获取的是不同的一段数组空间,所以现在单纯传序号是不行的。MultiProducerSequencer使用int数组[availableBuffer]来识别当前序号是否可用当producer成功生产事件后,会将availableBuffer中的当前序号设置为1,表示可以读取。这样消费者能读到的最大序号就是我们availableBuffer中第一个不可用的序号-1。初始化availableBuffer进程publicMultiProducerSequencer(intbufferSize,finalWaitStrategywaitStrategy){super(bufferSize,waitStrategy);//初始化可用数组availableBuffer=newint[bufferSize];indexMask=bufferSize-1;indexuffialShift=Util.log2(bufferiseBufferAize);();}//初始化默认的availableBuffer为-1privatevoidinitializeAvailableBuffer(){for(inti=availableBuffer.length-1;i!=0;i--){setAvailableBufferValue(i,-1);}setAvailableBufferValue(0,-1);}//生产者成功产生事件,并将可用区域数组设置为1index,intflag){longbufferAddress=(index*SCALE)+BASE;UNSAFE.putOrderedInt(availableBuffer,bufferAddress,flag);}复制代码Consumer消费流程WorkProcessor类中消费run方法publicvoidrun(){booleanprocessedSequence=true;longcachedAvailableSequence=Long.MIN_VALUE;longnextSequence=sequence.get();T事件=空;而(真){try{//先通过cas获取消费事件的占有if(processedSequence){processedSequence=false;做{nextSequence=workSequence.get()+1L;sequence.set(nextSequence-1L);}while(!workSequence.compareAndSet(nextSequence-1L,nextSequence));}//数据准备好被消费了if(cachedAvailableSequence>=nextSequence){event=ringBuffer.get(nextSequence);//触发回调函数workHandler.onEvent(event);处理序列=true;}else{//获取可读取的下标cachedAvailableSequence=sequenceBarrier.waitFor(nextSequence);}}//....省略}notifyShutdown();running.set(false);}publiclongwaitFor(finallongsequence)抛出AlertExceptionn,InterruptedException,TimeoutException{checkAlert();//通过这个值得到的当前write下标在这里可以看作是全局消费下标,区别于每个segment的write1和write2下标longavailableSequence=waitStrategy.waitFor(sequence,cursorSequence,dependentSequence,this);如果(可用序列<序列){返回可用序列;}//通过availableBuffer-1过滤掉第一个不可用的序号returnsequencer.getHighestPublishedSequence(sequence,availableSequence);}publiclonggetHighestPublishedSequence(longlowerBound,longavailableSequence){//从当前读下标开始,循环到当前写,如果availableBuffer为-1,直接返回for(longsequence=lowerBound;sequence<=availableSequence;sequence++){if(!isAvailable(sequence)){returnsequence-1;}}returnavailableSequence;}复制代码解决伪共享问题什么是伪共享问题?为了提高CPU的速度,CPU有一个高速缓存Cache。缓存的最小单位是缓存行CacheLine,它是从主存中复制的Cache的最小单位,通常为64字节。Javalong类型是8个字节,所以8个long类型变量可以存储在缓存行中。如果你访问一个longs数组,当数组中的一个值加载到缓存中时,它会另外加载另外7个值。所以你可以非常快速地迭代数组。关注公众号:码猿科技专栏每天定时推送更多精彩内容。伪共享问题是指当多个线程共享某个数据时,线程1可能拉取给线程2的数据在它的cacheline中。这时线程1修改了数据,当线程2取回数据时,又得从内存中拉取。两个线程相互影响,导致数据每次都从内存中拉取,即使是在cacheline中。Disruptor是如何解决的?该值前后统一添加7个Long类型进行填充。当线程拉取时,无论如何都会填满整个缓存。回顾总结:为什么Disuptor可以称为高性能的无锁队列框架?缓存行填充,避免缓存频繁失效。【java8中也引入了@sun.misc.Contended注解,避免虚假共享】无锁竞争:通过CAS【两阶段提交】环形数组:数据被覆盖,避免GC底层更多使用位操作提高效率