前言airflow是一个DAG(有向无环图)任务管理系统,简单理解就是crontab的进阶版。airflow解决了crontab无法解决的任务依赖问题。环境及组件Ubuntu18.04MySQL5.7Python3.6.9airflow1.10.10celery4.4.2RabbitMQ3.6.10实用步骤基本步骤安装Python3pipsudoaptinstallpython3-pipinstallMySQL开发包#避免错误:OSError:mysql_confignotfoundsudoaptinstalllibmysqlclient-devpython3-dev创建3个airflow服务器上的一个账号sudouseraddairflow-m-s/bin/bashsudopasswdairflow下面的步骤是修改airflow账号下的PATH环境变量,在/home/airflow/末尾追加如下内容.bashrc文件:exportPATH=/home/airflow/.local/bin:$PATHupgradepippip3installpip--upgradesetDoubanmirrorpip3configsetglobal.index-urlhttps://pypi.doubanio.com/simple/on3台机器在#全家桶(master)上安装airflowpip3install"apache-airflow[all]==1.10.*"#OR选择性安装pip3install"apache-airflow[mysql,celery,rabbitmq]==1.10.*"查看airflow版本并创建airflow的HOME目录#default~/airflowdirectoryairflowversionsetUbuntu18.04systemtimezonetimedatectlset-timezoneAsia/Shanghai修改后台时区(/home/airflow/airflow/airflow.cfg)[core]#Defaulttimezoneincasesupplieddatetimesarenaive#canbeutc(默认)、系统或任何IANA时区字符串(例如欧元ope/Amsterdam)#default_timezone=utc#更改为系统或亚洲/上海default_timezone=system以启用RBACUI并修改UI时区[webserver]rbac=Truedefault_ui_timezone=system修改MySQL连接(/home/airflow/airflow/airflow.cfg)[core]#MySQL连接字符串sql_alchemy_conn=mysql+pymysql://youruser:passwd@192.168.x.y/airflowinstallpymysqlpip3installpymysql手动创建airflow数据库初始化数据库表airflowinitdb检查数据库是否初始化成功createuser#roletable:ab_role#userTable:ab_user#创建管理员角色userairflowcreate_user--lastnameuser\--firstnameadmin\--usernameadmin\--emailwalkerqt@foxmail.com\--roleAdmin\--passwordadmin123#CreateViewerroleuserairflowcreate_user--lastnameuser\--firstnameview\--usernameview\--emailwalkerqt@163.com\--roleViewer\--passwordview123启动web服务器,默认端口为8080airflowwebserver-p8080启动定时器气流scheduler用浏览器打开192.168.y.z:8080,查看WEBUI。登录后在RabbitMQ上创建airflow账号,分配虚拟主机修改master配置文件(/home/airflow/airflow/airflow.cfg)[core]executor=CeleryExecutor[celery]broker_url=amqp://mq_user:mq_pwd@192.168.y.z:5672/vhost_airflowresult_backend=db+mysql://youruser:passwd@192.168.x.y/airflow从master同步配置文件到node#测试rsync-v/home/airflow/airflow/airflow.cfgairflow@node1_ip:/home/airflow/airflow/rsync-v/home/airflow/airflow/airflow.cfgairflow@node2_ip:/home/airflow/airflow/免密码提示脚本#明文暴露密码,不推荐使用sshpass-pairflowrsync/home/airflow/airflow/airflow.cfgairflow@node1_ip:/home/airflow/airflow/sshpass-pairflowrsync/home/airflow/airflow/airflow.cfgairflow@node2_ip:/home/airflow/airflow/创建测试脚本(/home/airflow/airflow/dags/send_msg.py),发送本地IP到企业微信#编码:utf-8#作者:qbit#日期:2020-04-02#总结:发送/分配任务到任务节点airflow.operators.python_operatorimportPythonOperatordefault_args={'owner'#depends_on_past是否依赖过去。#如果为True,那么DAG必须执行成功,才能执行本次DAG。'depends_on_past':False,'start_date':days_ago(1),}dag=DAG(dag_id='send_msg',default_args=default_args,#catchup任务是否从开始时间回填到现在catchup=False,start_date=days_ago(1),schedule_interval=timedelta(seconds=60),tags=['example'])defGetLocalIPByPrefix(prefix):r"""多网卡情况下,根据前缀获取IP可用测试:Windows,Linux,Python3.6.x,psutil5.4.xipv4/ipv6地址都适用注意如果有多个ip相同前缀,只会随机返回一个"""localIP=''dic=psutil.net_if_addrs()fordic中的适配器:snicList=dic[adapter]forsnicinsnicList:ifnotsnic.family.name.startswith('AF_INET'):continueip=snic.addressifip.startswith(prefix):localIP=ipreturnlocalIPdefsend_msg(msg='defaultmsg',**context):r"""给企业微信发送消息"""print(context)run_id=context['run_id']nowTime=time.strftime('%Y-%m-%d%H:%M:%S',time.localtime())消息='%s\n%s\n%s_%d\n%s'%(run_id,nowTime,GetLocalIPByPrefix('192.168.'),os.getpid(),msg)print(message)'''发送代码'''first=PythonOperator(task_id='send_msg_1',python_callable=send_msg,op_kwargs={'msg':'111'},provide_context=True,dag=dag,)second=PythonOperator(task_id='send_msg_2',python_callable=send_msg,op_kwargs={'msg':'222'},provide_context=True,dag=dag,)third=PythonOperator(task_id='send_msg_3',python_callable=send_msg,op_kwargs={'msg':'333'},provide_context=True,dag=dag,)[third,first]>>第二个验证脚本#打印出所有活动状态DAGairflowlist_dags#打印出'send_msg'DAG中的所有任务airflowlist_taskssend_msg#打印出'send_msg'DAG的任务层级airflowlist_taskssend_msg--tree将master的dags目录同步到nodesshpass-pairflowrsync-a/home/airflow/airflow/dags/airflow@node1_ip:/home/airflow/airflow/dags/sshpass-pairflowrsync-a/home/airflow/airflow/dags/airflow@node2_ip:/home/airflow/airflow/dags/启动步骤#masterairflowwebserver-p8080airflowschedulerairflowflower#defaultport5555#node1/node2airflowworkererrortroubleshootingstartworker如果报类似下面的错误,是celery和RabbitMQ之间的问题,卸载lirabbitmq即可加载命令:pip3uninstalllibrabbitmq错误:[2020-04-0214:54:42,279:CRITICAL/MainProcess]不可恢复的错误:SystemError('
