当前位置: 首页 > 后端技术 > Python

Airflow1.x分布式部署实战(qbit)

时间:2023-03-26 01:25:07 Python

前言airflow是一个DAG(有向无环图)任务管理系统,简单理解就是crontab的进阶版。airflow解决了crontab无法解决的任务依赖问题。环境及组件Ubuntu18.04MySQL5.7Python3.6.9airflow1.10.10celery4.4.2RabbitMQ3.6.10实用步骤基本步骤安装Python3pipsudoaptinstallpython3-pipinstallMySQL开发包#避免错误:OSError:mysql_confignotfoundsudoaptinstalllibmysqlclient-devpython3-dev创建3个airflow服务器上的一个账号sudouseraddairflow-m-s/bin/bashsudopasswdairflow下面的步骤是修改airflow账号下的PATH环境变量,在/home/airflow/末尾追加如下内容.bashrc文件:exportPATH=/home/airflow/.local/bin:$PATHupgradepippip3installpip--upgradesetDoubanmirrorpip3configsetglobal.index-urlhttps://pypi.doubanio.com/simple/on3台机器在#全家桶(master)上安装airflowpip3install"apache-airflow[all]==1.10.*"#OR选择性安装pip3install"apache-airflow[mysql,celery,rabbitmq]==1.10.*"查看airflow版本并创建airflow的HOME目录#default~/airflowdirectoryairflowversionsetUbuntu18.04systemtimezonetimedatectlset-timezoneAsia/Shanghai修改后台时区(/home/airflow/airflow/airflow.cfg)[core]#Defaulttimezoneincasesupplieddatetimesarenaive#canbeutc(默认)、系统或任何IANA时区字符串(例如欧元ope/Amsterdam)#default_timezone=utc#更改为系统或亚洲/上海default_timezone=system以启用RBACUI并修改UI时区[webserver]rbac=Truedefault_ui_timezone=system修改MySQL连接(/home/airflow/airflow/airflow.cfg)[core]#MySQL连接字符串sql_alchemy_conn=mysql+pymysql://youruser:passwd@192.168.x.y/airflowinstallpymysqlpip3installpymysql手动创建airflow数据库初始化数据库表airflowinitdb检查数据库是否初始化成功createuser#roletable:ab_role#userTable:ab_user#创建管理员角色userairflowcreate_user--lastnameuser\--firstnameadmin\--usernameadmin\--emailwalkerqt@foxmail.com\--roleAdmin\--passwordadmin123#CreateViewerroleuserairflowcreate_user--lastnameuser\--firstnameview\--usernameview\--emailwalkerqt@163.com\--roleViewer\--passwordview123启动web服务器,默认端口为8080airflowwebserver-p8080启动定时器气流scheduler用浏览器打开192.168.y.z:8080,查看WEBUI。登录后在RabbitMQ上创建airflow账号,分配虚拟主机修改master配置文件(/home/airflow/airflow/airflow.cfg)[core]executor=CeleryExecutor[celery]broker_url=amqp://mq_user:mq_pwd@192.168.y.z:5672/vhost_airflowresult_backend=db+mysql://youruser:passwd@192.168.x.y/airflow从master同步配置文件到node#测试rsync-v/home/airflow/airflow/airflow.cfgairflow@node1_ip:/home/airflow/airflow/rsync-v/home/airflow/airflow/airflow.cfgairflow@node2_ip:/home/airflow/airflow/免密码提示脚本#明文暴露密码,不推荐使用sshpass-pairflowrsync/home/airflow/airflow/airflow.cfgairflow@node1_ip:/home/airflow/airflow/sshpass-pairflowrsync/home/airflow/airflow/airflow.cfgairflow@node2_ip:/home/airflow/airflow/创建测试脚本(/home/airflow/airflow/dags/send_msg.py),发送本地IP到企业微信#编码:utf-8#作者:qbit#日期:2020-04-02#总结:发送/分配任务到任务节点airflow.operators.python_operatorimportPythonOperatordefault_args={'owner'#depends_on_past是否依赖过去。#如果为True,那么DAG必须执行成功,才能执行本次DAG。'depends_on_past':False,'start_date':days_ago(1),}dag=DAG(dag_id='send_msg',default_args=default_args,#catchup任务是否从开始时间回填到现在catchup=False,start_date=days_ago(1),schedule_interval=timedelta(seconds=60),tags=['example'])defGetLocalIPByPrefix(prefix):r"""多网卡情况下,根据前缀获取IP可用测试:Windows,Linux,Python3.6.x,psutil5.4.xipv4/ipv6地址都适用注意如果有多个ip相同前缀,只会随机返回一个"""localIP=''dic=psutil.net_if_addrs()fordic中的适配器:snicList=dic[adapter]forsnicinsnicList:ifnotsnic.family.name.startswith('AF_INET'):continueip=snic.addressifip.startswith(prefix):localIP=ipreturnlocalIPdefsend_msg(msg='defaultmsg',**context):r"""给企业微信发送消息"""print(context)run_id=context['run_id']nowTime=time.strftime('%Y-%m-%d%H:%M:%S',time.localtime())消息='%s\n%s\n%s_%d\n%s'%(run_id,nowTime,GetLocalIPByPrefix('192.168.'),os.getpid(),msg)print(message)'''发送代码'''first=PythonOperator(task_id='send_msg_1',python_callable=send_msg,op_kwargs={'msg':'111'},provide_context=True,dag=dag,)second=PythonOperator(task_id='send_msg_2',python_callable=send_msg,op_kwargs={'msg':'222'},provide_context=True,dag=dag,)third=PythonOperator(task_id='send_msg_3',python_callable=send_msg,op_kwargs={'msg':'333'},provide_context=True,dag=dag,)[third,first]>>第二个验证脚本#打印出所有活动状态DAGairflowlist_dags#打印出'send_msg'DAG中的所有任务airflowlist_taskssend_msg#打印出'send_msg'DAG的任务层级airflowlist_taskssend_msg--tree将master的dags目录同步到nodesshpass-pairflowrsync-a/home/airflow/airflow/dags/airflow@node1_ip:/home/airflow/airflow/dags/sshpass-pairflowrsync-a/home/airflow/airflow/dags/airflow@node2_ip:/home/airflow/airflow/dags/启动步骤#masterairflowwebserver-p8080airflowschedulerairflowflower#defaultport5555#node1/node2airflowworkererrortroubleshootingstartworker如果报类似下面的错误,是celery和RabbitMQ之间的问题,卸载lirabbitmq即可加载命令:pip3uninstalllibrabbitmq错误:[2020-04-0214:54:42,279:CRITICAL/MainProcess]不可恢复的错误:SystemError('returnedaresultwithanerrorset',)Traceback(最后一次调用):文件“/home/airflow/.local/lib/python3.6/site-packages/kombu/messaging.py”,第624行,在_receive_callbackreturnon_m(message)ifon_melseself.receive(已解码,消息)文件“/home/airflow/.local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py”,第571行,在on_task_received回调中,文件“/home/airflow/.local/lib/python3.6/site-packages/celery/worker/strategy.py”,第203行,在task_message_handlerhandle(req)File“/home/airflow/.local/lib/python3.6/site-packages/celery/worker/worker.py",line223,in_process_task_semreturnself._quick_acquire(self._process_task,req)File"/home/airflow/.local/lib/python3.6/site-packages/kombu/asynchronous/semaphore.py”,第62行,我n获取回调(*partial_args,**partial_kwargs)File"/home/airflow/.local/lib/python3.6/site-packages/celery/worker/worker.py",line228,in_process_taskreq.execute_using_pool(self.pool)文件“/home/airflow/.local/lib/python3.6/site-packages/celery/worker/request.py”,第652行,在execute_using_poolcorrelation_id=task_id,File“/home/airflow/.local/lib/python3.6/site-packages/celery/concurrency/base.py",line158,inapply_async**options)File"/home/airflow/.local/lib/python3.6/site-packages/billiard/pool.py”,第1530行,在apply_asyncself._quick_put((TASK,(result._job,None,func,args,kwds)))File“/home/airflow/.local/lib/python3.6/site-packages/celery/concurrency/asynpool.py",line885,insend_jobbody=dumps(tup,protocol=protocol)TypeError:can'tpicklememoryviewobjects上述异常是以下异常的直接原因:Traceback(最近的最后调用):文件“/home/airflow/.local/lib/python3.6/site-packages/celery/worker/worker.py",第205行,在启动self.blueprint.start(self)File"/home/airflow/.local/lib/python3.6/site-packages/celery/bootsteps.py",第119行,在开始step.start(parent)File"/home/airflow/.local/lib/python3.6/site-packages/celery/bootsteps.py",第369行,在开始returnself.obj.start()文件“/home/airflow/.local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py”,第318行,在启动blueprint.start(self)文件“/home/airflow/.local/lib/python3.6/site-packages/celery/bootsteps.py”,第119行,在启动步骤中。start(parent)File“/home/airflow/.local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py”,第599行,在开始c.loop(*c.loop_args())File“/home/airflow/.local/lib/python3.6/site-packages/celery/worker/loops.py”,第83行,在asynloopnext(loop)File“/home/airflow/.local/lib/python3.6/site-packages/kombu/asynchronous/hub.py”,第364行,在create_loopcb(*cbargs)文件“/home/airflow/.local/lib/python3.6/site-packages/kombu/transport/base.py",line238,in_readablereader(loop)File"/home/airflow/.local/lib/python3.6/site-packages/kombu/transport/base.py",第220行,在_readdrain_events(timeout=0)File"/home/airflow/.local/lib/python3.6/site-packages/librabbitmq/__init__.py",第227行,在drain_eventsself._basic_recv(timeout)SystemError:returnedaresultwithanerrorset相关链接rayGitHub:https://github.com/ray-projec...daskGitHub:https://github.com/dask/daskScalingoutAirflowwithCeleryandRabbitMQtoOrchestrateETLJobsontheCloudhttps://corecompete.com/scali...parallelism,dag_concurrency,worker_concurrencyAirflow理解:https://www.jianshu.com/p/7a8...参数有中文详细解释本文来自qbitsnap

猜你喜欢