当前位置: 首页 > 后端技术 > Java

我要摊牌!真正的灰度队列实现!您无法在整个Internet上找到它!

时间:2023-04-01 23:47:58 Java

背??景目前公司的Dubbo、Feign等RPC调用已经可以支持基于灰度的调用,但是MQ还不具备支持灰度的能力,导致测试和测试中业务验证和消息隔离体验较差生产环境。因此,基于RabbitMQ和Kafka,我们实现了消息灰度的能力。灰度场景在大多数场景下,MQ的灰度没有RPC那么严格,但是我们需要确认消费场景,即如果不存在灰度消费者,消息是否应该被普通消费者消费。1.灰度消息只被灰度节点消费,大家可能都想要这种严格的消息灰度隔离策略,证明是真正的消息灰度方案,但是这个方案需要考虑具体的场景问题。比如有的时候作为灰度节点的发送者,它的功能变化点不在MQ中,但是它发送的消息是灰度消息,而消息的消费者可能没有任何功能变化,也不会有对应的灰度消费标识。在这种情况下,如果我们丢弃灰度消息,最终的数据将是不完整的。2.灰度消息可以被普通节点消费。因此,让我们考虑第二种选择。如果灰度消费节点不存在,消息会被普通节点消费。当有灰度节点时,消息会被灰度节点消费,普通节点在灰度节点不存在时,只为底线消费灰度消息。那么,这种场景下可能还是会出现问题。例如,当消费节点的消费逻辑发生变化时,正常节点的消费可能会导致业务出错。对于这个问题,我们可以默认假设,如果消费者的逻辑发生变化,那么灰度节点一定是大概率存在的。如果某些异常情况导致异常或停机场景,仍然可以通过人工或通过告警来判断。简而言之,这个问题不算问题。灰度方案我们从MQ的特点和一些常见的处理方式出发,讨论RabbitMQ和Kafka的灰度实现方式。常规方案:ShadowQueue/Topic这是实现MQ灰度最常见的方案。为每个Queue/Topic创建对应的灰度Queue/Topic。producer层会为要发送的消息动态修改Queue/Topic/RoutingKey,让他发送到灰度或者普通的Queue/Topic。在消费者层面,只需要在应用启动时,根据自身的灰度标记,动态切换到灰度Queue/Topic进行监听即可。但是对于我们现在的系统状态,这个方案存在三个问题:第一,由于我们现在的系统测试环境的灰度标签是可以自定义的,所以每一个上线的功能都可能有对应的灰度标签,所以问题是Queue/Topic的数量会随着灰度标记的增加呈指数增长。不管是哪种MQ,过多的Queue/Topic都会造成MQ本身处理能力的一定降低。另外,我们的灰度标签可以根据启动的实例随意修改,也就是说相应的整套Queue/Topic也必须根据灰度标识任意创建。这样一来,手动跟随创建显然是不现实的,而我们在生产环境中创建Queue/Topic需要经过申请流程,这与我们目前的情况是相悖的。此外,即使我们可以根据生产者的灰度ID动态创建Queue/Topic,但至少我们需要考虑在灰度生产者实例正常下线时销毁创建的Queue/Topic。该行还需要手动访问以定期清理Queue/Topic。最后,如果是针对Kafka或者RocketMQ,这个方案实现起来还是比较简单的。对于RabbitMQ来说,Exchange和Queue之间多了一层绑定关系,不同的生产模式也需要做自己的适配。因此,为了RabbitMQ和Kafka的一致性,我们决定不实施该方案。RabbitMQ对于RabbitMq,我们使用重入特性来实现灰度队列。通过重排队这个特性,我们可以在生产者发送消息的时候,在消息头上标记灰度标识,在发送的时候一起发送。消费者在消费一条消息时,会根据消费者自己的标记来决定是否消费这条消息。如果消费者自身不满足灰度消费规则,则灰度消息由Requeue处理。经过轮询,这条消息最终会流向灰度标识的消费者进行消费。实现思路Producer在发送消息前获取当前实例的灰度标记,在消息头中添加灰度标记,并为消费者添加监听器。灰度节点消费根据灰度标记判断灰度消息的消费,普通节点根据灰度标记判断灰度消息的消费开关决定是消费还是执行Requeue生产过程。当生产者启动时,我们使用自动组装来注册RabbitTemplate。SetBeforePublishPostProcessors添加一个预处理器,在发送消息之前在消息的Header中添加一个灰度标记。消费流程首先,在消费时,重写exe??cuteListener方法,通过监听SimpleMessageListenerContainer进行消息处理。未开启灰度开关时,执行正常消费逻辑。当灰度机直接匹配灰度消息时,可以直接消费。通过监听Eureka本地缓存刷新事件,不断刷新灰度实例的缓存。普通节点消费灰度消息时,如果灰度实例不存在,可以直接消费。如果存在灰度实例,正常节点消费灰度消息,考虑两种可能。一是正常轮询到正常节点,二是灰度节点prefetch_count达到阈值,阻塞队列满,灰度消息处于正常状态。节点被连续轮询。为了解决第二种场景,增加了一层Bloomfilter。当再次匹配到相同的消息时,当前节点会休眠一小段时间。如果以上场景都不符合,则执行Requeue操作。Kafka在Kafka的消费概念中有一个消费组的概念,每个消费者都有一个对应的消费组。当一条消息发布到一个主题时,它只会被传递给订阅它的每个消费者组中的一个消费者,两个消费者组不会互相影响。借助这个消费特性,可以将同一个消费组中的灰度消费者分离出来,组成一个特殊的消费组,使每个消费组接收到相同的消息。在正常的消费群中,遇到带有灰度标识的消息,我们只是卖空消费,并不真正执行业务逻辑。灰度消费组中的消费者只处理符合灰度规则的消息,以及其他News短消费。实现思路生产者在生产灰度消息时,在消息头添加灰度标记。灰度消费者和普通消费者设置不同的GroupId。灰度消费者和普通消费者在收到消息后判断是否有灰度标记。判断配置中心是否开启了消息灰度。如果启用,它将消耗灰度节点。如果不启用,则不会消耗生产过程。生产者在启动时会动态组装所有的拦截器。组装方法是在BeanPostProcessor后处理器中获取到KafkaTemplate对象后,只要将我们拦截器类的全限定名设置到config中即可,无论用户创建的是Factory对象还是KafkaTemplate对象,都可以支持拦截器的组装.消费过程也是如此。如果当前节点是灰度节点,则修改当前group.id为灰度,最后通过拦截器执行消费逻辑。