简介:airflow是Airbnb用python编写的开源调度工具。基于有向无环图(DAG),airflow可以定义一组依赖任务,根据依赖关系依次执行,使用python代码定义子任务,支持各种Operate算子,灵活,可以满足各种需求用户。本文主要介绍使用Airflow的pythonOperator来调度MaxCompute任务。后台airflow是Airbnb用python编写的开源调度工具。基于有向无环图(DAG),airflow可以定义一组依赖任务,按照依赖关系依次执行,通过python代码定义子任务,支持各种Operate算子,具有极大的灵活性,可以满足用户的各种需求.本文主要介绍使用Airflow的pythonOperator来调度MaxCompute任务。1.环境准备Python2.7.5PyODPS支持Python2.6以上版本Airflowapache-airflow-1.10.71。安装MaxCompute需要的包pipinstallsetuptools>=3.0pipinstallrequests>=2.4.0pipinstallgreenlet>=0.4.10#可选,安装后可以加速Tunnel上传。pipinstallcython>=0.19.0#可选,不建议Windows用户使用。pipinstallpyodps注意:如果requests包冲突,先卸载再安装对应版本2.执行以下命令查看是否安装成功python-c"fromodpsimportODPS"2.开发步骤1.编写Airflow主目录中的python调度脚本Airiflow_MC.py#-*-coding:UTF-8-*-importsysimportosfromodpsimportODPSfromodpsimportoptionsfromairflowimportDAGfromairflow.operators.python_operatorimportPythonOperatorfromdatetimeimportdatetime,timedeltafromconfigparserimportConfigParserimporttimereload(sys)sysending.set('utf8')#修改系统默认编码。#MaxCompute参数设置options.sql.settings={'options.tunnel.limit_instance_tunnel':False,'odps.sql.allow.fullscan':True}cfg=ConfigParser()cfg.read("odps.ini")print(cfg.items())odps=ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))default_args={'owner':'airflow','depends_on_past':False,'retry_delay':timedelta(minutes=5),'start_date':datetime(2020,1,15)#'email':['airflow@example.com'],#'email_on_failure':False,#'email_on_retry':False,#'retries':1,#'queue':'bash_queue',#'pool':'backfill',#'priority_weight':10,#'end_date':datetime(2016,1,1),}dag=DAG('Airiflow_MC',default_args=default_args,schedule_interval=timedelta(seconds=30))defread_sql(sqlfile):withio.open(sqlfile,encoding='utf-8',mode='r')asf:sql=f.read()f.closedreturnsqldefget_time():print'当前时间是{}'.format(时间.time())returntime.time()defmc_job():project=odps.get_project()#获取默认项目instance=odps.run_sql("select*fromlong_chinese;")print(instance.get_logview_address())instance.wait_for_success()withinstance.open_reader()asreader:count=reader.countprint("查询表数据条数:{}".format(count))forrecordinreader:printrecordreturncountt1=PythonOperator(task_id='get_time',provide_context=False,python_callable=get_time,dag=dag)t2=PythonOperator(task_id='mc_job',provide_context=False,python_callable=mc_job,dag=dag)t2.set_upstream(t1)2.提交pythonAiriflow_MC.py3。Test#打印活动DAGsairflowlist_dags#打印“教程”的任务列表dag_idairflowlist_tasksAiriflow_MC#打印教程DAGairflowlist_tasksAiriflow_MC中的任务层次结构--tree#TesttaskairflowtestAiriflow_MCget_time2010-01-16airflowtestAiriflow_MCmc_job2010-01-164。运行定时任务,登录web界面点击按钮运行5.查看任务运行结果(1)点击查看日志(2)查看结果原文链接本文为阿里云原创内容,未经许可不得转载
