当前位置: 首页 > 后端技术 > PHP

php+crontab+shell方案实现秒级定时发起异步请求回调方案

时间:2023-03-29 22:57:40 PHP

方案介绍该方案的场景:某天有一个业务需求,我们需要同步一些异步请求比如我们的信息ororderstatustothesecond第三方,这里会有定时和延时消息处理,已经考虑了很多消息队列的方案(比如:rabbitmq,云消息服务等)。但最终公司还是做出了决定,因为业务流量很小,不需要这么麻烦。所以我直接想出了这个方案。计划是50条消息/s,压力不要太大。如果金额很大,就会出现消息延迟的问题。如果不注意这个业务时间的准确性,这个plan承载的秒级处理是1000方案架构图应该没什么大问题mysqltaskqueuetableCREATETABLE`open_queue`(`id`int(10)unsignedNOTNULLAUTO_INCREMENT,`type`tinyint(4)unsignedNOTNULLDEFAULT'0'COMMENT'任务类型,后续读取时可用于区分不同任务的失败次数',`order_id`int(10)unsignedNOTNULLDEFAULT'0'COMMENT'业务绑定的订单id等,根据自己设计业务id',`event_identity`varchar(30)NOTNULLDEFAULT''COMMENT'事件表示分类,可选,sametype',`data`textNOTNULLCOMMENT'需要处理的数据,json格式',`try`tinyint(4)unsignedNOTNULLDEFAULT'1'COMMENT'特定任务需要处理的次数那个时候,不是发起回调请求的任务',`again`tinyint(1)unsignedNOTNULLDEFAULT'0'COMMENT'0不是重试记录1重试记录,失败后启动任务不为1,否则不为0',`status`tinyint(1)unsignedNOTNULLDEFAULT'0'COMMENT'是否执行1是否为0',`create_time`int(10)unsignedNOTNULLDEFAULT'0'COMMENT'任务创建的时间戳',`at_time`int(11)unsignedNOTNULLDEFAULT'0'COMMENT'任务时间的时间戳',`task_status`tinyint(1)NOTNULLDEFAULT'0'COMMENT'执行结果-1重复消息系统主动取消0未执行1执行成功2执行失败',`task_time`int(10)unsignedNOTNULLDEFAULT'0'COMMENT'执行时间',PRIMARYKEY(`id`),KEY`IDX_EVENT`(`event_identity`),KEY`IDX_AT_TIME`(`at_time`)USINGBTREE,KEY`IDX_ORDER`(`order_id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8;该方案源代码下方的代码为伪代码。大家可以按照自己的想法设计自己的业务redis同步到mysql。这一步可以省略,因为我们当时Redis是部署在单机上的,不是持久化的,$redisTypeLock;//锁类型if(!IS_WIN){$file=fopen($path."/lock/{$redisTypeLock}.lock","w+");如果(!flock($file,LOCK_EX|LOCK_NB)){flock($file,LOCK_UN);fclose($文件);出口;}}//成功获取锁,开始业务执行$count=get_redis_lLen('msgevent:'.$redisType);if(!$count){//不用进入数据库订单队列,直接returnreturnfalse;}$successCount=0;for($i=0;$i<$loopLen;$i++){$data=get_redis_lPop('msgevent:'.$redisType);//格式化redis数据存入数据库持久化$result=$this->saveToMysql($data);if(!$result){//如果执行失败,重新推送redis队列get_redis_lPush('msgevent:'.$redisType,$data);}$成功次数++;}if(!IS_WIN){flock($file,LOCK_UN);fclose($file);}定时执行任务的入口本入口是定时秒级处理脚本和处理脚本调用小于当前时间的入口publicfunctiontask_run(){$timeType=$_GET['tt']?:'现在';//获取当前秒内所有需要处理的消息,一条一条路由后执行//如果你的redis不需要存入mysql,可以使用redis的链表结构实现,取出一次性收集所有数据,并删除列表//如果下一个业务发现发起请求失败,重新分配任务给redis创建恢复列表,列表名称为t+需要执行时的时间戳$lists=$this->getTaskLists($timeType);foreach($listsas$item){$this->task('event:'.$item['event_type']);//$this->task('event:demo1');//$this->task('event:demo2');//$this->task('event:demo3');}}实际业务处理(发起通知请求)#模拟一个发起的请求事件demoprivatefunctionpushDemo1Event($info){$sendInfoData=$this->getSendInfoFromInfo($info);//发起一个请求,这里是对对方的真正请求,可以根据$sendInfo拼接各种事件或者消息进行分类Other$isSuccess=$this->sendInfo($sendInfoData);$data=array('status'=>1,'task_status'=>$isSuccess?1:2,'task_time'=>NOW_TIME);$db->保存($data);//如果执行失败,重新插入一条记录,并生成at_time作为下次执行的时间戳,这样可以取出定时器根据at_time字段判断到此时执行到秒级if(!$isSuccess){$tryCount=$db->where(...)->count();如果($tryCount>=5){返回假;}$this->newTask($info,$tryCount,30);}}私有函数newTask($info,$tryCount,$timeout=30){$newTime=$this->nextTaskTime($info,$timeout);$data=array(...'at_time'=>newTime//执行时间(时间戳));$db->add($data);}计时消费者$timeType=$_GET['time_type']?:'现在';$lists=$this->getMsgAll($timeType);if(!$lists){returnfalse;}foreach($listsas$item){if($item['type']=='1'){//根据业务生成我们的真实订单call_user_func_array(array($this,"add_real_order"),array($item));}if(in_array($item['type'],array(2,3))){$this->push('push:toengineer',$item);//分配或取消工程师}if($item['type']=='5'){$this->push('push:edit',$项目);//编辑}if($item['type']=='6'){$this->push('push:cancel',$item);//取消}if($item['type']=='7'||$item['type']=='8'||$item['type']=='9'){$this->push('推送:状态',$item);//简单的订单状态}}使用crontab+shell外力实现秒级执行因为crontab的最小单位是分钟,所以需要使用shell脚本来实现秒级执行。读取redis任务队列到mysql存储#!/bin/bashcd/web/project/srcfor((i=1;i<60;i=i+1))do#执行php直接返回,不阻塞。不要忘记最后的&$(/usr/bin/phpindex.phpredis2mysql>/dev/null2>&1&)sleep1doneexit0processingsecondstimeshscript#!/bin/bashcd/web/project/srcfor((i=1;i<60;i=i+1))do$(/usr/bin/phpindex.phptask_run>/dev/null2>&1&)sleep1doneexit0processingsecondslessthancurrenttimesh脚本#!/bin/bashcd/web/project/srcfor((i=1;i<60;i=i+1))do$(/usr/bin/phpindex.phptask_run/tt/lt>/dev/null2>&1&)sleep1doneexit0linux下的crontab*****sh/web/project/src/shell/repair_build_order.sh*****sh/web/project/src/shell/repair_sync_second.sh*****sh/web/project/src/shell/repair_sync_lt_time.sh