当前位置: 首页 > 后端技术 > Java

java培训定时任务理解与实战教学

时间:2023-04-01 21:41:07 Java

定时任务关闭超时订单是在订单创建后的一段时间内没有完成支付就关闭订单的操作。该功能一般要求每笔订单的超时时间一致。如果我们使用定时任务来执行这个操作,那么定时任务轮询的时间间隔是很难把握的:时间间隔小到可以在误差允许范围内实现我们说的时间一致性问题,但是要频繁扫描数据库和执行定时任务,会造成网络IO和磁盘IO的消耗,对实时事务有一定的影响;时间间隔比较大,因为每个订单的创建时间不一致,所以上面的一致性要求很难达到,例如如下:假设30分钟订单超时自动关闭,定时任务的执行间隔30分钟:我们在第5分钟下单;时间到了第30分钟,定时任务执行了一次,但是我们的订单不满足条件,没有执行;当时间来到第35分钟时,订单达到成交条件,但是定时任务没有执行,所以没有执行;时间到了第60分钟,我们的平仓操作开始了,而此时,错误达到了25分钟。毕竟,我们需要放弃这种方法。延迟队列为了满足领导的需求,我接触到了消息队列:RabbitMQ。虽然它本身不提供延迟队列的功能,但是我们可以利用它的生存时间和死信开关的特性来间接实现它。首先我们简单介绍一下什么是生存时间?什么是死信交换?生存时间的全称是TimeToLive,简称TTL。它支持同时设置消息本身(延迟队列的关键)和队列(队列中的所有消息具有相同的过期时间)。消息本身设置:即使消息过期,也不会立即从队列中删除,因为每条消息是否过期在传递给消费者之前就已经确定了;设置队列:消息一旦过期,将从队列中移除如果两种方法同时使用,以过期时间较短的值为准。当消息到了过期时间还没有被消费,那么这个消息就“死了”,我们称之为死信消息。消息成为死信的条件:消息被拒绝(basic.reject/basic.nack),requeue=false;消息的过期时间已过;队列达到最大长度;队列中该属性的设置应该是该设置仅在队列第一次声明时有效。如果队列一开始就已经存在,没有这个属性,则必须删除队列,重新声明;queue的ttl只能设置为一个固定值,一旦设置之后就不能再更改,否则会抛出异常;死信交换死信交换的全称是Dead-Letter-Exchange,简称DLX。当消息在队列中成为死信时,如果消息所在队列设置了x-dead-letter-exchange参数,则将其发送到对应x-dead-letter-exchange值的交换机.这个开关叫做死信开关,这个死信开关绑定的队列就是死信队列_北京java培训。x-dead-letter-exchange:死信发生后重新发送死信到指定的exchange;x-dead-letter-routing-key:发生死信后根据指定的routing-key重新发送死信,如果不设置默认使用消息本身的routing-key死信队列和普通的区别queue的不同之处在于它的RoutingKey和Exchange需要作为参数,绑定到普通的queue上。实践教学先来张图感受下我们的整体思路。生产者用ttl发送消息,放入交换机路由到延迟队列;在延迟队列中绑定dead-letterswitch和routing-key进行dead-letter转发;etc.延迟队列中的消息到达延迟时间后,成为死信,被转发到死信交换机,路由到死信队列;最后,它被消费者消费。我们基于以上实现代码:配置类@ConfigurationpublicclassDelayQueueRabbitConfig{publicstaticfinalStringDLX_QUEUE="queue.dlx";//死信队列publicstaticfinalStringDLX_EXCHANGE="exchange.dlx";//死信交换publicstaticfinalStringDLX_ROUTING_KEY="routingkey.dlx";//死信队列和死信交换绑定的routing-keypublicstaticfinalStringORDER_QUEUE="queue.order";//订单延迟队列publicstaticfinalStringORDER_EXCHANGE="exchange.order";//订单交换publicstaticfinalStringORDER_ROUTING_KEY="routingkey.order";//绑定延迟队列和订单交换的routing-key/**定义死信队列**/@BeanpublicQueuedlxQueue(){returnnewQueue(DLX_QUEUE,true);}/**定义死信交换**/@BeanpublicDirectExchangedlxExchange(){returnnewDirectExchange(DLX_EXCHANGE,true,false);}/**死信队列和死信交换绑定设置路由键:routingkey.dlx**/@BeanBindingbindingDLX(){returnBindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}/**OrderDelayQueue设置名称队列中死信转发到的DLX设置死信转发时携带的routing-key名称**/@BeanpublicQueueorderQueue(){Mapparams=newHashMap<>();params.put("x-dead-letter-exchange",DLX_EXCHANGE);params.put("x-dead-letter-routing-key",DLX_ROUTING_KEY);returnnewQueue(ORDER_QUEUE,true,false,false,params);}/**OrderExchange**/@BeanpublicDirectExchangeorderExchange(){returnnewDirectExchange(ORDER_EXCHANGE,true,false);}/**绑定订单队列与订单交换**/@BeanpublicBindingorderBinding(){returnBindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);}}发送消息@RequestMapping("/order")公共类OrderSendMessageController{@AutowiredprivateRabbitTemplaterabbitTemplate;@GetMapping("/sendMessage")publicStringsendMessage(){StringdelayTime="10000";//将消息转换为路由keyrabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,"Sendmessage!",message->{message.getMessageProperties().setExpiration(delayTime);returnmessage;});return"ok";}}consumermessage@Component@RabbitListener(queues=DelayQueueRabbitConfig.DLX_QUEUE)//监听队列名称公共类OrderMQReciever{@RabbitHandlerpublicvoidprocess(Stringmessage){System.out.println("OrderMQReciever接收到的消息为:"+message);}}测试通过调用接口,发现消息消费后10秒。由于开发环境升级问题,测试环境使用相同的交换机和队列,所以发送延迟时间为30分钟。但是为了方便测试同学在测试环境中进行测试,将测试环境的时间手动改为1分钟。问题重现然后问题来了:延迟1分钟的消息并没有立即被消费,而是在消费了30分钟的消息后才被消费。至于原因,下面我们来分析一下,先用代码为大家复现一下问题。@GetMapping("/sendManyMessage")publicStringsendManyMessage(){send("延迟消息休眠10秒",10000+"");send("延迟消息休眠2秒",2000+"");send(“延迟消息睡眠5秒”,5000+“”);返回“ok”;}privatevoidsend(Stringmsg,StringdelayTime){rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,msg,message->{message.getMessageProperties().setExpiration(delayTime);returnmessage;});}执行结果如下:OrderMQReciever收到的消息是:delayedmessagesleepsfor10secondsOrderMQReciever收到的消息是:delayedmessagesleepsfor2secondsOrderMQReciever收到的消息是:延迟消息之所以休眠5秒,是因为延迟队列也满足了队列的先进先出特性。当10秒消息未出队时,后续消息无法成功出队,导致后续消息阻塞,无法实现精准延时。.问题解决我们可以使用x-delay-message插件来解决问题。消息的延迟范围是Delay>0,Delay=args=newHashMap();args.put("x-delayed-type","direct");returnnewCustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,args);}/**队列和延迟交换绑定**/@BeanpublicBindingorderBinding(){returnBindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();}}发送消息@RestController@RequestMapping("/delayed")publicclassDelayedSendMessageController{@AutowiredprivateRabbitTemplaterabbitTemplate;@GetMapping("/sendManyMessage")publicStringsendManyMessage(){send("延迟消息休眠10秒",10000);send("延迟消息休眠2秒",2000);send("Delayedmessagesleepsfor5secondsseconds",5000);return"ok";}privatevoidsend(Stringmsg,IntegerdelayTime){//携带路由键值的消息rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,msg,message->{message.getMessageProperties().setDelay(delayTime);returnmessage;});}}消费消息@Component@RabbitListener(queues=XDelayedMessageConfig.DIRECT_QUEUE)//监听队列名public类DelayedMQReciever{@RabbitHandlerpublicvoidprocess(Stringmessage){System.out.println("DelayedMQReciever收到的消息是:"+message);}}测试DelayedMQReciever收到的消息是:Delayedmessagesleepsfor2secondsThemessagereceived通过DelayedMQReciever是:延迟消息休眠5秒DelayedMQReciever收到的消息是:延迟消息休眠10秒这样我们的问题就顺利解决了局限性延迟消息存储在当前节点上只有一个磁盘副本的Mnesia表中,它们将在节点重启后存活下来。虽然触发预定交付的计时器不会持久存在,但它将在节点启动时插件激活期间重新初始化。显然,集群中只有一个计划消息副本意味着丢失该节点或禁用其上的插件将丢失驻留在该节点上的消息。插件目前的设计不适用于大量延迟消息(例如数万或数百万)的场景,插件的另一个可变性来源是对Erlang计时器的依赖,它使用一定数量的长时间计时器后,它们开始竞争调度程序资源,时间漂移不断累积。文章转载自Java编程

猜你喜欢