大家好,我是君哥,今天来聊聊RocketMQ的灰度方案。灰度发布是指黑白之间平滑过渡的发布方式。在流量较大的系统中,如果升级的范围比较大,或者影响的内容不确定,升级一般会以停产的方式进行,这样可以减少生产变更的影响。如上图,升级ServiceA服务,使用灰度发布,先升级Server5,如果一周后没有问题,再升级Server4和Server3,再运行一周没有问题,再升级剩下的两个节点。上面的例子是一个RPC调用。但是如果你使用消息队列呢?使用消息队列不使用网关进行流量转发。这里需要根据不同的场景进行分析。1.只升级消费者。这是最简单的情况。比如只有消费者修改消费逻辑,类似于RPC调用。我们只需要将消费者灰度释放即可。如下图所示:2.生产者也升级了。下面是一个订单的实体类。我们添加了一个新属性,即订单生成时间:publicclassOrder{privateLongid;私人长用户名;私人长产品编号;私人整数计数;私有BigDecimalpayAmount;/**订单状态:0:创建中;1:已完成*/私有整数状态;/**新增属性,订单生成时间*/privateStringcreateTime;}消费者的改造需要修改属性createTime处理。(1)消费者过滤器在生产者的Order类中添加createTime属性。如果我们直接使用createTime属性来过滤,消费者是做不到灰度的,因为所有的消费者都可能拉到带有createTime属性的订单。信息。RocketMQ中Message的定义如下:publicclassMessageimplementsSerializable{privateStringtopic;私有int标志;私有Map属性;私有字节[]主体;privateStringtransactionId;}可以在properties属性中添加灰度例如生产者发送消息时,封装如下:Messagemsg=buildMessage(topic);msg.putUserProperty("gray","true");注意:也可以定义在SendMessageHook钩子函数中。这样就可以在consumer端增加一个灰度ConsumerGroup来消费灰度消息。如下图所示:对于灰度ConsumerGroup,在判断灰色属性为true时进行消费,对于普通ConsumerGroup,在判断灰色属性不等于true时进行消费。这里可以使用RocketMQ客户端的FilterMessageHook,代码如下:=Listmssagecontext.getMsgList();context.setMsgList(messages.stream().filter(m->StringUtils.equals(m.getProperty("gray"),"true")).collect(Collectors.toList()));}});但是这种方式会有两个问题。灰度和普通两个ConsumerGroup相当于广播组:两个组都要拉取所有消息。流量,但实际上所有的流量都被切掉了,只是根据属性判断而已。这给整个消费端带来了两倍的压力;因为两个消费群体都要去Broker拉消息,Broker的压力也成倍增加。(2)Broker过滤使用tag过滤。如果一个Consumer没有订阅一个Topic中的所有消息,可以通过Tag进行过滤。比如一个Consumer订阅了TopicA中的Tag1和Tag2,那么这个Consumer的订阅关系是这样的:SubscriptionData封装了Topic、tag以及订阅的tag的hashcode集合。当Consumer发送拉取消息请求时,会将订阅关系传递给Broker(Broker将其解析为一个SubscriptionData对象)。Broker在使用consumequeue获取消息时,首先判断最后一个8字节的taghashcode是否在SubscriptionData的codeSet中,如果不在则跳过,如果有则将消息返回给Consumer。如下图所示:这样就可以在GrayscaleProducer发送消息的时候添加Tags,代码如下:Messagemsg=newMessage();msg.setBody("Test");msg.setTopic("Topic");msg.setTags("灰色");灰度消费者订阅灰色标签。这样就避免了2.1节中的全消息获取问题。使用SQL92过滤使用SQL92过滤来应对更复杂的场景,不仅要过滤Tags,还要过滤UserProperty。例如,下面是生产者的代码:Messagemsg=newMessage();msg.setTopic("测试主题");msg.setTags("tag1");msg.putUserProperty("灰色","真");像这样消费初始化的时候可以定义SQL92过滤,代码如下:consumer.subscribe("testTopic",MessageSelector.bySql("(TAGSisnotnullandTAGSinTAGS='''''tag1''''')"+"and(grayisnotnullgray='true')"));下面是bySql的源码:publicstaticMessageSelectorbySql(Stringsql){returnnewMessageSelector(ExpressionType.SQL92,sql);}3.本文小结介绍了如何使用RocketMQ灰度消息,场景比较简单。全链路的复杂灰度场景,可以参考阿里的微服务引擎MSE。