当前位置: 首页 > 科技观察

用了这么久的RabbitMQ异步编程,其实是错误的!

时间:2023-03-22 14:48:56 科技观察

优秀的项目辅以三种处理模式:同步、异步和定时任务。异步编程充满了陷阱。1适用场景1.1服务于主流程的支流程在注册流程中,向DB写入数据是主流程,注册后给用户发送优惠券或欢迎短信是支流程,时效性不强。1.2用户无需实时查看结果。例如,外卖下单后的配送配送流程可以异步处理。每个阶段处理完成后,向用户发送推送或短信,让用户知道。1.3MQ任务缓存分发、流量调峰、服务解耦和消息广播。当然,异步处理不只是通过MQ实现,还有其他方式比如开一个新的线程执行,返回Future和各种异步框架,比如Vertx,是通过回调实现的2异步处理的坑2.1异步处理的需求做消息补偿闭环RabbitMQ可以将消息发送到磁盘,即使MQ异常消息数据也不会丢失,但是异步过程可能会在消息发送、传输、处理等环节造成消息丢失。所有的MQ都不能保证100%的可用性,业务设计需要考虑异步流程在不可用时如何继续。因此,对于异步处理过程,必须考虑补偿或建立主备双活过程。2.1.1案例用户注册后异步发送欢迎信息。用户注册和登陆DB是一个同步过程。会员服务收到消息后发送欢迎信息。这是一个异步过程。蓝色线为MQ异步处理(主线),消息可能丢失(虚线代表异步调用)。主线上消息丢失补偿考虑极端MQ中间件故障场景,要求备线上的处理吞吐量达到主线上的性能代码示例UserController注册+发送异步消息。注册方式,一次注册10个用户,用户注册消息发不出去的概率为50%。MemberService会员服务监听用户注册成功消息,发送欢迎短信。使用ConcurrentHashMap存储那些发过短信的用户ID,实现幂等性,避免同一用户发短信时重复发短信,因为稳定性和稳定性等原因而重复自动赔付。比如这个例子,同一条消息可能会同时经过MQ和补偿,肯定会出现重复。而且,考虑到高内聚,补偿作业本身不会去重人工补偿,重复的消息会累积,异步处理流程必须延迟。如果提供补偿功能,当处理遇到延迟时,很可能先手动进行补偿。一段时间后,处理程序再次收到消息并重复处理。出现了MQ故障,MQ中积累了数十万条资金分发消息,导致业务无法及时处理。运行以为是程序出错,先通过后台手动处理。结果,MQ系统恢复后,又重新处理消息,导致大量Fund重复发行。异步处理必须考虑消息重复的可能性,因此处理逻辑必须是幂等的,以防止重复处理。然后定义补偿作业,即线准备操作。对于定时任务,每5秒补偿一次,因为作业不知道哪些用户注册消息可能丢失,所以是全补偿。补偿逻辑每5秒补偿一次,一次补偿5个用户,依次进行。下一次补偿操作从上次补偿的最后一个用户ID开始,将补偿任务提交给线程池进行“异步”处理,提高处理能力实现高内聚,主线和备线处理消息,最好使用相同的方法。在这种情况下,MemberService监听MQ消息和CompensationJob补偿,并调用welcome。这里的补偿逻辑很简单,只是一个demo。实际生产代码必须:考虑配置补偿的频率、每次处理的数量、补偿线程池的大小为合适的值以满足补偿的吞吐量考虑备份补偿数据的适当延迟例如,对注册时间在30s之前的用户进行补偿,方便与主线MQ的实时进程错开,避免当前补偿哪个用户的offset数据等冲突。它需要登陆数据库。补偿作业本身必须是高可用的,你可以使用类似xxl-Task的系统,比如job或ElasticJob。运行程序,执行registration方法注册10个用户,查看日志,一共有10个用户。发送MQ成功的用户有4个:1、5、7、8,补偿任务第一次运行补偿用户2、3、4,第二次运行补偿用户6、9,第三次运行操作补充用户10.最高标准的消息补偿闭环,可以达到补偿全量数据的吞吐量。也就是说,如果补偿和备份线路足够完善,即使直接关闭MQ,处理的时效性也会受到一点影响,但至少可以保证流程的正常执行。小结实际开发应该考虑异步进程丢失消息或者处理中断的场景。异步过程需要有后备线进行补偿,比如这里的全量补偿方式,即使异步过程完全失效,业务也可以通过补偿继续进行。2.2RabbitMQ广播,工作队列模式坑消息模式广播或者工作队列消息广播同一条消息,不同的消费者可以分别消费队列模式不同的消费者在同一个队列中共享消费数据,同一条消息只能被某个消费者消费一次。比如同一个用户的注册消息需要被会员服务监听发送欢迎短信,营销服务需要被监听给新用户送小礼物。但是,会员和营销服务可能存在多个实例,业务需要将同一用户的消息同时广播到不同的服务(广播方式),但是对于同一个服务的不同实例(比如会员服务1和会员服务2),不管哪个实例处理,只需要处理一次(工作队列模式):在实现代码的时候,一定要确认MQ系统的机制,保证消息的路由工作正如预期的那样。RocketMQ对类似功能的实现比较简单明了:如果消费者属于一个群组,则消息只会被同一个群组的一个消费者消费;如果消费者属于不同的组,则每个组可以消费一次消息。RabbitMQ的消息路由方式采用队列+交换机,队列是消息的载体,交换机决定消息如何路由到队列。step1:会员服务——监听用户服务发送的新用户注册消息。如果激活了两个会员服务,则同一用户的注册消息只能被其中一个实例消费。分别实现RabbitMQ队列、交换机、绑定三件套。该队列使用匿名队列。交换机使用DirectExchange。交换机绑定到匿名队列的路由键是一个空字符串。收到消息后,打印实例使用的端口。消息发布者、消费者、MQ的配置使用12345和45678两个端口启动两个程序实例,发送消息,输出日志显示同一个会员服务的两个实例都收到了消息:所以问题是即RabbitMQ直接exchange和队列之间的绑定关系RabbitMQ的直接exchange根据routingKey路由消息。并且每次程序启动时,都会创建一个匿名(随机命名)的队列,因此每个成员服务实例对应一个独立的队列,直接绑定到一个空的routingKeyexchange。用户服务发送消息时,也设置了一个空的routingKey,所以directexchange收到消息后,发现匹配到两个队列,于是都转发消息,修复成员服务,而不是使用匿名队列,但使用相同的队列。把上面代码中的匿名队列换成普通队列:privatestaticfinalStringQUEUE="newuserQueue";@BeanpublicQueuequeue(){returnnewQueue(QUEUE);}这样两个实例只有一个可以接收相同的消息但是,不同的消息被轮询并发送到不同的实例。当前交换和队列关系step2:用户服务-向会员广播消息,营销服务期待会员,营销服务可以接收到广播消息,但是会员/营销服务中的每个实例只需要接收一次消息。声明一个队列和一个FanoutExchange,然后模拟两个用户服务和两个营销服务:注册四个用户。日志发现一条用户注册的消息,不是会员服务接收的就是营销服务接收的,这不是广播。能用的明明是FanoutExchange,为什么不行呢?因为广播交换机忽略了routingKey,将消息广播给所有绑定的队列。在这种情况下,两个会员服务和两个营销服务绑定到同一个队列,所以四个服务只能接收一次消息:修复拆分队列,会员和营销服务使用独立队列绑定到broadcastexchange当前的交换器和队列结构可以从日志输出中验证。对于每条MQ消息,会员服务和营销服务分别接收一次,同时向两个服务广播一条消息,在每个服务的两个实例中通过轮询接收:一次异步消息路由模式配置不当,可能导致消息重复处理,也可能导致重要服务收不到消息,导致业务逻辑错误。总结一个微服务场景下不同服务的多个实例监听消息的情况。一般不同的服务需要同时接收同一条消息,而同一个服务的多个实例只需要轮询接收消息即可。我们需要确认MQ的消息路由配置是否符合要求,避免消息重复或丢失的问题。2.3死信阻塞MQ的坑一直无法处理的死信消息可能导致MQ阻塞。如果线程池的任务队列没有上限,最终可能会导致OOM。类似MQ也要注意任务堆积的问题。对于突发流量造成的MQ堆积,问题不大,应该通过适当调整消费者的消费能力来解决。但是在很多情况下,消息队列的堆积是阻塞的,因为有大量的消息无法处理。2.3.1用户注册后用户服务发送消息,会员服务监听消息并分发优惠券给用户,但由于用户未保存成功,会员服务始终处理消息失败,消息重新进入队列,然后处理仍然失败。在MQ中回显的同一消息是死信。随着MQ中死信越来越多,消费者需要花费大量时间反复处理死信,阻碍了正常消息的消费,最终MQ可能会因为数据量过大而崩溃。定义一个queue和一个directexchange,然后将queue绑定到exchangesendMessage向MQ发送消息,一次提交一条消息,使用自增标识作为消息内容,直接NPE模拟报错调用sendMessage接口发送两条消息,然后来到RabbitMQ管理控制台,可以看到这两条消息一直在队列中,不断的重新投递,导致重新投递的QPS为1063。还可以看到一个日志中有很多异常信息。最简单的方案就是解决死信无限重复入队列的问题。当程序处理错误时,直接抛出AmqpRejectAndDontRequeueException,避免消息重新入队thrownewAmqpRejectAndDontRequeueException("error");偶尔因为网络问题导致的消息处理失败,如果还是失败,消息会投递到专门设置的DLX。对于来自DLX的数据,可能只是记录日志和发送告警,即使有异常也不会重复投递。逻辑如下为了解决这个问题,我们来看下SpringAMQPDefinedeadletterexchangeranddeadletterqueues的简单解决方案。其实都是普通的交换机和队列,只是专门用来处理死信消息的。通过RetryInterceptorBuilder构建一个RetryOperationsInterceptor来处理失败时的重试。策略是最多尝试5次(重试4次);并采用指数退避重试,第一次重试延迟1秒,第二次2秒,以此类推,最大延迟10秒;如果第4次重试仍然失败,则使用RepublishMessageRecoverer将消息重新输入到DLX定义的死信队列处理程序中。本案例只记录日志代码执行程序,发送两条消息,查看日志:msg2的4次重试间隔分别为1秒、2秒、4秒、8秒,加上第一次失败,所以最大尝试次数为5次重试后,RepublishMessageRecoverer将消息发送给DLX。死信处理程序输出gotdeadmessagemsg2。虽然两条消息几乎同时发送,但是msg1的四次重试全部结束后msg2才开始处理,因为默认的SimpleMessageListenerContainer只有一个消费者线程。您可以通过添加消费者线程来避免性能问题:直接将concurrentConsumers参数设置为10以增加到10个工作线程。您还可以设置maxConcurrentConsumers参数,让SimpleMessageListenerContainer动态调整消费者线程数。总结一般遇到消息处理失败,可以设置重试。如果重试还是失败,可以将消息丢到专门的死信队列中处理,以免死信影响正常的消息处理。