后台airflow是Airbnb用python编写的开源调度工具。基于有向无环图(DAG),airflow可以定义一组依赖任务,根据依赖关系依次执行,并通过python代码定义子任务,并支持各种Operate算子,具有极大的灵活性,可以满足各种需求用户。本文主要介绍使用Airflow的pythonOperator调度MaxCompute任务一、环境准备Python2.7.5PyODPS支持Python2.6或Airflowapache-airflow-1.10.71以上版本。安装MaxCompute所需的包pipinstallsetuptools>=3.0pipinstallrequests>=2.4.0pipinstallgreenlet>=0.4.10#可选,安装后可以加速Tunnel上传。pipinstallcython>=0.19.0#可选,不建议Windows用户使用。pipinstallpyodps注意:如果requests包冲突,先卸载再安装对应版本2.执行以下命令查看是否安装成功python-c"fromodpsimportODPS"2.开发步骤1.编写Airflow主目录中的python调度脚本Airiflow_MC.py#-*-coding:UTF-8-*-importsysimportosfromodpsimportODPSfromodpsimportoptionsfromairflowimportDAGfromairflow.operators.python_operatorimportPythonOperatorfromdatetimeimportdatetime,timedeltafromconfigparserimportConfigParserimporttimereload(sys)sys.setdefaultencoding('utf8')#MaxCompute参数设置options.sql.settings={'options.tunnel.limit_instance_tunnel':False,'odps.sql.allow.fullscan':True}cfg=ConfigParser()cfg.read("odps.ini")print(cfg.items())odps=ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))default_args={'owner':'airflow','depends_on_past':False,'retry_delay':timedelta(minutes=5),'start_date':datetime(2020,1,15)#'email':['airflow@example.com'],#'email_on_failure':False,#'email_on_retry':False,#'retries':1,#'queue':'bash_queue',#'pool':'backfill',#'priority_weight':10,#'end_date':datetime(2016,1,1),}dag=DAG('Airiflow_MC',default_args=default_args,schedule_interval=timedelta(seconds=30))defread_sql(sqlfile):withio.open(sqlfile,encoding='utf-8',mode='r')asf:sql=f.read()f.closedreturnsqldefget_time():print'当前时间是{}'.format(time.time())returntime.time()defmc_job():project=odps.get_project()#获取默认项目instance=odps.run_sql("select*fromlong_chinese;")print(instance.get_logview_address())instance.wait_for_success()withinstance.open_reader()asreader:count=reader.countprint("查询表数据条数:{}".format(count))forrecordinreader:printrecordreturncountt1=PythonOperator(task_id='get_time',provide_context=False,python_callable=get_time,dag=dag)t2=PythonOperator(task_id='mc_job',provide_context=False,python_callable=mc_job,dag=dag)t2.set_upstream(t1)2.提交pythonAiriflow_MC.py3.进行测试#printthelistofactiveDAGsairflowlist_dags#printsthelistoftasksthe"tutorial"dag_idairflowlist_tasksAiriflow_MC#printsthehierarchyoftasksinthetutorialDAGairflowlist_tasksAiriflow_MC--tree#测试taskairflowtestAiriflow_MCget_time2010-01-16airflowtestAirriflow_MCmc_job2010-01-164。运行调度任务,登录web界面点击按钮运行5.查看任务运行结果1.点击查看日志2。查看结果
