异步任务是web开发中经常遇到的问题。比如用户提交了一个请求,虽然这个请求对应的任务很耗时,但是你不能让用户在这里等,通常需要立即返回结果,告诉用户任务已经提交了。任务可以在后续慢慢完成,完成后会给用户发送完成通知。今天分享一段代码,使用Celery、RabbitMQ和MongoDB实现一个异步任务工作流,可以修改task.py来实现自己的异步任务。架构图如下:Celery用于执行异步任务,RabbitMQ作为消息队列,MongoDB用于存储任务执行结果,FastAPI提供web接口。以上所有模块都可以使用Docker一键部署。下面是demo的使用方法:1.确保机器上安装了Docker和Git2。下载源代码:gitclonehttps://github.com/aarunjith/async-demo.git3。部署并启动:cdasync-demodockercomposeup--build4。启动一个异步任务:$curl-XPOSThttp://localhost:8080/process任务会被发送到消息队列,并立即返回一个任务id:?curl-XPOSThttp://localhost:8080/进程{“状态”:“待处理”,“id”:“a129c666-7b5b-45f7-ba54-9d7b96a1fe58”,“错误”:“”}%5。查询任务状态:curl-XPOSThttp://localhost:8080/check_progress/任务完成后返回结果如下:?curl-XPOSThttp://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58{"status":"SUCEESS","data":"\"hello\""}%代码目录结构如下:app.py如下:fromfastapiimportFastAPIfromcelery。resultimportAsyncResultfromtasksimportstart_processingfromloguruimportloggerfrompymongoimportMongoClientimportuvicorn#让我们创建一个连接到我们的后端,芹菜存储结果client=MongoClient("mongodb://mongodb:27017")#默认数据库和集合名称Celerycreatedb=client['task_results']coll=db["celery_taskmeta"]app=FastAPI()@app.post('/process')asyncdefprocess_text_file():'''Processendpointtotriggerthestartofaprocess'''try:result=start_processing.delay()logger.info(f'Startedprocessingthetaskwithid{result.id}')return{"status":result.state,'id':result.id,'error':''}exceptExceptionase:logger.info(f'TaskExecutionfailed:{e}')return{"status":"FAILURE",'id':None,'error':e}@app.post('/check_progress/{task_id}')asyncdefcheck_async_progress(task_id:str):'''检查任务进度并在任务完成时获取结果的端点。'''try:result=AsyncResult(task_id)ifresult.ready():data=coll.find({'_id':task_id})[0]return{'status':'SUCEESS','data':data['结果']}否则:rreturn{"status":result.state,"error":''}除了Exceptionase:data=coll.find({'_id':task_id})[0]ifdata:return{'status':'SUCEESS','data':data['result']}return{'status':'TaskIDinvalid','error':e}if__name__=="__main__":uvicorn.run("app:app",host='0.0.0.0',port='8080')如果想实现自己的任务队列,修改task.py加入自己的异步任务,可以集成到自己的项目中