一山是西云自主研发的数据迁移平台。通过在一山的配置和运行,可以轻松实现第三方数据接入和实时数据同步,异构数据源之间的迁移。本文主要介绍易山数据迁移平台异构数据源间迁移的实现思路和图形化配置实现过程。您可以访问这里查看更多关于大数据平台建设的原创文章。|1、什么是异构数据源可以理解为:指的是具有不同数据结构、访问方式、形式的多个数据源:一个公司在信息化建设中,不同的时期,不同的背景,面对不同的应用和客户都会生成多个系统;每个系统都可能以不同的存储方式积累大量的数据,从简单的Excel文件数据、txt文本数据到复杂的关系型数据库MYSQL、Oracle数据等,构成了异构的数据源。|2、异构数据源的迁移离线数仓为支撑集团的经营发展和决策分析,西云在成立之初就构建了完整的离线数仓体系;从业务数据源提取的数据存储在数据湖(Hbase)中;数据仓库的数据存储是HDFS,离线计算框架是Hive和Spark。数据迁移场景在构建数据仓库的过程中,会进行大量的ETL(Extract、Transform、Load)工作,以及各种异构数据源(txt、Hbase、HDFS、Mysql、SqlServer等)的迁移场景。场景一:简单数据转换:将字段级简单映射数据中的字段转换为目标数据中的字段的过程。例如:业务数据库(Mysql)到数据湖(Hbase);从数据湖(Hbase)到数据仓库(HDFS);数据仓库(HDFS)到目标关系数据库(Mysql)。备注复杂的数据转换需要更多的数据分析,包括通用标识符问题、复杂条件过滤、时间类型转换、去除重复记录等,此时可以使用MagicBox(开发协作平台)完成Spark计算任务的提交和实施工作流调度;这种类型的数据分析和处理超出了本文的范围。如果您有兴趣,可以阅读这篇文章。场景二:关系型数据库之间的数据迁移。例如,当数据报表系统B需要使用业务系统A中某张表的数据进行数据报表呈现时,需要保证过多的数据查询和数据聚合处理不会影响系统A的正常业务。|三、一山数据迁移工具1、目的是解决数据湖(Hbase)到数据仓库各层、关系型数据库之间的数据迁移问题。2.数据迁移工具为了支持异构数据源的迁移,【一山】采用了阿里内部广泛使用的离线数据同步工具DataX。2.1DataX简介DataX是一款异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP、等同步功能。2.2DataX3.0框架设计DataX本身作为离线数据同步框架,采用Framework+plugin架构构建。将数据源的读写抽象成一个Reader/Writer插件,融入到整个同步框架中。Reader:数据采集模块,负责从数据源采集数据,并将数据发送给Framework。Writer:数据写入模块,负责不断从Framework中获取数据,并将数据写入目的地。Framework:Framework用于连接Reader和Writer,作为两者之间的数据传输通道,处理缓冲、流控、并发、数据转换等核心技术问题。2.3DataX3.0插件系统DataX目前拥有比较完善的插件系统,已经对接了主流的RDBMS数据库、NOSQL、大数据计算系统:备注详情请点击:DataX数据源参考指南:https://github。com/alibaba/DataX2.4一山数据迁移功能封装DataXDataX配置过程比较复杂,只支持命令行执行;为了降低使用难度,我们将配置过程图形化处理,使用Python的Flask框架进行封装,任务执行支持HTTP请求调用。|4、一山数据迁移的实现过程针对上述两种场景,我们开发了数据迁移功能。以下是【宜山】数据迁移的主要实现流程。技术栈前端Vue.js+ElementUI+vue-socket+codemirrorYishan服务器端SpringBootDataX服务器端python+Flask+gunicorn+supervisor+Celery1.创建Reader/Writer插件1.1准备插件配置模板以创建HbaseReader插件为例,示例配置如下:rootdir":"hdfs://test/test1","hbase.cluster.distributed":"true","hbase.zookeeper.quorum":"",},"table":"table1","encoding":"utf-8","mode":"normal","column":[{"name":"info:column1","type":"string"},{"name":"info:column2",“类型”:“字符串”},{“名称”:“信息:column3”,“类型”:“字符串”}],“范围”:{“startRowkey”:“”,“endRowkey”:“”,“isBinaryRowkey":true}}}备注更多配置示例请点击这里查看:https://github.com/alibaba/DataX1.2保存配置易山平台在展示插件模板时使用codemirror在线代码编辑器,可以友好的展示JSON格式的模板数据。效果如下:总结借助codemirror插件,高亮JSON模板,功能强大软件的语法检查功能也有助于解决编辑过程中产生的语法错误,从而降低配置DataX的Reader的难度/编写器插件,您可以通过移山来创建迁移任务。其实就是通过图形化配置生成一个完整的作业配置文件,为运行任务做准备。2.1配置任务的基本属性。这一步主要是配置任务并发数和脏数据限制信息。备注Maximumnumberofdirtyrecords:写入数据时允许的最大脏记录数。当超过阈值时,程序将无法执行;脏数据率:写入数据时允许的最大脏数据率,当超过阈值时,程序将无法执行。2.2使用阅读器插件备注,您可以修改自动匹配显示的模板内容以满足您的需要;2.3使用Writer插件在下拉列表中选择要使用的Writer插件,配置内容会自动匹配显示,也可以修改配置内容Edited,这里不再截图。2.4信息确认通过前面几步的配置,已经形成了完整的Job配置文件。作业保存前,配置文件会完整显示,方便用户查看前面步骤中的设置数据。格式如下:3.执行迁移任务3.1前端3.1.1建立Websocket服务点击【执行】按钮,前端会发送执行任务的请求,浏览器与DataX服务通过建立连接Websocket服务,以便前端实时获取DataX服务产生的任务执行结果:importiofrom'vue-socket.io'Vue.use(io,domain.socketUrl)3.1.2TransferJob配置数据到DataX服务:socket.emit('action',jobConfig);3.2DataXserver3.2.1接收数据并生成json配置文件从data中取出Job配置数据写入.json文件:job_config=data['job_config']#生成临时执行文件file_name='/tmp/'+str(job_id)+'.json'withopen(file_name,'w')asf:f.write(job_content)f.close()3.2.2拼接执行命令command=DATAX\_ROOT+'data.py'+file\_name3.2.3执行任务使用子进程的Popen用法执行命令:child_process=subprocess.Popen(command,stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE,shell=True)3.2.4通过stdout获取执行信息,而child_process.poll()为None:line=child_process.stdout.readline()line=line.strip()+'\n'3.2.5将执行结果发送给Websocket客户端通过socket.emit('action',line)方法,将日志发送给Websocket客户端(浏览器):ifline:emit('execute_info',line)ifchild_process.returncode==0:emit('execute_info','执行成功!')else:emit('execute_info','执行失败!')4.移山查看任务执行情况4.1接收执行信息前端使用socket.on方法接收任务执行日志:this.$socket.on('execute_info',(info)=>{//TODO获取执行信息并展示})4.2展示执行结果4.3监控迁移任务在监控管理界面,可以监控当天所有的迁移任务:5.总结通过易山的数据迁移功能,数据开发者可以使用DataX在一山无感知的ETL工具替代了传统的手工开发、命令行执行的方式;通过图形化配置,快速创建并执行数据迁移任务;通过监控管理功能,可以方便地监控任务的执行情况;大大提高了数据开发人员的开发效率更多文章欢迎阅读更多关于消息中间件的原创文章:RabbitMQ系列的消息确认机制RabbitMQ系列的RPC实现RabbitMQ系列如何保证消息不丢失RabbitMQ系列的基本概念和基本使用部署方式RabbitMQ系列单机模式安装关注微信公众号欢迎关注我的微信公众号阅读更多文章:
