当前位置: 首页 > 科技观察

基于Redis配置Celery

时间:2023-03-15 08:32:07 科技观察

作为分布式异步计算框架。虽然Celery常用于Web框架,但也可以单独使用。虽然常规搭配的消息队列是RabbitMQ,但是由于系统在某些情况下已经包含了Redis,所以可以复用。抛开web框架不谈,下面介绍如何基于Redis配置Celery任务。pipinstallcelery[redis]项目结构$treeyour_projectyour_project├──__init__.py├──main.py├──celery.py└──tasks.py0directories,4files其中main.py是触发Task的业务代码。当然,文件名可以随意更改。celery.py是Celery的app定义位置,tasks.py是Task定义位置。不建议修改文件名。配置Celery,在celery.py中写入如下代码:fromceleryimportCeleryfrom.settingsimportREDIS_URLAPP=Celery(main=__package__,broker=REDIS_URL,backend=REDIS_URL,include=[f'{__package__}.tasks'],)APP.conf.update(task_track_started=True)其中REDIS_URL是从同一个配置settings.py导入的,形式大概是redis://localhost:6379/0。这里Redis既用作代理又用作后端。即既是消息队列又是结果反馈的数据库(默认只保存1天)。在include=中需要填写一个下游worker的包名列表。这里选择了同一个包的tasks.py文件。额外设置task_track_started是为了命令Worker反馈STARTED状态。默认情况下,无法知道任务何时开始执行。在tasks.py文件中编写任务和调用,添加异步任务的实现。from.celeryimportAPP@APP.taskdefdo_sth():pass在需要发起任务的地方,使用.apply_async触发异步调用。即它实际上只是向消息队列发送消息,真正的执行操作是远程的。fromcelery.resultimportAsyncResultfrom.tasksimprtdo_sthresult=do_sth.apply_async()assertisinstance(result,AsyncResult)RunWorker:celery-Ayour_projectworker运行原理一个task从触发到完成,时序图如下:其中main代表业务的主要流程代码。它可以是Django、Flask之类的Web服务,也可以是其他类型的进程。Worker指的是Celery的Worker。main发送消息后,会得到一个AsyncResult,里面有task_id。仅通过task_id,也可以自己构造一个AsyncResult来查询相关信息。其中,代表运行进程的主要是状态。worker会继续关注Redis(或其他消息队列,如RabbitMQ)并查询新消息。如果你收到一条新消息,在消费它之后,开始运行do_sth。操作完成后,会将返回值对应的结果和一些操作信息写回Redis(或其他后端,如Django数据库等)。在系统的任何地方,都可以通过对应的AsyncResult(task_id)查询到结果。CeleryTask的状态如下:除了SUCCESS,还有失败(FAILURE)和取消(REVOKED)两种结束状态。RETRY是设置重试机制后进入的临时等待状态。另外,如果清除保存在Redis中的结果信息(默认只保存1天),那么任务状态会再次变为PENDING。这在设计上是一个很大的问题,使用的时候一定要容错。常见的控制操作result=AsyncResult(task_id)#阻塞等待返回result.wait()#取消任务result.revoke()#删除任务记录result.forget()有时候需要等待异步操作的结果主要业务流程。当你需要使用等待。如果要取消排队或执行的任务,可以使用revoke。即使任务已经执行完毕,也可以使用Revoke,但是什么都不会改变。如果需要提前删除任务记录,可以使用forget。