当前位置: 首页 > 科技观察

分布式任务队列Celery的实践

时间:2023-03-14 21:11:27 科技观察

笔者在最近的工作中接触到了Celery。这是一个开源的分布式任务队列(DistributedTaskQueue)。Github上有18kstar,主要可以用来实现应用。的异步任务和定时任务,虽然是用Python写的,但是协议可以用任何语言实现,比如gocelery,nodecelery和celery-php。作者写这篇文章是为了总结一下对Celery的理解以及在工作中的使用。本文大致内容如下:什么是任务队列;芹菜做什么;Celery在工作中的实践。后端同学应该都知道什么任务队列就是“MessageQueue”。常见的有RabbitMQ、RocketMQ、Kafka。至于“TaskQueue”,笔者在接触Celery之前从未听说过。什么是任务队列,任务队列和消息队列是什么关系。带着疑问,我们来看看Celery的架构:在Celery的架构中,可以看到多个服务器发起异步任务(AsyncTasks),将任务发送到Broker的队列中,其中CeleryBeat进程可以负责发起定时任务。任务。当Task到达Broker时,会分发给对应的CeleryWorker进行处理。处理任务时,结果存储在后端。在上述过程中,Celery没有实现Broker和Backend,而是使用了现有的开源实现,比如RabbitMQ作为Broker提供消息队列服务,Redis作为Backend提供结果存储服务。Celery就像是抽象了消息队列架构中Producer和Consumer的实现。它将消息队列中的基本单元“消息”抽象为任务队列中的“任务”,进行异步、定时的任务发起、结果存储等操作。封装使得开发者可以忽略AMQP、RabbitMQ等实现细节,为开发带来便利。综上所述,Celery作为任务队列是在消息队列的基础上进一步封装,其实现依赖于消息队列。接下来,通过一个简单的应用程序来了解Celery到底做了什么。Celery做什么在应用程序开发中,为了保证响应速度,通常会异步处理一些耗时且不影响进程的操作。例如,在用户注册过程中,通常会异步发送邮件通知用户。让我们看看Celery是如何实现这个异步操作的。在task.py中声明了发送邮件的方法send_mail,并在其中添加了Celery提供的@app.task装饰器。通过这个装饰器,send_mail函数可以变成一个celery.app.task:Task实例对象。Task实例可以提供两个核心功能:向队列发送消息;声明Worker收到消息后需要执行的具体功能。fromceleryimportCeleryapp=Celery('tasks',broker='amqp://guest@localhost//')@app.taskdefsend_mail(email):print("sendmailto",email)importtimetime.sleep(5)返回“成功”任务已经定义完成后,如果要发起异步任务,可以调用Task的delay方法,将消息发送到队列中。例如用户注册完成后,发起发送邮件的异步任务:#user.pyfromtasksimportsend_maildefregister():print("1.向数据库中插入记录")print("2.通过celery异步发送邮件")send_mail.delay("chaycao@gmail.com")print("3.告诉用户注册成功")if__name__=='__main__':register()运行上面程序后,消息已经发送到RabbitMQ队列。可以观察到消息格式如下:RabbitMQ中的Task可以看出Celery封装的消息中包含了任务标识和运行参数。然后,启动Worker来消费RabbitMQ中的消息:celery-Atasksworker--loglevel=infoWorker启动后可以看到如下打印信息:Worker,然后从RabbitMQ的任务列表中成功获取消息并执行相应的Task。通过上面的例子,可以进一步理解Celery作为任务队列框架所做的工作,而“分布式任务队列”中的“分布式”是指可以有多个Producers和Consumers,即多个进程向Broker发送任务,多个Worker从Broker获取并执行Tasks。上面只是一个简单的例子,接下来我们看一下自己在工作中接触到的一些使用Celery的实践经验。Celery在工作中的做法是根据业务场景划分队列。在我做的项目中,使用Celery来处理下单、解析曲目、上游推送等异步和定时任务。根据每个Task的业务场景,可以为其指定对应的队列,例如:DEFAULT_CELERY_ROUTES={'celery_task.pending_create':{'queue':'create'},'celery_task.multi_create':{'queue':'create'},'celery_task.pull_tracking':{'queue':'pull'},'celery_task.pull_branch':{'queue':'pull'},'celery_task.push_tracking':{'queue':'push'},'celery_task.push_weight':{'queue':'push'},}CELERY_ROUTES={DEFAULT_CELERY_ROUTES}根据业务场景,在DEFAULT_CELERY_ROUTES配置中指定Task对应的6个Queue。有3个队列create,pull,push,将这条路由规则加入CELERY_ROUTES即可生效。这样设计的目的是为了保证不同的场景不会互相影响。例如,解析任务的阻塞不应影响下订单任务。队列按照业务场景粗略划分后,对于某个场景,可能需要更细化的划分。比如在向上游推送的时候,为了避免一个上游的阻塞影响到其他上游的推送,就需要让不同的上游相互通信。互不影响。所以不同的upstream需要使用不同的queue,例如:CLIENT_CELERY_ROUTES={#{0}是client的占位符,格式在ClientRouter'celery_task.push_tracking_retry':{'queue':'push_tracking_retry_{0}'},'celery_task.push_weight_retry':{'queue':'push_weight_retry_{0}'},}classClientRouter(object):defroute_for_task(self,task,args=None,kwargs=None):iftasknotinCLIENT_CELERY_ROUTES:returnNoneclient_id=kwargsid('cli)#根据client_id获取队列名queue_name=CLIENT_CELERY_ROUTES[task]['queue'].format(client_id)return{'queue':queue_name}CELERY_ROUTES={'ClientRouter'DEFAULT_CELERY_ROUTES,}在CLIENT_CELERY_ROUTES中指定需要隔离队列根据ClientTask及其对应的Queue名称格式,队列名称包含一个占位符,以便根据不同的客户端获取不同的队列名称。然后实现了一个路由器ClientRouter,其中定义了router_for_task方法,其作用是为任务指定对应的队列名。可以看出逻辑是,如果task在CLIENT_CELERY_ROUTES中,会把队列名格式化成kwargs中的client_id,获取最终消息的队列名,从而确定具体使用的队列根据入参client_id,从而隔离不同Client使用不同队列的效果。除了在Client维度划分,如果需要在其他维度进一步划分队列达到隔离的效果,也可以参考该方法设计路由规则。动态队列我们来说说动态队列。它的本质是一个储备队列。其目的是减轻在线环境中一些队列消息堆积的压力,起到快速支撑的作用。通过配置定义动态队列需要支持哪些队列。比如当push队列压力大的时候,可以这样配置json,将push_tracking和push_weight这两个task路由到预留的动态队列中。celery_dynamic_router配置{"celery_task.push_tracking":{"dynamic_queue":[1,2],"dynamic_percentage":0.7,},"celery_task.push_weight":{"dynamic_queue":[3,4],"dynamic_percentage":0.7,}}以上配置的效果是将70%的celery_task.push_trackingTask路由到动态队列1和2,将70%的celery_task.push_weightTask路由到动态队列3和4。动态队列的路由器DynamicRouter大致实现如下:classDynamicRouter(object):defroute_for_task(self,task,args=None,kwargs=None):#获取配置)#taskif如果不在配置中,直接返回ifnottask_config:returnNone#获取任务对应的动态队列配置路由到动态队列ifrandom.random()<=dynamic_percentage:#确定使用哪个动态队列queue_name=router_load_balance(dynamic_queue,task_name)log.data('get_router|task_name:%s,queue:%s',task_name,queue_name)return{'queue':queue_name}else:returnNone动态配置定时任务上面说了Celery不仅可以实现异步任务,也可以通过CeleryBeat实现定时任务。首先我们来看一个例子:fromcelery.schedulesimportcrontabapp.conf.beat_schedule={#每30秒发送一次邮件'sendmail-every-30-seconds':{'task':'asks.send_mail','schedule':30.0,'args':['chaycao@gmail.com']},}done经过以上配置后,执行CeleryBeat命令:celerybeat根据配置每30秒执行一次send_email任务。上面的例子就是在代码中配置定时任务。笔者在工作中使用了djcelery提供的数据库调度模型,结合django提供的ORM功能动态设置更方便。下面介绍如何实现。首先在Celery配置中添加:CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler'设置使用DatabaseScheduler,然后生成定时任务的配置表:pythonmanage.pymigrate可以看到数据库中添加了如下表:|celery_taskmeta||celery_tasksetmeta||djcelery_crontabschedule||djcelery_intervalschedule||djcelery_periodictask||djcelery_periodictasks||djcelery_taskstate||djcelery_workerstate|完成以上操作,最后只需要执行CeleryBeat命令,它会去数据库读取配置到initiateascheduledtask这样做的好处是可以通过修改数据库中的记录来动态配置定时任务,比如调整任务的周期或者参数。以上就是我在工作中从Celery中学到的东西。如果需要实现异步任务和定时任务,可以考虑使用Celery。我是GrassPinch,一只热爱科技热爱生活的草鱼,我们下期再见!参考MessageQueuevsTaskQueue区别(https://newbedev.com/message-queue-vs-task-queue-difference)高性能异步框架Celery入门指南(https://juejin.cn/post/6844903689103081480)分布式任务队列Celery——深度任务(https://www.cnblogs.com/jmilkfan-fanguiju/p/10589779.html)