当前位置: 首页 > 科技观察

RocketMQ的详细用法,你学会了吗?

时间:2023-03-18 14:50:29 科技观察

大家好,我是北军。消息中间件是我们工作中使用频率最高的一类中间件。它具有低耦合、可靠传递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。现在市面上主流的消息中间件有很多,比如老牌的ActiveMQ、RabbitMQ、现在流行的Kafka、阿里巴巴自研的RocketMQ等等,今天智贝君就来详细说说使用RocketMQ生产者和消费者的一些注意事项。1.Producer1.1发送消息注意事项1)消息大小建议消息大小不超过512K。2)异步发送默认发送是同步发送,send方法会一直阻塞,等待broker的响应。如果担心性能问题,可以通过send(msg,callback)发起异步调用。3)producergroup一般情况下,producergroup是没有用的,但是在发送交易消息的时候,如果producer中途意外崩溃,broker会主动回调producer组中的任意一台机器来确认交易状态。(目前开源版本不支持事务性消息)。4)线程安全问题生产者实例是线程安全的,在应用中只需要实例化一次。5)性能问题如果想在一个jvm进程中使用多个producer实例来提高发送性能,我们推荐:使用异步发送,每个producer调用setInstanceName只需要3到5个producer实例,区别在于不同的producer。6)发送超时时间当客户端向broker发送超时请求时,客户端会抛出RemotingTimeoutException,默认超时时间为3秒。可以通过调用send(msg,timeout)来设置超时。建议超时时间不要设置得太小,因为broker可能需要时间刷新磁盘或者同步数据到slave。7)同一个应用,最好只使用一个Topic。消息的子类型可以通过标签来识别,标签可以由应用程序自由设置。当发送的消息设置标签后,消费者在订阅消息时可以使用标签在broker上过滤消息。注意,虽然这里的命名是复数,但一条消息只能有一个标签。8)keys字段可以设置消息在业务层面的唯一标识,方便根据keys定位消息。broker会为每条消息创建一个索引(hashindex),应用可以通过topic和key查询消息的内容(MessageExt),谁消费了消息(MessageTrack,精确到消费者组)。由于是哈希索引,请尽量保证key的唯一性,避免潜在的哈希冲突。9)无论消息发送成功还是失败,都要打印消息日志,日志内容必须包含sendResult和key字段。10)对于消息不能丢失的应用,必须有消息重发机制。例如,如果消息发送失败,可以将该消息存储在数据库中,然后可以通过定时程序或手动触发重发。11)调用send同步发送消息时,假设此时设置了isWaitStoreMsgOK=true(默认为true),只要不抛出异常,则表示发送成功,但是当isWaitStoreMsgOK=false时,发送将始终返回SEND_OK。但是发送“成功”有多种状态,在SendStatus中定义如下:FLUSH_DISK_TIMEOUT如果broker设置FlushDiskType=SYNC_FLUSH,当broker刷盘超时时返回该状态(MessageStoreConfig.syncFlushTimeout,默认5秒).此时消息仍然保存在内存中,只有当broker宕机时消息才会丢失。FLUSH_SLAVE_TIMEOU如果broker的角色是SYNC_MASTER,当slave同步数据的时间超过MessageStoreConfig.syncFlushTimeout(默认5秒)时会返回这个状态。这个时候只有master和slave都down了,而且master还没有刷新磁盘,消息才会丢失。SLAVE_NOT_AVAILABLE如果broker的角色是SYNC_MASTER,此时slave不可用,会返回这个状态。SEND_OK发送成功。为了保证消息不丢失,需要配置SYNC_MASTER或者SYNC_FLUSH。12)消息重复发送消息时返回FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT。如果不幸的代理也崩溃了,消息就会丢失。如果此时什么都不做,消息可能会丢失,如果重新发送消息,消息可能会重复。通常我们建议发送者重发消息,消费者保证消息消费的幂等性。1.2消息发送失败如何处理Producer的send方法原生支持内部重试,重试逻辑如下:最多重试3次。如果发送失败,它将转向下一个代理。该方法总耗时不超过sendMsgTimeout,默认3秒,所以如果发送消息已经产生超时异常,则不会重试。以上策略仍然不能保证消息发送成功。为了保证消息发送成功,建议应用这样做:如果调用send同步发送失败,尝试将消息存入db,后台线程会定时重试,保证消息到达经纪人。1.3Oneway发送形式对于可靠性要求不高的应用,可以使用oneway发送形式,oneway形式不等待响应。1.4发送顺序消息顺序消息分为分区有序和全局有序。有序分区需要生产者在发送时传入MessageQueueSelector的实现类,最终将某类消息发送到同一个队列。但是,一旦出现通信异常、broker重启等情况,由于队列总数的变化,位于hash取模之后的队列会发生变化,导致暂时的顺序不一致。如果业务能够容忍集群出现异常(如broker崩溃或重启)时短时间的乱序消息,使用partitionorder更为合适。全局严格排序的消息即使在异常情况下也能保证消息的顺序,但是是以牺牲分布式故障转移特性为代价的,即如果broker集群只有一台机器不可用,则整个集群不可用,服务可用性会大大减少。顺序消息的缺点:发送顺序消息不能利用集群的FailOver特性来消费顺序消息。顺序消息的并行度取决于队列的数量。队列热点问题。个别队列由于哈希不均匀,消息过多,消费速度跟不上,导致消费堆积问题。如果遇到消费失败的消息,不能跳过,需要挂起当前队列。5、暂不支持发送交易消息。2.消费者2.1消费者组,订阅不同的消费者组可以独立消费同一个主题,类似于ActiveMQ的虚拟主题。另外,对于同一个消费组,需要保证组内的消费者订阅消息的规则是一致的!MQ中的ConsumerGroup代表一个Consumer实例组。对于大多数分布式应用,多个Consumer实例通常挂载在一个ConsumerGroup下。一致的订阅关系是指同一个ConsumerGroup下的所有Consumer实例的处理逻辑必须完全一致。一旦订阅关系不一致,就会导致消息消费逻辑混乱,甚至导致消息丢失。由于MQ订阅关系主要由Topic+Tag组成,保持订阅关系一致意味着同一个ConsumerGroup下的所有实例必须在以下两个方面保持一致:订阅的Topic必须一致;Tag中订阅的Topic必须一致。技术架构>消费者最佳实践>image2017-11-1515:50:13.png2.2MessageListener1)顺序消费MessageListenerOrderly消费者会锁队列保证消息顺序消费,但是这样也会造成一定的性能损失。当消费出现异常时,建议不要抛出异常,而是返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,暂停消费一段时间。暂停时间由context.setSuspendCurrentQueueTimeMillis方法指定。2)并发消费并发消费是推荐的消费方式。在这种模式下,消息将被并发消费。消费异常不建议抛异常,直接返回ConsumeConcurrentlyStatus.RECONSUME_LATER即可。为了保证消息至少被消费一次,在延迟后的某个时间点,消息会被重新发送回broker(主题不是原来的主题而是本消费组的RETRY主题)(默认10秒,业务可以设置,通过delayLevelWhenNextConsume和MessageStoreConfig.messageDelayLevel设置),会再次投递给这个ConsumerGroup,如果重复消费一定次数后继续失败(默认16次),DefaultMQPushConsumer.maxReconsumeTimes),它会被投递到DLQ队列中。应用程序可以监视死信队列以进行手动干预。3)当并行消费返回状态时,可以返回RECONSUME_LATER告诉Consumer当前不能消费这条消息,等待一个延时再消费,但此时消费不会停止,可以继续消费其他消息。但是在顺序消费中,因为必须保证消费的顺序,不能跳过失败的消息。这时候可以通过返回SUSPEND_CURRENT_QUEUE_A_MOMENT告诉Consumer暂停一下。4)阻塞不建议阻塞Listener,因为这样会阻塞线程池,也可能导致消费者线程终止。2.3线程数消费者内部通过一个ThreadPoolExecutor来消费消息,线程池的大小可以通过setConsumeThreadMin和setConsumeThreadMax改变。2.4ConsumeFromWhere当一个新的实例启动时,PushConsumer会得到这个消费组的broker记录的消费进度(consumeroffset),并根据这个进度发起自己的第一个Pull请求。如果Broker中没有存储消费进度,则证明这是一个全新的消费组。这个时候客户端有几种策略可以选择:CONSUME_FROM_LAST_OFFSET//默认策略是从队尾开始消费,也就是跳过历史消息。CONSUME_FROM_FIRST_OFFSET//从队列最开始消费,即消费所有历史消息(仍然保存在broker中)。CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,与setConsumeTimestamp()配合使用,默认半小时前注意:这些配置只对新消费组有效,老消费组根据消费进度继续已存储消费。对于想要跳过历史消息的老消费群,可以采用以下方法:1)判断消息的发送时间,如果消息太旧直接返回CONSUME_SUCCESS。2)判断消息的offset与MAX_OFFSET的差距,如果落后太多,可以直接。返回CONSUME_SUCCESS。3)消费者开始前,先调整消费组的消费进度,然后开始消费。您可以手动使用命令resetOffsetByTimeStamp,详细信息请参见ResetOffsetByTimeCommand.java。2.5消息幂等性由于RocketMQ无法避免重复消费,如果业务对消息的重复消费非常敏感,则必须在业务层面进行去重。2.6慢消费的处理方法1)增加消费并行度。大多数消息消费行为都是IO密集型业务。适当提高并发量可以显着提高消费吞吐量。2)批量消费默认情况下,消费者的consumeMessageBatchMaxSize为1,即一次只消费一条消息。如果应用可以批量消费消息,消费吞吐量可以得到很大的提升。3)跳过不重要的信息。当消息堆积严重时,可以丢弃不重要的消息。4)优化消息消费流程2.7打印消费日志建议在消费入口方法中打印消息,方便后续排查,消费失败时也可以打印失败日志。2.8使用broker过滤消息,避免冗余消息传输3.总结好了,RocketMQ生产者和消费者的使用已经总结了。相信大家对RocketMQ的使用应该更有信心了。