Celery实现分布式定时任务并开启监控(Celery-Beat、Celery-Once、flower)原理:celery-beat作为任务调度,当到达预定时间时,beat将任务id加载到rabbitmq队列中,worker从队列另一端取出任务id匹配当前注册的任务。如果没有注册,会报错。另外worker也会尝试通过celery-once从redis获取分布式锁,只有获取到锁的worker才会执行这个任务。worker执行成功或失败由flower监控1.环境准备pipinstallcelerypipinstallflowerpipinstallcelery_once2.代码结构代码结构如下:datetimeimportdatetimeimportsocketfromcelery_onceimportQueueOnceWEB_HOOK_SPIDER='Getweb_hookonDingTalkrobotpage'#根据任务名称和传递的参数值确认是否是同一个任务@celery_app.task(base=QueueOnce,once={'graceful':True})defsend_ding_test(arg1,arg2):dingding=DingtalkChatbot(WEB_HOOK_SPIDER)arg3=arg1+arg2dingding.send_text(msg="城市数据定时任务测试-{},{}执行时间:{}".format(arg3,socket.gethostname(),datetime.now().strftime("%Y-%m-%d%H:%M:%S")))配置文件config.py:fromcelery.schedulesimportcrontabclassceleryConfig(object):#accept_content=['json']#可以设置,列表,tuple,pickle,yaml#result_accept_content=['json']timezone='Asia/Shanghai'#中国只有两个时区,一个是上海,一个是乌鲁木齐broker_url="amqp://user:password@ip:port/vhost"后端=""include=['jobs.sendDingTest']#worker启动时要导入的task模块,这里需要添加,这样worker才能找到我们的taskbeat_schedule={'add-every-monday-morning':{'task':'jobs.sendDingTest。send_ding_test',#这里需要写全路径,否则worker找不到'schedule':crontab(minute="*/2"),'args':(16,16),},}#celery-onceconfigurationONCE={'backend':'celery_once.backends.Redis','settings':{'url':'redis://ip:port/database','default_timeout':60*60#分布式锁的默认超时时间}}启动函数start.py:fromceleryimportCeleryimportos#Windows平台需要设置,os.environ.setdefault('FORKED_BY_MULTIPROCESSING','1')celery_app=Celery()celery_app.config_from_object('config.celeryConfig')3.启动定时任务和监控#注意:beat、worker、flower可以不在同一台服务器上。如果是分布式的,需要在其他服务器上复制一份代码#openbeat(start是文件名)celery-Astart.celery_appbeat#startworkercelery-Astart.celery_appworker-c1-linfo#启动花(默认地址是localhost:5555)celery-A启动花下面是花的页面,大家可以查看查看worker数量,消息队列(rabbitmq需要开启rabbitmq_management)和任务执行结果等。4.文档参考celery-once文档花文档celery官方文档
