当前位置: 首页 > 科技观察

记录一个Kafka消费的生产故障处理过程

时间:2023-03-20 14:18:32 科技观察

大家好,欢迎来到Tlog4J课堂,我是Jensen。记录今天发生的一次生产故障和排除故障的全过程。问题背景需求背景如下:产品需要下单后,资金平台需要对这些订单进行结算,并将虚拟资产记入下单客户的虚拟账户。因为我们按照业务领域拆分了多个微服务,为了解耦订单和资金平台,我们选择了MQ异步消息的方式来传输业务数据。流程简化为:【订单中心】查询过往售后订单->发送MQ消息给财务中心。【金融中心】接收MQ消息->验证客户交易数据->调用基金平台进行积分结算。【资金平台】结算积分->虚拟资产入账。其中,金融中心的MQ消费使用了基于Kafka二次封装的组件。默认使用应用中的线程池异步消费消息进行业务处理(因为需要在多处消费)。这第二个组件也已经使用了一年。时间比较稳定。OK,目前还没发现问题。那么,如果不出意外,很快就会发生意外。早上6:00触发P1级警报。由于应用中的线程池被炸毁,应用拒绝策略737次,触发SQL慢查询10秒(刚好验证客户的交易数据操作使用非索引列查库)。之后进行排查,分析生产者和消费者的代码,发现问题如下:消费者金融中心对应的消费方式使用了默认的异步方式处理消息,使用的线程数默认200个线程。如果短时间内接收到多个MQ,但无法快速执行释放线程。如果线程数达到200,必然会通过拒绝策略报错,甚至会影响其他异步执行MQ(共享同一个线程池)的消费者方法。订单中心同时批量修改过去的售后订单,将发送MQ的方法包裹在for循环中。这意味着如果同时发送大量的MQ消息,由于第一个消费者的隐患,发送的MQ消息将无法正常消费。分析完处理过程中的问题,基本上就可以确定如何解决了。分为三步:第一步:对于线上消费异常的数据,根据代码逻辑再次运行SQL修复对应的数据。这件事要尽快做,不能因为程序问题影响客户体验。第二步:MQ组件异步消费的消息堆积能力受线程池大小的影响。消息堆积的问题应该交给专业的MQ自己解决,所以暂时关闭题目的异步执行,不使用线程池,改成同步的。MQ组件后续优化将不再提供异步执行的方法,比如使用类似@KafkaListener(topic="xxx",groupId="appName.beanName.methodName")的方法,但是需要动态创建KafkaListener和使用MQ本身的消费者组的功能,防止消息在应用程序线程池中堆积。第三步:通过业务回避,合理评估需求。对于确定的场景,可以结合MQ请求、SQL请求、Feign接口调用请求,比如上面提到的for循环发送订单售后通知,验证客户交易。在记录数据和资本平台点数的场景下,进行识别,通过批量合并请求(以空间换频率)解决频繁请求可能出现的问题。合并请求后,需要评估合并请求的大小限制,进一步切割请求。比如订单合并后有10万条数据,放在一个request里是不合理的。在发送请求之前,应根据一定的订单量切割请求。这里有一个leader(技术总监)用了8年的需求分析方法:总结出这种生产问题,分三步解决:第一,尽快修复生产数据,避免影响客户体验。二、治标不治本——找出问题的根源,有针对性地解决。三是治标不治本——合理评估需求。