在日常工作中,我们经常会用到需要周期性执行的任务。一种方法是使用Linux系统自带的crond[1]结合命令行来实现。另一种方法是直接使用Python。下面是常见的Python定时任务的实现。1、使用whileTrue:+sleep()实现定时任务。time模块中的sleep(secs)函数可以让当前正在执行的线程暂停secs秒后继续执行。所谓暂停就是当前线程进入阻塞状态。当到达sleep()函数指定的时间后,会从阻塞状态转为就绪状态,等待CPU调度。基于这个特性,我们可以通过while无限循环+sleep()的方式实现简单的定时任务。代码示例:importdatetimeimporttimedeftime_printer():now=datetime.datetime.now()ts=now.strftime('%Y-%m-%d%H:%M:%S')print('dofunctime:',ts)defloop_monitor():whileTrue:time_printer()time.sleep(5)#暂停5秒if__name__=="__main__":loop_monitor()的主要缺点:只能设置间隔,并且不能指定具体的时间,比如每天早上8:00的sleep是一个阻塞函数,也就是说在sleep期间程序不能操作任何东西。2.使用Timeloop库运行定时任务Timeloop[2]是一个可以用来运行多周期任务的库。这是一个使用装饰器模式在线程中运行标记函数的简单库。示例代码:importtimefromtimeloopimportTimeloopfromdatetimeimporttimedeltatl=Timeloop()@tl.job(interval=timedelta(seconds=2))defsample_job_every_2s():print"2sjobcurrenttime:{}".format(time.ctime())@tl.job(interval=timedelta(seconds=5))defsample_job_every_5s():print"5sjobcurrenttime:{}".format(time.ctime())@tl.job(interval=timedelta(seconds=10))defsample_job_every_10s():print"10sjobcurrenttime:{}".format(time.ctime())3.使用threading.Timer实现定时任务。threading模块中的Timer是一个非阻塞函数。sleep好一点,对timer最基本的理解就是定时器。我们可以启动多个定时任务。这些定时器任务是异步执行的,所以不存在等待顺序执行的问题。Timer(interval,function,args=[],kwargs={})interval:指定时间function:要执行的方法args/kwargs:方法参数代码示例:importdatetimefromthreadingimportTimerdeftime_printer():现在=日期时间。datetime.now()ts=now.strftime('%Y-%m-%d%H:%M:%S')print('dofunctime:',ts)loop_monitor()defloop_monitor():t=Timer(5,time_printer)t.start()if__name__=="__main__":loop_monitor()备注:Timer只能执行一次,需要循环调用,否则只能执行一次4.使用内置模块sched实现定时任务sched模块实现了一个通用的事件调度器,它使用调度器类中的延迟函数来等待特定的时间并执行任务。同时支持多线程应用,每个任务执行完后会立即调用delay函数,保证其他线程也能执行。classsched.scheduler(timefunc,delayfunc)此类定义用于安排事件的通用接口。它需要从外部传入两个参数。timefunc是一个返回时间类型数字的无参函数(常用如time模块中的time),delayfunc应该是需要一个参数调用的函数,兼容timefunc的输出,作为多个的延迟时间单位(常用的,比如time模块的sleep)。代码示例:importdatetimeimporttimeimportscheddeftime_printer():now=datetime.datetime.now()ts=now.strftime('%Y-%m-%d%H:%M:%S')print('dofunctime:',ts)loop_monitor()defloop_monitor():s=sched.scheduler(time.time,time.sleep)#生成调度程序s.enter(5,1,time_printer,())s.run()if__name__=="__main__":loop_monitor()scheduler对象的main方法:enter(delay,priority,action,argument),安排一个事件延迟delay时间单位。cancel(event):从队列中移除事件。如果事件当前不在队列中,此方法将引发ValueError。run():运行所有预定的事件。该函数将等待(使用传递给构造函数的delayfunc()函数)然后执行事件,直到没有更多的计划事件。个人点评:比threading.Timer好,不需要循环调用。Python是一门非常多样化和发达的语言,所以肯定有很多我没有考虑到的功能。有知道的可以在评论区告诉我。看到这里,走之前请给我点个赞吧~感谢阅读。
