大家好,我是君哥,最近有位读者在接受采访时被问到一个问题。如果一个消费者拉取了一批消息,比如100条,第100条消息消费成功,但是第50条消息消费失败,那么offset将如何更新呢?考虑到这一点,我们来谈谈如果一批消息消费失败时如何保存偏移量。1拉取消息1.1封装拉取请求以RocketMQ推模式为例,RocketMQ消费者启动代码如下:publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("CID_JODIE_1");消费者.subscribe("TopicTest","*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setConsumeTimestamp("20181109221800");consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){try{System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);}catch(Exceptione){returnConsumeConcurrentlyStatus.RECONSUME_LATER;}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}上面的DefaultMQPushConsumer是一个push模式的consumer,启动方法是start。消费者启动后,会触发再平衡线程(RebalanceService)。这个线程的任务就是在死循环中不断rebalance,最后将拉取消息的请求封装到pullRequestQueue中。该过程涉及的UML类图如下:1.2处理PullRequests拉取消息的请求PullRequest封装后,RocketMQ会不断从pullRequestQueue中获取消息pullrequest进行处理。UML类图如下:拉取消息的入口方法是一个死循环,代码如下://PullMessageServicepublicvoidrun(){log.info(this.getServiceName()+"servicestarted");while(!this.isStopped()){try{PullRequestpullRequest=this.pullRequestQueue.take();this.pullMessage(pullRequest);}catch(InterruptedExceptionignored){}catch(Exceptione){log.error("PullMessageServiceRunMethodexception",e);}}log.info(this.getServiceName()+"服务端");}这里拉取消息后,提交给回调函数PullCallback处理。获取到的消息先放到ProcessQueue中的msgTreeMap中,再封装到线程类ConsumeRequest中进行处理。简化代码后,ConsumeRequest的处理逻辑如下:ConsumeConcurrentlyContextcontext=newConsumeConcurrentlyContext(messageQueue.nullus);消费逻辑,这里的逻辑定义在文章开头的代码中status=listener.consumeMessage(Collections.unmodifiableList(msgs),context);}catch(Throwablee){}if(!processQueue.isDropped()){//2.处理消费结果ConsumeMessageConcurrentlyService.this.processConsumeResult(status,context,this);}else{log.warn("processQueue在没有进程消耗结果的情况下被丢弃。messageQueue={},msgs={}",messageQueue,msgs);}}2处理ConsumeResult2.1ConsumeConcurrentlyConcurrently处理消费结果的代码简化如下:etAckIndex();开关(状态){caseCONSUME_SUCCESS:if(ackIndex>=consumeRequest.getMsgs().size()){ackIndex=consumeRequest.getMsgs().size()-1;}intok=ackIndex+1;intfailed=consumeRequest.getMsgs().size()-好的;休息;案例RECONSUME_LATER:中断;默认值:中断;}switch(this.defaultMQPushConsumer.getMessageModel()){案例广播:for(inti=ackIndex+1;imsgBackFailed=newArrayList(consumeRequest.getMsgs().size());对于(inti=ackIndex+1;i=0&&!consumeRequest.getProcessQueue().isDropped()){this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),offset,true);}}}从上面的代码可以看出,如果处理消息的逻辑是串行的,比如文章开头的代码使用了for循环来处理消息,如果某条消息处理失败,直接退出循环,将ConsumeConcurrentlyContext的ackIndex变量赋值给失败消息在消息列表中的位置,这样失败消息之后的消息将不再处理,发送给Broker等待重新获取代码如下:publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("TopicTest","SU*");consumer.setConsumeRSETFromWhereW(Consume;consume.setConsumeTimestamp("20181109221800");consumer.registerMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(Listmsgs,ConsumeConcurrentlyContextcontext){for(inti=gs.i++){try{System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);}catch(Exceptione){context.setAckIndex(i);返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}将消费成功的消息从ProcessQueue中的msgTreeMap中移除,返回msgTreeMap中的最小offset(firstKey)进行update。注意:集群模式的offset存储在Broker端,更新offset需要向Broker发送消息,而广播模式的offset存储在Consumer端,只需要更新本地的offset即可。如果处理消息的逻辑是并行的,在消息处理失败后给ackIndex赋值是没有意义的,因为可能有多个消息失败,给ackIndex变量赋值是不准确的。最好的办法是给ackIndex赋0值,整批消息会被重新消费,可能会引起其他问题。2.2顺序消息对于顺序消息,从msgTreeMap中取出消息后,必须先放到consumingMsgOrderlyTreeMap中。更新偏移量时,从consumingMsgOrderlyTreeMap中取出最大的消息偏移量(lastKey)。3小结回到开头的问题,如果一批消息按顺序消费,不可能第100条消息消费成功,第50条消息消费失败,因为当第50条消息消费失败时,循环应该退出。不要继续食用。如果是并发消费,出现这种情况,建议重新消费整批消息,也就是给ackIndex赋0,这样就必须考虑ghosts的问题了。