如果我问你,如何提高kafka队列中的消息消费速度?答案很简单,把topic分成几个片段,然后用消费组(ConsumerGroup)来消费topic。如果加个条件,对同一个对象的操作请求必须严格顺序处理?答案并不难。Topic分片后,生产者自定义分发策略,保证同一个对象的操作请求分发到同一个分片中,让每个消费者按顺序消费各自分片中的数据。~如果加上一些条件:这个consumer的消费速度极慢,慢到处理一条消息需要100ms,即使把topic分成100个slice,也不能满足要求;每个对象的操作请求数量严重倾斜,有的分片消息量大,有的分片消息量少,有的分片可能已经积压,有的分片空闲;请求操作非常重要,需要保证每个请求都被可靠消费。保证交易的最终一致性;几十年的老系统,复杂的业务,项目方不允许涉及业务逻辑和整体架构的大改动……当上述条件叠加时,需要显着提升消费性能。如果是你,怎么破局?前段时间应业务部门的要求,为他们的一个在线历史系统做了并发性能提升方案,遇到了上述需求叠加的棘手情况。先简单说一下遇到的业务场景:一个互动论坛的帖子评论处理场景,要求每个帖子的评论请求操作必须严格按照一定的顺序进行(比如可能有删除评论、引用评论、回复评论等对于操作,所以请求顺序必须严格顺序处理),将帖子评论的操作请求发送给Kafka,然后评论服务消费Kafka来处理每个请求。这个评论消费者服务消费太慢,并发效率有待提高。增加分片数和消费者数,正式开始整改优化。首先是常规调整:根据Kafka自身的机制,将topic进行分片,拆分为N个分片,然后增加一个消费者组,在消费者组中部署与分片数量相等的消费者服务节点,使得每个consumer可以处理一个shard,所以整个review的消费性能会提升N倍。那么,这里为什么要强调消费者组中服务节点的数量必须等于主题分片的数量呢?这里是Kafka中ConsumerGroup的消费者数量和topicfragment数量的相关逻辑。看一下不同消费者数量和主题分片数量对应的处理消费场景:因此,消费组中的消费者数量并不是越多越好,而是受到主题分片数量的限制:consumer如果consumer数量太少,一个consumer需要消费多个fragment的数据,会增加某个consumer的消费压力;如果消费者数量过多,部分消费者不会消费任何数据,浪费部署资源。同样基于此,在我们上面的解决方案中,计划的消费者组中的消费者数量与主题分片的数量是一致的,这样可以保证每个消费者消费1个分片,以达到最大的效率协调。再补充一个知识点:为什么Kafka会限制每个分片最多只能被一个消费组中的消费者处理?因为消费者拉取消息需要提供offset和limit。如果offset放在broker端,会产生额外的通信开销;如果offset放在consumer端,如果一个group有多个consumers,就需要一个coordinator来集中管理,解决锁冲突。如果不解决冲突,必然会出现重复消费和无用消费,造成资源浪费。因此,在性能和复杂度的权衡上,Kafka采用了相对简单的解决策略。保证shard中的写入顺序通过上一章的方法,增加了topic分区的数量和consumergroup中的consumer数量,提高了Kafka中消息并行消费的效率,但是问题来了再次:顺序问题!前面提到,由于业务明确要求顺序消费,所以Kafka只保证一个分片内的消费顺序是固定的,而不能保证不同分片之间的消费顺序。分析业务后发现,业务需要的顺序处理其实是有条件的顺序处理。即对同一个帖子的所有评论相关操作必须同步处理,对不同帖子的评论相关操作没有顺序要求。那么问题就简单了,只要保证同一个post的所有评论相关的操作请求都分发到同一个topic分区即可!当生产者向kafka主题写入消息时,kafka会根据不同的策略将数据分发到不同的分区:轮询分区策略随机分区策略根据key分区分发策略自定义分区策略这里使用自定义分区策略,因为每个评论operationrequest带有原始postID字段,所以分发策略也很简单。直接postID%分片数分发消息,这样相同postID的评论都可以到同一个shard,这样顺序的问题就解决了。因此,优化上一环节给出的初步方案,补充生产者端自定义分发策略的要求,保证同一个帖子的评论操作,都会去到同一个话题段:这里的方案设计似乎已经解决了并发消费的问题。但经过实际压力测试,结果却让人大跌眼镜。单消费者提速根据上面给出的方案,部署DEMO环境进行压测(拆分成4个分片,部署4个消费者),最终发现集群消费速度确实提升了4倍,但是整体并发量还是低得可怜,4台机器最终消耗的还不到100!?被暴击后,我分析了单个消费节点的运行情况,发现压测时整机的CPU、IO、MEM、线程数都很低,没有任何波动。向业务方索取代码权限,下载代码,阅读Consumer服务的代码逻辑,才发现其中的奥秘。其实业务整体的交互逻辑其实很简单。从Kafka获取消息,然后使用它。但是这个消费逻辑需要依次调用10多个外围系统的HTTP接口!难怪CPU、内存、IO都很低。整个流程处理业务只有一个线程,这个线程大部分时间都处于IO等待状态。因此,如果要提升整个集群的消费能力,要么无限扩展机器,要么提升单个节点的消费能力——显然前者是不可能的,只能选择后者。对于单线程、多IO的操作场景,要提升并发性能,首先想到的就是改成多线程并发处理。但是当多个线程并发时,就会涉及到如何保证顺序消费的问题。针对之前的方案进行优化,给出如下方案:在之前方案的基础上,主要调整了消费者端的实现逻辑:在消费者内部,区分了ConsumerThread和WorkThread,由ConsumerThread负责用于从Kafka中拉取消息,而WorkThread负责真正的消费逻辑处理。单机内存中维护若干个队列,每个队列对应一个WorkThread,负责消费队列中的数据;ConsumerThread根据affinity分布策略对消息进行重分布,保证相同postID的请求分布到不同的内部队列中间。然后进行压测,设置单个消费服务的WorkThread数为100,集群中有4个消费服务,整体消费速度达到7000,单节点消费性能从原来的20提升了达到1700家,增长近80倍!如何保证消息不丢失将单机消费模式改成多线程后,解决了当前消费并发性能的问题,但是可靠性问题又出现了。以前消费者是从kafka拉取一条消息,消费完成后给kafka一个ack响应,然后再拉取下一条消息,这样即使消费者中途崩溃了,kafka依然可以分发消息给下一条一个可用的消费者来处理,可以保证请求消息不会丢失。之前的方案中,消费服务从Kafka拉取消息后,并没有等待处理完成,而是继续从Kafka拉取消息并缓存到本地内存中,等待工作线程慢慢消费.这时候如果机器宕机,所有缓存的消息都会丢失!为了解决以上问题,考虑将kafka的响应机制改为手动提交ack。但是由于多线程之间对Kafka上数据的乱序处理,导致每个线程处理过的偏移值不一样。示意图如下:为了保证消息可靠不丢失,采用如下策略:定期手动提交当前偏移量信息,提交的偏移量值,选择当前节点处理的最小偏移量值(对于上面的示意图,提交1002的偏移值),你可以通过在内存中缓存处理后的偏移列表来实现,如下。实现策略如下:正常情况下,提交的offset值不会有任何作用和影响,但是一旦出现异常情况,当前节点进程不可用,kafka重启时平衡分配当前分片给另一个消费者进行消费,另一个消费者会从上次提交的offset位置继续消费。解决了数据丢失的问题,保证了数据的可靠性。然而,另一个问题出现了:双重消费。好在这个业务系统虽然是十多年前搭建的,但至少分布式消费者应该具备的一个关键特性还是有的,那就是幂等性,所以这个问题不需要考虑。数据积压不可控,场景覆盖到这里。应该没问题吧?是和不是。正常情况下是没有问题的,但作为“核心”系统,需要考虑极端异常情况下的保命策略。比如一个帖子突然火了,这个帖子的评论数远远超过了其他帖子的评论数,甚至远远超过了整个系统的额定最大负载请求量,就会出现一个问题:kafka数据的某些shard卷积压严重,其余shard闲置。该热门帖子的相关评论请求阻塞了与该帖子分配到同一分区的其他帖子的评论处理。原计划是创建一个动态可扩展的sharding分布策略,但考虑到这种场景过于极端,现有系统实施起来不划算,所以本着合理设计的原则,放弃并改变了原计划到简单人工处理+补偿服务方式如下:一旦发生意外异常,系统积压已经超出正常处理范围,已经远远超出系统正常恢复的极限。为了保证现有业务尽快恢复正常,可以先跳过积压的请求,先保证新的请求正常处理,然后再启动补偿流程慢慢消耗之前积压的消息。有一点要说的是:这个地方是整个方案中我不太满意的一个实现。这是一个适应现实的妥协方案。在写这篇文档的时候,我还是打算在不久的将来按照一个更好的计划来实现这部分内容。如果你也有兴趣了解或者有更好的建议,欢迎联系我,我们一起分解。总结至此,为了解决Kafka消费者消费能力过慢的场景,集群并发性能提升方案设计完成,满足业务需求的各种需求和约束,最终实现了业务逻辑没有改变。在这种情况下,整个集群的性能提升了数百倍。总体变化如下:
