所谓延迟队列就是延迟消息队列。说说一些业务场景,实践场景,订单支付失败,时不时提醒用户用户的并发,可以给用户发短信,延时2分钟,我们来看看Redis对普通消息队列的实现。我们知道,对于专业的消息队列中间件,比如Kafka、RabbitMQ,消费者在消费消息之前,要经过一系列繁琐的流程。比如在RabbitMQ发送消息之前,需要创建一个Exchange,然后创建一个Queue,通过一些规则将Queue和Exchange绑定。发送消息时,需要指定路由键,控制头信息。但是在大多数情况下,虽然我们的消息队列中只有一组消费者,但是还是需要经过上面的一些过程。有了Redis,对于那些只有一组消费者的消息队列,Redis可以非常方便的使用。Redis的消息队列不是专业的消息队列,没有很多高级特性,也没有ack保证。如果对消息的可靠性有极致的追求,那么不适合使用异步消息队列,基本实现Redis的列表(list)数据结构,常用作异步消息队列,使用rpush/lpush操作入队,并使用lpop和rpop退出队列>rpushqueueMoonwithFlyingFish1MonthwithFlyingFish2MonthwithFlyingFish3(integer)3>lpopqueue"MonthwithFlyingFish1">llenqueue(integer)2问题1:如果队列为空,客户端得到消息通过队列的pop操作,然后进行处理。处理完后,再次获取消息,再进行处理。如此循环下去,就是客户端作为队列消费者的生命周期。但是如果队列为空,客户端就会陷入pop的死循环,不断pop,没有数据,再pop,也没有数据。那是浪费生命的空轮询。空轮询不仅会增加客户端的CPU,还会增加redis的QPS。如果有几十个client进行这样的空轮询,Redis的慢查询可能会明显增加。通常我们使用sleep来解决这个问题,让线程休眠一段时间,就休眠1s。不仅可以降低客户端的CPU,还可以降低Redis的QPS。问题2:队列延迟可以使用上面的sleep方法解决。同时,如果只有一个消费者,那么延迟就是1s。如果有多个消费者,这个延迟会减少,因为每个消费者的休眠时间是不同的。有什么方法可以显着减少延迟吗?那就是blpop/brpop。这两条指令的前缀字符b代表blocking,即阻塞读。当队列中没有数据时,阻塞读会立即进入休眠状态,一旦有数据到来,会立即唤醒。消息的延迟几乎为零。用blpop/brpop代替之前的lpop/rpop完美解决了上面的问题。问题三:空??闲连接自动断开其实还有一个问题需要解决——空闲连接的问题。如果线程一直阻塞,Redis客户端连接就变成空闲连接。如果空闲时间过长,服务器通常会主动断开连接,减少空闲资源的占用。这时候blpop/brpop会抛出异常。所以写客户端消费者的时候要小心,注意捕获异常,再试。分布式锁冲突处理客户端在处理请求时加分布式锁失败怎么办。加锁失败的处理策略一般有3种:1、直接抛出异常,通知用户稍后重试;2.睡一会再试;3.将请求转入延迟队列,稍后重试;直接抛出特定类型的异常更适合用户直接发起的请求。用户看到错误对话框后,会先阅读对话框内容,然后点击重试,可以达到人为延迟的效果。如果考虑用户体验,前端代码可以替换用户自己的延迟重试控件。本质上是放弃当前请求,由用户决定是否发起新的??请求。sleepsleep会阻塞当前的消息处理线程,会延迟队列后续的消息处理。如果碰撞频繁或者队列中的消息很多,sleep可能不合适。如果因为某些死锁键导致加锁失败,线程将被完全阻塞,后续消息将永远得不到及时处理。延迟队列的方式更适合异步消息处理,将当前有冲突的请求丢到另一个队列进行延迟处理,避免冲突。延迟队列的实现,我们可以使用zset命令,将设置的时间戳作为score进行排序,使用zaddscore1value1...命令在内存中不断产生消息。然后使用zrangebysocre查询所有满足条件的pending任务,循环执行队列任务。也可以通过zrangebyscorekeyminmaxwithscoreslimit01查询最早的任务来消费privateJedisjedis;publicvoidredisDelayQueueTest(){Stringkey="delay_queue";//实际开发建议使用业务ID和随机生成的唯一ID作为value,随机生成uniqueID可以保证消息的唯一性,业务ID可以避免valueStringorderId1=UUID.randomUUID().toString();jedis.zadd(queueKey,System.currentTimeMillis()+5000,orderId1携带的信息过多);StringorderId12=UUID.randomUUID().toString();jedis.zadd(queueKey,System.currentTimeMillis()+5000,orderId2);newThread(){@Overridepublicvoidrun(){while(true){Set
