当前位置: 首页 > 科技观察

Redis使用List实现消息队列的优缺点

时间:2023-03-15 23:25:57 科技观察

分布式系统中必备的一个中间件就是消息队列。通过消息队列,我们??可以异步解耦服务,降低流量峰值,实现最终一致性。目前市面上有RabbitMQ、RochetMQ、ActiveMQ、Kafka等。可能有人会问:“Redis适合做消息队列吗?”在回答这个问题之前,我们先思考一下本质:消息队列提供了哪些特性?队列?是否满足接入要求?今天码哥结合消息队列的特点,一步步分析使用Redis的List作为消息队列的实现原理,分享如何将SpringBoot和Redission集成到项目中。什么是消息队列消息队列是一种异步的服务间通信方式,适用于分布式和微服务架构。消息存储在队列中,直到它们被处理和删除。每条消息只能由用户处理一次。消息队列可用于分离重量级处理、缓冲或批处理工作,并减轻峰值工作负载。消息队列Producer:消息生产者,负责生成消息并发送给Broker;Broker:消息处理中心。负责消息的存储、确认、重试等,一般包含多个队列;Consumer:消息消费者,负责从Broker获取消息并进行相应的处理;消息队列的使用场景有哪些?消息队列在实际应用中包括以下四种场景:应用耦合:发送方系统和接收方系统不需要相互认识,只需要知道消息。多个应用通过消息队列处理同一条消息,避免调用接口失败导致整个流程失败;异步处理:多个应用处理消息队列中的同一条消息,应用并发处理消息,相比串行处理减少了处理时间;限流调峰:广泛应用于秒杀或抢购活动,避免因流量过大导致应用系统挂掉的情况;消息驱动系统:系统分为消息队列、消息生产者、消息消费者,生产者负责生成消息,消费者(可能有多个)负责处理消息;消息队列满足什么特性?消息有序性消息是异步处理的,但是消费者需要按照生产者发送消息的顺序进行消费,避免后发送消息先处理消息的情况。重复消息处理Producers可能会因为网络问题导致消息重传而收到多条重复消息。如果同一条消息重复多次,可能会导致一个业务逻辑执行多次,需要保证如何避免重复消费的问题。可靠性保证消息传递一次。如果在发送消息时收件人不可用,则消息队列会保留该消息,直到成功传递为止。消费者重启后,可以继续读取消息进行处理,防止消息遗漏。List实现了消息队列。Redis的列表(List)是一种线性有序结构,可以按照元素入栈的顺序存储元素,可以满足“先进先出”的要求。这些元素可以是文本数据,也可以是二进制数据。LPUSH生产者使用LPUSHkeyelement[element...]将消息插入到队列的头部。如果键不存在,将创建一个空队列并插入消息。如下,生产者依次向队列中插入“Java”、“CodeByte”、“Go”,返回值表示插入队列的消息条数。>LPUSHqueueJavacodebyteGo(integer)3RPOP消费者使用RPOPkey顺序读取队列的消息,先进先出,所以“Java”会先读取消费:>RPOPqueue“Java”>RPOPqueue“codebyte”>RPOPqueue《Go》List队列实时消费问题65师兄:实现起来这么简单吗?别高兴得太早,LPUSH和RPOP都有性能风险。当生产者向队列中插入数据时,List不会及时主动通知消费者消费。我们需要写一个while(true)来不断调用RPOP指令,当有新消息时,就返回消息,否则返回空。程序在执行消费逻辑之前需要不断轮询判断是否为空,这会导致即使没有新消息写入队列,消费者也会不断调用RPOP命令占用CPU资源。65兄:如何避免循环调用造成的CPU性能损失?Redis提供了BLPOP和BRPOP阻塞读命令。当读取队列中没有数据时,消费者会自动阻塞,直到有新消息写入队列。会继续读取新的消息来执行业务逻辑。BRPOPqueue0参数0表示阻塞等待时间无限重复消费。消息队列为每条消息生成一个“全局ID”;生产者为每条消息创建一个“全局ID”,消费者记录一个处理过的消息ID。判断是否重复。其实这就是幂等性。对于同一条消息,消费者收到后处理一次的结果与多次的结果是一致的。消息可靠性65大哥:消费者从List中读取消息,如果消息在处理消息的过程中宕机了,消息不会被处理,但是数据还没有保存到List中。我应该怎么办?本质是消费者在处理消息的时候崩溃了,消息已经无法恢复,缺乏消息确认机制。Redis提供了两条指令,RPOPLPUSH和BRPOPLPUSH(阻塞),意思是从List读取消息的同时将消息复制到另一个List(备份),是一个原子操作。我们可以在业务流程处理正确后删除队列消息,实现消息确认机制。如果在处理消息的时候崩溃了,重启并从备份列表中读取消息进行处理。LPUSHredisMQ公众号代码兄弟字节BRPOPLPUSHredisMQredisMQBack生产者使用LPUSH将消息插入到redisMQ队列中,消费者使用BRPOPLPUSH读取消息“公众号”,消息将被插入到“redisMQBack”队列中。如果消费成功,删除“redisMQBack”消息即可。如果异常,可以继续从“redisMQBack”再次读取消息进行处理。需要注意的是,redis的消息确认机制是,如果producer消息发送很快,而consumer处理速度慢,消息就会堆积起来,给Redis的内存带来过大的压力。Redission实战在Java中,我们可以使用Redission封装的API来快速实现队列。下面码哥给大家介绍一下基于SpringBoot2.1.4版本的集成和实战。详细的API文档可以在:https://github.com/redisson/redisson/wiki/7.-Distributed-collections添加依赖org.redissonredisson-spring-boot-starter3.16.7添加Redis配置,码哥的Redis是没有密码的,可以根据实际情况配置。spring:application:name:redissionredis:host:127.0.0.1port:6379ssl:falseJava代码实战RBlockingDeque继承java.util.concurrent.BlockingDeque,在使用过程中,我们可以根据接口选择合适的API实现业务逻辑文档。主要方法如下:码哥以双端队列为例@Slf4j@ServicepublicclassQueueService{@AutowiredprivateRedissonClientredissonClient;privatestaticfinalStringREDIS_MQ="redisMQ";/***向队头发送消息**@parammessage*/publicvoidsendMessage(Stringmessage){RBlockingDequeblockingDeque=redissonClient.getBlockingDeque(REDIS_MQ);try{blockingDeque.putFirst(message);log.info("将消息:{}插入队列。",message);}catch(InterruptedExceptione){e.printStackTrace();}}/***阻塞从队尾读取消息,如果没有消息,线程会阻塞等待新消息插入防止CPU空转*/publicvoidonMessage(){RBlockingDequeblockingDeque=redissonClient.getBlockingDeque(REDIS_MQ);while(true){try{Stringmessage=blockingDeque.takeLast();log.info("读取消息来自队列{}:{}.",REDIS_MQ,message);}catch(InterruptedExceptione){e.printStackTrace();}}}单元测试@RunWith(SpringRunner.class)@SpringBootTest(classes=RedissionApplication.class)publicclassRedissionApplicationTests{@AutowiredprivateQueueServicequeueService;@TestpublicvoidtestQueue()throwsInterruptedException{newThread(()->{for(inti=0;i<1000;i++){queueService.sendMessage("message"+i);}}).start();newThread(()->queueService.onMessage()).start();Thread.currentThread().join();}}总结可以使用List数据结构实现消息队列,满足先进先出。为了实现消息的可靠性,Redis提供了BRPOPLPUSH命令作为解决方案。Redis是一个非常轻量级的key-value数据库,部署一个Redis实例就是启动一个进程,部署一个Redis集群,也就是部署多个Redis实例。Kafka和RabbitMQ的部署涉及额外的组件。比如Kafka的运行需要部署ZooKeeper。与Redis相比,Kafka和RabbitMQ一般被认为是重量级的消息队列。需要注意的是,我们要避免生产者太快,消费者太慢,占用Redis内存造成的消息堆积。在消息量不大的时候使用Redis作为消息队列可以给我们带来高性能的消息读写,看起来是一个不错的消息队列方案。本文转载自微信公众号《码哥字节》