如果要在Linux服务器上周期性的执行Python脚本,最著名的选择应该是Crontab脚本,但是Crontab有以下缺点:1.执行二级任务不方便。2。当需要执行的定时任务有上百个时,Crontab的管理会特别不方便。另一个选择是Celery,但是Celery的配置比较繁琐。如果你只是需要一个轻量级的调度工具,Celery将不是一个好的选择。当你想使用一个轻量级的任务调度工具,并希望它尽可能简单易用,不需要外部依赖,最好能容纳Crontab的所有基本功能,那么Schedule模块是你最好的选择。用它来安排任务可能只需要几行代码,感受一下:#Python实战宝典importscheduleimporttimedefjob():print("I'mworking...")schedule.every(10).minutes.do(job)whileTrue:schedule.run_pending()time.sleep(1)上面的代码表示job函数每10分钟执行一次,非常简单方便。只需要导入schedule模块,通过调用scedule.every(timenumber).timetype.do(job)发布周期性任务即可。发布的周期任务需要使用run_pending函数检查是否执行,所以需要一个while循环不断轮询这个函数。下面介绍Schedule模块的安装和主要、高级使用方法。一、准备工作请选择以下任意一种方式输入命令安装依赖项:1、Windows环境打开Cmd(开始-运行-CMD)。2.在MacOS环境下,打开Terminal(command+空格进入Terminal)。3.如果你使用的是VSCode编辑器或者Pycharm,可以直接使用界面下方的Terminal.pipinstallschedule。2.基本使用最基本的使用在文章开头已经提到了。这里有更多调度任务的例子:#Python实战宝典importscheduleimporttimedefjob():print("I'mworking...")#每十分钟执行一次任务schedule.every(10).minutes.do(job)#执行任务每小时schedule.every().hour.do(job)#每天10:30执行任务schedule.every().day.at("10:30").do(job)#每隔月schedule.every().monday。do(job)#每周三13:15执行任务schedule.every().wednesday.at("13:15").do(job)#每分钟17秒执行任务schedule.every().minute.at(":17").do(job)whileTrue:schedule.run_pending()time.sleep(1)可以看到从月到秒的配置都被上面的例子覆盖了。但是如果只想让任务运行一次,可以这样配置:#Python实战集锦importscheduleimporttimedefjob_that_executes_once():#这里写的任务只会执行一次...returnschedule.CancelJobschedule.every().day.at('22:30').do(job_that_executes_once)whileTrue:schedule.run_pending()time.sleep(1)参数传递如果你有参数需要传递给作业执行,你只需要这样做:#Python实战宝典importscheduledefgreet(name):print('Hello',name)#do()给作业函数传递附加参数schedule.every(2).seconds.do(greet,name='Alice')schedule.every(4).seconds.do(greet,name='Bob')获取当前所有的job如果想获取所有当前的job:#Python实战宝典importscheduledefhello():print('Helloworld')schedule.every().second.do(hello)all_jobs=schedule.get_jobs()取消所有作业如果触发某种机制,需要立即清除当前程序的所有作业:#Python实用宝典importscheduledefgreet(name):print('Hello{}'.format(name))schedule.every().second.do(greet)schedule.clear()标签函数设置job时,方便后续的作业管理,可以给作业打上标签,方便通过标签过滤作业或者取消作业。#Python实战集锦importscheduledefgreet(name):print('Hello{}'.format(name))#.tag打标签schedule.every().day.do(greet,'Andrea').tag('daily-tasks','friend')schedule.every().hour.do(greet,'John').tag('hourly-tasks','friend')schedule.every().hour.do(greet,'Monica').tag('hourly-tasks','customer')schedule.every().day.do(greet,'Derek').tag('daily-tasks','guest')#get_jobs(tag):OK获取alltasksofthislabelfriends=schedule.get_jobs('friend')#取消所有daily-tasks任务schedule.clear('daily-tasks')setjobdeadlineifyouneedtoletajobtoacertainTimeisup,you可以用这个方法:#Python实战集importschedulefromdatetimeimportdatetime,timedelta,timedefjob():print('Boo')#每小时运行作业,18:30后停止schedule.every(1).hours.until("18:30").do(job)#每小时运行一次作业,2030-01-0118:33todayschedule.every(1).hours.until("2030-01-0118:33")。do(job)#每小时运行一次作业,8小时后停止schedule.every(1).hours.until(timedelta(hours=8)).do(job)#每小时运行一次作业,11:32:停止schedule.every(1).hours.until(time(11,33,42)).do(job)#每小时运行一次作业,停止schedule.every2020-5-1711:36:20(1).hours.until(datetime(2020,5,17,11,36,20)).do(job)过了deadline,job就不会跑了现在需要一次跑所有的job,可以调用schedule.run_all():#Python实战集importscheduledefjob_1():print('Foo')defjob_2():print('Bar')schedule.every().monday.at("12:40").do(job_1)schedule.every().tuesday.at("16:40").do(job_2)schedule.run_all()#一次运行所有作业,每次作业间隔10秒schedule.run_all(delay_seconds=10)3.高级使用装饰器安排作业如果觉得设置作业的形式过于冗长,也可以使用装饰器模式:#Python实战集fromscheduleimportevery,repeat,run_pendingimporttime#这个装饰器相当于schedule.every(10).minutes.do(job)@repeat(every(10).minutes)defjob():print("Iamascheduledjob")whileTrue:run_pending()time.sleep(1)并行执行默认情况下,Schedule按顺序执行所有作业。这背后的原因是很难找到一个让每个人都开心的并行执行模型。但是,您可以以多线程形式运行每个作业以解决此限制:#PythonPracticalCollectionimportthreadingimporttimeimportscheduledefjob1():print("I'mrunningonthread%s"%threading.current_thread())defjob2():print("我在线程%s上运行"%threading.current_thread())defjob3():print("我在线程%s上运行"%threading.current_thread())defrun_threaded(job_func):job_thread=threading.线程(目标=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded,job1)schedule.every(10).seconds.do(run_threaded,job2)schedule.every(10).seconds。do(run_threaded,job3)whileTrue:schedule.run_pending()time.sleep(1)LoggingSchedule模块也支持logging日志记录,所以使用:#Python实战宝典importscheduleimportlogginglogging.basicConfig()schedule_logger=logging.getLogger('schedule')#日志级别为DEBUGschedule_logger.setLevel(level=logging.DEBUG)defjob():print("Hello,Logs")schedule.every().second.do(job)schedule.run_all()schedule.clear()的效果如下:DEBUG:schedule:Running*all*1jobswith0sdelayinbetweenDEBUG:schedule:RunningjobJob(interval=1,unit=seconds,do=job,args=(),kwargs={})你好,LogsDEBUG:schedule:Deleting*all*jobs异常处理Schedule不会自动捕获异常,会直接Throw,这会导致一个严重的问题:所有后续作业都会被中断,所以我们需要捕获这些异常。您可以手动捕获它们,但是一些意外情况需要程序自动捕获它们。添加一个装饰器就可以了:#Python实战集锦importfunctoolsdefcatch_exceptions(cancel_on_failure=False):defcatch_exceptions_decorator(job_func):@functools.wraps(job_func)defwrapper(*args,**kwargs):try:returnjob_func(**kargs,)except:importtracebackprint(traceback.format_exc())ifcancel_on_failure:returnsschedule.CancelJobreturnwrapperreturncatch_exceptions_decorator@catch_exceptions(cancel_on_failure=True)defbad_task():return1/0schedule.every(5).baadtasks.do执行过程中遇到的任何错误都会被捕获通过catch_exceptions,这对于保证定时任务的正常运行非常关键。
