当前位置: 首页 > 后端技术 > PHP

Hyperf-Crontab组件源码分析

时间:2023-03-30 05:34:04 PHP

预读:Hyperf/Crontab使用文档预读:Hyperf/Process自定义进程使用文档预读:Hyperf事件机制在开始之前编写本项目使用Hyperf/Crontab组件来实现进行秒级数据清洗,最近拆分了定时任务,所以打算翻个组件源码加深理解,顺便想想如何搭建Hyperf/Crontab的任务调度功能以这个为基础。crontab本质上是一个随Server启动的自定义进程,所以接下来我们将从启动和执行两个阶段来介绍。定时器的启动由于crontab是一个随服务器启动的进程,分析它的生命周期肯定会设计框架的启动。但本文并不主要介绍Hyperf框架的启动源码,所以我们将简单回顾一下涉及自定义进程启动的框架的启动代码。当我们使用命令phpbin/hyperf.phpstart启动hyperf时$application=$container->get(\Hyperf\Contract\ApplicationInterface::class);在入口文件hyperf.php框架中会实例化一个HyperfFrameworkApplicationFactory实例,同时扫描所有带@Command注解或定义命令配置的地方,并实例化一个SymfonyComponentConsoleApplication对象(这是一个SymfonyConsole命令类,用于定义任务触发通过命令)。在$application对象中注册所有定义为命令的对象类,最后在入口文件中执行$application->run();执行所有Command命令,包括HyperfServerCommandStartServer服务Server启动类。在该类中定义了如果接收到的命令参数中包含start,则执行其逻辑,即根据config/autoload/server.php的配置实例化HyperfServerServer类,并触发BeforeMainServerStart事件在此过程中被触发。到这里我们就要进入自定义流程启动的核心阶段了。$serverProcesses=$serverConfig['进程']??[];$processes=$this->config->get('processes',[]);$annotationProcesses=$this->getAnnotationProcesses();//检索已注册的进程。$processes=array_merge($serverProcesses,$processes,ProcessManager::all(),array_keys($annotationProcesses));foreach($processesas$process){...if($instanceinstanceofProcessInterface){$instance->isEnable()&&$instance->bind($server);}BootProcessListener监听BeforeMainServerStart事件的触发,会获取进程配置文件中定义的所有@Process和进程类,并执行它们的isEnable和bind方法。定时器的执行上面我们简单介绍了自定义进程是如何用框架启动的。接下来我们分析一下本文的主角Crontab自定义进程。Hyperf文档介绍,使用Crontab前需要在config/autoload/processes.php中注册HyperfCrontabProcessCrontabDispatcherProcess自定义进程。那我们直接看一下这个流程类都做了什么。如果熟悉Hyperf/Process组件的使用,就会知道进程类执行的主要逻辑都在handle()方法中。publicfunctionhandle():void{$this->event->dispatch(newCrontabDispatcherStarted());while(true){$this->sleep();$crontabs=$this->scheduler->schedule();while(!$crontabs->isEmpty()){$crontab=$crontabs->dequeue();$this->strategy->dispatch($crontab);}}}首先在句柄中触发一个CrontabDispatcherStarted事件,目前没有人在监听这个事件。接下来是协程阻塞期。第一个阻塞时间是从下一个整分钟算起的秒数,其余都是60s。至于为什么直接使用SwooleCoroutine::sleep而不是sleep(),是因为自定义进程默认是协程Server。接下来$crontabs=$this->scheduler->schedule();返回一个当前分钟应该执行的SplQueue队列,队列为Crontab对象object(Hyperf\Crontab\Crontab)#46164(10){["name":protected]=>string(4)"Foo4"["type":protected]=>string(8)"callback"["rule":protected]=>string(11)"******"["singleton":protected]=>bool(false)["mutexPool":protected]=>string(7)"default"["mutexExpires":protected]=>int(3600)["onOneServer":protected]=>bool(true)["callback":protected]=>array(2){[DEBUG]事件Hyperf\Framework\Event\OnPipeMessage由Hyperf\Crontab\Listener\OnPipeMessageListener侦听器处理。[0]=>string(16)"App\Task\FooTask"[1]=>string(7)"execute"}["memo":protected]=>NULL["executeTime":protected]=>object(Carbon\Carbon)#46106(3){["date"]=>string(26)"2020-06-0214:15:57.000000"["timezone_type"]=>int(3)["timezone"]=>string(13)"Asia/Shanghai"}}这个对象记录了我们现在的亲戚注意两个关键信息:Executioncallback执行时间关于这个crontab对象是如何生成的,后面会介绍,我们拿到crontab对象后,将这个对象发送到config/autoload/dependencies.php中定义一个好的StrategyInterface实现类用于dispatch,默认指定worker进程执行策略。$server=$this->serverFactory->getServer()->getServer();if($serverinstanceofServer&&$crontab->getExecuteTime()instanceofCarbon){$workerId=$this->getNextWorkerId($server);$server->sendMessage(newPipeMessage('callback',[Executor::class,'execute'],$crontab),$workerId);}dispatch方法除了Coroutine策略是一样的,都是进程间轮训通过sendMessage将PipeMessage对象发送给WorkID,sendMessage方法会触发OnPipeMessage事件。该事件由OnPipeMessageListener监听,会根据PipeMessage执行相应的回调函数,即Executor->execute'。该方法中根据corntab对象的属性定义了SwooleTimer::after,根据corontab的executeTime属性定义多少秒后执行回调。$callback&&Timer::after($diff>0?$diff*1000:1,$callback);这样就基本完成了我们二级定时任务的执行。现在让我们回到上面关于crontab对象何时生成的问题。我们说过,Cornrab本质上是一个自定义过程。根据Hyperf/Process的说明,所有自定义进程都继承HyperfProcessAbstractProcess。该类会在启动SwooleProcess时触发BeforeProcessHandle事件。在这个事件中,所有的crontab配置和注解,这些注解被解析生成crontab对象,存储在crontabs属性中。publicfunctionregister(Crontab$crontab):bool{if(!$this->isValidCrontab($crontab)){returnfalse;$this->crontabs[$crontab->getName()]=$crontab;返回真;}以上就是一个crontab定时任务的大致执行过程。当然还有很多执行细节和Contab个性化定义参数。由于篇幅有限,我们没来得及一一介绍。有兴趣的同学可以私读。我还将在下面附上Hyperf。/crontab的整体执行流程图,方便大家对比阅读源码。写在最后Hyperf是一个基于Swoole4.4+的高性能、高灵活性的PHP协程框架。它内置协程服务器和大量常用组件。在保持高性能的同时,还保持了极其灵活的可扩展性。标准组件基于PSR标准实现。基于强大的依赖注入设计,保证大多数组件或类是可替换和可重用的。框架组件库除了常见的协程版MySQL客户端和Redis客户端,还为大家准备了协程版EloquentORM、WebSocket服务端和客户端、JSONRPC服务端和客户端、GRPC服务端和客户端Client、Zipkin/Jaeger(OpenTracing)客户端、GuzzleHTTP客户端、Elasticsearch客户端、Consul客户端、ETCD客户端、AMQP组件、Apollo配置中心、阿里云ACM应用配置管理、ETCD配置中心、基于令牌桶的算法限流器、通用连接池、熔断器、Swagger文档generation、SwooleTracker、Blade和Smarty视图引擎、Snowflake全局ID生成器等组件省去你实现对应协程版本的麻烦。Hyperf还提供了基于PSR-11的依赖注入容器、注解、AOP面向方面编程、基于PSR-15的中间件、自定义流程、基于PSR-14的事件管理器、Redis/RabbitMQ消息队列和自动模型缓存、PSR-16位缓存、Crontab秒级定时任务、Translation国际化、Validation验证器等非常便捷的功能,满足丰富的技术和业务场景,开箱即用。