Celery是一个基于Python的分布式调度系统。文档在这里。最近需要在不重启celery服务的情况下动态添加任务。找了一圈也没找到什么好的解决办法(也可能是我没仔细看文档),只能自己实现,给celery动态添加任务。首先,我想到了传入一个函数,让一个特定的任务执行传入的函数,像这样@app.taskdefexecute(func,*args,**kwargs):returnfunc(*args,**kwargs)不幸的是,就会出现这样的错误kombu.exceptions.EncodeError:Objectoftype'function'isnotJSONserializable改变一个序列化方法@app.task(serializer='pickle')defexecute(func,*args,**kwargs):returnfunc(*args,**kwargs)导致一系列错误消息ERROR/MainProcess]Poolcallbackraisedexception:ContentDisallowed('Refusingtodeserializeuntrustedcontentoftypepickle(application/x-python-serialize)',)追溯(最近调用最后):文件“/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py”,第42行,在__get__returnobj.__dict__[self.__name__]KeyError:'chord'在处理上述异常的过程中,又发生了一个异常:Traceback(mostrecentcalllast):File"/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py",line42,in__get__returnobj.__dict__[self.__name__]KeyError:'_payload'另一种思路func=import_string(func)不知道这样可以不,结果test:不行,飞年不利,最后我一直测试啊测试,终于找到了直接上传代码的方法fromimportlibimportimport_module,reloadapp.conf.CELERY_IMPORTS=['task','task.all_task']defimport_string(import_name):import_name=str(import_name).replace(':','.')modules=import_name.split('.')mod=import_module(modules[0])forcompinmodules[1:]:如果不是hasattr(mod,comp):reload(mod)mod=getattr(mod,comp)returnmod@app.taskdefexecute(func,*args,**kwargs):func=import_string(func)returnfunc(*args,**kwargs)projectstructure这是├──celery_app.py├──config.py├──task│├──all_task.py│├──__init__.py注意:任务必须大于等于两级目录添加到all_task.py,调用时无需重启celery服务all_task.ee',2,444)好的,找到了cElery也支持任务定时调用,像这样execute.apply_async(args=['task.all_task.aa'],eta=datetime(2017,7,9,8,12,0))简单的实现了一个任务的重复调用function@app.taskdefinterval(func,seconds,args=(),task_id=None):next_run_time=current_time()+timedelta(seconds=seconds)kwargs=dict(args=(func,seconds,args),eta=next_run_time)iftask_idisnotNone:kwargs.update(task_id=task_id)interval.apply_async(**kwargs)func=import_string(func)returnfunc(*args)大概意思是先计算下一次运行时间,再添加task在celery队列中,task_id有问题,因为假设每3s添加一个task,它的task_id默认会使用uuid生成。如果要去掉这个任务,就不方便了。自定义task_id可能会更好,另外可能需要判断task_id是否存在AsyncResult(task_id).stateok,然后提供一个有用的函数frominspectimportgetmembers,isfunctiondefget_tasks(module='task'):return[{'name':'task:{}'.format(f[1].__name__),'doc':f[1].__doc__,}forfingetmembers(import_module(module),isfunction)]就是这样。
