当前位置: 首页 > 科技观察

Redis延迟队列,这次给大家彻底的了解

时间:2023-03-12 01:32:00 科技观察

所谓延迟队列就是延迟消息队列。说说一些业务场景,实践场景,订单支付失败,时不时提醒用户用户的并发,可以给用户发短信,延时2分钟,我们来看看Redis对普通消息队列的实现。我们知道,对于专业的消息队列中间件,比如Kafka、RabbitMQ,消费者在消费消息之前,要经过一系列繁琐的流程。比如在RabbitMQ发送消息之前,需要创建一个Exchange,然后创建一个Queue,通过一些规则将Queue和Exchange绑定。发送消息时,需要指定路由键,控制头信息。但是在大多数情况下,虽然我们的消息队列中只有一组消费者,但是还是需要经过上面的一些过程。有了Redis,对于那些只有一组消费者的消息队列,Redis可以非常方便的使用。Redis的消息队列不是专业的消息队列,没有很多高级特性,也没有ack保证。如果对消息的可靠性有极致的追求,那么不适合使用异步消息队列,基本实现Redis的列表(list)数据结构,常用作异步消息队列,使用rpush/lpush操作入队,并使用lpop和rpop退出队列>rpushqueueMoonwithFlyingFish1MonthwithFlyingFish2MonthwithFlyingFish3(integer)3>lpopqueue"MonthwithFlyingFish1">llenqueue(integer)2问题1:如果队列为空,客户端得到消息通过队列的pop操作,然后进行处理。处理完后,再次获取消息,再进行处理。如此循环下去,就是客户端作为队列消费者的生命周期。但是如果队列为空,客户端就会陷入pop的死循环,不断pop,没有数据,再pop,也没有数据。那是浪费生命的空轮询。空轮询不仅会增加客户端的CPU,还会增加redis的QPS。如果有几十个client进行这样的空轮询,Redis的慢查询可能会明显增加。通常我们使用sleep来解决这个问题,让线程休眠一段时间,就休眠1s。不仅可以降低客户端的CPU,还可以降低Redis的QPS。问题2:队列延迟可以使用上面的sleep方法解决。同时,如果只有一个消费者,那么延迟就是1s。如果有多个消费者,这个延迟会减少,因为每个消费者的休眠时间是不同的。有什么方法可以显着减少延迟吗?那就是blpop/brpop。这两条指令的前缀字符b代表blocking,即阻塞读。当队列中没有数据时,阻塞读会立即进入休眠状态,一旦有数据到来,会立即唤醒。消息的延迟几乎为零。用blpop/brpop代替之前的lpop/rpop完美解决了上面的问题。问题三:空??闲连接自动断开其实还有一个问题需要解决——空闲连接的问题。如果线程一直阻塞,Redis客户端连接就变成空闲连接。如果空闲时间过长,服务器通常会主动断开连接,减少空闲资源的占用。这时候blpop/brpop会抛出异常。所以写客户端消费者的时候要小心,注意捕获异常,再试。分布式锁冲突处理客户端在处理请求时加分布式锁失败怎么办。加锁失败的处理策略一般有3种:1、直接抛出异常,通知用户稍后重试;2.睡一会再试;3.将请求转入延迟队列,稍后重试;直接抛出特定类型的异常更适合用户直接发起的请求。用户看到错误对话框后,会先阅读对话框内容,然后点击重试,可以达到人为延迟的效果。如果考虑用户体验,前端代码可以替换用户自己的延迟重试控件。本质上是放弃当前请求,由用户决定是否发起新的??请求。sleepsleep会阻塞当前的消息处理线程,会延迟队列后续的消息处理。如果碰撞频繁或者队列中的消息很多,sleep可能不合适。如果因为某些死锁键导致加锁失败,线程将被完全阻塞,后续消息将永远得不到及时处理。延迟队列的方式更适合异步消息处理,将当前有冲突的请求丢到另一个队列进行延迟处理,避免冲突。延迟队列的实现,我们可以使用zset命令,将设置的时间戳作为score进行排序,使用zaddscore1value1...命令在内存中不断产生消息。然后使用zrangebysocre查询所有满足条件的pending任务,循环执行队列任务。也可以通过zrangebyscorekeyminmaxwithscoreslimit01查询最早的任务来消费privateJedisjedis;publicvoidredisDelayQueueTest(){Stringkey="delay_queue";//实际开发建议使用业务ID和随机生成的唯一ID作为value,随机生成uniqueID可以保证消息的唯一性,业务ID可以避免valueStringorderId1=UUID.randomUUID().toString();jedis.zadd(queueKey,System.currentTimeMillis()+5000,orderId1携带的信息过多);StringorderId12=UUID.randomUUID().toString();jedis.zadd(queueKey,System.currentTimeMillis()+5000,orderId2);newThread(){@Overridepublicvoidrun(){while(true){SetresultList;//只获取第一条数据,只取数据不会被移除resultList=jedis.zrangebyscore(key,System.currentTimeMillis(),0,1);if(resultList.size()==0){try{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();break;}}else{//取出获取到的数据if(jedis.zrem(key,resultList.get(0))>0){StringorderId=resultList.get(0);log.info("orderId={}",resultList.get(0));this.handleMsg(orderId);}}}}}.start();}publicvoidhandleMsg(Tmsg){System.out.println(msg);}上面的实现在多线程逻辑上没有任何问题假设有两个线程T1,T2等多个线程,处理逻辑如下,保证多线程情况下只有一个线程处理相应的消息:1.T1,T2等多个线程调用zrangebyscore获取一个消息A2.T1准备删除消息A,因为是原子操作,T2等更多线程等待T1执行zrem删除消息A,再执行zrem删除消息A3.T1删除消息A,返回删除成功标记1,处理消息A4.T2其他更多线程启动zrem删除消息A,因为消息A已经删除,所以全部删除失败,放弃对消息A的处理。同时要注意handle_msg异常捕获,避免个别任务处理问题导致循环异常退出。进一步优化上述算法中,同一个任务可能被多个进程获取,然后使用zrem进行竞争。那些没有抢到任务的进程就白拿了。太浪费了可以考虑使用lua脚本来优化这个逻辑,将zrangebyscore和zrem移到server端进行原子操作,这样在多个进程竞争任务时就不会出现这种浪费。使用Luascripts进一步优化LuaScript,如果有超时消息则删除并返回此消息,否则返回空字符串:StringluaScript="localresultArray=redis.call('zrangebyscore',KEYS[1],0,ARGV[1],'limit',0,1)\n"+"if#resultArray>0then\n"+"ifredis.call('zrem',KEYS[1],resultArray[1])>0then\n"+"returnresultArray[1]\n"+"else\n"+"return''\n"+"end\n"+"else\n"+"return''\n"+"end";jedis.eval(luaScript,ScriptOutputType.VALUE,newString[]{key},String.valueOf(System.currentTimeMillis()));Redis延迟队列的优点Redis用于实现延迟队列具有以下优点:1.Rediszset支持高性能分数排序。2、Redis是在内存上运行的,速度非常快。3、Redis可以搭建集群。当消息较多时,我们可以使用集群来提高消息处理速度和可用性。4、Redis有持久化机制。当发生故障时,可以通过AOF和RDB恢复数据,保证了数据的可靠性。Redis延迟队列的缺点是Redis实现的延迟消息队列也有数据持久化。,没有消息可靠性问题的重试机制——没有消息处理异常的重试机制,这些都需要自己实现,包括重试次数的实现等。没有ACK机制——比如在在获取消息和删除消息的情况下,当客户端在处理消息时崩溃,正在处理的消息将丢失。MQ需要显式返回一个值给MQ,才认为消息被正确消费了。如果消息可靠性高,建议使用MQ实现Redission实现延迟队列。基于Redis的Redisson分布式延迟队列结构的RDelayedQueueJava对象在实现RQueue接口的基础上,提供了延迟加入队列的功能。该函数可用于实现消息传输延迟按几何增长或几何衰减的发送策略RQueuedistinationQueue=...RDelayedQueuedelayedQueue=getDelayedQueue(distinationQueue);//将消息发送到指定队列after10secondsdelayedQueue.offer("msg1",10,TimeUnit.SECONDS);//一分钟后发送消息到指定队列delayedQueue.offer("msg2",1,TimeUnit.MINUTES);当对象不再需要时,应该主动销毁。只有当相关的Redisson对象也需要关闭时才不需要主动销毁。RDelayedQueuedelayedQueue=...delayedQueue.destroy();是不是很方便……本文转载自微信公众号《月亮与飞鱼》,可以使用以下二维码关注。转载本文请联系月版飞语公众号。