当前位置: 首页 > 科技观察

架构师详解:从0到1搭建大数据平台

时间:2023-03-17 11:44:48 科技观察

如今,大数据在各行各业的应用越来越广泛:运营基于数据关注运营效果,产品基于数据分析关注转化率,以及基于数据效果的开发措施系统优化等。美图公司拥有美拍、美图秀秀、美颜相机等十余款APP。每个APP都会根据数据进行个性化的推荐、搜索、报表分析、反作弊、广告投放等。比较整体业务对数据的需求。多且应用广泛。因此,美图数据技术团队的业务背景主要体现在业务线多、应用面广。这也是我们做数据平台最重要的原因之一,是业务驱动。美图数据平台整体架构如图所示为我们数据平台的整体架构。在数据采集部分,我们搭建了一个采集服务器日志系统Arachnia,支持各个app集成的客户端SDK,负责采集app客户端数据;同时,还有基于DataX的数据集成(导入导出);Mor爬虫平台支持可配置任务开发,用于爬取公网数据。数据存储层主要根据业务特点选择不同的存储方案。目前主要使用HDFS、MongoDB、Hbase、ES等。在数据计算部分,目前离线计算主要是基于Hive&MR,实时流计算是Storm、Flink,以及另一个自研的位图系统Naix。在数据开发方面,我们搭建了一套数据车间、数据总线分发、任务调度等平台。数据可视化与应用部分,主要是根据用户需求搭建一系列数据应用平台,包括:A/B实验平台、渠道推广跟踪平台、数据可视化平台、用户画像等。右边展示的是一些基础的每个组件可能依赖的服务,包括地理位置、元数据管理、唯一设备标识等。下图是基本数据架构的流程图。在一个典型的lamda架构中,从左边的数据源采集开始,Arachnia和AppSDK分别将服务端和客户端数据上报给代理服务采集器,并将数据写入kafka,然后实时流会经过一层数据分发,最终业务消费kafka数据进行实时计算。离线时,ETL服务会负责将数据从Kafka转储到HDFS,然后异构数据源(如MySQL、Hbase等)主要基于DataX和Sqoop进行数据导入导出,最后将数据写入到各种存储层,最后通过统一的外部API连接到业务系统和我们自己的可视化平台。数据平台的阶段性发展企业级数据平台建设主要分为三个阶段:初期基本使用免费的第三方平台。这个阶段的特点是可以快速整合,看到app的一些统计指标,但是缺点也很明显,没有原始数据,无法实现那些第三方提供的基础指标以外的其他分析和推荐。所以有一个从0到1的过程,这样我们才有数据可以用;数据有了之后,由于业务线和需求的爆发,我们需要提高开发效率,让更多的人参与数据开发和使用数据。不局限于数据研发人员的使用,因此涉及到将数据和计算存储能力开放给各个业务线,而不是掌握在自己手中;数据打通后,业务方会问数据任务能不能跑得更快,能不能秒级输出,能不能更实时;另一方面,为了满足业务需求,集群的规模越来越大,因此会开始考虑如何在满足业务需求的同时实现更多的资源节约。美图目前正处于第二和第三阶段的过渡期。在不断提升数据开放性的同时,也在逐步提升查询和分析的效率,并开始考虑如何优化成本。接下来重点介绍我们平台在0到1和数据开放两个阶段的实践和优化思路。From0to1从0到1解决从数据采集到最终可用的数据。图4展示了数据采集的演变过程,从开始使用友盟、flurry等免费第三方平台,到快速使用rsync将日志同步到服务器存储计算,再到后来快速开发一个简单的python脚本支持业务服务器上报日志,最后我们开发了服务端日志采集系统Arachnia和客户端AppSDK。数据采集??是数据的源头,是整个数据链路中比较重要的一个环节,更需要关注:数据是否完整,数据是否支持实时上报,数据埋点是否规范准确以及维护和管理成本。因此,我们的日志采集系统需要满足以下要求:能够集管理和维护于一体,包括Agent的自动部署、安装、升级和卸载、配置热更新、延时监控;在可靠性方面,至少要保证至少一次;以IDC为例,需要能够支持多个IDC的数据采集并汇聚到数据中心;在资源消耗方面,应该是越小越好,尽量不影响业务。基于以上需求,我们没有使用flume、scribe、fluentd,最终选择了自己开发一个采集系统Arachnia。上图是Arachnia的简单架构图,通过系统大脑进行集中管理。puppet模块主要作为单个IDC内Agent指标的统一汇总,转发指标或者配置热更新命令。CollectorAgent主要负责运维平台安装,启动后从大脑中拉取配置,开始采集并上报数据给采集器。然后看Arachnia的实际优化,首先是可靠性保证至少一次。很多系统使用WAL记录上报失败的数据,重试再上报,避免上报失败丢失。我们的做法是去掉WAL,增加一个coordinator来统一分发和管理tx状态。在收集开始前,协调器会发送一个txid,source收到信号后开始收集,并将数据发送给sink,发送后会acktx,告诉协调器已经commit了。协调器会检查并确认,然后向源和汇发送提交信号以更新状态。最后tx之后,source会把采集进度更新到持久层(默认是本地文件)。如果该方法前3步出现问题,则数据发送不成功,不再重复;如果接下来的4个步骤失败,数据将被重复并且tx将被重播。基于上述至少一次的可靠性保证,部分业务方要求唯一性。我们支持为每个日志生成一个唯一的ID。数据采集??系统的另一个主要做法是唯一定位一个文件,为每条日志创建一个唯一的MsgID,以便业务方后期根据MsgID在出现重复日志时进行清理。我们一开始用的是filename,后来发现很多业务方会改filename,所以改成了inode,但是inodelinux会回收再利用,最后用inode和文件头内容作为hash作为fileID。而MsgID由agentID&fileID&offset唯一确认。数据上报后,收集器负责解析协议,将数据推送给Kafka,那么Kafka是如何落地HDFS的呢?先来看美图的诉求:支持分布式处理;涉及业务线多,数据格式多,需要支持多种数据格式的序列化,包括json、avro、特殊分隔符等;支持机器故障、服务问题等导致的数据落地失败,需要重跑,需要有比较快的重跑能力,因为一旦这个块出现故障,会影响后续业务线的数据使用;支持可配置的HDFS分区策略,可以支持相对灵活的各业务线不同的分区配置;支持一些特殊的业务逻辑处理,包括:数据校验、过期过滤、测试数据过滤、注入等;图7显示了数据服务的实现。基于Kafka和MR的特点,针对每一个kafkatopicpartition,组装mapper的inputsplit,然后设置一个mapper进程来处理和消费这批kafka数据,经过数据分析、业务逻辑处理、验证和过滤,最后根据分区规则Land写入目标HDFS文件。登陆成功后,本次处理的meta信息(包括topic、partition、startoffset、endoffset)将存储到MySQL中。下次处理时,会从上次处理的offset开始读取消息,新一批数据消费开始落地。实现基本功能后,难免会遇到一些问题。例如,不同业务主题的数据量级不同。这会导致一个任务要等待分区数据量最大、处理时间最长的mapper结束,才能完成整个任务。.那么我们如何解决这个问题呢?制度设计有一个不成文的原则:长线分必合,长线合必分。对于数据倾斜的问题,我们也采用了类似的思路。首先将数据量级较小的分区合并为一个inputsplit,这样一个mapper就可以处理多个业务的分区数据,最后写入多个文件。另外,对于数据量大的分区,可以进行分段,分成多个mapper处理同一个分区。这样可以实现更加均衡的mapper处理,可以更好的应对业务量的突增。除了数据倾斜问题,还有各种原因导致数据dump到HDFS失败,比如kafka磁盘问题、hadoop集群节点宕机、网络故障、外部访问权限等,导致数据异常ETL程序,最终可能导致HDFS文件关闭失败导致文件损坏等,需要重新运行数据。我们数据的时间分区基本上是按照天来的。使用原来的方法可能会导致天粒度的文件损坏,无法读取分析。我们采用两阶段的处理方式:mapper1先将数据写入一个临时目录,mapper2将hdfs临时目录中的数据追加到目标文件中。这样当mapper1出现故障时,可以直接重跑batch,而不用重跑一整天的数据;当mapper2失效时,可以直接从临时目录中用merge数据替换最终文件,减少re-ETLdaygranularity的过程。在数据的实时分发和订阅中,写入kafka1的数据基本是每个业务的全量数据,而需求端的业务大多只关注某个事件或一小类数据,而不是为任何业务消耗全部数据。处理,所以我们加了一个实时分发的Databus来解决这个问题。Databus支持业务方自定义分发规则向下游kafka集群写入数据,方便业务方订阅和处理自己想要的数据,支持更小粒度的数据复用。上图展示了Databus的实现。其主体实现了基于Storm的数据总线拓扑结构。Databus有两个spout,一个支持拉取全量和新增规则,然后更新到下游distributionbolt更新缓存规则,一个是从Kafka消费的spout。distributionbolt主要负责解析数据,匹配规则,向下游的kafka集群发送数据。数据开放有了原始数据,可以做离线实时的数据开发之后,数据开发的需求就会井喷,数据研发团队就会不堪重负。因此,我们通过数据平台开放数据计算和存储能力,赋予业务方开发数据的能力。元数据管理、任务调度、数据集成、DAG任务编排、可视化等实现不再赘述,主要介绍美图在数据开放后稳定性方面的实践经验。数据开放与系统稳定性相得益彰:一方面,开放后不再由数据类研发人员来做,经常会遇到数据任务非法提交、资源消耗高等问题。计算和存储集群带来了很多麻烦;另一方面,其实也是因为数据的开放性,我们不断推动提升系统稳定性的需求。对于很多高资源和非法的任务,我们首先考虑是否可以在HiveSQL层面做一些验证和限制。如图13所示,将HiveSQL的整个分析编译成可执行MR的过程:首先基于Antlr进行语法分析生成AST,然后进行语义分析,基于Antlr生成JAVA对象QueryBlock在AST上。基于QueryBlock生成逻辑计划后,做逻辑优化,最后生成物理计划,进行物理优化,最后转化为可执行的MR任务。我们主要是在语义分析阶段生成QueryBlock之后,拿到它之后做了很多语句校验,包括:非法操作,查询条件限制,高资源消耗校验判断等。稳定性方面的第二个实践主要是优化集群,包括:我们已经全面升级了Hive和Hadoop集群。主要是低版本修复了一些问题,合并了一些社区补丁,后续会在新版本修复;另一个原因是新版本的功能和性能优化。Hive从0.13版本升级到2.1版本,Hadoop从2.4升级到2.7;我们优化了Hive的HA部署。我们将HiveServer和MetaStoreServer拆分开来,分别部署多个节点,避免合并服务部署运行时相互影响;之前,执行引擎基本都是OnMapReduce,我们也在做HiveOnSpark的迁移,逐步把线上任务从HiveOnMR切换到HiveOnSpark;针对日常生活中遇到的一些问题拉一个内部分支做bugfix或者合并社区补丁特性;平台稳定性的最后一个实践是提升权限、安全性,防止集群、非法数据访问、攻击等。提升权限主要有两个部分:API访问和集群。APIServer:上面提到了我们有OneDataAPI,它为各个业务系统提供统一的API来访问数据。这方面主要实现了一个额外的统一认证CA服务。业务系统必须访问CA获取token,然后访问OneDataAPI。通过CA验证后,OneDataAPI是合法的,允许真正访问数据,从而防止业务系统任意访问所有数据指标。.集群:目前主要是基于ApacheRanger统一各种集群,包括Kafka、Hbase、Hadoop等,对集群进行授权管理和维护;以上是美图在搭建数据平台并向各业务线开放后的平台稳定性。完成一些练习和优化。总结首先,在搭建数据平台之前,首先要了解业务,看业务整体体量是不是比较大,业务线是不是比较宽,需求是不是大到严重影响我们生产率。如果答案是肯定的,那么可以考虑尽快搭建数据平台,更高效、更快速地提升数据开发和应用的效率。如果你自身的业务量和需求并不多,不一定非要搭建大数据或者搭建完善的数据平台才能快速满足支撑业务的优先级。在平台建设过程中,需要关注数据质量和平台稳定性,如数据源采集的完整性、时效性、设备唯一标识等,优化和实践平台稳定性,为业务方提供稳定、可靠的平台。在提高分析决策效率,逐步扩大规模后,需要对成本和资源进行优化和思考。