当前位置: 首页 > 后端技术 > Java

RabbitMQ、RocketMQ、Kafka延迟队列实现

时间:2023-04-02 01:40:20 Java

延迟队列在实际项目中有很多应用场景。最常见的有未支付订单、超时取消订单、创建订单时发送延迟消息等。到达延迟时间后,消费者收到消息,如果未支付订单,则取消订单。那么,今天我们要聊的问题是,延迟队列在RabbitMQ、RocketMQ、Kafka中是如何实现的,它们对应的实现原理是什么?RabbitMQRabbitMQ本身没有延迟队列的概念。在RabbitMQ中,延迟队列是通过DLX死信开关和TTL消息过期来实现的。TTL(TimetoLive)过期时间TTL有两种设置方式。通过队列属性设置,让队列中的所有消息都会有相同的过期时间。分别为消息设置过期时间,这样每条消息的过期时间可以不同。如果同时设置呢?这将花费两次中较小的时间。队列的方法由参数x-message-ttl设置。Mapargs=newHashMap();args.put("x-message-ttl",6000);channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);对于消息方法由setExpiration设置。AMQP.BasicPropertiesproperties=newAMQP.BasicProperties();Properties.setDeliveryMode(2);properties.setExpiration("60000");channel.basicPublish(exchangeName,routingKey,mandatory,properties,"message".getBytes());DLX(DeadLetterExchange)死信交换中消息变成死信消息有三种情况:消息被拒绝,比如调用了reject方法,requeue需要设置为false。消息过期队列达到最大长度,可以通过参数dead-letter-exchange设置死信exchange也可以通过参数dead-letter-exchange指定一个RoutingKey(如果不指定,原队列的RoutingKey将使用)。Mapargs=newHashMap();args.put("x-dead-letter-exchange","exchange.dlx");args.put("x-dead-letter-路由密钥","路由密钥");channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);原理当我们为消息设置TTL和DLX时,当消息正常发送时,通过Exchange到达Queue后,由于设置的TTL过期时间已经过去,消息还没有被消费(订阅死信队列)。到达过期时间后,消息会被转移到绑定的DLX死信队列中。这样的话,就相当于通过DLX和TTL间接实现了消息延时的功能。在实际使用中,我们可以根据不同的延迟级别,绑定设置不同延迟时间的队列来实现不同的延迟时间。RocketMQRocketMQ和RabbitMQ不同的是它有延迟队列的功能,但是开源版本只能支持固定延迟时间的消息,不支持任意时间精度的消息(这个好像只有阿里云版本有).他默认的时间间隔分为18级,基本可以满足大部分场景的需求。默认延迟级别:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h。使用起来也很简单,直接通过setDelayTimeLevel设置延迟级别即可。setDelayTimeLevel(level)的实现原理比较简单。Broker会根据不同的延迟级别创建多个不同级别的队列。当我们发送延迟消息时,会根据不同的延迟级别将其发送到不同的队列。同时,Broker内部使用一个定时器轮询这些队列(RocketMQ会为每个延迟级别创建一个定时任务)。如果消息到达发送时间,则直接将消息发送到主题队列。RocketMQ的这种实现是在服务器端完成的,同时它的好处是延迟时间相同的消息可以保证顺序。说到这里顺便提一下消息消费重试的原理,本质上是一样的。消费失败需要重试的消息,实际上会被丢进延迟队列的topic中,然后转发到真正的topic中。对于Kafka,Kafka本身并不支持延迟队列功能,需要我们手动实现。这里我提供一个基于RocketMQ设计的实现思路。在这个设计中,我们同样不支持任意时间精度的延迟消息,只支持固定级别的延迟,因为对于大多数延迟消息场景来说已经足够了。只创建了一个topic,但是为这个topic创建了18个partition,每个partition对应不同的delaylevel。这和RocketMQ一样的好处是可以对延迟时间相同的消息进行排序。原理首先,为延迟队列创建一个单独的主题,同时创建18个分区。发送不同延迟级别的消息时,根据延迟参数将消息发送到延迟主题对应的分区。对应的key是延迟时间,将原topic保存到header中。ProducerRecordproducerRecord=newProducerRecord<>("delay_topic",delayPartition,delayTime,data);producerRecord.headers().add("origin_topic",topic.getBytes(StandardCharsets.UTF_8));EmbeddedConsumer单独设置一个ConsumerGroup来消费延迟的topic消息。消费消息后,如果没有达到延迟时间,则暂停,然后寻找到当前ConsumerRecord的偏移位置,使用定时器轮询延迟的TopicPartition,达到延迟时间后继续。如果resume到达延迟时间,则获取header中的真实topic,直接转发。为什么要在这里暂停和恢复?因为如果不是,如果超时没有达到max.poll.interval.ms(默认300s)的最大时间,就会触发Rebalance。