随着企业业务的发展,在微服务化大趋势下,各个服务的拆分,服务之间的通信交互越来越多。与单体服务不同,微服务之间的数据往往需要额外的手段来保证一致性,比如事务消息、异步任务补偿等。除了最大程度保证机制,如何及时观察和发现数据不一致也很重要。本文介绍虾皮金融产品团队设计开发的实时查询系统(Real-timeCheckingSystem)。接入方便,只需根据校验需求配置相应的校验规则,实现规则热加载,无需侵入业务即可使用。在对系统数据进行实时监控和比对的前提下,及时发现数据的不一致。自上线以来,该系统已在Shopee多个产品线推广使用,帮助不同团队快速发现线上数据不一致,保护数据安全。1.背景1.1系统数据不一致在日常的开发迭代中,我们会发现系统数据有时并没有我们想象的那样发生变化。常见场景如:用户进行了还款(Repay),系统A收到还款请求后调用系统B,对被冻结的账户进行解冻,但由于某些原因(如系统故障、网络分区等),解冻如果请求没有到达B,或者解冻成功的响应没有返回给A,此时支付已确认但未解冻,或支付未确认但已解冻,导致用户投诉或经济损失。图。1。数据不一致造成此类问题的原因通常包括:代码逻辑错误、并发场景处理不当、基础组件(网络、数据库、中间件)失效、缺乏跨系统原生一致性保证等。随着业务的扩展,企业中的应用越来越多,很多单体应用(MonolithicApplication)被拆分并转化为微服务(Microservices)。在分布式场景下,失去了数据库事务的支持,需要解决数据一致性问题。性问题。有许多解决方案可以确保数据的一致性。在服务单一,不同组件之间(比如跨Database,不同存储中间件)缺乏事务支持的情况下,可以使用本地事务表+补偿任务的组合,将主表数据与校验任务结合起来通过事务写入,然后使用异步任务不断检查目标数据是否一致并进行补偿,可以达到最终一致性;在跨服务场景下,Saga模式提供了通过可靠消息和服务回滚事务的能力,实现分发形式化事务。但是对于重要的业务,无论采用何种一致性方案,都需要提供额外的检查、验证、自下而上的手段,从而导致大量的业务验证和对账需求。通过特定的手段保证服务之间的数据一致性,并设计了一个非侵入式的旁路系统来进行数据校验和验证,这是微服务架构下的典型搭配。图2。数据一致性保险1.2离线检查的缺陷常见的离线数据检查可以使用定时任务,根据一定的过滤条件,从不同的数据源中获取特定的数据,然后进行比较。该方案的伪代码如下:funcCheck(){//获取上游update_time落在[a,b)upstreamRows:=QueryUpstreamDB(a,b)foruniqueKey,sourceData:=rangeupstreamRows{//的数据行为每一个上游数据搜索对应的下游数据targetData:=QueryDownstreamDB(uniqueKey)//比较上下游数据Compare(sourceData,targetData)}}时效性低是这类查表方案的通病。检查操作通常在异步任务中定时执行,执行时间与数据变化时间之间存在一定的延迟,定时任务的查询条件也会影响检查目标。当数据出现异常时,无法及时发现问题,需要等下一次定时任务执行后才能发现问题。引入额外的表扫描开销也是一个不容忽视的问题。在数据量很大的情况下,尤其是有大量INSERT操作的场景,如果要查询,需要SELECT上下游目标数据。为了在不影响正常业务的情况下及时完成校验任务,开发者可以将查询转移到从库,甚至可以引入一个专用于校验任务的从库,但是这种查表校验方案在资源方面使用和实施的复杂性。两者都不理想。同时,由于查表得到的结果只是当前的数据版本,两次查表之间数据可能发生了多次变化,定时任务无法感知和观察到每一次状态变化。当数据频繁UPDATE时,在某些场景下也存在一定的验证和检测难度。因此,要实现更好的数据验证,我们需要考虑以下目标:实现秒级验证。尽量减少数据库查询。检查数据更改,而不是数据快照。接入方式简单灵活。2.实时数据检查为了更好地检测数据不一致,虾皮金融产品团队于2021年年中设计并实施了实时检查系统(RCS)。RCS具有以下核心优势:秒级数据校验。不侵入业务逻辑。可配置的访问。自上线以来,RCS帮助团队及时发现了多个数据问题,原因归纳为以下几个方面:代码逻辑BUG:包括幂等处理、并发问题、业务逻辑错误等系统运行环境:DB异常、网络抖动、MQ异常等。发现Bug的类型本节主要介绍RCS的实现,包括系统架构和验证过程、验证性能优化、消息通知机制等。2.1系统架构和检查过程在系统设计上,我们将RCS分为三层:数据抓取层、数据校验层、结果处理层。图4。SystemLayers2.1.1Changingdata要获得实时验证,顾名思义,我们需要关注两个关键点:“实时”和“验证”。数据获取层负责实现实时目标。通过研究不同的CDC(ChangeDataCapture,变更数据捕获)方案,我们采用Log-Based方案提供时效性保障。延伸阅读CDC模式用于感知数据变化,主要分为以下四类:Timestamps,基于update_time或类似字段查询获取变化的数据。表差分,获取完整的数据快照进行比较。Triggers,为DDL和DML设置Trigger,通过附加操作将改变的内容记录到数据库中。Log-Based,一个典型的例子就是MySQLbinlog和MongoDBoplog的使用。其中,Timestamps方案和TableDifferencing都是由定时任务驱动,时效性较弱。Timestamps方案无法感知删除的数据,使用时需要用软删除代替;TableDifferencing方案弥补了这个缺点,但是多次获取完整的数据会使整个方案非常繁琐。Triggers方案和Log-Based方案都是获取数据变化而不是数据快照,但是Triggers感知后,用特定的语句记录下来,本质上是写操作,仍然给数据库带来了额外的负担。当MySQL产生数据变化时,高可用的binlog同步组件会获取对应的binlog并投递给Kafka,从而获取变化数据的数据值进行校验。图5。在DataFetchingLayer实际使用中,需要查询的数据不一定都在MySQL中。比如我们还要查看MySQL和MongoDB的数据,MySQL和Redis的数据。为此,业务系统也可以通过自己发布特定格式的Kafka消息来接入,保证接入的灵活性。2.1.2数据校验数据校验层负责对接收到的数据流进行处理,包括获取特定的校验规则,对接收到的数据进行暂存或比对。RCS对binlog数据进行抽象,提取出一套通用的可配置验证规则。用户只需填写相应的规则,即可实现自助接入。规则定义示例如下:图6.ConfigExample不难想象,不同系统之间的数据变化是有顺序的,变化的消息按顺序被RCS接收。因此需要存储先到的数据作为后续比较的目标,后到的数据按照规则与已有数据进行比较。图7。CheckFlow为了描述方便,这里定义了几个名称:Dataupstream:最先到达RCS的数据为upstream。数据下行:晚到达RCS的数据为下行。检查项:数据检查要求,包括上游数据和下游数据。例如:系统A和系统B需要查询用户的资金情况。图8。Kafka数据校验流程以下面的校验为例。需要在10秒内判断数据是否一致。整体校验流程可以简单描述为:比较数据到达,校验,删除Rediskey;比较数据如果没有到达,则判断延迟队列中的数据。(图8)Checkpoint的上游数据到达,暂存Redis和延迟队列。(图8)RCS等待校验项下行数据:(图9)延迟队列到达时间后,再次校验Redis中是否有对应数据:如果存在,如果超过校验时间阈值,发送异常告警并删除Rediskey;如果不存在,则进行检查。图9。DelayQueueCheckFlow2.1.3消息通知机制RCS的目标是及时发现数据不一致。因此,虾皮企业IM(SeaTalk)的机器人连接到ResultHandlingLayer,发出告警。未来还将开放告警接口,方便其他消息类应用的扩展和接入。我们设计了四种消息通知机制:MismatchNoticeAggregatedNoticeRecoveryNoticeStatisticalNoticeMismatchNotice针对一般场景验证失败,及时通知相应业务人员,快速定位问题原因,修复数据。但当大量数据出现不一致时,AggregatedNotice会替换并聚合报警,避免影响值班人员的正常阅读。RCS也会持久化校验失败的数据,所以它有恢复意识的能力。当异常数据被恢复时,RecoveryNotice会发送一条消息,告知用户恢复了哪些不一致以及间隔多长时间。最后,StatisticalNotice会向用户上报一般统计数据,包括DB主从延迟、日常验证成功率等。2.2验证功能的演化自系统上线以来,越来越多的团队正在接入或自己部署RCS,对应的业务场景也不一样。早期的验证规则很难满足不同团队的验证需求。2021年底,虾皮金融产品研发团队对DataCheckingLayer进行了一系列扩展,旨在降低维护成本,更通用地支持不同团队的使用。2.2.1对等/映射校验在最早推出的版本中,RCS系统包含了对等和状态映射校验的功能,这是针对集团面临的实际场景设计的,满足日常使用的需要。检查系统主要处理上下游系统之间数量和状态的变化。通常,我们可以通过如下方式获取binlog核心字段示例和校验逻辑:等价性校验假设先收到系统A的binlog消息,暂存Redis,在指定时间内也收到系统B的binlog消息:loan_amount为200,需要找到一条系统A对应的binlog,并且order_amount必须匹配;loan_status为4,需要找到系统A对应的一个binlog,而order_status必须为2。根据系统B的binlog的特点,发现配置中有两条验证规则:用于验证single记录不同系统之间产生的变化,等价和映射检查可以覆盖大部分场景。但是,由于这两个检查的逻辑是固定的,如果业务方有不同的检查需求,则需要新的代码逻辑实现。为此,研发团队考虑将验证逻辑交给用户来描述,从而诞生了表情验证的功能。2.2.2表达式检查如果我们考虑下面的binlog例子,不同系统之间的数据模型设计是不一致的,字段不是一一对应的。图11。表达式检查系统A记录订单金额为100,系统B记录订单支付金额为30,贷款金额为70。需要检查的是系统Aorder_amount是否等于系统Bpaid_amount+loan_amount,原来的设计是支持不了的。为此,我们引入了表达式评估方案。当binlog到来时,用户会通过一个返回值为布尔类型的表达式来描述自己的验证逻辑,如:a.order_amount==b.loan_amounta.order_status==2&&b.loan_status==4判断2.2.2中的合计场景:a.order_amount==b.paid_amount+b.loan_amount兼容判断2.2.1中的场景:表达式校验方案下,几乎所有单数据校验场景之间可以覆盖两个系统,这种方案的好处是研发团队不需要费心去提供新的计算、映射和/或非逻辑实现支持,大大降低了维护成本。2.2.3动态配置数据校验在电商和金融场景中,有一些动态数据,比如费率、活动折扣等,会随着业务和运营计划实时变化。这类数据通常存储在配置表中,无法通过简单的表达式定义,而且不同业务系统中配置表的结构设计不同,难以在校验系统的代码中声明。为了满足这种场景,RCS在业务系统中引入了对SQL查询的支持。当获取到新的binlog时,会检查binlog满足的校验规则。用户在校验规则中配置要执行的SQL语句,库分表规则由校验系统执行并得到比对内容,然后进行表达式校验:从binlog中得到的当前顺序是0.5。根据配置信息执行SELECT语句查询实时汇率rate。执行表达式检查a.order_rate==rate。此外,RCS还可以支持JSON字符串检查。比如系统A需要查询order_rate,但是存储的order_rate信息是JSON字符串,rate_info={"decimal_base":"10000","order_rate":"0.5"}。您可以在RCS校验规则中自定义JSON解析表达式,提取出真正需要校验的字段。3.性能RCS系统的性能主要取决于数据获取层和数据校验层。DataFetchingLayer的性能代表实时获取变化数据的能力,受binlog解析(CPU密集型任务)和Kafka的消息持久化(I/O密集型任务)影响。业务团队可以根据需要选择相应的硬件搭建CDC模块。以我们的使用场景为例,每秒可以投递的消息数超过20K。数据校验层负责数据校验。为了测试RCS的性能极限,DataFetching使用Kafka发送源数据,检查系统部署在单机上。测试结果表明,RCS每秒可以完成10K+的检查。详细数据如下:ComponentMachineKafka3*48Core128GBRedis3*48Core128GBReal-timeCheckingSystem1*48Core128GBcheckentryTPSCPUCost1entry14.3K454%2entries12.0K687%3entries10.4K913%从分析压测结果,RCS的性能瓶颈主要取决于Redis集群的性能,单次检查耗时0.5ms左右。当然,RCS支持集群部署。作为Kafka的消费者,可以利用Kafka消费者组的Rebanancing机制实现动态扩缩容机制。4.总结虾皮金融产品团队在2021年推出的RCS目前正在多条产品线中推广使用。主要解决传统T+1离线数据校验延迟高,业务耦合紧,新业务上线时会带来额外成本。发展负担问题。RCS通过灵活配置验证规则、表达式场景覆盖、Log-BasedCDC解决方案,提供近实时的数据验证解决方案,最大限度降低数据不一致带来的资金和信息安全风险。我们也欢迎不同的用户和团队接入或部署。在后续的更新迭代中,RCS将进一步提升验证的性能,以支持业务量增长带来的验证需求。本文作者为后端研发工程师一中和松涛。来自Shopee金融产品团队。Jiekun,后端研发工程师,热衷于分布式系统和Kubernetes。来自Shopee平台外广告团队。
