当前位置: 首页 > 后端技术 > Java

Redis实现延迟任务

时间:2023-04-02 00:19:01 Java

1。30分钟内未付款,订单将自动取消。2.30分钟内无回复,会议结束。对于上面的任务,我们给一个专业的名字来形容,那就是延迟任务一什么是延迟任务?延迟任务不同于常规计划任务。延迟任务是在事件触发后的未来某个时间执行,没有重复执行周期。2.延迟任务和定时任务有什么区别?定时任务有明确的触发时间。延迟任务没有定时任务就没有执行周期。延迟任务在事件触发后的一段时间内执行。没有执行周期的常规任务一般是多个任务分批执行,而延时任务一般是单任务处理。3、技术对比本文主要讲解Redis实现延迟任务的Zset实现。其他解决方案仅作介绍。1.数据库轮询通过定时组件扫描数据库,使用时间判断是否有超时命令,然后进行更新或删除等操作。优点:简单易行缺点:服务器内存消耗大,时间间隔小,数据库数据内存状态丢失巨大,不可靠如果任务负载过大,给数据库造成很大压力。频繁查询数据库带来性能影响2.JDK的延迟队列是使用JDK自带的DelayQueue实现的。这是一个无界阻塞队列。队列只有在延迟期满时才能从中获取元素,需要将其放入DelayQueue中。实现延迟接口。优点:实现简单,效率高,任务触发延时小。缺点:服务器重启后,所有数据都会消失。怕内存条件限制导致宕机。比如没有付款的订单太多,很容易出现OOM异常的数据内存状态,不可靠。3、时间轮算法时间TimingWheel是一种高效低延迟的调度数据结构。底层使用数组实现一个循环队列,用于存储任务列表。原理图如下:时间轮时间轮算法可以类比为时钟,如上图所示。频率旋转,每跳动一次,称为一次跳动。可以看出,计时轮由3个重要的属性组成,ticksPerWheel(一个回合的tick数)、tickDuration(一个tick的持续时间)和timeUnit(时间单位),例如当ticksPerWheel=60时,tickDuration=1,timeUnit=second,这就完全类似于现实中秒针不停的走动。如果当前指针指向1,我有一个任务需要在4秒后执行,那么执行线程回调或者消息会放在5。常用命令ZADD命令:向有序集中添加一个或多个成员元素及其分值,或更新已有成员的分值ZCARD命令:获取有序集的成员个数ZRANGEBYSCORE:返回指定区间的orderedsetbyscoreMembersinZREM:删除有序集中的一个或多个成员。java中操作简介1.add(Kkey,Vvalue,doublescore)给一个变量添加一个元素,同时指定该元素的分数。redisTemplate.opsForZSet().add("zSetValue","A",1);2.rangeByScore(Kkey,doublemin,doublemax)根据设定的分数得到区间值。zSetValue=redisTemplate.opsForZSet().rangeByScore("zSetValue",1,2);3.rangeByScore(Kkey,doublemin,doublemax,longoffset,longcount)根据设置的score和给定的长度,从给定的下标中获取区间值,得到最终值。zSetValue=redisTemplate.opsForZSet().rangeByScore("zSetValue",1,5,1,3);4.rangeWithScores(Kkey,longstart,longend)获取RedisZSetCommands.Tuples的区间值。Set>typedTupleSet=redisTemplate.opsForZSet().rangeWithScores("typedTupleSet",1,3);Iterator>iterator=typedTupleSet.iterator();while(iterator.hasNext()){ZSetOperations.TypedTupletypedTuple=iterator.next();对象值=typedTuple.getValue();双倍分数=typedTuple.getScore();}5.删除成员redisTemplate.opsForZSet().remove("myZset","a","b");下面的代码可以直接使用——基于SpringBoot项目5.3DelayQueueFactoryCode中的注释@Autowired私有ThreadPoolTask??ExecutorasyncTaskExecutor;/***插入任务id**@paramjobId任务id(队列中唯一)*@paramtime延迟时间(单位:毫秒)*@return是否插入成功*/publicbooleanaddJob(StringjobId,Integertime){日历实例=Calendar.getInstance();//增加延迟时间获取最终触发时间instance.add(Calendar.MILLISECOND,time);longdelayMillisecond=instance.getTimeInMillis();log.info("延迟队列添加问题{}",jobId);returnredisUtil.zAdd(setDelayQueueName(),delayMillisecond,jobId);}/***删除任务id**@paramjobId任务id(队列中唯一)*/publicbooleanremoveJob(StringjobId){Longnum=redisUtil.zRemove(setDelayQueueName(),jobId);如果(num>0)返回true;返回假;}/***延迟队列机器开始运行*/privatevoidstartDelayQueueMachine(){log.info("延迟队列{}启动",setDelayQueueName());//监听redis队列while(true){try{//获取当前时间之前的任务列表Set>tuples=redisUtil.zRangeByScore(setDelayQueueName(),0,System.currentTimeMillis());//如果任务不为空if(!CollectionUtils.isEmpty(tuples)){log.info("延迟任务开始:{}",JSONUtil.toJsonStr(tuples));Iterator>iterator=tuples.iterator();while(iterator.hasNext()){ZSetOperations.TypedTupletypedTuple=iterator.next();StringquestionId=Convert.toStr(typedTuple.getValue());//移除缓存,如果移除成功说明当前线程已经处理完延迟任务,则执行延迟任务//延迟任务只有在删除成功后才会执行,否则不执行,可以防止分布式系统多次执行延迟任务Longnum=redisUtil.zRemove(setDelayQueueName(),questionId);//如果移除成功,则执行if(num>0){asyncTaskExecutor.execute(()->invoke(questionId));}}}}catch(Exceptione){log.error("处理延迟时间任务出现异常,异常原因为{}",e.getMessage(),e);}finally{//每()分钟执行一次//根据业务场景设置对应的时间try{TimeUnit.MINUTES.sleep(5);}catch(InterruptedExceptione){e.printStackTrace();}}}}/***最后执行任务的方法**@paramjobId任务id*/publicabstractvoidinvoke(StringjobId);/***实现延迟队列的名称*/publicabstractStringsetDelayQueueName();//SpringBoot初始化时启动一个线程运行@PostConstructpublicvoidinit(){newThread(this::startDelayQueueMachine).start();}}addJob方法添加任务id和延迟时间(毫秒)redisUtil.zRangeByScore::根据设置的分数获取区间值@PostConstruct注意:是做一些事情之后Bean的初始化完成,比如注册一些listeners..(初始化实现方案有很多选择)为什么要先delete再执行业务逻辑呢?延迟任务只有在删除成功后才会执行,否则不执行,避免分布式系统多次执行延迟任务5.4RedisUtil工具类@Component@Slf4jpublicclassRedisUtil{@AutowiredprivateRedisTemplate<字符串,对象>redisTemplate;QueueName--延迟队列名称也是zset中的key值/***Testdelayqueue**/@Slf4j@ComponentpublicclassDelayQueueextendsAbstractDelayQueueMachineFactory{@AutowiredprivateZnjExpertConsultQuestionRecordMapperquestionRecordMapper;/***处理业务逻辑*/@Overridepublicvoidinvoke(StringjobId){IntegerquestionId=Convert.toInt(jobId);ZnjExpertConsultConsultService.whetherEnd(questionRecordEntity)questionRecordEntity=questionRecordMapper.selectById(questionId);布尔标志=znjExpertConsultService.whetherEnd(questionRecordEntity)*/@OverridepublicStringsetDelayQueueName(){return"expert_consult:delay_queue";}}操作成功。当Redis中有任务时,就可以执行任务。可行实时性:在一定时间内允许出错(可按时间设置)高可用:支持单机,支持集群消息可靠性:保证消息至少被消费一次持久性:基于消息的持久化特性Redis本身,上面的消息可靠性是基于Redis的持久化,所以如果redis数据丢失,就意味着延迟消息丢失,但是可以作为master备份和集群保障