当前位置: 首页 > 后端技术 > Java

Disruptor生产消费模式及高级应用详解(并行模式)

时间:2023-04-01 14:57:29 Java

大家好,昨天的文章带走了大家Disruptor华丽的外衣。最重要的是,我们知道了Disruptor高性能的原因。一个重要的原因是引入环形数组结构:数组元素不会被回收,避免频繁GC,无锁设计:使用CAS无锁模式保证线程安全属性填充:通过添加额外的无用信息,避免定位伪共享问题的元素位置:使用与一致性哈希相同的方法,一个索引,自增。本文在上一篇文章的基础上提供了一些实际应用。研究Disruptor的生产和消费模式,以及高级应用。至此,关于Disruptor的系列文章就结束了。我已经尽力了。如果有什么达不到大家的需求,以及文章的内容,大家有什么其他的意见,也欢迎大家在评论区留言。毕竟我是一个懵懂的人,希望大家多多指点。前两篇文章在这里:好嚣张,号称高性能队列的Disruptor是什么来头?Disruptor测试结果计算1亿次,耗时5503ms,吞吐量18171000/s,所以我剥离了Disruptor的高性能生产消费模式。基于上面的环形结构,我们来详细分析一下Disruptor的工作原理。Disruptor不像传统的队列,分为队列头指针和队列尾指针,而是只有一个下标(上面的seq),那么这如何保证生产的消息不会覆盖未消费的消息。在Disruptor中,生产者分为单生产者和多生产者,而消费者不区分。在单生产者的情况下,普通生产者放置数据RingBuffer中,消费者获取最大可消费位置并消费。当有多个生产者时,多了一个和RingBuffer一样大小的Buffer,称为AvailableBuffer。在多生产者中,每个生产者先通过CAS竞争获得可写空间,然后慢慢往里面放入数据。如果此时消费者要消费数据,每个消费者都需要获取最大的Consumable下标,这个下标是从AvailableBuffer中获取的最长的连续序列下标。5.1单生产者生产数据生产者单线程写入数据的过程比较简单:申请写入m个元素;如果有m个元素可以输入,则返回最大的序号。这里主要判断未读元素是否会被覆盖;如果返回正确,则生产者开始写入元素。5.2多生产者生产数据在多生产者的情况下,你会遇到“如何防止多个线程重复写入同一个元素”的问题。Disruptor的解决方案是每个线程获取不同的一块数组空间进行操作。这很容易通过CAS实现。分配元素的时候只需要判断这个空间是否已经通过CAS分配好了。但是又会出现一个新的问题:如何防止读取还没有写入的元素。在有多个生产者的情况下,Disruptor会引入一个与RingBuffer大小相同的缓冲区:availableBuffer。当某个位置写入成功后,设置可用Buffer对应位置,标记为写入成功。读取时会遍历可用的Buffer,判断元素是否就绪。5.3.1生产过程申请写m个元素;如果有m个元素可以写入,则返回最大的序号。每个生产者将被分配一个专属空间;生产者写入元素,同时在可用Buffer中设置相应位置,标记哪些位置写入成功。如下图,Writer1和Writer2两个线程写入数组,都申请可写数组空间。Writer1分配下标3到下表5的空间,Writer2分配下标6到下标9的空间,Writer1写入下标3位置的元素,同时设置可用Buffer对应位置为标记写入成功,后移一位,开始写入下标4位置的元素。Writer2同理。最后,所有的写入都完成了。5.3.2CAS检测空间占用,防止不同生产者写入同一个空间,如下图:通过do/while循环的条件cursor.compareAndSet(current,next)判断每次请求的空间是否有已完成被其他生产者占用。如果已经被占用,则函数返回失败,重新执行While循环申请写空间。publiclongtryNext(intn)throwsInsufficientCapacityException{if(n<1){thrownewIllegalArgumentException("n必须>0");}长电流;接下来很久;做{current=cursor.get();下一个=当前+n;如果(!hasAvailableCapacity(gatingSequences,n,current)){throwInsufficientCapacityException.INSTANCE;}}while(!cursor.compareAndSet(current,next));returnnext;}5.4多生产者消费数据绿色表示写好的OK数据假设三个生产者都在写,没有设置AvailableBuffer,那么消费者能获取到的消费下标只能获取6,然后在生产者写入之后OK,消费者收到通知,消费者继续重复以上步骤。5.4.1消费进程应用读取流水号n;如果writercursor>=n,仍然无法确定最大连续可读下标。从读取器游标中读取可用的Buffer,直到找到第一个不可用的元素,然后返回最大的连续可读元素的位置;consumer读取元素如下图,reader线程读取下标为2的元素,Writer1/Writer2/Writer3三个线程正在向RingBuffer对应位置写入数据,最大元素索引赋值给写线程为11,读线程申请读取下标从3到11的元素,判断写游标>=11。然后开始读取availableBuffer,从3开始,往回读,发现下标为7的元素没有成功产生,于是WaitFor(11)返回6。然后,消费者一共读取了4个下标为3的元素到6。6.高级使用6.1Parallel模式6.1.1Singlewriter模式在并发系统中提高性能的最佳方式之一是singlewriter原则,这同样适用于Disruptor。如果您的代码中只有一个事件生产者,您可以将其设置为单生产者模式以提高系统性能。publicclasssingleProductorLongEventMain{publicstaticvoidmain(String[]args)throwsException{//.....//使用SingleProducerSequencerDisruptordisruptor=newDisruptor(factory,bufferSize,ProducerType.SINGLE,//Singlewritermode,executor);//.....}}6.1.2串行消费例如:现在触发一个注册事件需要一个Handler来存储信息,一个Handler来发送邮件等等。/***线程依次执行*
*p-->c11-->c21*@paramdisruptor*/publicstaticvoidserial(Disruptordisruptor){disruptor.handleEventsWith(newC11EventHandler()).then(newC21EventHandler());破坏者.start();}6.1.3菱形方式执行publicstaticvoiddiamond(Disruptordisruptor){disruptor.handleEventsWith(newC11EventHandler(),newC12EventHandler()).then(newC21EventHandler());破坏者.start();}6.1.4链式并行计算publicstaticvoidchain(Disruptordisruptor){disruptor.handleEventsWith(newC11EventHandler()).then(newC12EventHandler());disruptor.handleEventsWith(newC21EventHandler()).then(newC22EventHandler());破坏者.start();}6.1.5相互隔离模式publicstaticvoidparallelWithPool(Disruptordisruptor){disruptor.handleEventsWithWorkerPool(newC11EventHandler(),newC11EventHandler());disruptor.handleEventsWithWorkerPool(新的C21EventHandler(),newC21EventHandler());破坏者.start();}6.1.6通道模式/***串行执行,C11和C21分别有2个实例*
*p-->c11-->c21*@paramdisruptor*/publicstaticvoidserialWithPool(Disruptordisruptor){disruptor.handleEventsWithWorkerPool(newC11EventHandler(),newC11EventHandler()).then(newC21EventHandler(),newC21EventHandler())entHand;破坏者.start();}本文参与思维技术征文征文,如果您正在阅读本文,欢迎您的加入。转载请注明出处!