基于redis有序集的延时任务执行,比如在某个时间给某个用户发送短信,订单过期处理等,我是在tp5框架上写的。实现起来非常简单,对于一些不是很复杂的应用来说已经足够了,目前在公司项目中使用,后台进程没有实现多进程,话不多说,贴代码,不回排版,见谅1.命令行脚本执行方式:phpthinkdelay-queuequeuename(这是有序集的键)namespaceapp\command;useapp\common\lib\delayqueue\DelayQueue;usethink\console\Command;usethink\console\Input;usethink\console\Output;usethink\Db;classDelayQueueWorkerextendsCommand{constCOMMAND_ARGV_1='queue';protectedfunctionconfigure(){$this->setName('delay-queue')->setDescription('delayqueuetaskprocess');$this->addArgument(self::COMMAND_ARGV_1);}protectedfunctionexecute(Input$input,Output$output){$queue=$input->getArgument(self::COMMAND_ARGV_1);//参数1延迟队列表名,对应redis的有序集合键名while(true){DelayQueue::getInstance($queue)->perform();睡眠(300000);}}}库类目录结构config.php为redis连接参数配置RedisHandler.php只实现了有序集的操作,重连机制尚未实现namespaceapp\common\lib\delayqueue;R类edisHandler{公共$provider;私有静态$_instance=null;私有函数__construct(){$this->provider=new\Redis();//主机端口$config=require_once'config.php';$this->provider->connect($config['redis_host'],$config['redis_port']);}finalprivatefunction__clone(){}publicstaticfunctiongetInstance(){if(!self::$_instance){self::$_instance=newRedisHandler();}返回自我::$_实例;}/***@paramstring$key有顺序集合key*@paramnumber$score排序值*@paramstring$value格式化的数据*@returnint*/publicfunctionzAdd($key,$score,$value){return$this->provider->zAdd($key,$score,$value);}/***获取有顺序集合数据*@param$key*@param$start*@param$end*@paramnull$withscores*@returnarray*/publicfunctionzRange($key,$start,$end,$withscores=空){返回$this->provider->zRange($key,$start,$end,$withscores);}/***删除有序集合数据*@param$key*@param$member*@returnint*/publicfunctionzRem($key,$member){return$this->provider->zRem($key,$成员);}}延迟队列类命名空间app\common\lib\delayqueue;类DelayQueue{private$prefix='delay_queue:';私人$队列;私有静态$_instance=null;私有函数__construct($queue){$this->queue=$queue;}finalprivatefunction__clone(){}publicstaticfunctiongetInstance($queue=''){if(!self::$_instance){self::$_instance=newDelayQueue($queue);}返回自我::$_实例;}/***添加任务信息到队列中**demoDelayQueue::getInstance('test')->addTask(*'app\common\lib\delayqueue\job\Test',*strtotime('2018-05-0220:55:20'),*['abc'=>111]*);**@参数$jobClass*@paramint$runTime执行时间*@paramarray$args*/publicfunctionaddTask($jobClass,$runTime,$args=null){$key=$this->prefix.$this->queue;$params=['class'=>$jobClass,'args'=>$args,'runtime'=>$runTime,];RedisHandler::getInstance()->zAdd($key,$runTime,serialize($params));}/***执行作业*@returnbool*/publicfunctionperform(){$key=$this->prefix.$this->queue;//取出有顺序集合第一个元素$result=RedisHandler::getInstance()->zRange($key,0,0);如果(!$result){返回false;}$jobInfo=unserialize($result[0]);print_r('job:'.$jobInfo['class'].'将运行于:'.date('Y-m-dH:i:s',$jobInfo['runtime']).PHP_EOL);$jobClass=$jobInfo['class'];如果(!@class_exists($jobClass)){print_r($jobClass.'undefined'.PHP_EOL);RedisHandler::getInstance()->zRem($key,$result[0]);返回假;}//执行时间if(time()>=$jobInfo['runtime']){$job=new$jobClass;$job->setPayload($jobInfo['args']);$jobResult=$job->preform();if($jobResult){//执行任务RemoveRedisHandler::getInstance()->zRem($key,$result[0]);返回真;}}返回假;}}异步任务基类:namespaceapp\common\lib\delayqueue;classDelayJob{protected$payload;publicfunctionpreform(){//todoreturntrue;}publicfunctionsetPayload($args=null){$this->payload=$args;}}所有异步执行的任务都卸载到job目录下,必须继承DelayJob,你可以实现任何你想延迟执行的任务如:namespaceapp\common\lib\delayqueue\job;useapp\common\lib\delayqueue\DelayJob;classTestextendsDelayJob{公共函数preform(){//payload中应该有处理任务需要的参数,通过DelayQueue的addTask传入print_r('testjob'.PHP_EOL);返回真;}}使用方法:假设用户创建了一个订单,订单在10分钟后过期,则在订单创建后添加:DelayQueue::getInstance('close_order')->addTask('app\common\lib\delayqueue\job\CloseOrder',//自己实现的jobstrtotime('2018-05-0220:55:20'),//订单过期时间['order_id'=>123456]//传给的参数工作);close_order是有序集合的key启动进程phpthinkdelay-queueclose_order
