当前位置: 首页 > 后端技术 > PHP

Laravel5.2queuedriverexpire参数设置导致重复执行问题Databasedriver

时间:2023-03-30 00:54:36 PHP

'connections'=>[....'database'=>['driver'=>'database','table'=>'jobs','queue'=>'default','expire'=>60,],'redis'=>['driver'=>'redis','connection'=>'default','queue'=>'default','expire'=>180,],....],Laravel5.2队列驱动config/queue.php配置文件中,“database”和“redis”有一个expire参数,手册中解释为“队列任务过期时间(秒)”,默认为60秒。(注:5.2及以后的配置文件有变化,改成'retry_after'参数,详见手册)这个配置我在网上搜了一下,没有太多解释,但是在实际使用过程中,是发现执行时间超过expire设置时间的队列进程,分布式程序部署使用队列,这个参数和这个设计模式是个大坑。..我发现这个问题是因为我想使用分布式程序来部署处理队列。两台服务器部署Laravel框架artisan脚本,连接MYSQL数据库,使用作业队列表。部署后,分别启动两台服务器的脚本,找到执行过的脚本,在队列驱动中取数据,比如MYSQL的jobs表,遇到最先执行的脚本队列数据不会跳过,而是把这个数据放在视为Failed,在failed_jobs表中存储了一条新的数据(当Laravel队列发生故障时,队列数据会存储在failed_jobs表中),造成数据重复。之前在一台服务器上启动三个进程执行脚本,就不会出现这种错误。后面执行的脚本不会获取到前面进程的队列数据,不会判断为Failed。多业务处理时出现队列是什么原因?驱动器中的数据错误怎么办?根据队列执行流程,当程序执行时,队列从队列驱动中取任务,获取任务的进程队列驱动要做事务处理,这样第二个进程取任务会跳过正在执行的队列数据.在查阅了一些资料,了解了Laravel队列的原理之后,最后还是要看一下Queue的源码了。Laravel的Queue源代码在IlluminateQueue目录下。首先分析MYSQL驱动的jobs表:CREATETABLE`jobs`(`id`bigint(20)unsignedNOTNULLAUTO_INCREMENT,`queue`varchar(255)COLLATEutf8_unicode_ciNOTNULL,`payload`longtextCOLLATEutf8_unicode_ciNOTNULL,`attempts`tinyint(3)unsignedNOTNULL,`reserved`tinyint(3)unsignedNOTNULL,`reserved_at`int(10)unsignedDEFAULTNULL,`available_at`int(10)unsignedNOTNULL,`created_at`int(10)unsignedNOTNULL,PRIMARYKEY(`id`),KEY`jobs_queue_reserved_reserved_at_index`(`queue`,`reserved`,`reserved_at`))ENGINE=InnoDBDEFAULTCHARSET=utf8COLLATE=utf8_unicode_ci;手册主要介绍队列任务的保存,payload字段存放序列化的任务。Laravel队列可以序列化数据模型。执行时,队列系统会自动从数据库中获取整个模型实例。有关详细信息,请参阅手册。但其他几个状态和时间字段是保证队列事务处理的关键字段。“attempts”执行次数,“reserved”执行状态,“reserved_at”执行开始时间,“available_at”预约执行时间,“created_at”是队列创建时间。监听事件的脚本包括Listener.php和Worker.php。查看源码可以看出Listener可以处理指定的queue和connection参数,但实际上最后是通过work来处理queue的。Laravel5.4取消了queue:listen参数,使用queue:work来执行。但是我这里说的是Laravel5.2的问题。不知道是不是以下原因让Laravel优化去掉了listen。继续分析队列处理的Worker类源码,取队列数据时使用pop方法。该方法会根据传入的驱动类型,如数据库或redis,调用驱动的pop方法。$connection=$this->manager->connection($connectionName);$job=$this->getNextJob($connection,$queue);//如果我们能够从堆栈中取出一个作业,我们将处理它并//然后立即返回。如果队列中没有作业//我们将让工作人员“休眠”指定的秒数。if(!is_null($job)){return$this->process($this->manager->getName($connectionName),$job,$maxTries,$delay);}下面是DatabaseQueue的pop方法。PHP。/***从队列中弹出下一个作业。**@paramstring$queue*@return\Illuminate\Contracts\Queue\Job|null*/publicfunctionpop($queue=null){$queue=$this->getQueue($queue);$this->database->beginTransaction();如果($job=$this->getNextAvailableJob($queue)){$job=$this->markJobAsReserved($job);$this->database->commit();返回新的数据库作业($this->container,$this,$job,$queue);}$this->database->commit();}已经开启了取数据的过程。获取队列数据的核心还是$this->getNextAvailableJob($queue)。打开sql日志,查看队列数据是如何查询的。/***获取队列的下一个可用作业。**@paramstring|null$queue*@return\StdClass|null*/protectedfunctiongetNextAvailableJob($queue){$this->database->enableQueryLog();$job=$this->database->table($this->table)->lockForUpdate()->where('queue',$this->getQueue($queue))->where(函数($query){$this->isAvailable($query);$this->isReservedButExpired($query);})->orderBy('id','asc')->first();var_dump($this->database->getQueryLog());返回$工作?(object)$job:null;}array(1){[0]=>array(3){'query'=>string(165)"select*from`jobs`where`queue`=?and((`reserved`=?and`available_at`<=?)or(`reserved`=?and`reserved_at`<=?))orderby`id`asclimit1forupdate"'bindings'=>array(5){[0]=>string(7)“默认”[1]=>int(0)[2]=>int(1493634233)[3]=>int(1)[4]=>int(1493634173)}'time'=>double(1.55)}从sql语句可以看出,取队列数据有两个条件。当reserved为0时,available_at时间小于当前时间。这个条件就是要执行的队列;当reserved为1时,reserved_at被执行开始时间小于计算时间($this->isReservedButExpired),即当前时间减去超时秒数Carbon::now()->subSeconds($this->expire)->getTimestamp(),这个条件是判断队列任务是否过期?整个select过程都是“forupdate”,并且有排他锁。获得合格队列后/***将给定的作业ID标记为保留。**@param\stdClass$job*@return\stdClass*/protectedfunctionmarkJobAsReserved($job){$job->reserved=1;$工作->尝试=$工作->尝试+1;$job->reserved_at=$this->getTime();$this->database->table($this->table)->where('id',$job->id)->update(['reserved'=>$job->reserved,'reserved_at'=>$job->reserved_at,'attempts'=>$job->attempts,]);return$job;}程序会更新数据,更新完成后提交。在同一台服务器上,当第二个进程在取数据的时候遇到悲观锁,需要等待第一个进程取数据更新reserved和time后才执行。也就是说Laravel队列在使用数据库时,并发进程不会同时取多条数据,而是取同一条数据等待其中一个进程更新数据状态和执行时间.队列成功获取数据后,第一个操作就是更新,所以第二个进程不会获取到和第一个进程一样的数据,除非队列过期。在DatabaseQueue.php的pop方法中,拿到队列数据后,sleep(10)before"$this->database->commit();",会很明显第二个队列没有拿到其他队列数据,说明""forupdate"只是一个update级的独占锁,不会排除select。Laravel在使用数据库队列的时候有时会阻塞,不知道是不是这个原因造成的。如果执行时间过长,超过'expire'参数设置的时间,第二个队列会获取第一个队列的数据,判断超时。此时会根据设置的最大执行次数判断尝试插入新的队列数据并继续尝试执行。还是插入错误队列“failed_jobs”表,判断队列执行失败。以上就是Laravel使用mysql执行队列的逻辑。上面提到的两台服务器都部署了Laravel框架来执行artisan脚本。一个jobstablequeueFailed的问题就是服务器时间不一致的原因。后面的server执行的时候,判断前面队列的数据。超时后向“failed_jobs”插入一条新数据,已达到最大失败次数,否则将插入新数据继续尝试。所以queue:listen的执行时间参数--timeout=60一定要设置小于队列任务过期时间的expire参数!还有,Laravel5.2的queue:work没有参数--timeout=60。....最后是执行队列后的处理逻辑。如果队列成功执行,它将删除作业数据,这很好。如果失败,包括超时、异常等,会根据设置的最大失败次数判断是插入一条新数据还是插入一条Failed数据到“failed_jobs”表中。当出现错误时,handleJobException的异常处理会调用DatabaseQueue.php的release方法,$job->release($delay),最后实现pushToDatabase。插入新数据时,attempts为失败次数,reserved为0,available_at为当前时间戳加上延迟时间参数,这样整个队列处理就形成了一个完整的数据逻辑操作。Laravel5.4大大修改了队列功能。手册中的提示任务过期和超时任务执行时间在配置文件config/queue.php中,每个连接都定义了retry_after项。这个配置项的目的是定义任务执行后多少秒释放回队列。如果retry_after设置为90,任务运行90秒还没有完成,不会被删除,而是会被释放回队列。毫无疑问,您需要将retry_after的值设置为任务执行时间的最大可能值。Laravel5.4去掉了队列的listen命令,work也加入了超时参数。Laravel5.5出来的时候应该会升级。附:Laravel5.2测试脚本是早前在网上找的,作业还是写成命令。其实5.2之后的jobs使用很简单。Job任务定义在jobs下,handle可以添加一些测试方案,比如我的抛异常直接FailedclassMyJobextendsJobimplementsShouldQueue{useInteractsWithQueue,SerializesModels;私钥;私人价值;/***创建一个新的作业实例。**@returnvoid*/publicfunction__construct($key,$value){$this->key=$key;$this->value=$value;}/***执行作业。**@returnvoid*/publicfunctionhandle(){for($i=0;$i<20;$i++){echo"$i\n";}睡觉(1);}echo"sss\t".$this->key."\t".date("Y-m-dH:i:s")."\n";thrownew\Exception("Test\n");//Redis::hset('queue.test',$this->key,$this->value);}publicfunctionfailed(){dump('失败');}}控制器访问和设置任务队列。key和value是之前测试redis插入的。根据您自己的测试计划设置作业参数。对于($i=0;$i<5;$i++){echo"$i";$job=(newMyJob($i,$i))->delay(20);$this->dispatch($job);}我的示例设置了5个队列,打开多个shell以同时执行artisan测试。本来想看看redis队列代码,一起发出来的。最近事情太多,redis的代码也不是很看。redis驱动可以参考http://www.cnblogs.com/z12987...这篇文章详细介绍了Laravel队列redis驱动逻辑。redis驱动使用的list和zset结构存储队列,执行过程中会去掉对数据库的“forupdate”操作,所以应该不会有队列阻塞。但是队列任务过期时间设置和数据库驱动是一样的,所以同样queue:listen执行时间参数--timeout=60必须设置小于队列任务过期时间expire参数!终于完成了。..