背景:客户需要为业务做一些数据展示。客户每天都会通过s3给我们增量数据。我们每天通过DataFactory的job提取s3的数据,提取出来的原始数据存储在Blob容器中,然后通过job将数据提取到DataBricks表中,通过sparksql对数据进行处理,形成结果表,最后提供给BI同事做前端报表。上面已经介绍了基本或关键步骤。提取原始数据第一步是提取原始数据。项目的基础是如何稳定地从s3中提取数据并插入到DataBrikcs表中。一般过程如下。如果不使用blob容器,它可能会更简单。遍历s3容器,获取数据到Blob,首先要明确,并不是所有的s3文件都会被取进来,要根据当前实际需要去取数据。我们只需要一个配置表来限制获取的s3文件。比如FILE_LIST.csv文件放在BLOB中,文件有两列,一列是表名和数据文件名,另一列是判断是否有效的状态值。先用【简单示例】的方法将FILE_LIST作为配置表放到DataBricks表中。比如FILE_NAMEIS_ACTIVEDW_TEST1根据【04DataBricks遍历S3容器】,我们可以得到数据文件在s3中的FILE_LIST表中的路径,并将该路径存储在一个表中,比如flag_file_info表中,接下来通过lookupactivityinDataFactory,获取的flag_file_info记录。配置如下。--拼接数据文件的路径,放入查找活动时注意不要有回车select'EDW_SHARE/Request/DataFile'||replace(substr(flag_file,instr(flag_file,'FLAGFILE_')+8),'.csv','')||'/'||content||'.csv'asfile_name,flag_file,content||'.csv'ascontentfromcfg.flag_file_infowherestatus=1anddate_id=date_format(from_utc_timestamp(current_timestamp(),'UTC+8'),'yyyyMMdd')迭代。ForEach逐一获取查询结果,在变量ForEach中设置两个活动,一个获取s3数据到blob,一个更新记录的状态。同样是复制数据,只不过这里复制的数据源是s3,目标是Blob。源S3数据集配置如下。receiverBlob设置类似于将CSV文件批量转换为Parquet文件,将所有同步的s3文件放在一个Blob文件中,使用迭代将每个CSV文件转换为Parquet文件。pipelineactivity获取元数据的dataset配置如下:注意iterationsetting中的变量@activity('get_file_name').output.childItems放在[CopyData]source和sink配置中,如下:将数据同步到DataBricks表中,生成Parquet文件,将临时视图插入到DataBricks表中。因为这一步容易出错,而且每张表的出错情况不同,所以这一步没有使用变量批处理,每张表单独处理。参考【05??简单示例】在处理数据到dw的每日增量文件时,同步到stg库,但为了防止出错,stg只保留每日增量文件。Stg再根据每个表的不同更新策略将数据同步到DW层,DW层保留全量数据。JOB时直接调用sp所在的notebook即可。处理数据到dm底层有表,可以处理剩下的数据,处理完的数据放在另外一层。提供对外服务。JOB调用与dw相同。整体的JOB如下:trigger,monitoringandalarmtrigger每个pipeline都可以设置一个trigger来定时运行JOB。比较简单,不再介绍监控告警。在Monitor->MonitoringandAlerting中创建一个新的警报规则。主要是配置条件选择指标,添加操作组,并通知配置条件选择指标时,可以选择流水线级别或者活动级别。其他的可以根据自己的需要设置。添加操作组和通知器多个通知可以放在一个操作组中。创建操作组后,还可以引用其他监控告警。通知可以是电子邮件、短信或语音电话。填写相关信息即可(会产生相应费用)
