当前位置: 首页 > 后端技术 > Python

Python定时任务框架:APScheduler源码分析(一)

时间:2023-03-26 00:23:44 Python

前言APScheduler是Python中知名的定时任务框架,可以在很多方面满足定时执行或者周期性执行程序任务的需求,类似于crontabonlinux,不过crontab更强大。该框架不仅可以添加和删除定时任务,还可以提供多种持久化任务。APScheduler弱分布式框架,由于每个任务对象都存储在当前节点中,所以只能以人肉的形式进行分布式,比如使用Redis。第一次接触APScheduler,会发帖说它有很多概念。刚开始接触的时候,是因为概念太多。直接用crontab就舒服多了。但是现在公司的很多项目都是基于APScheduler实现的,下面简单介绍一下。源代码。先行概念用最简单的语言提及APScheduler中的关键概念。Job:任务对象,就是你要执行的任务。JobStores:任务存储方式,默认存储在内存中,也可以支持redis、mongodb等。Executors:执行器,就是执行任务的。触发对应的调用逻辑Scheduler:Scheduler,将以上部分串联起来。APScheduler提供了多个调度器。不同的Scheduler适用于不同的场景。目前我看到最多的就是BackgroundScheduler后台调度器。该调度器适用于后台运行程序的调度需求。还有其他各种调度程序:BlockingScheduler:适用于在一个进程中只运行单个任务,通常在调度程序是您唯一想运行的东西时使用。AsyncIOScheduler:适合使用asyncio框架GeventScheduler:适合使用gevent框架TornadoScheduler:适合使用Tornado框架的应用TwistedScheduler:适合使用Twisted框架的应用QtScheduler:适合QT的使用本文只分析BackgroundScheduler相关的逻辑,我们先简单看一下官方的例子,然后以此为切入点逐层分析。BackgroundScheduler官方示例代码分析如下。fromdatetimeimportdatetimeimporttimeimportosfromapscheduler.scheduler.backgroundimportBackgroundSchedulerdeftick():print('Tick!Thetimeis:%s'%datetime.now())if__name__=='__main__':scheduler=BackgroundScheduler()调度程序.add_job(tick,'interval',seconds=3)#添加任务,3秒后运行scheduler.start()print('PressCtrl+{0}toexit'.format('Break'ifos.name=='nt'else'C'))try:#这是为了模拟应用程序活动(保持主线程活动)。whileTrue:time.sleep(2)except(KeyboardInterrupt,SystemExit):#关闭调度器scheduler.shutdown()上面的代码很简单,先通过BackgroundScheduler方法实例化一个调度器,然后调用add_job方法设置将要执行的任务添加到JobStores中,默认是存储在内存中,更具体地说,是存储在一个dict中,最后通过start方法启动调度器,APScheduler会每隔3秒触发一个名为interval的trigger,让调度器执行器调度默认执行器执行tick方法中的逻辑。当所有程序都执行完毕后,调用shutdown方法关闭调度器。BackgroundScheduler其实是由线程组成的,线程有守护线程的概念。如果启用守护线程模式,则不必关闭调度程序。首先看BackgroundScheduler类的源码。#apscheduler/schedulers/background.pyclassBackgroundScheduler(BlockingScheduler):_thread=Nonedef_configure(self,config):self._daemon=asbool(config.pop('daemon',True))super()._configure(config)def复制代码start(self,*args,**kwargs):#创建事件通知#多个线程可以等待一个事件发生,事件发生后所有线程都会被激活。self._event=Event()BaseScheduler.start(self,*args,**kwargs)self._thread=Thread(target=self._main_loop,name='APScheduler')#设置为守护线程,在Python主线程之后运行,直接结束,忽略守护线程的情况,#如果是非守护线程,Python主线程会等待其他非守护线程运行完毕,才结束self._thread.daemon=self._daemon#daemon是否为Daemon线程self._thread.start()#启动线程defshutdown(self,*args,**kwargs):super().shutdown(*args,**kwargs)self._thread.join()delself._thread上面的代码中,给出了详细的注释,并做了简单的解释。_configure方法主要用于参数设置。这里定义了self._daemon参数,然后通过super方法调用父类的_configure方法。start方法就是它的启动方法,逻辑很简单。线程事件Event被创建。线程事件是一种线程同步机制。如果你看它的源码,你会发现线程事件是基于条件锁实现的。线程事件提供了set()、wait()、clear()三个主要方法。set()方法会将事件标志状态设置为true。clear()方法将事件标志状态设置为false。wait()方法阻塞线程,直到事件标志状态为真。线程事件创建后,调用其父类的start()方法。这个方法才是真正的启动方法。暂时放下吧。启动后,通过Thread方法创建一个线程。线程的目标函数是self._main_loop。它是调度器的主要训练。如果调度器没有关闭,它会一直执行主循环中的逻辑,从而实现APScheduler的各种功能。这是一个非常重要的方法。同样,暂时放下。创建好后,启动线程就ok了。创建线程后,定义线程的守护进程。如果daemon为True,则表示当前线程为守护线程,否则为非守护线程。简单提一下,如果线程是守护线程,那么Python主线程逻辑执行完后,会直接退出,与守护线程无关。如果是非守护线程,Python主线程执行完后,会等待其他所有非守护线程执行完毕后退出。shutdown方法先调用父类的shutdown方法,再调用join方法,最后直接删除线程对象。看完BackgroundScheduler类的代码,再回头看看开头的示例代码。通过BackgroundScheduler实例化scheduler之后,再调用add_job方法,在add_job方法中加入三个参数,就是你要定时执行的tick方法。trigger触发器的名字叫interval,这个trigger的参数是seconds=3。是否可以将触发器触发器的名称更改为任何字符?这不可能。APScheduler实际上在这里使用了Python中的入口点技术。如果你经历过制作Python包并上传到PYPI的过程,你应该对入口点有印象。其实entrypoint不仅可以永远打包,还可以用于模块化的插件架构。关于这方面的内容很多,我们以后再说。简单来说,add_job()方法需要传入对应的trigger名称,interval会对应apscheduler.triggers.interval.IntervalTrigger类,seconds参数就是这个类的参数。add_job方法分析add_job方法源码如下。#apscheduler/schedulers/base.py/BaseSchedulerdefadd_job(self,func,trigger=None,args=None,kwargs=None,id=None,name=None,misfire_grace_time=undefined,coalesce=undefined,max_instances=undefined,next_run_time=undefined,jobstore='default',executor='default',replace_existing=False,**trigger_args):job_kwargs={'trigger':self._create_trigger(trigger,trigger_args),'执行者':执行者,'func':func,'args':tuple(args)ifargsisnotNoneelse(),'kwargs':dict(kwargs)ifkwargsisnotNoneelse{},'id':id,'name':name,'misfire_grace_time':misfire_grace_time,'coalesce':coalesce,'max_instances':max_instances,'next_run_time':next_run_time}#过滤job_kwargs=dict((key,value)forkey,valueinsix.iteritems(job_kwargs)ifvalueisnotundefined)#Instantiateaspecifictaskobjectjob=Job(self,**job_kwargs)#在调度程序启动并运行之前,不要真正将作业添加到作业存储self._jobstores_lock:ifself.state==STATE_STOPPED:self._pending_jobs.append((job,jobstore,replace_existing))self._logger.info('暂定添加作业——它会在''调度程序启动'时被正确调度)else:self._real_add_job(job,jobstore,replace_existing)returnjobadd_job方法代码不多。一开始创建了一个job_kwargs字典,里面包含了trigger、executors等,通过self._create_trigger()方法创建了一个简单的trigger触发器,需要两个参数,代码中的trigger其实就是一个区间字符串,trigger_args是对应的参数。exectuor目前是默认的,后面会讲到。func回调方法是我们真正要执行的逻辑。trigger会触发scheduler,scheduler会调用executor执行具体的逻辑。misfire_grace_time:其注释解释为“指定运行时间后任务仍在运行几秒”,只有阅读相关文档才能理解。例如,一个任务本来是在12:00运行的,但是由于某些原因没有安排到12:00。现在是12:30。此时调度时会判断当前时间与预先调度时间的差值。如果misfire_grace_time设置为20,则调度失败的任务将不会被调度。如果misfire_grace_time设置为60,就会被调度。coalesce:如果某个任务因为某种原因没有真正运行,导致任务堆积,比如堆积10个相同的人,coalesce为True,只会执行最后一层,如果coalesce为False,则尝试连续执行10个second-速度。max_instances:通过task,最多同时运行几个instance。next_run_time:任务下次运行的时间再进行一次过滤,然后将参数传入Job类,完成任务对象的实例化。后面的逻辑就比较简单了。首先判断self._jobstores_lock锁是否可以获取。它实际上是一个可重入锁。在Python中,可重入锁的实现是基于普通的互斥锁,只是多了一个变量用于计数。每加一把锁,变量就加一,每次解锁,变量就减一。只有当变量为0时,互斥锁才真正被释放。获取到锁后,首先判断当前调度器的状态。如果是STATE_STOPPED(停止状态),则将任务添加到_pending_jobs待处理列表中。如果不是停止状态,调用_real_add_job方法,然后返回job对象。其实_real_add_job方法才是将任务对象job添加到指定存储后端的真正方法。当任务对象被添加到指定的存储后端(默认直接存储在内存中)时,调度器将检索它并执行。回到示例代码中,执行完调度器的add_job方法后,会立即执行调度器的start方法。考虑到最后的字数,本文就先打住,后面继续分析APScheduler。如果文章对你有帮助,点击“在看”支持二良,下篇文章见。