当前位置: 首页 > 后端技术 > Python

Python定时任务框架:APScheduler源码分析(三)

时间:2023-03-26 14:23:14 Python

前言距离上次APScheduler源码分析已经有一段时间了,现在有点闲,就写最后一篇吧。本文分析APScheduler执行器相关的代码。回顾一下,首先回忆一下APScheduler是如何工作的?例如查看代码。scheduler=BackgroundScheduler()scheduler.add_job(tick,'interval',seconds=3)#添加任务并在3秒后运行scheduler.start()。简单的说就是实例化BackgroundScheduler,然后调用add_job方法添加任务,最后调用start方法启动。add_job方法通过上一篇文章的分析已经知道,即方法存放在内存dict中,interval指定trigger为interval触发器,interval为3秒。现在看启动方法。start方法BackgroundScheduler的start方法调用BaseScheduler类的start方法,其代码如下。#apscheduler/schedulers/base.py/BaseSchedulerdefstart(self,paused=False):ifself.state!=STATE_STOPPED:raiseSchedulerAlreadyRunningError#检查:如果我们在uWSGI线程禁用状态下运行,返回相应的错误警报self._check_uwsgi()withself._executors_lock:#Createadefaultexecutorifnothingelseisconfigured#如果'default'不在self._executors:中创建默认执行程序alias,executorinself._executors.items():executor.start(self,alias)withself._jobstores_lock:#Createadefaultjobstoreifnothingelseisconfigured#Createadefaultstorageif'default'notinself._jobstores复制代码:self.add_jobstore(self._create_default_jobstore(),'default')#启动所有作业存储ias,storeinself._jobstores.items():store.start(self,alias)#为job,jobstore_alias,replace_existinginself._pending_jobs:self._real_add_job(job,jobstore_alias,replace_obsisting)delending_job.[:]self.state=STATE_PAUSEDifpausedelseSTATE_RUNNINGself._logger.info('Schedulerstarted')self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_START))ifnotpaused:self.wakeup()start方法的代码含义很直观,也就是创建一个默认的执行器和默认的内存,同时调用想要的启动方法。executor的start方法传入self(调度器本身)和alias。executor的start方法是做什么的?默认执行器的启动方法在BaseExecutor类中,其代码如下_create_lock()self._logger=logging.getLogger('apscheduler.executors.%s'%alias)可以发现start方法其实什么也没做。APScheduler的默认执行器是线程执行器#apscheduler/schedulers/base.py/BaseSchedulerdef_create_default_executor(self):"""创建一个默认的执行器存储,特定于特定的调度器类型。"""returnThreadPoolExecutor()本质上是使用ThreadPoolExecutor,但注意它继承自BasePoolExecutor,BasePoolExecutor又继承自BaseExecutor。#apscheduler/executores/pool.pyclassThreadPoolExecutor(BasePoolExecutor):def__init__(self,max_workers=10):pool=concurrent.futures.ThreadPoolExecutor(int(max_workers))super().__init__(pool)如何调用执行器?让我们谈谈_process_jobs方法。这个方法在《Python定时任务框架:APScheduler源码分析(二)》中有详细分析,这里是due_jobs中job的一些相关代码:#查找job的执行者#搜索当前任务对象的执行者try:executor=self._lookup_executor(job.executor)exceptBaseException:#...omit#获取运行时间try:#将这个任务提交给executorexecutor.submit_job(job,run_times)exceptMaxInstancesReachedError:#...省略的一般逻辑是从jobstore中获取job任务对象,然后将job任务对象提交给In执行器,submit_job方法的具体实现在BaseExecutor类中,其逻辑如下。#apscheduler/executors/base.py/BaseExecutordefsubmit_job(self,job,run_times):#self._lock是RLockassertself._lockisnotNone,'Thisexecutorhasnotbeenstartedyet'withself._lock:ifself._instances[job.id]>=job.max_instances:raiseMaxInstancesReachedError(job)self._do_submit_job(job,run_times)self._instances[job.id]+=1submit_job方法首先判断是否存在可重入锁,如果存在在存在锁定的情况下,使用_do_submit_job方法执行job任务对象。因为使用了默认的线程执行器,其_do_submit_job方法只是将job任务对象提交到线程池,对应代码如下#apscheduler/executors/pool.py/BasePoolExecutordef_do_submit_job(self,job,run_times):def回调(f):exc,tb=(f.exception_info()ifhasattr(f,'exception_info')else(f.exception(),getattr(f.exception(),'__traceback__',None)))ifexc:self._run_job_error(job.id,exc,tb)其他:self._run_job_success(job.id,f.result())f=self._pool.submit(run_job,job,job._jobstore_alias,run_times,self._logger.name)f.add_done_callback(callback)在_do_submit_job方法中,初始定义了一个回调函数,用于接收线程池执行任务的结果。如果成功,它会调用_run_job_success方法,如果失败,它会调用_run_job_error方法。所有方法都在BaseExecutor中。_run_job_success方法代码如下。#apscheduler/executors/base.py/BaseExecutordef_run_job_success(self,job_id,events):"""当:func:`run_job`被成功调用时,由执行者使用生成的事件列表调用。"""withself._lock:self._instances[job_id]-=1ifself._instances[job_id]==0:delself._instances[job_id]foreventinevents:self._scheduler._dispatch_event(event)这个方法会调用事件相关该机制通过APScheduler事件机制分发线程池执行作业任务对象的结果。APScheduler的事件机制下回再说。回过头来看,f=self._pool.submit(run_job,job,job._jobstore_alias,run_times,self._logger.name),job任务对象作为run_job方法的参数,所以job的执行其实,它是run_job方法。run_job方法run_job方法代码如下。#apscheduler/executors/base.pydefrun_job(job,jobstore_alias,run_times,logger_name):events=[]logger=logging.getLogger(logger_name)forrun_timeinrun_times:#misfire_grace_time:在指定的运行后几秒仍然运行作业timeJobrunningifjob.misfire_grace_timeisnotNone:difference=datetime.now(utc)-run_timegrace_time=timedelta(seconds=job.misfire_grace_time)#判断是否超时ifdifference>grace_time:#超时,则记录EVENT_JOB_MISSED事件到事件thisevents.append(JobExecutionEvent(EVENT_JOB_MISSED,job.id,jobstore_alias,run_time))logger.warning('%s错过了作业“%s”的运行时间',job,difference)continuelogger.info('Runningjob"%s"(scheduledat%s)',job,run_time)try:#executejobtaskobjectretval=job.func(*job.args,**job.kwargs)除了BaseException:exc,tb=sys.exc_info()[1:]formatted_tb=''.join(format_tb(tb))#job任务对象执行错误,将EVENT_JOB_ERROR添加到events.append(JobExecutionEvent(EVENT_JOB_ERROR,job.id,jobstore_alias,run_time,exception=exc,traceback=formatted_tb))logger.exception('Job"%s"raisedanexception',job)#为了防止循环引用,导致内存泄漏traceback.clear_frames(tb)deltbelse:events.append(JobExecutionEvent(EVENT_JOB_EXECUTED,job.id,jobstore_alias,run_time,retval=retval))logger.info('Job"%s"执行成功',job)returnevents在run_job方法中,先判断当前jobtask对象的运行时间是否超过misfire_grace_time(指定运行时间后job还在运行几秒),如果超时,会记录到events列表中,通过retval=job.func(*job.args,**job.kwargs)才是真正的执行任务对象。如果在执行过程中崩溃了,job任务对象的执行错误报告也会以事件的形式添加到events中。这是一个有趣的小技巧。job任务对象崩溃后,通过exc,tb=sys.exc_info()[1:]获取错误,而不是像往常一样打印Exception中的值。sys.exc_info方法会返回三个值:type(异常类别),value(异常描述,带参数),traceback(traceback对象,包含更丰富的信息),这里只取value和traceback信息,然后通过traceback的。format_tb方法将其格式化,记录到log中后,调用traceback.clear_frames(tb)方法回溯清除所有栈帧中的局部变量tb。APScheduler对这个方法的注释是“防止循环引用,导致内存泄漏”。有趣的。最后,本文主要分析APScheduler中线程执行器的源码。线程执行器的代码简单,是APScheduler默认的执行器。APScheduler也有许多不同的执行器。感兴趣的可以自行探索。如果你有风度,可以联系我一起浅谈。APScheduler源码与不同的executor、scheduler、trigger的设计理念类似,这里就不一一分析了,但是还有一个之前出现过但没有分析的,就是,APScheduler的“事件分发”机制。下一篇文章,我们来看看APScheduler的事件分发/监听是如何实现的。如果文章对你有帮助,点击“在看”支持二良,下篇文章见。