消息队列线程池模型如何保证重启时消息不丢失?转载请联系CaféLatte公众号。背景今天在脉脉上看到一个帖子,挺有意思的:这个帖子的意思是:在使用Kafka的时候,我们设置了多个partition,如何提高消费能力?如果我们使用线程池来改进如何保证重启时消息不丢失。这个问题其实问了两点。一是如何提高消费能力,二是选择线程池如何保证消息不丢失。先解释一下这两个问题是怎么回事。在很多消息队列中,有一个概念叫做partition,代表分区。分区是提高消息队列消费的关键。我们的消费者消费渠道是从每个分区出发,一个分区只能持有一个消费者,如下图:有点像银行排队,排队的人越多,排队的时间越短,当然也可以采用异步的方式来处理,比如线程池,将所有的消息都丢到线程池中执行,这就引出了作者的第二个问题。首先我们来看看为什么同步消费不会丢消息?如果我们使用同步模型。当我们消费的时候,我们会返回偏移量ack。如果我们重启后offset失败,那么这部分数据又会被消费掉。如果我们使用线程池进行消费,那我们怎么ack呢?,比如我们用线程池消费10、11、12三个消息,如果先消费12,那我们是不是就ack13?如果我们这样做,这个时候重启,kafka会认为你已经处理了10和11的消息,这个时候消息就丢失了,发这个帖子的同学对这个比较迷茫。来看看部分网友的回答:网友A:这位网友的回答实质是使用线程池,作者也回复了,并没有解决线程池的问题。网友B:这个方法类似于银行排队。只要队列多,处理速度就会加快。它确实是第一个问题的解决方案之一。网友C:这个类主要是解决第二个问题。通过在外部维护偏移量,例如将偏移量存储在仓库中,我们可以找到正确的应该被消费的偏移量。这个比较复杂,需要用一个MQ来配数据库。如果我使用MQ服务,完全没有数据库,就得单独申请。网友D:另一种观点是,如果代码写的好,消费的速度提高了,那么消费力自然就变大了。这确实是很重要的一点,通常被别人忽略。有时消费更贵。慢,可能很多人一上来就想怎么设置中间件,往往会忽略自己的代码。看了这么多帖子的回复,感觉没有一个真正让我满意的答案。我说说我的一些想法吧。我的想法关于第一个问题,如何提高消费能力?这道题其实可以归纳为三种方法:如果每台消费机的消费线程是固定的,那么我们可以扩大消费机和分区,类似于银行队列增加排队窗口一样。如果机器和partition是固定的,增加消费线程是更好的方法,但是如果是顺序消费,就不能通过增加线程数来提高消费能力,因为顺序消费的每个partition都是一个单独的线程,它只能用第一种方法解决。增加自己代码的消费能力,你想想如果是银行办理业务,如果柜员的办理效率可以提高非常高,那么整个排队的速度肯定是非常快的。对于第二个问题,如果我们使用线程池模型,如何解决消息丢失的问题。这里我推荐RocketMQ中的做法。我们之前说过,使用数据库保存偏移量比较复杂,性能还是比较差。在RocketMQ中,TreeMap结构用于执行我们上面提到的数据库操作:privatefinalTreeMap
