异步任务是web后端开发中最常见的需求,非常适合多任务、高并发场景。本文分享如何使用docker-compose、FastAPI、rq快速创建一个包含异步任务队列集群的RESTAPI,后端执行任务的节点可以随意扩展。系统架构图:上图中的每个方框都可以理解为一个服务器。用户请求api,api将任务放入redis队列,worker自动去redis队列取出任务执行。worker节点可以随意横向扩展。下面我们来实现一下这个架构的demo,就可以看到docker的强大和方便了。1.先创建一个虚拟环境。安装依赖于fastapi、redis和rq库。安装后生成一个requirements.txt文件mkdirmyprojectpython3-mvenvenvsourceenv/bin/activatepipinstallrqpipinstallfastapipipinstallredispipfreeze>requirements.txt2.编码实现RESTAPI,WorkerREST是一种风格,这里不是重点,我们使用FastAPI快速创建接口,新建一个api.py文件,内容如下:fromfastapiimportFastAPIfromredisimportRedisfromrqimportQueuefromworkerimportsend_captchaapp=FastAPI()#需要注意的是这里的host是主机名,是docker中的服务名,后面的docker-compose.ymal中的服务名应该也就是这个redis_conn=Redis(host='myproj_redis',port=6379,db=0)#定义一个名为my_queueq=Queue('my_queue',connection=redis_conn)@app的队列。get('/hello')defhello():"""Testendpoint"""return{'hello':'world'}#RestAPIexample@app.post('/send_captcha/{phone_number}',status_code=201)defaddTask(phone_number:str):"""Addstaskstoworkerqueue.ExpectsbodyasdictionarymatchingtheGroupclass."""job=q.enqueue(send_captcha,phone_number)return{'job':"tasksadddone."}这里的send_captcha函数是一个异步任务。从worker.py导入,worker.py内容如下:importtimedefsend_captcha(phone_number):"""模拟一个耗时的异步任务"""print(f'{time.strftime("%T")}是准备发送手机验证码')#inplaceofactualloggingprint(f'{time.strftime("%T")}生成随机验证码存入redis,并设置过期时间5分钟')time.sleep(5)#simulatelongrunningtaskprint(f'{time.strftime("%T")}{phone_number}sendcomplete')return{phone_number:'taskcomplete'}return{phone_number:'taskcomplete'}3.构建Docker镜像现在目标是实现一个有两个执行节点的集群我们需要启动4个容器来完成一个集群部署:容器1:运行FastAPIapp容器2:运行Redis服务容器3:运行worker1服务容器4:运行worker2服务Containers1、3和4都是Python应用程序。可以共享Python图像。为了方便调试,我们可以让1、3、4容器共享我们本地的路径,这样改完代码就不用重新构建代码了,比较方便。创建一个包含依赖项的Python镜像现在让我们创建一个包含之前requirements.txt依赖项的Python镜像,并编写一个包含以下内容的Dockerfile:FROMpython:3.8-alpineRUNadduser-DmyprojWORKDIR/home/myprojCOPYrequirements.txtrequirements.txtRUNpipinstall-rrequirements.txtRUNchown-rmyproj:myproj./USERmyprojCMDuvicornapi:app--host0.0.0.0--port5057内容说明:FROMpython:3.8-alpine指定使用python:3.8-alpine,本容器已预装Python3.8,你可以在命令行python执行dockersearch,查看有哪些Python镜像可用。RUNadduser-Dmyproj添加一个用户myproj,这一步的主要目的是生成目录/home/myprojWORKDIR/home/myproj设置程序的执行路径为/home/myprojCOPYrequirements.txtrequirements.txt复制需求在当前路径下。txt到容器的/home/myproj。这里没有复制.py文件,因为我们后面启动容器的时候会共享本地路径,不需要复制。生产部署的时候最好复制到window里面,这样容器就不会依赖了。这台机器。RUNpipinstall-rrequirements.txt在容器中安装依赖RUNchown-Rmyproj:myproj./将/home/myproj路径下文件的属主和属组改为myproj,这一步是使用myproj用户startfastapi服务和生产环境一般都是root用户启动的,所以不需要这个命令。USERmyproj切换到myproj用户CMDuvicornapi:app--host0.0.0.0--port5057容器启动后执行的命令,服务端口为5057更多Dockerfile语法请参考官方文档,这里只是简单介绍描述。现在在Dockerfile所在目录执行如下命令构建镜像:dockerbuild-tmyproject:latest。创建完成后可以使用dockerimages查看:?dockerimages|grepmyprojmyproject4.启动集群这里使用DockerCompose启动4个容器,为什么要使用DockerCompose呢?因为方便,如果不用的话,每次需要手动启动一个容器。DockerCompose会读取一个yaml格式的配置文件,根据配置文件启动容器,每个容器共享同一个网络。记住api.py中使用的Redis主机名,这里你需要将redis服务名称设置为该主机名。写一个docker-compose.yml,内容如下:version:'3'services:myproj_redis:image:redis:4.0-alpineports:-"6379:6379"volumes:-./redis:/datamyproj_api:image:myproject:latestcommand:uvicornapi:app--host0.0.0.0--port5057ports:-"5057:5057"volumes:-./:/home/myprojmyproj_worker1:image:myproject:latestcommand:rqworker--urlredis://myproj_redis:6379my_queuevolumes:-./:/home/myprojmyproj_worker2:image:myproject:latestcommand:rqworker--urlredis://myproj_redis:6379my_queuevolumes:-./:/home/myproj第一个容器是myproj_redis,里面运行的是redis服务,redis数据是通过volumes保存在本地,所以需要在本地创建一个redis目录来映射容器内部的/data目录。第二个容器是fastapi服务,端口5057,使用本地路径映射到/home/myproj第三个容器和第四个容器是worker节点,虽然也映射了本地路径,但是它只使用了worker.py文件.当任务过多时,可以扩展worker节点来解决负载压力。最终目录如下:执行dockercompose命令启动4个容器:dockercompose-fdocker-compose.ymlup可以看到4个服务全部启动,日志输出正常。4.测试现在我们来测试一下。在左侧窗口,我使用Python快速发送了3个post请求:importsubprocessforiinrange(3):subprocess.run("curl-v-XPOST'http://localhost:5057/send_captcha/18012345678'",shell=True)From右侧窗口的日志输出,可以看到worker1和worker2都执行了任务,其中worker1执行了2个task,worker2执行了1个task。查看完整代码,请点击“阅读原文”。最后,本文分享如何使用Dockerfile构建镜像,使用DockerCompose管理容器集群,并在此基础上实现一个带有异步任务队列集群的RESTAPI。docker-compose的详细使用方法请参考Docker官方文档
