作者:王东宜信科技研发中心架构师目前就职于宜信科技研发中心架构师,负责流计算和大数据业务产品解决方案。曾就职于Naver中国(韩国最大搜索引擎公司)中国研发中心高级工程师,多年从事CUBRID分布式数据库集群开发和CUBRID数据库引擎开发http://www.cubrid.org/blog/news/cubrid-cluster-introduction/专题介绍:DWS背景介绍dbus+wormhole整体架构及技术实现方案DWS实战应用案例前言大家好,我是易信科技研发中心的王东,这是我的***来自社区此次分享,如有不足之处,还请指正,见谅。本次分享的主题是《基于日志的DWS平台实现和应用》,主要分享一下我们目前在宜信做的一些事情。这个主题包含了两个团队很多兄弟姐妹努力的结果(我们团队和汕尾团队的结果)。这次由我代写,尽量介绍给大家。其实整个实现原理上还是比较简单的,当然涉及的技术也很多。我尽量用最通俗的方式表达出来,让大家明白这件事的原理和意义。过程中大家可以随时提问,我会尽力解答。DWS是缩写,由3个子项目组成,后面会解释。一、背景事情的起因是前段时间公司的需要。大家都知道宜信是一家互联网金融公司。我们的许多数据与标准的互联网公司不同。一般来说:玩数据的人都知道,数据是非常有价值的,那么这些数据就存储在各个系统的数据库中。需要数据的用户如何获得一致的、实时的数据?过去有几种常见的做法:DBA打开各个系统的备份数据库。在高峰期(比如晚上),用户分别提取需要的数据。由于抽取时间不同,导致各个数据使用者的数据不一致,数据冲突,重复抽取。相信很多DBA都对这件事很苦恼。公司统一的大数据平台利用Sqoop在业务淡季统一提取各个系统的数据,保存在Hive表中,然后为其他数据用户提供数据服务。这种方式解决了一致性问题,但是时效性差,基本是T+1的时效性。基于trigger方式获取增量变化的主要问题是业务方侵入性大,trigger也带来了性能损失。这些程序都不被视为***。在了解和考虑不同的实现方式后,我们最终借鉴了linkedin的思想,认为如果要同时解决数据的一致性和实时性,更合理的方式应该来自于日志。(此图来自:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)以增量Log作为所有系统的基础。后续数据用户通过订阅Kafka来消费日志。例如:大数据的用户可以将数据保存到Hive表或Parquet文件中,用于Hive或Spark查询;提供搜索服务的用户可以将数据保存在Elasticsearch或HBase中;提供缓存服务的用户可以将日志缓存到Redis或者alluxio中;数据同步的用户可以将数据保存在自己的数据库中;由于kafka日志可以重复消费并缓存一段时间,每个用户都可以消费kafka日志,实现既保持与数据库相同数据的一致性,又可以保证实时性;为什么要使用log和kafka作为基础,而不是Sqoop来进行抽取呢?因为:为什么不使用dualwrite(双重写入)?,请参考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/我不会在这里解释太多。二、整体架构于是我们提出了搭建一个基于日志的公司级平台的想法。让我们解释一下DWS平台。DWS平台由3个子项目组成:Dbus(数据总线):负责从源头实时抽取数据,并用自己的schema转换成约定的json格式数据(UMS数据),并放入在卡夫卡;Wormhole(数据交换平台):负责从kafka读取数据,并将数据写入target;Swifts(实时计算平台):负责从kafka读取数据,实时计算,并将数据写回kafka。图中:日志抽取器和dbus共同完成数据抽取和数据转换,抽取包括全量抽取和增量抽取。虫洞可以将所有日志数据保存到HDFS;它还可以将数据记录到所有支持jdbc的数据库,如HBash、Elasticsearch、Cassandra等;Swifts以配置和SQL的形式支持流计算,包括支持流式Join、lookup、filter、windowaggregation等功能;dbusweb是dbus的配置管理终端,rider不仅仅进行配置管理,还包括Wormhole和Swifts运行时管理,数据质量校验等。由于时间关系,今天主要介绍DWS中的Dbus和Wormhole,介绍一下在需要的时候迅速行动。3、dbus解决方案的日志分析上面提到了dbus的主要解决方案是实时从源头提取日志。这里我们以MySQL为例简单说明一下如何实现。我们知道MySQLInnoDB虽然有自己的日志,但是MySQL的主备同步是通过binlog实现的。如下图所示:图片来自:https://github.com/alibaba/canalbinlog有三种模式:行模式:日志会记录每一行数据被修改,然后修改相同的数据奴隶那边。语句模式:每条修改数据的SQL语句都会记录在master的bin-log中。当slave在复制的时候,SQL过程会被解析成原来master执行过的相同SQL再执行一次。混合模式:MySQL会根据每条执行的具体SQL语句来区分记录的日志形式,即Statement和Row二选一。它们各自的优缺点如下:这里来自:http://www.jquerycn.cn/a_13625由于statement模式的不足,在和我们DBA的交流中了解到,在replication中使用的是row模式实际生产过程。这使得阅读完整日志成为可能。通常我们的MySQL布局采用2个master主库(vip)+1个slave从库+1个备份容灾库的方案。由于灾备库通常用于异地灾备,实时性不高,不易部署。为了尽量减少对源的影响,显然我们应该从从库读取binlog日志。读取binlog的解决方案有很多,github上也有很多,参考https://github.com/search?utf8=%E2%9C%93&q=binlog。最终我们选择了阿里的运河作为日志提取器。Canal最早用于同步中国和美国的数据中心。Canal的原理比较简单:Canal模拟MySQLSlave的交互协议,伪装成MySQLSlave,向MySQLSlave发送dump协议。MySQLmaster收到转储请求,开始向Slave推送二进制日志。(即canal)Canal解析二进制日志对象(本来就是字节流)。图片来自:https://github.com/alibaba/canal解决方案MySQL版Dbus的主要解决方案如下:对于增量日志,通过订阅CanalServer的方式,获取到的增量日志MySQL:根据Canal的输出,日志为protobuf格式,开发增量Storm程序,实时将数据转换成我们定义的UMS格式(json格式,后面会介绍),并保存在卡夫卡;增量Storm程序还负责捕获模式更改以控制版本号;增量Storm配置信息保存在Zookeeper中,以满足高可用性需求。Kafka既作为输出结果,又作为处理过程中的缓冲区和消息解构区。在考虑使用Storm作为解决方案时,我们主要认为Storm有以下优势:技术相对成熟稳定,与Kafka也是标准组合;实时性比较高,可以满足实时性要求;满足高可用性要求;通过配置Storm并发,可以激活扩展性能的能力;对于流表,增量部分足够全量抽取,但很多表需要知道原始(已有)信息。这时候我们就需要初始加载(firstload)。对于initialload(第一次加载),也开发了全抽取Storm程序,通过jdbc连接从源库的备库拉取。初始加载是拉取所有数据,所以建议在非高峰期进行。幸运的是,它只进行一次,而不是每天进行一次。全量抽取,我们借鉴了Sqoop的思想。Storm全量抽取分为两部分:数据分片实际抽取数据分片需要考虑分片列,根据列的配置和自动选择,将数据按照范围分片,分片信息为保存在卡夫卡。下面是具体的分片策略:全量抽取的Storm程序读取Kafka的分片信息,采用多并发并行连接数据库备库进行拉取。因为提取时间可能会很长。抽取过程中将实时状态写入Zookeeper,方便心跳程序监控。统一的消息格式无论是增量还是全量,最终输出到Kafka的消息都是我们约定好的统一消息格式,称为UMS(unifiedmessageschema)格式。如下图:消息的schema部分定义了命名空间,由类型+数据源名+schema名+表名+版本号+分库号+分表号组成,用于描述所有的表整个公司,通过名称空间。独特的定位。_ums_op_表示数据类型为I(insert),U(update),D(delete);_ums_ts_增删改事件的时间戳,显然是更新了新数据的时间戳;_ums_id_消息的唯一id,保证消息是唯一的,但是这里我们保证消息的顺序(后面解释);payload是指具体的数据,一个json包可以包含一个或多个数据,以增加数据的payload。UMS支持的数据类型参考并简化了Hive类型,基本包括了所有的数据类型。全量和增量一致性在整个数据传输过程中,为了尽可能保证日志消息的顺序,我们在Kafka中使用了一个partition。总的来说,基本上是顺序的,唯一的。但是我们知道写Kafka会失败,可能会被重写,Storm也使用了redo机制。因此,我们不严格保证exactlyonceandcompleteorder,但我们保证至少一次。所以_ums_id_变得尤为重要。对于完全提取,_ums_id_是唯一的。zk中每个并发度取不同的id切片,保证唯一性和性能。填写负数不会和增量数据冲突,保证早于增量数据。消息。对于增量抽取,我们使用MySQL的日志文件号+日志偏移量作为唯一id。id是一个64位的长整数,高7位用于日志文件号,低12位用作日志偏移量。例如:000103000012345678。103是日志文件号,12345678是日志偏移量。这样从日志层面保证了物理唯一性(重做id号也不会变),同时也保证了顺序(也可以定位日志)。通过对比_ums_id_消费日志,对比_ums_id_就可以知道更新了哪条消息。其实_ums_ts_和_ums_id_的用意是相似的,只是有时候_ums_ts_可能会重复,即1毫秒内发生多次操作,所以需要比较_ums_id_。心跳监控预警整个系统涉及到数据库主备同步、CanalServer、并发多个Storm进程等环节。因此,过程的监测和预警就显得尤为重要。通过heartbeat模块,比如每分钟(可配置)为每一个抽取的表插入一个心态数据,保存发送时间,这个心跳表也被抽取,跟随整个过程,同步表其实走的是同一个逻辑(因为并发的多个Storm可能会有不同的分支),当收到一个心跳包时,即使没有数据的增删改查,也可以证明整个链路是连通的。Storm程序和heartbeat程序将数据发送到一个公共统计topic,然后统计程序将其保存在influxdb中,并使用grafana进行展示,可以看到如下效果:图为实时监控信息一定的业务系统。上面是实时的交通情况,下面是实时的延误情况。可见实时性还是很不错的。基本上1到2秒数据就已经发送到终端kafka了。Granfana提供的是实时监控能力。如果有延迟,会通过dbus的心跳模块发送邮件或者短信告警。实时脱敏考虑到数据安全,对于需要脱敏的场景,Dbus的全量风暴和增量风暴方案也完成了实时脱敏功能。脱敏方式有3种:总结一下:简单来说,Dbus就是将各种来源的数据实时导出,并以UMS的形式提供订阅,支持实时脱敏,实际监控告警。4.虫洞解决方案说完了Dbus,我们来说说Wormhole。为什么两个项目不是一个,而是要通过kafka连接?一个重要的原因是脱钩。Kafka具有天然的解耦能力,程序可以直接通过Kafka做异步消息传递。Dbus和Wornhole也在内部使用kafka进行消息传递和解耦。另一个原因是UMS是自描述的。通过订阅Kafka,任何有能力的用户都可以直接消费UMS来使用。虽然可以直接订阅UMS结果,但需要进行开发工作。Wormhole解决的是:提供一键配置,将Kafka中的数据实现到各个系统中,让没有开发能力的数据使用者通过虫洞实现对数据的使用。如图所示,Wormhole可以将Kafka中的UMS实现到各种系统中,HDFS、JDBC数据库和HBase是目前使用最多的。在技??术栈上,wormhole选择使用sparkstreaming。在Wormhole中,流指的是从源到目标的命名空间。火花流服务于多个流。选择Spark有充分的理由:Spark天然支持各种异构存储系统;虽然SparkStream的延迟比Storm稍差,但Spark的吞吐量和计算性能更好;Spark支持并行计算,具有更大的灵活性;Spark提供统一的功能,在一个技术栈内解决SparkingJob、SparkStreaming、SparkSQL,方便后期开发;这里补充一下Swifts的作用:Swifts的本质是读取KafkaData中的UMS,进行实时计算,并将结果写入Kafka的另一个topic。实时计算可以通过多种方式进行:如过滤、投影(projection)、查找、streamingjoinwindowaggregation,可以完成各种具有商业价值的流式实时计算。Wormhole和Swifts的对比如下:HDFS通过WormholeWparkStreaming程序消费Kafka的UMS。首先,UMS日志可以保存到HDFS。Kafka一般只保存几天的信息,并不会保存所有的信息,而HDFS可以保存所有的历史增删改查。这使得很多事情成为可能:通过在HDFS中重放日志,我们可以随时恢复历史快照。可以做一个拉链表,还原每条记录的历史信息,方便分析;当程序出错时,可以通过backfill重新消费消息,形成新的快照。可以说HDFS中的日志是很多东西的基础。由于Spark对parquet的原生支持非常好,因此SparkSQL可以为Parquet提供良好的查询。当UMS登陆HDFS时,它存储在Parquet文件中。Parquet的内容是保存所有的日志增删改查信息以及_ums_id_、_ums_ts_。Wormholesparkstreaming按照namespace将数据分布在不同的目录下,即不同的表和版本放在不同的目录下。由于每次写入的Parquet文件都是小文件,所以大家都知道HDFS对小文件性能不好,所以还有一个工作就是每天定时将这些Parquet文件合并成大文件。每个Parquet文件目录携带文件数据的开始时间和结束时间。这样,在重新填充数据时,可以根据选择的时间范围决定需要读取哪些Parquet文件,而不用读取所有数据。插入或更新数据的幂等性,往往需要我们对数据进行处理,落地到数据库或HBase中。那么这里涉及到的一个问题就是,什么样的数据可以更新到数据中?这里最重要的原则是数据的幂等性。无论我们遇到任何数据的增删改查,我们面临的问题是:应该更新哪一行;什么是更新策略。对于第一个问题,其实是需要定位数据,找到一个唯一的key。常见的有:使用业务库的主键;业务方指定若干列作为联合唯一索引;对于第二个问题,涉及到_ums_id_,因为我们保证了_ums_id_的大值的更新,所以找到对应的数据行后,按照这个原则进行替换更新。之所以软删除加上_is_active_列是针对这样一种情况:如果已经插入的_ums_id_比较大,就是删除数据(说明这条数据已经删除),如果不是软删除,就插入这时就会真正插入一个_ums_id_较小的数据(旧数据)。这会导致插入旧数据。不再幂等了。因此保留删除的数据(软删除)是很有价值的,可以用来保证数据的幂等性。HBase向Hbase中保存和插入数据,相当简单。不同的是HBase可以保留多个版本的数据(当然也只能保留一个版本)。默认保留3个版本;因此,在向HBase中插入数据时,需要解决的问题是:选择合适的rowkey:Rowkey的设计是可以选择的,用户可以选择源表的主键,也可以选择几列作为联合首要的关键。选择合适的版本:使用_ums_id_+更大的偏移量(比如100亿)作为行版本。Version的选择很有意思。利用_ums_id_的唯一性和自增性,与版本本身的比较关系一致:即版本越大,对应的_ums_id_越大,对应的版本越新。从提高性能的角度出发,我们可以不进行比较,直接将整个SparkStreamingDataset集合插入到HBase中。让HBase根据版本自动帮我们判断哪些数据可以保留,哪些数据不需要保留。在Jdbc中插入数据:虽然向数据库中插入数据保证幂等性的原理很简单,但是在实现上要提高性能就变得复杂很多。不可能一一比较,然后插入或更新。我们知道Spark的RDD/dataset是通过集合的方式进行操作来提升性能的,我们需要在集合操作的方式上实现幂等性。具体思路是:首先根据集合中的主键查询目标数据库,得到一个已有的数据集合;与数据集中的集合进行比较,分为两类:A:不存在的数据,即可以插入这部分数据;B:现有数据,比较_ums_id_,最后只更新哪个_ums_id_行大的到目标数据库,小的直接丢弃。使用Spark的同学都知道,RDD/dataset是可以分区的,可以使用多个worker进行操作,提高效率。在并发的情况下,插入和更新都可能失败,所以也有失败后的策略。例如:因为另一个worker已经插入了,那么因为唯一性约束插入失败了,那么需要改为更新,必须比较_ums_id_看是否可以更新。对于其他无法插入的情况(比如目标系统有问题),Wormhole也有重试机制。有很多细节。这里就不多介绍了。有些仍在开发中。关于插入其他存储我就不多介绍了。总的原则是:根据各个存储的特点,设计一个基于集合的、并发的数据插入实现。这些都是Wormhole为了性能所做的努力,使用Wormhole的用户大可不必关心。五、应用案例说了这么多实时营销,那么DWS的实际应用是什么呢?下面介绍一下某系统使用DWS实现的实时营销。如上图所示:系统A的数据都保存在自己的数据库中。我们知道宜信提供很多金融服务,包括贷款,而信用审核在借款过程中非常重要。借款人需要提供证明其信用价值的信息,比如央行的信用报告,就是信用数据最新的数据。银行流水、网购流水也是信用属性很强的数据。借款人通过Web或手机APP在系统A填写信用信息时,可能会因为某些原因无法继续。尽管借款人可能是一个优质的潜在客户,但这些信息是不可用的或很早以前就知道的,所以实际上客户流失了。应用DWS后,借款人填写的信息已记录在数据库中,并通过DWS实时提取、计算并登陆目标数据库。根据客户打分,评价优质客户。然后立即将客户的信息输出到客户服务系统。在极短的时间内(几分钟内),客服人员通过电话联系借款人(潜在客户),进行客户关怀,将潜在客户转化为真实客户。我们知道借钱是有时间限制的,如果时间太长,那就一钱不值了。如果没有实时绘制/计算/放置的能力,这一切都不可能实现。实时报表系统的另一个实时报表应用如下:我们数据用户的数据来自多个系统。以往通过T+1获取报表信息,然后指导次日操作,时效性很强。通过DWS,从多个系统实时提取、计算、落地数据,并提供报表,及时部署调整运营,快速响应。6.总结说了这么多,简单总结一下:DWS技术基于主流的实时流式大数据技术框架,具有高可用、大吞吐、强水平扩展、低延迟、高容错、最终一致性等特点。在能力方面,DWS支持异构、多源、多目标系统,支持多种数据格式(结构化、半结构化、非结构化数据)和实时技术能力。DWS将三个子项目合为一个平台,让我们具备实时能力,驱动各种实时场景应用。适用场景包括:实时同步/实时计算/实时监控/实时报表/实时分析/实时洞察/实时管理/实时运营/实时决策感谢收听,本次分享到此结束。Q&AQ1:Oracle日志阅读器有开源方案吗?A1:Oracle行业的商业解决方案很多,比如:OracleGoldenGate(原goldengate)、OracleXstream、IBMInfoSphereChangeDataCapture(原DataMirror)、DellSharePlex(原Quest)、国产DSGsuperSync等,很少开源解决方案易于使用。Q2:这个项目投入了多少人力物力?感觉有点复杂。Q2:DWS由三个子项目组成,平均每个项目5-7人。说的有点复杂,其实就是想用大数据技术来解决我们公司目前遇到的困难。因为是搞大数据相关技术的,所以团队里的兄弟姐妹们还是比较开心的:)其实Dbus和Wormhole都是比较固定和模型化的,复用方便。Swifts的实时计算与各个业务相关性比较大,定制性比较强,比较麻烦。Q3:宜信的这个DWS系统会开源吗?A3:我们也考虑过为社区做贡献。与宜信的其他开源项目一样,目前的项目刚刚成型,还需要进一步完善。我相信在未来的某个时候,我们会把它开源。Q4:架构师懂什么?他们是系统工程师吗?A4:不是系统工程师。我们宜信有很多架构师,他们应该算是以技术驱动业务的技术经理。包括产品设计、技术管理等。Q5:复制方案是OGG吗?A5:OGG和上面提到的其他商业解决方案是可选的。
