当前位置: 首页 > 科技观察

领导看了我写的关闭超时命令让我出去左转!

时间:2023-03-20 01:39:47 科技观察

大家好,我是阿Q!几天前,领导突然宣布重启几年前停产的电商项目。我怀着复杂的心情仔细阅读了自己“童年”的代码,只有我能体会到内心的悲伤。这不,我昨天被领导叫进了“小黑屋”,让我重构代码,升级。看到这么“可爱”的代码,我的心里面有一万匹“xx骏马”奔腾而过。最让我反感的是,它居然用定时任务实现了“关闭加班单”的功能。现在想想,真是哭笑不得。我们先来分析一下为什么大家都抵制使用定时任务来实现这个功能。定时任务关闭超时订单是在订单创建后的一段时间内未完成支付就关闭订单的操作。该功能一般要求每笔订单的超时时间一致。如果我们使用定时任务来执行这个操作,那么定时任务轮询的时间间隔是很难把握的:时间间隔小到可以在误差允许范围内实现我们说的时间一致性问题,但是要频繁扫描数据库和执行定时任务,会造成网络IO和磁盘IO的消耗,对实时事务有一定的影响;时间间隔比较大,因为每个订单的创建时间不一致,所以上面的一致性要求很难达到,例如如下:假设30分钟订单超时自动关闭,定时任务的执行间隔30分钟:我们在第5分钟下单;时间到了第30分钟,定时任务执行了一次,但是我们的订单不满足条件,没有执行;当时间来到第35分钟时,订单达到成交条件,但没有执行定时任务,故不执行;时间来到第60分钟,我们开始执行我们的平仓操作,此时错误达到25分钟;这样一来,我们就需要放弃这个方法了。延迟队列为了满足领导的需求,我接触到了消息队列:RabbitMQ。虽然它本身不提供延迟队列的功能,但是我们可以利用它的生存时间和死信开关的特性来间接实现它。首先我们简单介绍一下什么是生存时间?什么是死信交换?生存时间的全称是TimeToLive,简称TTL。它支持同时设置消息本身(延迟队列的关键)和队列(队列中的所有消息具有相同的过期时间)。消息本身设置:即使消息过期,也不会立即从队列中删除,因为每条消息是否过期在传递给消费者之前就已经确定了;设置队列:一旦消息过期,就会从队列中移除并擦除。如果同时使用这两种方式,则以到期时间较短的值为准。当消息到了过期时间还没有被消费,那么这个消息就“死了”,我们称之为死信消息。消息成为死信的条件:消息被拒绝(basic.reject/basic.nack),requeue=false;消息的过期时间到期;队列达到最大长度。队列设置注意事项该属性在队列中的设置只有在队列第一次声明时才有效。如果队列一开始就已经存在,不具备该属性,则必须删除队列,重新声明。队列的ttl只能设置为固定值。一旦设置,不能更改,否则会抛出异常;死信交换死信交换的全称是Dead-Letter-Exchange,简称DLX。当消息在队列中成为死信时,如果消息所在队列设置了x-dead-letter-exchange参数,则将其发送到对应x-dead-letter-exchange值的交换机.这个开关叫做死信开关,这个死信开关绑定的队列就是死信队列。x-dead-letter-exchange:死信发生后重新发送死信到指定的exchange;x-dead-letter-routing-key:发生死信后根据指定的routing-key重新发送死信,如果不设置默认使用消息本身的routing-key。死信队列和普通队列的区别在于,它的RoutingKey和Exchange需要作为参数绑定到普通队列上。实践教学先放一张图感受下我们的整体思路:生产者将带有ttl的消息发送到交换机,路由到延迟队列;为延迟队列中的死信转发绑定死信开关和routing-key;延迟队列中的消息到达延迟时间后,成为死信,被转发到死信交换机,路由到死信队列;最后,它被消费者消费。配置类@ConfigurationpublicclassDelayQueueRabbitConfig{publicstaticfinalStringDLX_QUEUE="queue.dlx";//死信队列publicstaticfinalStringDLX_EXCHANGE="exchange.dlx";//死信交换publicstaticfinalStringDLX_ROUTING_KEY="routingkey.dlx";//绑定死信队列和死信交换的routing-keypublicstaticfinalStringORDER_QUEUE="queue.order";//订单延迟队列publicstaticfinalStringORDER_EXCHANGE="exchange.order";//顺序switchpublicstaticfinalStringORDER_ROUTING_KEY="routingkey.order";//routing-key绑定延迟队列和顺序switch/***定义死信队列**/@BeanpublicQueuedlxQueue(){returnnewQueue(DLX_QUEUE,真的);}/***定义死信交换**/@BeanpublicDirectExchangedlxExchange(){returnnewDirectExchange(DLX_EXCHANGE,true,false);}/***死信队列和死信交换Binding*设置路由键:routingkey.dlx**/@BeanBindingbindingDLX(){returnBindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}/***OrderDelayQueue*设置队列中的死信转发到的DLX名称*设置转发时死信携带的routing-key名称**/@BeanpublicQueueorderQueue(){Mapparams=newHashMap<>();params.put("x-dead-letter-exchange",DLX_EXCHANGE);params.put("x-dead-letter-routing-key",DLX_ROUTING_KEY);返回新队列(ORDER_QUEUE,真,假,假,参数);}/***订单交换**/@BeanpublicDirectExchangeorderExchange(){returnnewDirectExchange(ORDER_EXCHANGE,true,false);}/***Together绑定订单队列和订单交换**/@BeanpublicBindingorderBinding(){returnBindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);}}消费消息@Component@RabbitListener(queues=DelayQueueRabbitConfig.DLX_QUEUE)//监听队列名publicclassOrderMQReciever{@RabbitHandlerpublicvoidprocess(Stringmessage){System.out.println("OrderMQReciever收到的消息是:“+消息);}}测试通过了调用接口,发现需要10秒的消费Message:Problemupgrade由于开发环境和测试环境使用相同的交换机和队列,所以发送延迟时间为30分钟。但是为了方便测试同学在测试环境中进行测试,手动将测试环境的时间改为1分钟。问题重现然后问题来了:延迟1分钟的消息并没有立即被消费,而是在消费了30分钟的消息后才被消费。至于原因,下面我们来分析一下,先用代码为大家复现一下问题。@GetMapping("/sendManyMessage")publicStringsendManyMessage(){send("延迟消息休眠10秒",10000+"");send("延迟消息休眠2秒",2000+"");send("延迟消息休眠5秒",5000+"");return"ok";}privatevoidsend(Stringmsg,StringdelayTime){rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,msg,message->{message.getMessagePropertiesProperties().setExpiration(delayTime);返回消息;});}执行结果如下:OrderMQReciever收到的消息是:延迟消息休眠10秒OrderMQReciever收到的消息是:延迟消息休眠2秒OrderMQReciever收到的消息是:延迟消息休眠的原因5秒是延迟队列也满足了队列的先进先出特性。当10秒消息未出队时,后续消息无法成功出队,导致后续消息阻塞,无法实现精准延时。.问题解决我们可以使用x-delay-message插件来解决这个问题。消息延迟范围是Delay>0,Delay=args=newHashMap();args.put("x-delayed-type","direct");返回新的CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,args);}/***队列和延迟交换绑定**/@BeanpublicBindingorderBinding(){returnBindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();}}发送消息@RestController@RequestMapping("/delayed")publicclassDelayedSendMessageController{@AutowiredprivateRabbitTemplaterabbitTemplate;@GetMapping("/sendManyMessage")publicStringsendManyMessage(){send("延迟消息休眠10秒",10000);send("延迟消息休眠2秒",2000);send("延迟消息休眠5秒",5000);返回“确定”;}privatevoidsend(Stringmsg,IntegerdelayTime){//携带路由key的消息rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,msg,message->{message.getMessageProperties().setDelay(delayTime);返回信息;});}}消费消息@Component@RabbitListener(queues=XDelayedMessageConfig.DIRECT_QUEUE)//监听队列名publicclassDelayedMQReciever{@RabbitHandlerpublicvoidprocess(Stringmessage){System.out.println("DelayedMQReciever收到的消息是:"+message);}}测试DelayedMQReciever收到的消息是:Delayedmessagesleepfor2secondsDelayedMQReciever收到的消息是:Delayedmessagesleepfor5secondsDelayedMQReciever收到的消息是:DelayedMessagessleepfor10seconds所以我们的问题顺利解决LimitationsDelayed消息存储在Mnesia表中,当前节点上只有一个磁盘副本,它们将在节点重启后继续存在。虽然触发预定交付的计时器不会持久存在,但它将在节点启动时插件激活期间重新初始化。显然,集群中只有一个计划消息副本意味着丢失该节点或禁用其上的插件将丢失驻留在该节点上的消息。插件目前的设计不适用于大量延迟消息(例如数万或数百万)的场景,插件的另一个可变性来源是对Erlang计时器的依赖,它使用一定数量的长时间计时器后,它们开始竞争调度程序资源,时间漂移不断累积。