如果要周期性的执行某个Python脚本,最著名的选择应该是Crontab脚本,但是Crontab有以下缺点:1.不方便秒执行级任务。2、当需要执行的定时任务有上百个时,Crontab的管理会特别不方便。另一个选择是Celery,但是Celery的配置比较麻烦。如果你只是需要一个轻量级的调度工具,Celery将不是一个好的选择。当你想使用一个轻量级的任务调度工具,并希望它尽可能简单易用,不需要外部依赖,最好能容纳Crontab的所有基本功能,那么Schedule模块是你最好的选择。用它来安排任务可能只需要几行代码,感受一下:#Python实战宝典importscheduleimporttimedefjob():print("I'mworking...")schedule.every(10).minutes.do(job)whileTrue:schedule上面的代码.run_pending()time.sleep(1)表示job函数每10分钟执行一次,非常简单方便。只需要导入schedule模块,通过调用scedule.every(timenumber).timetype.do(job)发布周期任务。释放后的周期性任务需要使用run_pending函数来检测是否执行,所以需要一个While循环不断轮询这个函数。下面介绍Schedule模块的安装和主要、高级使用方法。1.在开始之前,您需要确保您的计算机上已经成功安装了Python和pip。如果没有,可以访问这篇文章:超详细的Python安装指南进行安装。(可选1)如果使用Python进行数据分析,可以直接安装Anaconda:Anaconda,Python数据分析和挖掘的好帮手,内置Python和pip。(可选2)另外,推荐大家使用VSCode编辑器,它有很多优点:Python编程的最佳拍档——VSCode详解指南。请选择以下方式之一输入命令安装依赖项:1.Windows环境打开Cmd(开始-运行-CMD)。2.在MacOS环境下,打开Terminal(command+空格进入Terminal)。3.如果你使用的是VSCode编辑器或者Pycharm,可以直接使用界面下方的Terminal.pipinstallschedule。2.基本使用最基本的使用在文章开头已经提到了。这里有更多调度任务的例子:#Python实战宝典importscheduleimporttimedefjob():print("I'mworking...")#每十分钟执行一次任务schedule.every(10).minutes.do(job)#每十分钟执行一次任务小时schedule.every().hour.do(job)#每天10:30执行任务schedule.every().day.at("10:30").do(job)#每月执行任务schedule.every().monday.do(job)#每周三13:15执行任务schedule.every().wednesday.at("13:15").do(job)#每分钟17秒执行任务schedule.every().minute.at(":17").do(job)whileTrue:schedule.run_pending()time.sleep(1)可以看到从月到秒的配置都被上面的例子覆盖了。但是如果只想让任务运行一次,可以这样配置:#Python实战集锦importscheduleimporttimedefjob_that_executes_once():#这里写的任务只会执行一次...returnschedule.CancelJobschedule.every().day.at('22:30').do(job_that_executes_once)whileTrue:schedule.run_pending()time.sleep(1)参数传递如果你有参数需要传递给作业执行,你只需要这样做:#Python实战宝典importscheduledefgreet(name):print('Hello',name)#do()给job函数传递附加参数schedule.every(2).seconds.do(greet,name='Alice')schedule.every(4).seconds.do(greet,name='Bob')获取所有当前工作如果想获取所有当前工作:#Python实战宝典importscheduledefhello():print('Helloworld')schedule.every().second。do(hello)all_jobs=schedule.get_jobs()取消所有作业如果触发某种机制,需要立即清除当前程序的所有作业:#PythonPracticalCollectionimportscheduledefgreet(name):print('Hello{}'.format(name))schedule.every().second.do(greet)schedule.clear()labelfunction设置job时,方便管理稍后,您可以在作业上打上标签,以便您可以通过标签过滤作业或取消作业。#Python实战集锦importscheduledefgreet(name):print('Hello{}'.format(name))#.tag打标签schedule.every().day.do(greet,'Andrea').tag('daily-tasks','friend')schedule.every().hour.do(greet,'John').tag('hourly-tasks','friend')schedule.every().hour.do(greet,'Monica').tag('hourly-tasks','customer')schedule.every().day.do(greet,'Derek').tag('daily-tasks','guest')#get_jobs(tag):yes获取alltasksofthislabelfriends=schedule.get_jobs('friend')#取消所有daily-tasks标签的任务schedule.clear('daily-tasks')setjobdeadline如果需要让job到某个时间就到了,可以使用这个方法:#Python实战集importschedulefromdatetimeimportdatetime,timedelta,timedefjob():print('Boo')#每小时运行作业,18:30后停止schedule.every(1).hours.until("18:30").do(job)#每小时运行一次作业,2030-01-0118:33todayschedule.every(1).hours.until("2030-01-0118:33").do(job)#运行一次作业小时,8小时后停止schedule.every(1).hours.until(timedelta(hours=8)).do(job)#每小时运行一次作业,在11:32:42后停止schedule.every(1).hours.until(time(11,33,42)).do(job)#每小时运行一次作业,停止schedule。每(1).小时后2020-5-1711:36:20.until(datetime(2020,5,17,11,36,20)).do(job)在截止日期后,作业不会立即运行runalljobsregardlessoftheirschedule如果触发机制,则需要立即运行Alljobs可以调用schedule.run_all():#Python实战集importscheduledefjob_1():print('Foo')defjob_2():print('Bar')schedule.every().monday.at("12:40").do(job_1)schedule.every().tuesday.at("16:40").do(job_2)schedule.run_all()#立即运行所有作业,每个作业间隔10秒schedule.run_all(delay_seconds=10)3.装饰器安排作业的高级使用如果觉得设置作业过于啰嗦,也可以使用装饰器模式:#Python实用集锦fromscheduleimportevery,repeat,run_pendingimporttime#这个装饰器的效果相当于schedule.every(10).minutes.do(job)@repeat(every(10).minutes)defjob():print("Iamascheduledjob")whileTrue:run_pending()time.sleep(1)并行执行默认情况下,Schedule是顺序执行所有赋值。这背后的原因是很难找到一个让每个人都开心的并行执行模型。但是,你可以以多线程的形式运行每个作业来解决这个限制:#Python实战集'mrunningonthread%s"%threading.current_thread())defjob3():print("I'mrunningonthread%s"%threading.current_thread())defrun_threaded(job_func):job_thread=threading.Thread(target=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded,job1)schedule.every(10).seconds.do(run_threaded,job2)schedule.every(10).seconds.do(run_threaded,job3)whileTrue:schedule.run_pending()time.sleep(1)LoggingSchedule模块也支持logging日志记录,所以这样使用:#Python实战宝典importscheduleimportlogginglogging.basicConfig()schedule_logger=logging.getLogger('schedule')#日志级别为DEBUGschedule_logger.setLevel(level=logging.DEBUG)defjob():print("Hello,Logs")schedule.every().second.do(job)schedule.run_all()schedule.clear()效果ct如下:DEBUG:schedule:Running*all*1jobswith0sdelayinbetweenDEBUG:schedule:RunningjobJob(interval=1,unit=seconds,do=job,args=(),kwargs={})Hello,LogsDEBUG:schedule:Deleting*all*jobs异常处理Schedule不会自动捕获异常,会直接Throwing,这个会导致一个严重的问题:所有后续作业都会被打断,所以我们需要捕获这些异常。可以手动捕捉,但是有些意外情况需要程序自动捕捉并加一个装饰器就可以做到:#Python实战宝典(*args,**kwargs):try:returnjob_func(*args,**kwarexcept:importtracebackprint(traceback.format_exc())ifcancel_on_failure:returnschedule.CancelJobreturnwrapperreturncatch_exceptions_decorator@catch_exceptions(cancel_on_failure=True)defbad_task():return1/0schedule.every(5).badd_task.do(任何错误都会被catch_exceptions捕获,这对保证调度任务的正常运行非常关键。我们的文章到此结束,如果喜欢今天的Python实战教程,请继续关注我们!
