当前位置: 首页 > 科技观察

ApacheFlink在蔚来汽车的应用

时间:2023-03-19 23:20:43 科技观察

本文整理自蔚来汽车大数据专家、架构师吴江在FlinkForwardAsia2021行业实践环节的演讲。主要内容包括:NIO实时计算的发展历程实时计算平台实时看板CDP实时数据仓库其他应用场景1.NIO实时计算的发展历程2018年5月左右,我们开始接触实时计算的概念,最初使用SparkStreaming做一些简单的流计算数据处理;2019年9月,我们引入了Flink,通过命令行提交,包括对整个作业的生命周期进行管理;1月21日,我们推出实时计算平台1.0,2.0版本正在开发中。2、实时计算平台在实时计算平台1.0上,我们编译代码,将jar包上传到服务器,通过命令行提交。这个过程存在很多问题:第一,所有过程都是手工的,非常繁琐,容易出错;第二,缺少监控,Flink本身内置了很多监控,但是没有自动添加的方式,还是需要手动做配置;另外,任务的维护也很麻烦,一些不熟悉操作的开发人员容易出问题,出现问题后很难排查。实时计算平台1.0的生命周期如上图所示。任务编写完成后,打包成jar包上传提交,后续任务的启动、停止、恢复、监控等都可以自动完成。作业管理主要负责创建、运行、停止、恢复和更新作业。日志主要记录Flink任务提交时的一些日志。如果是运行时日志,通过Yarn集群中的日志查看有点麻烦。关于监控告警模块,首先metrics监控主要是利用Flink内置的指标上传到Prometheus,然后配置各种监控接口。Alarms也是使用Prometheus的一些指标来设置规则,然后设置告警。Yarn负责整个集群资源的管理。上图是实时计算平台1.0的界面,整体功能比较简单。上图是实时计算平台2.0。与1.0相比,最大的区别在于蓝色部分。实时计算平台的形式可能没有一个统一的标准,这与每个公司自身的情况密切相关,比如公司本身的规模和规模,公司在实时计算平台上的资源投入等等等,最终应该应用在实时计算平台上,公司本身的现状就是最好的标准。在2.0版本中,我们增加了对从开发到测试的两阶段功能的支持。简单介绍下它们的具体功能:FlinkSQL:是很多公司的实时计算平台都支持的功能。它的优点是可以降低使用成本,比较简单易用。空间管理:不同部门和不同群体可以在自己的空间创建和管理工作。有了空间的概念,我们就可以利用它来做一些权限控制,比如我们只能在自己有权限的空间里进行一些操作。UDF管理:在使用FlinkSQL的前提下,可以使用UDF基于SQL的语义进行功能扩展。另外,UDF也可以用于Java和Schema任务,可以将一些常用的功能封装到UDF中,降低开发成本。它还有一个很重要的调试功能,可以简化原来的调试过程,让用户感觉不到。实时计算平台2.0的实现最大的影响就是减轻了数据团队的负担。在我们原来的开发过程中,往往需要数据团队的介入,但实际上很大一部分工作是比较简单的,比如数据同步或者简单的数据处理,不一定需要数据团队的介入。我们只需要让实时计算平台更加完善、易用、简单,其他团队就可以使用FlinkSQL来完成以上简单的工作。理想情况下,他们甚至可以在不知道Flink相关概念的情况下做一些Flink。发展。比如后台人员在做业务端开发的时候,一些比较简单的场景不需要依赖数据团队,大大降低了沟通成本,加快了进度。最好在部门内部形成一个闭环。这样一来,每个角色实际上都会感到更快乐。产品经理的工作也会变得更加轻松。需求阶段不需要引入太多的团队,工作量也会减少。因此,这是以技术方式优化组织流程的一个很好的例子。3、实时看板实时看板是一个比较常见的功能。在具体实施中,我们主要发现了以下难点:一是数据上报滞后。比如业务数据库出现问题后,需要中断CDC访问,包括后续写入Kafka。如果kafka集群负载高或者kafka有问题,也会中断一段时间,造成数据延迟。上述延迟在理论上是可以避免的,但在实践中很难完全避免。此外,还有一些理论上无法完全避免的延迟,例如用户流量问题或信号导致操作日志无法实时上传等。二是流批一体化。主要看历史数据和实时数据能否统一。第三,实时选择维度。实时看板可能需要灵活选择多个维度值。比如你想先看北京的活跃用户数,然后是上海的活跃用户数,最后是北京+上海的活跃用户数。这个尺寸可以根据您的需要灵活选择。四是指标验证。离线情况下指标的验证相对简单。比如你可以做一些数据分布,看看每个分布的大概情况。也可以将ODS层数据的计算与中间表进行对比,做交叉。核实。但实时比较麻烦,因为实时处理一直在进行,有些情况很难重现,也很难验证指标的范围或分布。实时看板一般有两个要求:第一,延迟。不同的场景对延迟的要求不同。例如,在某些场景下,数据可以延迟1-2分钟,但在某些场景下,只允许延迟。几秒钟。不同场景下实践的技术方案的复杂度不同。其次,要兼顾实时和历史看板的功能。在某些场景下,除了要看实时数据变化,还需要结合历史数据进行对比分析。实时数据和历史数据应该统一存储,否则可能会出现很多问题。首先,表结构在实现时比较复杂。在查询的时候,可能需要判断哪些时间段是历史数据,哪些时间段是实时数据,然后拼接,这样会导致查询实现成本很高。其次,历史数据切换时容易出问题。例如,每天早上定期刷新历史数据。如果此时历史任务出现延迟或错误,很容易导致检测到的数据错误。我们内部对实时看板延时要求比较高,一般在秒级以内,因为我们希望大屏幕上的数字一直在跳动变化。传统方案一般采用pull的方式,比如每秒查一次数据库,实现起来比较困难,因为一个页面会包含很多指标,需要同时发送很多接口来查询数据。返回内部是不可能的。另外,如果很多用户同时进行查询,负载会很高,时效性更难保证。因此,我们采用了push的方式。上图是具体的实现架构图,主要分为三层。第一层是数据层,是Kafka的实时数仓。这些数据通过Flink处理后,实时推送到后台,后台实时推送到前端。通过websocket实现后台与前端的交互,实现所有数据的实时推送。在这种需求场景下,有些功能会比较复杂。举个简单的例子,比如统计实时去重的UV个数,其中一个维度是城市,一个用户可能对应多个城市。选择上海和北京的UV数,意味着上海和北京的人会被放在一起去重,计算去重后的实时UV数据,是一件比较麻烦的事情。从离线的角度来看,选择多个维度非常简单。选择好维度后,可以直接抓取数据进行聚合。但是在实时场景下,需要聚合的维度是事先指定好的。第一种解决方案是将所有在Flink状态下出现过的用户ID和维度存储起来,直接计算出所有可能的维度组合UV,然后将更新后的UV推送到前端。但是这种方式会增加大量的计算成本,并且会导致维数爆炸,导致存储成本急剧增加。第二种方案的架构图如上。我们把sink作为一个stream的核心,端到端作为一个整体作为一个stream应用,比如Flink中的数据访问,数据处理和计算,然后到后台,通过push到前端网络套接字作为一个整体。要考虑的应用程序。我们会将每个用户的所有维度值存储在Flink中,Flink在后台推送的用户详情也会存储在每个城市的用户ID列表中。Flink有一个非常关键的排除功能。如果用户已经出现,则在Flink阶段不会将变更推送到前端和后台;如果用户没有出现,或者用户已经出现但是城市还没有出现,则将用户和城市的组合推送到后台,保证后台能够拿到各个城市的用户id去重列表。前端选择维度后,可以在后台对不同维度的用户ID进行增量订阅。这里需要注意两点:首先,在刚打开前端选择纬度的时候,有一个初始化过程,会从后台读取选中维度的完整用户ID做一个集合,然后计算UV的数量。第二阶段新用户ID到达后,会通过Flink推送到后台,后台只会将增量ID推送到前端。然后,因为前端已经保存了之前的collection,对于增量ID,可以直接用O(1)的时间计算出一个新的collection,计算出它的UV数。可能有人会问,这个方案下用户多了怎么办?前端会不会占用太多资源?首先,从我们目前的实际使用场景来看,这个方案已经够用了。如果以后ID数量激增,使用bitmap也是一种选择,但是仅仅使用bitmap并不能解决问题。因为不同公司的用户ID生成规则不一样,有的是自增ID,有的是非自增ID甚至不是一个值,那么就需要做映射,如果是离散的还需要做一些额外的处理价值。第一种方案将ID从1重新编码,使其更小且连续。目前大部分场景下你可能会用到RoaringBitMap。它的特点是如果ID很稀疏,在实际存储中会使用列表而不是位图来存储,达不到减少内存占用的目的。.因此,尽量让ID空间变小,让ID值更连续。但这还不够。如果该ID以前从未出现过,则需要为其重新分配一个ID。但是,在处理这些数据的时候,Flink任务的并行度可能会大于1。这时候如果多个节点同时消费数据,可能会遇到相同的新ID,如何分配一个对应的新mappedsmallID到这个ID?比如一个节点查询后需要生成一个新的ID,同时又要保证其他节点不会再次生成相同的ID,可以通过对新的ID做一个唯一索引来保证。索引创建成功后,会生成一个新的ID。失败的节点可以重试获取当前ID映射的操作,因为刚才其他节点已经生成了这个ID。因此它将成功重试映射阶段。此外,还需要考虑一个场景。比如用户注册完成后,会立即产生一些行为,而用户注册和一些业务模块的行为表可能是不同业务部门开发的,也可能有不同的数据库,不同的表,即使是不同类型的数据库,上述情况的访问方式也会有所不同,这可能会导致注册数据流比行为数据流稍晚到达,尽管它是先注册的。这会导致什么问题?目前看来是不可能的。它只需要在行为数据流和新用户注册数据流之间共享一个ID映射。综上所述,一个好的架构,即使面对数据量的激增,也不需要在架构层面进行大的改动,只需要在细节上重新设计。第二个问题,前端会不会有很大的计算负荷?答案是不。人数去重虽然是前端做的,但是只有前端第一次加载的时候才需要拉取所有用户,后续增量的用户id会直接添加到当前集合中在O(1)中。因此前端的计算负担很低,整个过程完全流式处理。第三个问题,同时访问实时报表的用户很多怎么办?从目前的架构来看,对Flink和后台端基本没有影响。建立一个websocket连接。但是由于实时报表主要是内部使用,外部不使用,所以并发访问量不会太多。而且,我们把数据ID去重的一部分责任放在了前端。即使有多个用户同时访问,计算责任也会分散到不同的用户浏览器上,实际上不会有太大的负载。4、CDPCDP为运营平台,负责部分后台工作。我们的CDP需要存储一些数据,比如ES中的属性数据,Doris中的详细行为数据包括统计数据,TiDB中的任务执行状态等。还有一些针对实时场景的应用。首先是属性需要实时更新,否则可能会导致运行效果不佳。二是行为聚合数据有时需要实时更新。5、实时数据仓库实时数据仓库的主要考虑如下:元信息管理,包括目录管理。分层,如何进行合理的分层。建模,实时数仓应该如何建模,它和离线数仓的建模方式有什么区别?及时性,延迟越低越好,链路越短越好。上图是我们目前的实时数仓架构图。整体上很像离线数仓,也有原始层、DWD层、DWS层和应用层。不同的是它有一个维度层(DIMlayer),里面包含了很多不同的存储介质。维度信息可以存储在TiDB中,维度表可以通过AIO访问;它也可以使用TemporalJoin存储在Hive中。协会;有些数据总是在变化,或者需要做一些基于时间的关联,可以把数据放在Kafka中,然后使用Broadcast或者TemporalJoin来关联。左边是我们正在规划的能力。一是血缘关系,有助于问题的溯源和变更影响的评估;二是元信息管理,我们希望把所有的数据都表化,直接用SQL就可以搞定;三是权限管理,需要对不同的数据源和表进行权限管理;第四是数据质量,如何保证数据质量。以下是对这些未来计划的详细描述。第一,目录管理,这个功能还没有开发。我们想为所有的数据源创建一张表,不管里面的数据是维表还是其他表,不管存在于MySQL还是Kafka中,创建表之后,这些细节都可以被屏蔽掉,可以很方便的通过SQL使用。二是合理分层。分层会在很多方面影响实时数据仓库。首先,层数越多,延迟越大。实时数仓是否需要这么多层,值得深思。其次,实时数据的质量监控比离线数据更复杂,因为它是连续处理的。层数越多,就越难发现问题、定位问题、追溯或重现问题,包括数据集成的分布。监视器。最后,如何进行合理的分层。一定要尽量减少层数,对业务功能进行合理的垂直划分。如果不同业务之间的交集很少,尽量在各自的业务领域建立自己的独立层。三、建模。这是离线数仓中非常重要的一个部分,因为离线数仓的使用者很大一部分是分析师,他们的日常工作就是使用SQL查询和分析数据。这时候就必须考虑易用性。比如大家喜欢又大又宽的表,所有相关的字段都放在一张表里。因此,在对离线数仓进行建模和设计表结构时,需要尽可能多地添加维度。实时数仓面向更多的开发者,所以更加强调实用性。因为在实时数仓的需求下,宽表每增加一个字段都会增加延迟,尤其是维数的增加。因此,实时数仓的场景维表和建模更符合实际需求。第四,及时性。实时数仓本身还是需要一个raw层的,但是在时效性比较高的场景下,比如同步一些线上的数据,这些数据最后的同步快充也是用于线上业务的,所以需要尽量减少链路和减少延误。比如一些FlinkCDC的方法可以通过减少中间层,这样不仅可以减少整体的链路和延迟,也可以减少由于链路节点的减少而出现问题的概率。对于低延迟要求的内部分析场景,尽量使用实时数仓,减少数据冗余。6.其他应用场景其他使用场景包括CQRS应用。比如业务部门的功能更多的是增删改查或者传统的数据库操作,但是以后还是会有数据分析的场景。这时候使用业务库进行分析是一种不正确的方法,因为业务库Analysis的设计一开始就没有考虑,更适合使用分析型OLAP引擎来做这项工作。这样也就把业务部门负责的工作和数据部门负责的工作分开了,各司其职。此外,还有指标监控和异常检测。比如通过Flink进行各种指标的实时检测,它会加载一个机器学习模型,然后实时检测指标的变化是否符合预期,与预期的差距有多大。您还可以设置一个区域值来检测指标的异常情况。实时数据的场景越来越多,人们对实时数据的需求也越来越多,未来我们还会继续探索实时数据。我们在流批一体的实时和离线存储统一方面已经有了一些成果,我们也会在这方面投入更多的精力,包括FlinkCDC是否真的可以减少链接,提高响应效率,这也是一个问题我们会考虑问题。