RocketMQ就做到了这一点,经过压测他们的杰作,性能提升了30%。根据RocketMQ4.9.1的更新日志,我们提取了关于消息发送性能优化的[Issues:2883]。详细链接如下:具体优化点见截图:首先尝试对以上优化点做一个简单的介绍:是优化WaitNotifyObject(item2)中的锁移除HAService中的锁(item3)移除lockinGroupCommitService(item4)消除HA中不必要的arraycopy(item5)调整消息发送的几个参数的默认值(item7)sendMessageThreadPoolNumsuseReentrantLockWhenPutMessageflushCommitLogTimedendTransactionThreadPoolNums减少琐碎影响的范围(item8-12)看了上面的变化,它得出结论,优化方法主要有以下三点:去除不必要的锁,降低锁的粒度(范围)修改消息发送相关参数接下来结合源码,从中选取有代表性的函数进行详细分析,领悟Java的魅力高并发编程在一起。1.去除不必要的锁本次性能优化主要针对RocketMQ同步复制场景。先简单介绍一下RocketMQ主从同步的编程技巧。RocketMQ主节点将消息写入内存后,如果使用同步复制,需要等待从节点写入消息成功后,再将消息返回给客户端。代码编写也非常熟练,如下图所示:温馨提示:消息发送从RocketMQ4.7开始优化,同步消息发送模型引入jdk的CompletableFuture实现消息的异步发送。核心步骤解读:消息发送线程调用Commitlog的aysncPutMessage方法写入消息。Commitlog调用submitReplicaRequest方法,将任务提交给GroupTransferService,获取一个Future实现异步编程。值得注意的是,这里需要等待数据成功写入从节点(基于CompletableFuture机制的内部线程池ForkJoin)。在GroupTransferService中依次对提交的任务进行判断,判断对应的请求是否已经同步到从节点。如果已经复制到slave节点,通过Future唤醒,将结果返回给消息发送者。GroupTransferService代码如下图所示:为了让大家更容易理解接下来的优化点,先总结提炼一下GroupTransferService的设计理念:首先介绍两个List的组合,分别命名为读写链表分别。外部调用GroupTransferService的putRequest请求会存储在写入列表(requestWrite)中。GroupTransferService的run方法从requestRead链表中获取任务,判断这些任务对应的请求数据是否成功写入从节点。每当requestRead中没有数据可读时,两个队列交互实现读写分离,减少锁竞争。新版本的优化点主要包括:更改putRequest的锁类型,synchronized替换为spinlock,doWaitTransfer方法中去掉多余的锁1.1synchronized替换为spinlock如下图,GroupTransferService提供接口putRequest给outsideworldfor接受外部同步任务,需要对ArrayList进行加锁保护。向ArrayList中添加数据是内存操作,操作耗时较少。所以不需要用synchronized,也就是synchronized,可以用自旋锁。自旋锁的实现非常轻量,如下图所示:整个锁的实现只需要引入一个AtomicBoolean来加锁和释放锁。它们都是基于CAS操作,非常轻量级,自旋锁不会发生线程切换。1.2去除多余的锁“锁”的滥用是一个非常普遍的现象。在多线程环境中编程是一个非常复杂的交互过程。在写代码的过程中,我们可能会觉得无法预测这段代码是否会被多个线程并发使用。对于执行,为了谨慎起见,直接加锁简单粗暴,自然会造成性能损失。这里要解除锁,我们需要结合这个类型的调用链来判断是否需要加锁。在整个多线程环境下运行的GroupTransferService中,需要保护的主要是requestRead和requestWrite集合。引入锁的目的是为了保证这两个集合在多线程环境下可以安全访问,所以我们应该先梳理一下。GroupTransferService核心方法的运行过程:doWaitTransfer方法操作的主要对象是requestRead链表,该方法只会被GroupTransferService线程调用,在swapRequest中会修改requestRead中的方法,但这两个方法是串行执行的,并且在同一个线程中,不需要引入锁,可以去掉。但是由于去掉了锁,所以在swapRequests中加了锁,因为requestWrite队列会被多个线程访问,优化后的代码如下:从这个角度来看,其实主要是更换锁类型从同步到更轻量级的自旋锁。2.缩小锁的范围。被锁包裹的代码块是串行执行的,即不能并发。如果无法避免锁,减少锁的代码块可以有效提高并发性。示意图如下:如果多个线程区域访问lock1、lock2,lock1中的doSomeThing1和domSomeThing2这两个方法必须串行执行,而多个线程同时访问lock2方法,doSomeThing1可以同时被多个线程执行,只有doSomething2需要串行执行,其整体并发效果肯定是lock2,基于这个理论:得到一个关于锁使用的最佳实践:被锁包裹的代码块越少越好。在旧版本中,消息写入和加锁的代码块比较大,一些可以并发执行的动作也被锁包裹起来,比如生成offsetMsgId。新版本采用了函数式编程的思想,只定义了获取msgId的方法。写入消息时不会执行,降低锁的粒度,从而并行化生成offsetMsgId。编程方法巧妙,值得学习。3.调整消息发送相关参数sendMessageThreadPoolNumsBroker端消息发送线程池数,4.9.0版本前该值默认为1,新版本调整为操作系统的CPU核数,且不小于4。这个参数的调整有利也有弊。提高了消息发送的并发性,但同时会导致消息顺序的紊乱。示例图如下,同步发送下不会有顺序问题,大家可以放心修改。在顺序消费场景下,不建议修改该参数。在实际过程中,需要对RocketMQ集群进行管理,对于顺序消费场景应该使用专用集群。useReentrantLockWhenPutMessage将MQ消息写入用于锁定内存的锁类型。低版本之前默认为false,表示默认使用自旋锁;新版本使用ReentrantLock。自旋的主要优点是没有线程切换成本,但是自旋容易造成CPU浪费。大多数情况下,内存写入是非常快的,但是RocketMQ更多的依赖于pagecache。如果出现缓存抖动,造成的CPU浪费是不值得的。sendMessageThreadPoolNums设置大于1后,锁类型用ReentrantLock更稳定。flushCommitLogTimed首先我们通过观察源码来理解这个参数的含义:它的主要作用是控制flush线程阻塞等待的方式。低版本flushCommitLogTimed为false,默认使用CountDownLatch,而高版本直接使用Thread.sleep。猜测的原因是闪烁线程相对独立,不需要和其他线程直接交互,所以没必要使用CountDownLatch这种专门用于线程协作的“洋和尚”。endTransactionThreadPoolNums主要用于设置交易消息线程池的大小。新版本主要通过调整发送线程池来动态调整交易消息的值。可以根据压测结果动态调整。
