当前位置: 首页 > 后端技术 > Java

我用Kafka两年踩过的一些不寻常的坑

时间:2023-04-01 14:48:44 Java

前言我的博客我上一家公司是做餐饮系统的。每天中午和晚上的用餐高峰期,系统的并发量不容小觑。为保险起见,公司规定各部门必须在用餐时间轮流值班,以防上网出现问题得到及时处理。我在后厨展示系统团队,属于订单的下游业务。用户下单后,订单系统会向我们的系统发送一条kafka消息。系统读取消息后,进行业务逻辑处理,将订单和菜品数据持久化,然后展示给点餐客户端。这样,厨师就知道哪些菜要准备哪些菜,哪些菜做好了,就可以通过系统上菜了。系统自动通知服务员上菜。如果服务员上完菜,修改菜品的上菜状态,用户就会知道哪些菜已经上了,哪些菜还没有上。该系统可以大大提高从厨房到用户的效率。事实证明,这一切的关键在于消息中间件:kafka,如果它出现问题,将直接影响厨房显示系统的功能。接下来跟大家说说我在使用Kafka的这两年踩过哪些坑?顺序问题1.为什么要保证消息的顺序?一开始,我们系统里的商户很少。为了快速实现功能,我们也没有想太多。由于是通过消息中间件Kafka进行通信,因此订单系统在发送消息时会将详细的订单数据放在消息体中。我们的厨房展示系统只要订阅主题,就可以获取到相关的消息数据,然后处理自己的业务。但是这个方案有一个关键因素:必须保证消息的顺序。为什么?订单有很多状态,比如:下单、支付、完成、取消等,不看订单的消息是不可能不看的,所以先看支付或取消的消息。这样的话,数据就不会混乱了。?嗯,看来保证消息顺序是很有必要的。2、如何保证消息的顺序?我们都知道Kafka的topic是无序的,但是一个topic包含多个partition,每个partition内部都是有序的。![Image]()这样思路就清晰了:只要生产者写入消息,按照一定的规则写入同一个分区,不同的消费者读取不同分区的消息,就可以保证生产和消费者消息的顺序。这就是我们一开始所做的。同一个商户号的消息写入同一个分区。在topic中创建四个partition,然后部署四个consumer节点组成一个consumergroup。一个分区对应一个消费者节点。理论上,该方案可以保证消息的顺序。一切都计划得“天衣无缝”,我们“顺风顺水”上线。3.发生了事故。这个功能上线有一段时间了,一开始还挺正常的。然而好景不长,用户反映部分订单和菜品在点菜客户端看不到,无法下单。我找到了原因。那段时间公司网络经常不稳定,业务接口时不时报超时,业务请求时不时连接不上数据库。这种情况对顺序消息的影响可以说是毁灭性的。你为什么这么说?假设订单系统发送了三个消息:“Order”、“Payment”和“Complete”。但是由于网络原因,我们的系统未能处理“订单”消息,后面两条消息的数据也无法存入数据库,因为只有“订单”消息的数据才是完整的数据,其他类型的消息只会更新状态。另外我们当时没有实现失败重试机制,放大了这个问题。问题就变成了:一旦“订单”消息的数据入库失败,用户将永远看不到订单和菜品。那么如何解决这个紧迫的问题呢?4.解决过程一开始我们的想法是:消费者处理消息时,如果处理失败,立即重试3-5次。但是,如果某些请求需要第6次才能成功怎么办?不可能一直重试。这种同步重试机制会阻塞其他商户订单消息的读取。显然,使用上面的同步重试机制会严重影响消息消费者的消费速度,在出现异常时降低其吞吐量。从这个角度来看,我们不得不使用异步重试机制。如果使用异步重试机制,失败的消息必须存储在重试表中。但是一个新的问题马上出现了:如何保证只有一条消息的顺序?保存消息并不能保证顺序。如果“order”消息失败,就没有时间异步重试。这个时候消费的是“支付”消息,肯定不能正常消费。此时“支付”消息应该一直在等待,每隔一段时间判断一下,之前的消息是否已经消费完了?如果真的这样做,会出现两个问题:“支付”消息前面有“订单”消息,这种情况比较简单。但是如果某类消息前面有N种消息,那么有多少种消息呢?timesdoyouneedtojudge?这种判断跟订单系统耦合度太强,相当于把他们系统的部分逻辑搬到了我们的系统中。影响了消费者的消费速度,这个时候就出现了一个更简单的解决方案:消费者处理消息时,首先判断订单号在重试表中是否有数据,如果有则直接将当前消息保存到重试表中,如果没有则进行业务处理,如果出现异常则保存消息到重试表,后面我们使用elastic-job建立失败重试机制,如果7次重试还是失败,消息的状态会被标记为失败,并通过邮件通知开发者。终于解决了因网络不稳定导致用户在点菜客户端看不到部分订单和菜品的问题。现在商家最多只能偶尔延迟看到菜品,总比一直看不到菜品要好很多。留言积压随着销售团队的营销推广,我们系统中的商户越来越多。随之而来的是消息越来越多,导致消费者无法处理,经常出现消息积压的情况。对商家的影响非常直观。点菜客户端上的订单和菜品可能要等到半小时后才能看到。一两分钟我还能忍,半条消息的延迟,我受不了一些脾气暴躁的商家,立马投诉。那段时间,我们经常接到商家的投诉,订单和菜品延迟。虽然增加服务器节点可以解决问题,但是按照公司省钱的做法,还是先做系统优化,于是我们开始了解决消息积压问题的旅程。1、邮件正文过大。Kafka虽然号称支持百万级TPS,但是从producer向broker发送消息需要一次网络IO,而broker向磁盘写入数据需要一次磁盘IO(写操作)。消费者首先从代理处获取消息。磁盘IO(读操作),然后是网络IO。![Picture]()一个简单的从生产到消费的过程,需要2倍的网络IO和2倍的磁盘IO。如果消息体过大,必然会增加IO的时间消耗,影响Kafka生产和消费的速度。由于消费者太慢,会出现消息积压。除了上述问题外,如果消息体过大,还会浪费服务器的磁盘空间。如果不小心,磁盘空间可能会不足。至此,我们就到了需要优化消息体过大的问题了。如何优化呢?我们对业务进行了重组,不需要知道订单的中间状态,只需要一个终态即可。那么好,我们可以这样设计:订单系统发送的消息体只需要包含id、status等关键信息即可。厨房显示系统消费消息后,通过id调用订单系统的订单详情查询接口获取数据。后厨展示系统判断数据库中是否有该订单的数据,如果没有则存储,如果有则更新。![Picture]()果然经过这次调整,很久没有再出现消息积压问题。2.不要为不合理的路由规则而高兴。一天中午,一位商户抱怨订单和菜品都被耽误了。我们查看kafka的topic时,又出现了消息积压的情况。但是这次有点奇怪,不是所有分区都有消息积压,只有一个。一开始以为是消费分区消息的节点出了问题。但经过排查,并没有发现异常。这就奇怪了,问题出在哪里?后来查了日志和数据库,发现有几个商户的订单量特别大。恰好这些商户被分配到同一个分区,导致这个分区的消息量比其他分区大很多。这时我们才意识到发送消息时根据商户ID路由partition的规则是不合理的,可能会导致有的partition消息太多消费者处理不了,而有的partition消息太少消费者空闲。为了避免这种分布不均,我们需要调整发送消息的路由规则。我们想了想,用订单号做路由相对来说比较统一,不会出现单个订单发送过多消息的情况。除非是有人不停加菜的情况,但是加菜是要花钱的,所以其实同一个订单的消息并不多。调整后,根据序号路由到不同分区,相同序号的消息每次发送到同一个分区。调整后,很久没有再出现消息积压的问题。这段时间我们商户的数量增长非常快,而且越来越多。3、批量操作带来的连锁反应在高并发场景下,消息积压问题可以说是如影随形,实在是没有办法从根本上解决。表面上已经解决了,过一会又会弹出来,比如这次:一天下午,产品来了说:好几个商家投诉了,说是菜品延迟了,赶紧查原因。这次出现的问题有点奇怪。你为什么这么说?首先,这个时间点有点奇怪。通常,如果有问题,不都是中午或晚上的用餐高峰时间吗?为什么这次问题出现在下午?根据以往积累的经验,直接看Kafka的topic数据。果然上面有消息积压,但是这次每个partition都积压了10万多条消息没有消费,比之前的压压消息多了几百倍。次。这次的消息积压极为不寻常。我赶紧查看服务监控,看消费者有没有挂掉,还好没有。又查看了服务日志,没有发现异常。这时候我有点懵了,就碰碰运气问订单组下午怎么了?他们说下午有促销活动,跑了个JOB,批量更新部分商家的订单信息。这时,我突然如梦初醒。问题是他们在JOB中批量发送消息引起的。你为什么不通知我们?这太糟糕了。虽然我们知道问题的原因,但是面对积压在我们面前的几十万条消息,我们应该怎么办呢?此时,直接增加分区数量是行不通的。历史消息存储在4个固定分区中,只有新消息才会发送到新分区。我们需要关注的是现有分区。直接添加服务节点是不行的,因为Kafka允许同组的多个分区被一个消费者消费,但不允许同组的多个消费者消费一个分区,可能造成资源浪费。看来只能用多线程了。为了紧急解决问题,改成使用线程池处理消息,核心线程和最大线程数都配置为50,调整后果然消息积压数持续减少.但是这时候,更严重的问题出现了:收到了报警邮件,订单系统的两个节点宕机了。很快,订单组的一个同事来找我说,我们系统调用他们??的订单查询接口的并发量突然变大了,超出了预期数倍,导致两个服务节点挂掉了。他们把查询功能集成到一个服务中,部署了6个节点,挂了2个节点。如果他们不处理,其他4个节点也会挂掉。订单服务可以说是公司的核心服务。如果失败了,公司将损失惨重,情况十分紧急。为了解决这个问题,我们只能先减少线程数。好在可以通过zookeeper动态调整线程数。我调整了核心线程数为8,核心线程数为10,后来运维重启了链接订单服务的两个节点,恢复正常。为了以防万一,又添加了两个节点。为确保订单服务不会出现问题,将维持目前的消费速度。后厨显示系统消息积压1小时后恢复正常。后来我们开了个审核会,得出的结论是,订单系统的批量操作一定要提前通知下游系统团队。下游系统团队在多线程调用订单查询接口时需要进行压力测试。这次为订单查询服务敲响了警钟。作为公司的核心服务,对于高并发场景的处理还不够好,需要优化。监控消息积压。顺便说一句,对于需要严格保证消息顺序的场景,线程池可以改为多个队列,每个队列由一个线程处理。4.桌子太大。为了防止消息积压再次发生,消费者一直在使用多线程来处理消息。但是一天中午,我们还是收到了很多告警邮件,提醒我们Kafka主题消息积压了。我们正在排查原因,这时候产品过来了,说:又有商家投诉菜品延误了,请看一下。她这次显得有些不耐烦,确实优化了很多次,但还是出现了同样的问题。从外行的角度看:为什么同样的问题没有解决?事实上,他们并不知道技术内心深处的痛苦。从表面上看,问题的征兆都是一样的,上菜有延迟,他们知道是消息积压造成的。但深层次的原因他们并不清楚,消息积压的原因其实有很多。这可能是使用消息中间件的通病。我默不作声,只能硬着头皮查找原因。后来查看日志,发现消费者消费一条消息最多需要2秒。以前是500毫秒,现在怎么变成2秒了?奇怪,消费者的代码没有做大的调整,为什么会出现这种情况?查了一下网上的菜表,单表的数据量已经上千万了。其他表格菜单也是如此。现在单个表中保存的数据太多了。我们的团队整理了业务。实际上客户端上只能显示最近3天的菜品。这很容易处理。我们在服务器端存储冗余数据,所以最好将表中的冗余数据归档。所以DBA给我们做了数据归档,只保留了最近7天的数据。经过这样的调整,新闻积压问题得到解决,恢复了往日的平静。不要对主键冲突太兴奋,还有其他问题,例如:报警邮件经常报告数据库异常:Duplicateentry'6'forkey'PRIMARY',表示主键冲突。出现这种问题一般是因为有两条或两条以上主键相同的sql,同时插入数据。第一次插入成功后,第二次插入时会报主键冲突。表的主键是唯一的,不允许重复。仔细查看代码,发现代码逻辑会先根据主键从表中查询订单是否存在,存在则更新状态,不存在则插入数据,没问题。这种判断在并发量不大的时候很有用。但是如果在高并发场景下,两个请求同时发现顺序不存在,一个请求先插入数据,另一个请求再插入数据,就会出现主键冲突的异常。解决这个问题最常见的方法是:锁定。我一开始就是这么想的。给数据库加悲观锁肯定不行,太影响性能了。添加数据库乐观锁,根据版本号判断,一般用于更新操作,这种插入操作基本不用。剩下的只能使用分布式锁了。我们的系统使用的是redis,可以添加基于redis的分布式锁来锁定订单号。但是仔细一想:加分布式锁也可能会影响消费者的消息处理速度。消费者依赖redis。如果redis出现网络超时,我们的服务就悲剧了。因此,我不打算使用分布式锁。相反,选择使用mysql的INSERTINTO...ONDUPLICATEKEYUPDATE语法:INSERTINTTOtable(column_list)VALUES(value_list)ONDUPLICATEKEYUPDATEc1=v1,c2=v2,...;它会尝试先将数据插入表中,如果主键冲突,则更新字段。改造之前的insert语句后,就不会再有主键冲突了。数据库主从延迟不久后的一天,接到商家投诉,下单后在点菜客户端可以看到订单,但是看到的菜品不全,有时订单和菜品连在一起看不到数据。这个问题与前面的问题不同。根据以往的经验,首先查看Kafka主题中是否有消息积压,这次没有积压。再次查看服务日志,发现订单系统接口返回的数据有的是空的,有的只返回订单数据,没有返回菜品数据。这就很奇怪了,直接去找订单群里的同事。他们仔细检查了服务,没有发现任何问题。这时候我们都同时想到,可能是数据库有问题,一起去找DBA。果然,DBA发现数据库的主库向从库同步数据,由于网络原因偶尔会出现延迟,有时延迟3秒。如果我们的业务流程从发送消息到消费消息的时间小于3秒,那么在调用订单详情查询接口的时候可能会查不到数据或者查不到最新的数据。这个问题很严重,会直接导致我们的数据出错。为了解决这个问题,我们还增加了重试机制。调用接口查询数据时,如果返回数据为空,或者只返回订单没有菜品,添加重试表。调整后,商家投诉的问题已得到解决。Kafka重复消费消息时支持三种模式:atmostoncemodeatmostonce。确保每条消息在消费前都已成功提交。消息可能会丢失,但不会重复。至少一次模式至少一次。在提交之前确保每条消息都已成功处理。消息不会丢失,但可能会重复。exactlyonce模式只传递一次。使用offset作为唯一id同时处理消息,保证处理的原子性。消息只处理一次,既不会丢失也不会重复。但这样做很难。Kafka默认的模式是至少一次,但是这种模式可能会造成重复消费,所以我们的业务逻辑必须是幂等的。在我们的业务场景中,使用INSERTINTO...ONDUPLICATEKEYUPDATE语法来保存数据,不存在时插入,存在时更新,天然支持幂等性。多环境消费问题当时我们的线上环境分为:pre(预发布环境)和prod(生产环境)。两个环境共享同一个数据库,同一个kafka集群。需要注意的是,在配置kafka的topic时,要加一个前缀来区分不同的环境。pre环境以pre开头,如:pre_order,生产环境以prod开头,如:prod_order,防止不同环境的消息串在一起。但是pre环境有个运维切换节点,在配置topic的时候配置错了,配置成了prodtopic。就在那天,我们在前置环境上有了一个新的功能。结果是一场悲剧。prod部分消息被前置环境的消费者消费,由于调整了消息体,导致前置环境的消费者无法处理消息。结果是在生产环境中丢失了一些消息。幸运的是,最终生产环境中的消费者通过重置偏移量并重新读取那部分消息解决了问题,没有造成太大损失。后记除了以上问题,我还遇到了:Kafka的consumer使用了自动确认机制,导致CPU占用100%。kafka集群中某个broker节点挂了,重启后一直挂。这两个问题说起来有点复杂,就不一一列举了。有兴趣的朋友可以关注我的公众号,加我微信私聊我。非常感谢那两年使用消息中间件Kafka的经历。虽然遇到了很多问题,踩了很多坑,走了很多弯路,但积累了很多宝贵的经验,成长很快。其实kafka是一个非常好的消息中间件。我遇到的大部分问题都不是kafka本身引起的(除了100%的cpu使用率是它里面的一个bug引起的)。文章来自:苏三说科技