翻译GitHubhttps://github.com/yuansir/diving-laravel-zh原文链接https://divinglaravel.com/queue-system/workers现在,我们知道Laravel是如何将作业被推送到不同的队列,让我们更深入地了解工作人员如何处理您的作业。首先,我将工作人员定义为一个在后台运行的简单PHP进程,目的是从存储中提取作业并根据多个配置选项运行它们。phpartisanqueue:work运行此命令将指示Laravel创建应用程序实例并开始执行作业。这个实例将永远存在。启动Laravel应用程序的动作只会在运行命令时发生一次。同一个实例将用于执行您的作业,这意味着:通过避免在每个作业上启动整个应用程序来节省服务器资源。在应用程序中进行任何代码更改后,必须手动重新启动Worker。您也可以这样运行:phpartisanqueue:work--once这将启动应用程序的一个实例,处理单个作业,然后终止脚本。phpartisanqueue:listenqueue:listen命令相当于在无限循环中运行queue:work--once命令,这会导致以下问题:每个循环启动应用程序的一个实例。指定的工人将选择一个工作并执行它。工作进程将被杀死。使用queue:listen确保为每个作业创建一个新的应用程序实例,这意味着您不必在代码更改后手动重启worker,这也意味着会消耗更多的服务器资源。queue:workcommand让我们看一下Queue\Console\WorkCommand类的handle()方法,这个方法就是你运行phpartisanqueue:work时会执行的方法:publicfunctionhandle(){if($this->downForMaintenance()&&$this->option('once')){return$this->worker->sleep($this->option('sleep'));}}$this->listenForEvents();$connection=$this->argument('connection')?:$this->laravel['config']['queue.default'];$queue=$this->getQueue($connection);$this->runWorker($connection,$queue);}首先我们检查应用程序是否处于维护模式并使用--once选项,在这种情况下我们希望脚本正常运行,所以我们不执行任何jobs,我们只是把脚本完全杀掉,然后再让worker休眠一段时间。Queue\Worker的sleep()方法如下所示:publicfunctionsleep($seconds){sleep($seconds);}为什么我们不能在handle()方法中返回null来终止脚本?如前所述,queue:listen命令在循环中运行WorkCommand:while(true){//这个过程简单地调用'phpartisanqueue:work--once'$this->runProcess($process,$options->memory);}如果应用程序处于维护模式,并且WorkCommand立即终止,这将导致循环结束并在很短的时间内开始下一个循环,最好在这种情况下造成一些延迟,而不是通过创建我们不会真的大量使用应用实例。在handle()方法中,我们调用了listenForEvents()方法:writeOutput($event->job,'开始');});$this->laravel['events']->listen(JobProcessed::class,function($event){$this->writeOutput($event->job,'success');});$this->laravel['events']->listen(JobFailed::class,function($event){$this->writeOutput($event->job,'failed');$this->logFailedJob($event);});}在这个方法中,我们会监听几个事件,这样我们就可以在每个作业处理、处理或失败时打印一些信息给用户。记录失败的作业一旦作业失败,就会调用logFailedJob()方法$this->laravel['queue.failer']->log($event->connectionName,$event->job->getQueue(),$event->job->getRawBody(),$event->exception);queue.failer容器别名在Queue\QueueServiceProvider::registerFailedJobServices()中注册:protectedfunctionregisterFailedJobServices(){$this->app->singleton('queue.failer',function(){$config=$this->app['config']['queue.failed'];returnisset($config['table'])?$this->databaseFailedJobProvider($config):newNullFailedJobProvider;});}/***创建一个新的数据库工作提供者失败。**@param数组$config*@return\Illuminate\Queue\Failed\DatabaseFailedJobProvider*/protectedfunctiondatabaseFailedJobProvider($config){returnnewDatabaseFailedJobProvider($this->app['db'],$config['database'],$config['table']);}如果配置了queue.failed,数据库队列将用于失败,有关失败作业的信息只是将数据存储在在库表中:$this->getTable()->insertGetId(compact('connection','queue','payload','exception','failed_at'));运行worker要运行worker,我们需要收集两个Message:worker的连接信息是从作业中提取的。工作人员找到了作业的队列。如果没有使用queue.default配置定义默认连接,您可以为queue:work命令提供--connection=default选项。队列也是如此,你可以提供一个--queue=emails选项,或者在连接配置中选择队列选项。一旦完成,WorkCommand::handle()方法运行runWorker():protectedfunctionrunWorker($connection,$queue){$this->worker->setCache($this->laravel['cache']->司机());返回$this->worker->{$this->option('once')?'runNextJob':'daemon'}($connection,$queue,$this->gatherWorkerOptions());}命令构造后设置worker类属性:publicfunction__construct(Worker$worker){parent::__construct();$this->worker=$worker;}容器解析Queue\Worker实例,我们在runWorker()中设置worker将使用的缓存驱动,我们也根据--once命令决定调用什么方法.使用--once选项,我们只需调用runNextJob来运行下一个可用作业,然后脚本终止。否则,我们调用守护进程方法来始终保持进程处理作业。启动作业时,我们使用gatherWorkerOptions()方法收集用户给出的命令选项,稍后我们将提供该方法,该工具是runNextJob或daemon方法。protectedfunctiongatherWorkerOptions(){returnnewWorkerOptions($this->option('delay'),$this->option('memory'),$this->option('超时'),$this->option('sleep'),$this->option('tries'),$this->option('force'));}daemon让我看看Worker::daemon()方法,这个方法的第一行调用了Worker::daemon()方法保护函数listenForSignals(){if($this->supportsAsyncSignals()){pcntl_async_signals(true);pcntl_signal(SIGTERM,function(){$this->shouldQuit=true;});pcntl_signal(SIGUSR2,function(){$this->paused=true;});pcntl_signal(SIGCONT,function(){$this->paused=false;});}}该方法使用PHP7.1信号处理,支持AsyncSignals()方法检查我们是否在PHP7.1上并加载pcntl扩展。之后调用pcntl_async_signals()以启用信号处理,我们为多个信号注册处理程序:当指示脚本关闭时引发SIGTERM。SIGUSR2是Laravel用来指示脚本应该暂停的用户定义信号。恢复暂停的脚本时会引发SIGCONT。这些信号从ProcessMonitor(如Supervisor)发送并与我们的脚本通信。Worker::daemon()方法中的第二行读取上次队列重启的时间戳,这个值在我们调用queue:restart时保存在缓存中,稍后我们会检查是否与上次重启时间相同。stamp不满足,之后多次通知worker重启。最后,该方法启动一个循环,在该循环中我们完成其余工作人员获取作业、运行它们并在工作进程上执行多个操作。while(true){if(!$this->daemonShouldRun($options,$connectionName,$queue)){$this->pauseWorker($options,$lastRestart);继续;$job=$this->getNextJob($this->manager->connection($connectionName),$queue);$this->registerTimeoutHandler($job,$options);如果($job){$this->runJob($job,$connectionName,$options);}else{$this->sleep($options->sleep);}$this->stopIfNecessary($options,$lastRestart);}确定worker是否应该处理作业防止循环继续如果应用程序处于维护模式,工作人员仍然可以使用--force选项处理作业:phpartisanqueue:work--force:$this->events->until(newEvents\Looping($connectionName,$queue))===false)这一行触发了Queue\Event\Looping事件,检查handle()方法中是否有监听器返回false,这种情况下你可以强制你的worker暂时停止处理工作。如果worker应该暂停,调用pauseWorker()方法:$this->stopIfNecessary($options,$lastRestart);}sleep方法并传递给控制台命令的--sleep选项,该方法调用publicfunctionsleep($seconds){sleep($seconds);}之后脚本休眠一段时间,我们检查worker是否应该在这种情况下退出并杀死脚本,稍后我们查看stopIfNecessary方法,如果脚本不能被杀死,我们就调用continue;开始一个新的循环:if(!$this->daemonShouldRun($options,$connectionName,$queue)){$this->pauseWorker($options,$lastRestart);continue;}检索要运行的作业$job=$this->getNextJob($this->manager->connection($connectionName),$queue);getNextJob()方法接受队列连接的实例,我们从队列保护函数getNextJob($connection,$queue){try{foreach(explode(',',$queue)as$queue){if(!is_null($job=$connection->pop($queue))){return$job;}}}赶上(异常$e){$t他的->异常->报告($e);$this->stopWorkerIfLostConnection($e);我们简单地遍历给定的队列,使用选定的队列从存储空间(数据库,redis,sqs,....)获取作业并返回该作业为了从存储中检索作业,我们查询最旧的作业满足以下条件:被推入队列,我们??试图从中找到一个没有被另一个工人保留的工作,以便能够在给定的时间内运行,一些工作我们还获取已被冻结的工作等待很长时间并重试,一旦找到满足此条件的工作,我们将此工作标记为已保留,以便其他工作人员可以获取它,同时我们也增加了工作监控计数。监控作业超时,取回下一个作业后,我们调用registerTimeoutHandler()方法:){$this->kill(1);});$timeout=$this->timeoutForJob($job,$options);pcntl_alarm($timeout>0?$timeout+$options->sleep:0);同样,如果加载了pcntl扩展,我们将注册一个信号处理程序以在作业超时时终止工作进程,我们使用pcntl_alarm()在配置的超时后发送SIGALRM信号。如果作业花费的时间超过超时值,处理程序将终止脚本,否则作业将通过并且下一个循环将设置一个新的警报来覆盖第一个警报,因为可能有一个警报正在进行中。job只在PHP7.1以上有效,在window上也是无效的ˉ_(ツ)_/ˉ处理job的runJob()方法调用process():publicfunctionprocess($connectionName,$job,WorkerOptions$options){try{$this->raiseBeforeJobEvent($connectionName,$job);$this->markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName,$job,(int)$options->maxTries);$工作->火();$this->raiseAfterJobEvent($connectionName,$job);}catch(Exception$e){$this->handleJobException($connectionName,$job,$options,$e);}}raiseBeforeJobEvent()触发Queue\Events\JobProcessing事件,raiseAfterJobEvent()触发Queue\Events\JobProcessed事件。markJobAsFailedIfAlreadyExceedsMaxAttempts()检查进程是否达到最大尝试次数并将作业标记为失败:is_null($job->maxTries())?$job->maxTries():$maxTries;如果($maxTries===0||$job->attempts()<=$maxTries){返回;}$this->failJob($connectionName,$job,$e=newMaxAttemptsExceededException('已尝试排队的作业太多次。该作业之前可能已超时。'));throw$e;}否则我们调用作业对象的fire()方法来运行作业。从哪里获取作业对象getNextJob()方法返回一个Contracts\Queue\Job实例,这取决于我们使用对应的作业实例的队列驱动程序,例如数据库队列驱动程序则为Queue\Jobs\DatabaseJob。循环结束在循环结束时,我们调用stopIfNecessary()来检查在下一个循环开始之前是否应该停止进程:这个->杀死();}if($this->memoryExceeded($options->memory)){$this->stop(12);}elseif($this->queueShouldRestart($lastRestart)){$this->stop();}}shouldQuit属性在两种情况下设置,首先在listenForSignals()内部作为SIGTERM信号处理程序,其次在stopWorkerIfLostConnection()保护函数stopWorkerIfLostConnection($e){if($this->causedByLostConnection($e)){$this->应该退出=真;}}在检索和处理作业的时候,这个方法会在几个try...catch语句中被调用,以确保worker应该处于被杀死的状态,以便我们的ProcessControl可以启动一个新的数据库连接。causedByLostConnection()方法可以在Database\DetectsLostConnections特性中找到。memoryExceeded()检查内存使用量是否超过当前设置的内存限制,您可以使用--memory选项进行设置。转载请注明:转载自瑞安是菜鸟|LNMP技术栈笔记
