相信很多小伙伴在开发中都使用过消息队列,尤其是在高并发的情况下。一般可以在缓存中操作数据,然后通过消息业务逻辑,操作数据库等进行异步处理。我公司用的是阿里云的消息队列和RabbitMQ。据说使用阿里云的消息队列有一部分原因是RabbitMQ实现延迟消息比较复杂,依赖死信。。。接下来说说我是怎么使用消息的,以及我们遇到的坑~总的来说,我们可以使用一个脚本来接收阿里云消息处理业务逻辑,但是如果业务量特别大,我们可能会遇到一个问题,就是脚本处理不了,累积的消息量可能远远超过可以处理的每秒处理。针对这种现象,我们可以同时启动多个处理脚本来处理消息,这样会明显加快消息的处理速度。但是,当多个脚本同时从一个消息队列中获取消息时,会不会同时获取同一条消息,进而造成重复的消息处理呢?我想肯定会的。消息是第三方服务,我们无法保证其100%的稳定性,所以我们在处理的时候还需要多下功夫。我们发送的消息的数据体是json。一般我们会给每条消息加上一个taskid,由时间戳(精确到毫秒级)+随机数组成。这个taskid足够长,我们可以保证它不会重复(重复的可能性极小,类似于mongodb的主键原理)。接下来看一段代码:receive();$getData=\Zend\Json\Json::decode($getData,true);//如果不是json数据我们可以捕获异常//先检测数据$errorMsg=[];if(!isset($getData['taskid'])||$getData['taskid']==''){$errorMsg[]='taskid不能为空';}if(!isset($getData['order_id'])||$getData['order_id']==''){$errorMsg[]='订单号不能为空';}if(!cache()->setex($this->cachePrefix.$getData['taskid'],1)){$errorMsg[]='任务已处理';}if(count($errorMsg)>=1){//记录日志方便排错$this->log('xxxx处理脚本错误whichfileerrorlevelerrorreason'.implode('|',$errorMsg));返回假;}//必须有过期时间否则redis会爆cache()->expire($this->cachePrefix.$getData['taskid'],7*86400);/***处理业务逻辑*///业务逻辑处理正常,删除redis锁,删除消息缓存()->del($this->cachePrefix.$getData['taskid']);$mq->ack();returntrue;}//捕获异常catch(\Exception$e){//一定要删除这条消息,不然会重复进来。日志的错误级别应该更高。手动处理问题$mq->ack();//记录日志$this->log();}小伙伴们知道这段代码有什么问题吗?
