1前言大家好,我是吴老板。用Celery官方的话来说,Celery是一个非常优秀的分布式队列,可以应用于分布式共享中间队列和定时任务等。2版本差异Celery有很多版本,版本之间的差异也不小。比如最新的Celery6.0版本远不如Celery4.0稳定,所以在使用不同版本的时候,系统给我们的反馈可能不是我们想要的。3服务挂在Windows下的Celery上。服务有时会变得不稳定(unix中暂时没有发现这种情况)。无法按照我们指定的时间点执行任务。这些任务只是加入待运行队列(在Redis中累积),累积的任务需要手动重启Celery服务后才能释放运行。这样,一是定时任务没有在指定的时间点正常运行,二是这些任务在其他时间运行,可能会导致更新数据不及时、时间节点混乱等问题,不仅不能满足业务需求,也会受其害。4设置心跳为了解决windows下Celery的弊端,可以给Celery任务队列设置一个心跳时间,比如每分钟或者每五分钟向Redis数据库发送一次数据,保证队列一直处于活动状态,这样只要你电脑不关机,保持网络畅通(如果是远程Redis),Celery任务队列服务就不会出现假死状态。5举个栗子,我一向喜欢用例子来说话。前段时间在某平台商户后台采集数据时,为了在使用时自动获取网站的cookie,用Pyppeteer写了一个自动登录脚本,还在Celery队列中快速启动服务as通常。脚本是这样的(很接近实际的伪代码,没办法,救命要紧)#-*-coding:utf-8-*-fromdb.redisCurdimportRedisQueueimportasyncioimportrandomimporttkinterfrompyppeteer.launcherimportlaunchfromplatLogin.configimportUSERNAME,PASSWORD,LOGIN_URLclassLogin():def__init__(self,shopId):self.shopId=shopIdself.RedisQueue=RedisQueue("cookie")defscreen_size(self):tk=tkinter.Tk()width=tk.winfo_screenwidth()height=tk.winfo_screenheight()tk.quit()return{'width':width,'height':height}asyncdeflogin(self,username,password,url):browser=awaitlaunch({'headless':False,'dumpio':True},args=['--no-sandbox','--disable-infobars','--user-data-dir=./userData'],)page=awaitbrowser.newPage()#启动一个新的浏览器页面try:awaitpage.setViewport(viewport=self.screen_size())awaitpage.setJavaScriptEnabled(enabled=True)#Enablejsawaitpage.setUserAgent('Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/58.0.3029.110Safari/537.36边缘/16.16299')awaitself.page_e评估(页面)awaitpage.goto(url)awaitasyncio.sleep(2)#输入用户名,密码awaitpage.evaluate(f'document.querySelector("#userName").value=""')awaitpage.type('#userName',username,{'delay':self.input_time_random()-50})#delay是限制输入的时间awaitpage.evaluate('document.querySelector("#passWord").value=""')awaitpage.type('#passWord',password,{'delay':self.input_time_random()})awaitpage.waitFor(6000)loginImgVcode=awaitpage.waitForSelector('#checkCode')awaitloginImgVcode.screenshot({'path':'./loginImg.png'})awaitpage.waitFor(6000)res=use_cjy("./loginImg.png")pic_str=res.get("pic_str")ifres.get("err_str")=="OK"else"1234"awaitpage.waitFor(6000)awaitpage.type('#checkWord',pic_str,{'delay':self.input_time_random()-50})awaitpage.waitFor(6000)awaitpage.click('#subMit')awaitpage.waitFor(6000)awaitasyncio.sleep(2)awaitself.get_cookie(page)awaitpage.waitFor(3000)awaitself.page_close(browser)return{'code':200,'msg':'登陆成功'}except:return{'code':-1,'msg':'出错'}finally:awaitpage.waitFor(3000)awaitself.page_close(browser)#获取登录后cookieasyncdefget_cookie(self,page):cookies_list=awaitpage.cookies()cookies=''forcookieincookies_list:str_cookie='{0}={1};'str_cookie=str_cookie.format(cookie.get('name'),cookie.get('value'))cookies+=str_cookie#将cookie放入cookie池self.RedisQueue.put_hash(self.shopId,cookies)returncookiesasyncdefpage_evaluate(self,page):awaitpage.evaluate('''()=>{Object.defineProperties(navigator,{webdriver:{get:()=>undefined}})}''')awaitpage.evaluate('''()=>{window.navigator.chrome={runtime:{},};}''')awaitpage.evaluate('''()=>{Object.defineProperty(navigator,'languages',{get:()=>['en-US','en']});}''')awaitpage.evaluate('''()=>{Object.defineProperty(navigator,'插件',{get:()=>[1,2,3,4,5,6],});}''')awaitpage.waitFor(3000)asyncdefpage_close(self,browser):for_pageinawaitbrowser.pages():await_page.close()awaitbrowser.close()definput_time_random(self):returnrandom.randint(100,151)defrun(self,username=USERNAME,password=PASSWORD,url=LOGIN_URL):loop=asyncio.get_event_loop()i_future=asyncio.ensure_future(self.login(username,password,url))loop.run_until_complete(i_future)returni_future.result()if__name__=='__main__':Z=Login(shopId="001")Z.run()Celery任务文件是这样的#-*-coding:utf-8-*-from__future__importabsolute_importimportosimportsysimporttimefromdb.redisCurdimportRedisQueuefromsend_msg.weinxinimportSend_msgbase_dir=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))sys.path.append(base_dir)fromlogger.loggerimportlog_vfromceleryimportTaskfromplatLogin.loginimportLogin#登陆类fromceleryimportCeleryrandomQueue=RedisQueue("_app=cookie")celeryCelery('task')celery_app.config_from_object('celeryConfig')S=Send_msg()dl_dict={'demo':{'cookie':'','loginClass':'Login',}}#todo这是三种运行的状态classtask_status(Task):defon_success(self,retval,task_id,args,kwargs):log_v.info('任务信息->id:{},arg:{},successful.....Done'。格式(task_id,args))defon_failure(self,exc,task_id,args,kwargs,einfo):log_v.error('taskid:{},arg:{},失败!错误:{}'.format(task_id,args,exc))defon_retry(self,exc,task_id,args,kwargs,einfo):log_v.warning('taskid:{},arg:{},retry!info:{}'.format(task_id,args,exc))#todo随机找一个hashkey作为轮询对象。Celery在win10系统中可能不稳定,有时会断开连接@celery_app.task(base=task_status)defget_cookie_status(platName="demo"):try:#log_v.debug(f'[+]polling{platName}timerstart.....Done')randomQueue.get_hash(platName).decode()log_v.debug(f'[+]polling{platName}Success.....Done')return"Erppollingsucceeded"除了:return"Erppollingfailed"@celery_app.task(base=task_status)defset_plat_cookie(platName="demo",shopId=None):log_v.debug(f"[+]{platName}正在登录")core=eval(dl_dict[platName]['loginClass'])(shopId=shopId)result=core.run()returnresultCelery配置文件是这样的KER_URL=f'redis://root:{parse.quote("你的不规则密码")}@host:6379/15'#导入任务,如tasks.pyCELERY_IMPORTS=('monitor.tasks',)#column中的任务加载的默认序列化方式CELERY_TASK_SERIALIZER='json'#结果序列化方式CELERY_RESULT_SERIALIZER='json'CELERY_ACCEPT_CONTENT=['json']CELERY_TIMEZONE='Asia/Shanghai'#指定时区,不指定则默认为'UTC'#CELERY_TIMEZONE='UTC'CELERYBEAT_SCHEDULE={'add-every-60-seconds':{'task':'tasks.get_cookie_status','schedule':datetime.timedelta(minutes=1),#每1分钟执行一次'args':()#taskfunctionparameter},}Startservicecelery-Atasksbeat-lINFOcelery-Atasksworker-lINFO-c2以2个线程启动消费者队列服务,开启定时任务。当发现当前平台的cookie不可用时,我会向Celery发送信号(也就是调用之前的set_plat_cookie方法),消费者会执行自动化脚本获取cookie并存储到Redis中。消费者得到这个任务。当数据空闲时,Celery中的get_cookie_status方法会每分钟向Redis请求一次数据,也就是我们设置的1分钟心跳。这样不管我们的Celery是否在后台启动,都不会出现假死或者卡死的情况,一切都会好起来的!!6小结为了解决windows下Celery的弊端,给Celery任务队列设置一个心跳时间,比如每分钟或者每五分钟向Redis数据库发送一次数据,保证队列一直处于活动状态,这样只要由于你的电脑没有关机,网络是开着的(如果是远程Redis的话),Celery任务队列服务不会出现假死,Stuck状态。
