本文在发送消息的时候主要涉及:RocketMQkeyRocketMQtagRocketMQmsgId顺序消息的队列选择机制在很多业务场景中,需要保证顺序消息的处理顺序messages,比如orders消息会在流向不同状态变化时发送到同一个topic,但是消费者在消费的时候希望按照order变化的顺序进行处理。如果他们不控制,消息会被发送到主题中不同的队列中,这样消费者就不会方法是顺序消费。本文首先只分析Producer是如何发送顺序消息的,后面再分析Consumer的处理过程。我们知道RocketMQ在队列层面是支持顺序消息的,所以在发送消息的时候,只需要将需要顺序消费的消息按顺序发送到一个队列中即可。RocketMQ在发送消息时提供了自定义的队列加载机制。消息发送的默认队列加载机制是轮询。如何选择队列?RocketMQ提供了如下API(这里只是举例其中一种API):SendResultsend(finalMessagemsg,finalMessageQueueSelectorselector,finalObjectarg)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException;用法示例:publicstaticvoidmain(String[]args)throwsUnsupportedEncodingException{try{MQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name");生产者.start();String[]tags=newString[]{"TagA","TagB","TagC","TagD","TagE"};for(inti=0;i<100;i++){intorderId=i%10;Messagemsg=newMessage("TopicTestjjj",tags[i%tags.length],"KEY"+i,("HelloRocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResultsendResult=producer.send(msg,newMessageQueueSelector(){@OverridepublicMessageQueueselect(Listmqs,Messagemsg,Objectarg){Integerid=(Integer)arg;intindex=id%mqs.size();返回mqs.get(索引);}},订单号);System.out.printf("%s%n",sendResult);}producer.shutdown();}catch(MQClientException|RemotingException|MQBrokerException|InterruptedExceptione){e.printStackTrace();届时我们指定选择队列的实现,传入引用arg(可以是orderId或者userId等)然后对整个队列取模,这样就可以将同一个arg发送到同一个队列。需要特别注意这一点:如果我们使用MessageQueueSelector,消息发送的重试机制将失效,即RocketMQ客户端不会重试,消息发送的高可用需要业务方保证。一种方式是在消息发送失败后将消息存入数据库,然后定时调度,最后将消息发送给MQ。当使用异步发送方式并重试时(RocketMQ本身异步时不会重试,业务方可能会重试),比如有1、2、3三个消息,但是消息2发送失败再重试的话,可能在消息3之后重发成功,这样的话消息的顺序就达不到了。所以我们在使用顺序消息时不使用异步发送而是使用同步发送。当Broker宕机重启时,由于partitionrebalancing动作,生产端根据keyhash取模得到的partition会发生变化,瞬时消息序列会不一致。针对这个问题,如果业务方不能容忍短期的顺序一致性,要么集群故障后集群立即不可用,要么将topic做成一个partition,但这会大大牺牲高可用集群和单个分区也会引起额外的问题。集群性能大大降低。RocketMQkey的使用RocketMQ提供了丰富的消息查询机制,比如使用消息偏移量、消息全局唯一msgId、消息Key。RocketMQ在发送消息时,可以为消息设置一个索引。比如上面的例子,我们指定了“KEY”+序号作为消息的Key,这样我们就可以通过索引Key来查询消息。如果需要为消息指定Key,只需要在构造Message时传入Key参数即可,例如如下API:publicMessage(Stringtopic,Stringtags,Stringkeys,byte[]body)RocketMQtag的使用RocketMQ可以为Topic设置Tag(标签),让消费者可以根据Tag过滤Topic中的消息,即有选择地处理Topic中的消息。比如一个订单的整个生命过程:订单创建、待支付、支付完成、商户审核、商户发货、买家发货,订单的每一次状态变化都会向同一个主题order_topic发送消息,只是不同的下游系统而已请注意,并非所有消息都需要在订单流的某些阶段进行处理。我们可以为上面的每一种状态指定不同的标签,消费者在订阅消息的时候也指定对应的标签,这样消费者就只能消费指定标签的消息。API同指定消息key:publicMessage(Stringtopic,Stringtags,Stringkeys,byte[]body)消费者订阅时指定tagAPI:voidsubscribe(finalStringtopic,finalStringsubExpression)throwsMQClientException;在控制台我们可以看到不满足订阅的Tag的消费状态显示为CONSUMED_BUT_FILTERED(已消费但被过滤掉)。RocketMQmsgId当我们用RocketMQ发送信息的时候通常都会返回如下信息:SendResult[sendStatus=SEND_OK,msgId=0A42333A0DC818B4AAC246C290FD0000,offsetMsgId=0A42333A00002A9F000000000134F1F5,messageQueue=MessageQueue[topic=topicTest1,brokerName=mac.local,queueId=3],queueOffset=4]Fortheclient,themsgIdisgeneratedbytheclientproduceritself,andtheoffsetMsgIdisgeneratedbytheserverbroker,wheretheoffsetMsgIdisthemessageIdwedirectlyinputintothequeryintherocketMQconsole.我们先看一下生成msgId的代码:publicstaticStringcreateUniqID(){StringBuildersb=newStringBuilder(LEN*2);sb.append(FIX_STRING);sb.append(UtilAll.bytes2string(createUniqIDBuffer()));返回sb.toString();}publicstaticStringcreateUniqID(){StringBuildersb=newStringBuilder(LEN*2);sb.append(FIX_STRING);sb.append(UtilAll.bytes2string(createUniqIDBuffer()));返回sb.toString();}privatestaticbyte[]createUniqIDBuffer(){ByteBufferbuffer=ByteBuffer.allocate(4+2);长电流=System.currentTimeMillis();if(current>=nextStartTime){setStartTime(current);}缓冲。位置(0);buffer.putInt((int)(System.currentTimeMillis()-startTime));buffer.putShort((短)COUNTER.getAndIncrement());返回buffer.array();}FIX_STRING的内容是什么?在MessageClientIDSetter类的静态代码块中有:static{LEN=4+2+4+4+2;ByteBuffertempBuffer=ByteBuffer.allocate(10);tempBuffer.position(2);tempBuffer.putInt(UtilAll.getPid());tempBuffer.position(0);尝试{临时缓冲区。放(UtilAll.getIP());}catch(Exceptione){tempBuffer.put(createFakeIP());}tempBuffer.position(6);tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());FIX_STRING=UtilAll.bytes2string(tempBuffer.array());setStartTime(System.currentTimeMillis());计数器=新原子整数(0);}组成部分是:客户端发送IP,支持IPV4和IPV6进程PID(2字节)类加载器的hashcode(4字节)当前系统时间戳和启动时间戳的差值(4字节)自增序列(2字节)对于每个生产者实例都是唯一的,所以不同的生产者生成的msgId不会重复。生产者单实例的区分因素是:时间+计数器。在应用不重启的情况下可以保证msgId的唯一性,应用重启后只要系统时钟不变,msgId也是唯一的。所以只要系统时钟不回拨,我们就可以保证msgId的全局唯一性。上述组件中的时间戳差异是当前时间戳与上个月时间戳的差异。如果应用程序运行一个月然后重新启动,则会重复msgId。从算法上讲是的!但是MQ的消息是有时间限制的,有效期是72小时或者3天。RocketMQ会在每天凌晨4:00清除过期消息。所以msgId也保证了全局唯一。最后offsetMsgId.offsetMsgId是指消息所在Broker的物理偏移量,即commitlog文件中的偏移量。由以下两部分组成:commitlog中Broker的IP和端口号的物理偏移量publicstaticStringcreateMessageId(finalByteBufferinput,finalByteBufferaddr,finallongoffset){input.limit(MessageDecoder.MSG_ID_LENGTH);输入.put(地址);input.putLong(偏移量);返回UtilAll.bytes2string(input.array());}我们可以在不知道消息的Topic等信息的情况下,根据offsetMsgId定位到具体的消息。