本文作者:丁伟-中通快递资深架构师,《RocketMQ技术内幕》作者,ApacheRocketMQ社区首席布道师,公众号“中间件维护者”兴趣圈”。01物流行业的业务特点物流行业具有三大业务特点:业务量大、效率容忍度高、业务复杂度极高。作为快递行业的龙头企业,中通的日均订单量已达5000万单。双11期间,日均订单量可达2亿+,日均新闻发行量超万亿。在快递行业的实际日常业务中,比如上午10点下单,可能需要下午2点取件,所以我们可以容忍几分钟甚至几小时的延误。对于截包、路由变更等场景,需要保证一定的时效性,但大多数情况下,只需要尽可能保证即可。中通快递是加盟制,其转运中心、配送中心和网点不在同一家公司,业务逻辑、管控、结算等都比较复杂。快递行业的业务系统结算、下单、运单代收,对解耦的要求很高。另外,双十一期间的流量可能是平时的3-4倍,所以应对突发流量的能力也是需要的。而RocketMQ的能力和我们的业务场景是非常吻合的,所以在快递行业应用非常广泛。02订单中心使用RocketMQ案例,基于RocketMQ搭建订单中心的架构图如上图。用户在天猫或淘宝等电商平台下单后,会将订单发送至云端服务器。中通自主研发的数据同步平台,负责定位云服务器的变更日志,并通过变更日志中转给RocketMQ。第二层链接是拼多多、京东等厂商,通过网关进入RocketMQ。第三层是组件用户,也是通过网关进入RocketMQ。流量到达RocketMQ后,订单域消费者消费消费消息,写入数据库。很多后续的系统,比如运单域和结算,都需要这个数据。为了解耦,数据需要通过同步平台同步到另一个topic,供各个业务系统进行订阅和数据分发。上述架构的关键是如何保证MQ服务器的性能和数据的可靠性。比如MQ使用的刷机策略是同步的还是异步的?是否启用了transientStorePoolEnable以提高性能?复制策略是同步的还是异步的?我需要Dledger吗?如何优雅运维?RocketMQ在4.5.0版本之前支持主从同步。4.5.0版本后引入Dledger多副本机制,支持主从切换。一个复制组中有主节点和从节点,不同复制组之间有负载均衡。通常,主节点负责读写数据。当主节点忙时,可以将读取转发给从节点。同时,如果主节点出现故障,从节点仍然可以消费,保证消息发送和消息消费的高可用。一个复制组失败后,所有的请求都会被发送到另一个复制组,导致它因流量过大而失败。因此,在实际环境中,一般建议部署四个复制组来应对流量激增。RocketMQ4.5.0之后引入了Dledger多副本机制,支持主从切换,不同复制组之间的负载均衡。主节点负责读写,从节点只负责复制数据。当复制组中的master节点宕机时,会开始重新选举复制组中的master。选主完成后,可以继续提供写消息功能,流量不会转移到其他复制组,保证传输和消费。高可用性。但是,Dledger的多副本机制仍然存在不足。三台机器中,只有主节点可以读写,从节点只负责切换,主节点承受了很大的压力。如果从节点也可以承担读写请求,根据主从同步模型,主节点宕机后,将请求转移到从节点,实现真正的高可用。物流行业一般选择主从同步模式,因为主从切换意义不大,浪费机器。当用户发送消息时,消息会先到达brokermaster,然后存储到PageCache中,然后以同步或异步的方式写入磁盘。为了保证消息永不丢失,会使用同步磁盘,同时将数据复制到从节点。数据副本存储在多处,避免单点故障导致数据丢失。Dledger模式是基于Raft协议的数据拷贝机制。数据写入成功前需要副本组中超过半数的节点写入成功,可以保证强一致性。使用同步复制同步刷,消息延迟肯定会比不使用这种模式大。为了提高写入性能,RocketMQ在内核层提供了读写分离机制,引入了transientStorePoolEnable。默认情况下,消息会先进入PageCache,然后通过同步/异步刷新进入磁盘文件。当transientStorePoolEnable=true时,消息会先进入堆外内存,然后通过FileChannel块提交,批量提交到FileChannel,再通过异步刷写,批量进入磁盘文件。堆外内存技术可以保证数据一直留在内存中,不会因为内存不足而将数据交换到其他内存中。这种方法可以提高更高的可写性。同时写入过程不经过PageCache,仍然从PageCache中读取,实现了内核层的读写分离。它的优点是性能更高,投入资源更少。缺点是容易丢失消息,因为堆外内存中的消息可能无法批量提交到磁盘,导致消息丢失。性能和数据丢失之间的权衡是什么?我们认为一定要追求性能(资源投入少),同时也要保证数据库的正确性。首先考虑数据丢失的概率和数据检索成本是否可控。如果数据检索难度低,毫不犹豫地选择性能优先。比如数据保存在binlog中,一般保存15天。在这种情况下,集群可以使用异步复制和异步磁盘刷新。并且建议开启transientStorePoolEnable=true。即使由于集群服务节点异常、机器掉电导致消息丢失,通过消息回溯仍然可以找到掉电前的数据。而且这种情况发生的概率低,没必要为了小概率事件牺牲性能。此外,应避免人为因素(集群运维)造成的数据丢失,进一步减少人工干预的次数。比如开启transientStorePoolEnable=true,堆外内存重启时数据会丢失。当出现上述情况时,可以采用以下方案来保证内存不丢失:①关闭一组broker的写权限,只保留其读权限,即仍然可以消费原来的消息。②broker将TPS写入0后,停止broker。③运维操作完成后,启动broker。④开启broker写权限。当消息发送者或消费者查询信息时,他们将不会访问禁用写权限的代理。比如有四个broker,每个broker有四个queue。关闭其中一个的写权限后,只返回12个队列。这样,就可以顺利地停止流动了。流量停止后,堆外内存中的数据必然会被刷到磁盘中,从而保证数据不会丢失。RocketMQ支持分区级别的顺序消费。以银行账户余额变动短信通知为例:发送方根据key(银行账号)进行hash取模,检索后变为q0、q1、q2、q3。然后保证相同key的消息可以进入同一个队列,消费者使用顺序消费模式可以保证单个partition的消息按顺序进入。RocketMQ主要通过锁一致性来实现顺序消费。消费者在拉取消息之前,会先在broker服务器上锁定队列。如果加锁成功,则可以消费;否则不消费等待下一个消息队列。消息进入pullRequest队列后,consumer会先在本地锁住队列。比如消费者分配给了q0和q1,它会先申请q0的锁再消费,再申请q1的锁再消费。上述过程中,RocketMQ只支持分区级并发。例如,如果给消费者分配了30个线程,实际上只有两个线程可以同时工作。这种策略会导致如果消息队列有积压,调整消费者线程数没有效果。解决并发困境的关键词是:关联序列。关联顺序是指同一队列中不同账号的消息并发执行,同一队列中同一账号的消息串行执行。如上图,以账户余额短信通知业务为例,q0队列中有1、3、5、3、9,只需要保证1、3、5、9是并行执行,前后两个3依次执行。能。上图展示了顺序消费模型的优化方案。定义一个线程池,消费时通过hash取模,使得相同key的消息进入同一个线程,不同key的消息分散在所有线程池中。比如本来有10个线程,不够用可以增加到20个线程,解决了并发带来的困境。在该模型下,并发度与队列无关,可以根据需求任意设置并生效。并实现无锁设计,根据key选择线程。RocketMQ4.6.0版本提供了DefaultLitePullConsumerAPI,其功能与Kafka高度相似,实现了RocketMQ与Kafka的通用性。全链路压测有两个基本的设计需求:①隔离:如何对压测流量进行存储,使其与正式流量互不影响。②上下文信息:当链路一部分连通,另一部分未连通时,上下文信息如何存储?我们主要通过影子主题和影子消费群来实现全链路压测方案。如上图所示,消息发送到中通自研数据同步平台后,会判断是否为压测数据。如果是,发送到shadow_T_ORDER_TOPIC,否则发送到T_ORDER_TOPIC。order_consumer包含shadow_C_ORDER_CONSUMER和ORDER_CONSUMER分别消费压测消息和非压测消息。不接入全链路压测的消费者是指消费非压测数据。通过上述方式,将流量从消息发送和消息消费环节分离。同时,RocketMQ的消息属性中会存储ID等其他相关信息。未接入全链路压测的应用无法识别消息属性,无法区分消息是否具有压测属性。这样会导致所有的流量都被送到没有连接全链路压测的ORDER_CONSUMER,所以不适合使用消息属性。隔离。如果要使用消息属性进行隔离,那么所有的数据都必须是业务方要消费的消息。03中通基于RocketMQ的平台搭建实践中通基于RocketMQ的平台已经开源,开源地址如下:https://github.com/ZTO-Expres...在中通,所有的kafka集群和RocketMQ在目前生产环境Clusters、Zookeeper等可以直接通过页面操作快速搭建集群。实现原理如下:在页面访问zms-portal来启停服务进程,zms-agent启动supervisor进程管理系统来启停服务。安装时会向zms-agent提供参数,然后发送给supervisor启动脚本启动服务。今年,中通计划实现MQ集群的容灾策略、扩容、主题迁移,以及无需执行其他运维命令,只需在平台上一键操作即可完成消费者群体迁移的能力。nameserver地址动态感知机制是指:项目组使用ams-sdk发送消息,消费消息时无需感知nameserver地址,只需要面向topic和consumergroup编程;一个集群迁移到另一个集群。NameServer地址动态感知机制的实现原理如下:参照ZK存储元信息,zms-portal在增删改查时会操作Message集群,并将操作写入ZK,存储在ZK中哪个topic属于Cluster,nameserver地址等。zms-client在发送消息之前,会查询topic的meta信息,并根据meta信息构建底层sender来发送消息。如果要从一个集群迁移到另一个集群,可以先修改元信息,更新ZK。ZK更新后,zms-client会订阅主题内容的变化。如果有变化,会通知SDK重新构建sender,实现切换。ZTO实现了集群、主题、消费组等可视化的监控告警系统,该能力的实现主要是通过zmsCollector服务监控ZMS节点的变化并返回集群数据。MessageCluster收集集群指标,将客户端指标数据订阅到zmsCollector,然后存储到influxDB中进行最终展示。另外,RocketMQ的客户端耗时等指标,我们也在zms-sdk中埋点,发送到MessageCluster,由zmsCollector消费,再发送到influxDB最后展示。加入ApacheRocketMQ社区,十年磨一剑。ApacheRocketMQ的成长离不开全球近500名开发者的积极参与和贡献。相信你会在下一个版本中成为ApacheRocketMQ的贡献者。在社区中,你不仅可以结识社区领袖,提升技术,还可以提升个人影响力,促进自我成长。社区5.0版本开发如火如荼,近30个SIG(兴趣小组)等你加入。欢迎立志打造世界一流分布式系统的同学加入社区。添加社区开发者微信:rocketmq666进群。参与贡献构建下一代消息、事件、流的综合处理平台。微信扫码加小火箭进群。另外,你也可以加入钉钉群,与RocketMQ爱好者广泛讨论:钉钉扫码加群
