当前位置: 首页 > 科技观察

RocketMQ如何应对每天1500亿条的数据处理?

时间:2023-03-12 02:31:41 科技观察

同程艺龙的机票、火车票、汽车票、酒店相关业务已经接入RocketMQ,用于在流量高峰期进行削峰填谷,减轻后端压力。同时对常规系统进行解耦,将部分同步处理改为异步处理,每天处理1500亿条数据。在最近的ApacheRocketMQMeetup上,同程艺龙机票业务部架构师查江分享了同程艺龙的消息系统如何处理每天1500亿条的数据处理。通过本文,您将了解到:同程-艺龙消息系统的使用同程-艺龙消息系统的应用场景技术上踩过的坑基于RocketMQ的改进同程-艺龙消息系统的使用RocketMQ集群划分分为两部分:NameServer和Broker,NameServer采用双主模式,一个是为了性能,一个是为了安全。纯数据Broker分为很多组,每组又分为Master和Slave。目前我们的机票、火车票、汽车票、酒店相关的业务都接入了RocketMQ,用于流量高峰时的调峰,减轻后端的压力。同时对常规系统进行解耦,将部分同步处理改为异步处理,每天处理1500亿条数据。选择RocketMQ的原因是:接入简单,引入的Java包相对较少纯Java开发,设计逻辑相对清晰整体性能比较稳定,主题数量大时也能保持性能这是我们退订系统中的一个应用场景.当用户在前端点击退订按钮时,系统调用退订接口,然后调用供应商的退订接口,完成退订功能。如果供应商的系统接口不可靠,会导致用户退订失败。如果系统设置为同步运行,会导致用户再次点击。因此,我们引入了RocketMQ来实现变同步为异步。当当前终端用户发出退订请求时,退订系统收到请求后会记录在退订系统的数据库中,表示用户退订。同时,退订消息通过消息引擎发送给与供应商连接的系统,调用供应商的接口。如果调用成功,数据库会被标记,表示订阅已经成功取消订阅。同时增加补偿脚本,将那些未成功退订的消息从数据库中取出,重新退订,避免消息丢失导致退订失败。仓库系统的第二个应用场景就是我们的仓库系统。这是一个比较常规的消息使用场景。我们从供应商那里收集酒店的一些基本信息数据和详细数据,然后接入到消息系统中,由后台的分销系统、最低价系统和库存系统进行计算。同时,当供应商有价格变动时,价格变动事件也会通过消息系统传递到我们的后台业务系统,保证数据的实时性和准确性。供应库的订阅系统数据库的订阅系统也使用了消息的应用。一般做数据库同步都是通过binlog读取里面的数据,然后传输到数据库中。在处理过程中,我们最关心的是数据的顺序,所以在数据库行模式的基础上,新增了一个函数,保证每个Queue中的顺序是唯一的。虽然Queue中的顺序自然是唯一的,但是在我们的使用中有一个特点,就是我们把相同ID的消息放在同一个Queue中。比如图中右上角id1的消息,数据库的主字段是id1,统一放在Queue1中,是顺序的。在Queue2中,两个id3被两个有顺序的id2隔开,但是实际消费读出来的时候,是有顺序的。因此,可以利用多个队列的顺序来提高整体的并发度。上图中,一个MQ对应两个消费者。他们在同一个Group1中。一开始大家只有Topic1,这时候是正常消费。但是如果给第一个消费者添加了一个Topic2,此时会无法消费或者消费异常。这是RocketMQ本身机制导致的问题。需要给第二个消费者添加Topic2才能正常消费。支付交易系统的另一个场景是支付交易系统。在这个场景中,还有两个应用程序。他们都在同一个组和同一个主题下。一个是消费Tag1的数据,一个是消费Tag2的数据。正常情况下启动应该没问题,但是有一天我们发现一个应用无法启动,另一个应用只消费了Tag2的数据,但是因为RocketMQ的机制,Tag1的数据会被接过来.Tag1的数据将被丢弃。这会导致用户在支付过程中支付失败。对此,我们将Tag2放入Group2中,两个Group不会消费同一条消息。我个人建议RocketMQ可以实现一种机制,只接受自己的Tag消息,不接收无关的Tag。大量旧数据读取场景在火车票消费场景中,我们发现有200亿条旧数据没有被消费。当我们开始消费时,RocketMQ默认会从第0条数据开始读取。这个时候磁盘IO飙升到100%,影响了其他消费者数据的读取,但是这些旧数据加载后没有实际作用。.因此,读取大量旧数据的改进方法是:对于一个新的消费组,默认从LAST_OFFSET开始消费。当Broker中单个topic累计超过1000万时,禁止消费,需要联系管理员开通消费。监控一定要到位,当磁盘IO激增时,可以第一时间联系消费者进行处理。服务端场景①CentOS6.6FutexKernelbug,导致NameServer和Broker进程频繁挂起,无法正常工作解决:升级到6.7②服务端两个线程会创建相同的CommitLog,放入List,导致计算消息偏移量出错,解析消息失败无法消费,重启也没有解决问题。解决方案:线程安全问题,改成单线程③重置Pull模式下的消费进度,导致服务端向Map中填充大量数据,BrokerCPU占用率飙升100%。解决方案:场景中没有使用Map局部变量,删除④Master建议客户端从slave消费时,如果数据还没有同步到slave,会重置pullOffset,导致大量重复消费。解决方法:不重置offset⑥没有MagicCode的同步,安全组扫描同步端口时,Master解析错误,导致一些问题。解决方法:同步时加入magicCode校验。基于RocketMQ的改进,新增客户端。添加.net客户端,基于Java源码原生开发;添加HTTP客户端实现部分功能,通过NettyServer连接RocketMQ。新增消息限流功能如果客户端代码写错,出现死循环,会产生大量重复数据。这时候生产线程就会爆满,导致队列溢出,严重影响我们MQ集群的稳定性,进而影响其他业务。上图是限流的模型图。我们在Topic之前添加限流功能。可以通过限流功能设置速率限制和大小限制。其中,限速是通过令牌桶算法实现的,即每秒向桶中放入多少令牌,每秒消耗多少速度,或者写入主题多少数据。以上两种配置都支持动态修改。后台监控我们还搭建了监控后台,监控消息的全链路过程,包括:消息的全链路跟踪,覆盖消息生产、消费、过期的全生命周期。消息生产、消费曲线、消息生产异常告警、消息累积告警、通知哪个IP消费过慢其他功能:HTTP方式生产、消费消息Topic消费权限设置,Topic只能被指定群组消费,防止乱上网订阅支持新消费组从***位置消费(默认是从**文章开始消费)广播方式消费进度同步(服务器显示进度)以上是同程艺龙在消息系统建设中的实践。