AintQueue是一个基于Swoole的异步队列库,可弹性扩展worker进程池,worker进程协程支持。Github地址:https://github.com/Littlesqx/...特性默认Redis驱动二级延时任务自定义重试次数和时间自定义错误回调支持任务执行中间件自定义队列快照事件弹性多进程消费Worker进程协程支持内置漂亮的dashboard环境PHP7.2+Swoole4.4+Redis3.2+(redisdriver)安装$composerrequirelittlesqx/aint-queue-vvvuseconfiguration默认读取配置路径:config/aint-queue.php,里面写着/vendor/littlesqx/aint-queue/src/Config/config.php不存在时。[...config]'default'=>['driver'=>['class'=>RedisQueue::class,'connection'=>['host'=>'127.0.0.1','port'=>6379,'database'=>'0',//'password'=>'password',],'pool_size'=>8,'pool_wait_timeout'=>1,'handle_timeout'=>60*30,],'logger'=>['class'=>DefaultLogger::class,'options'=>['level'=>\Monolog\Logger::DEBUG,],],'pid_path'=>'/var/run/aint-queue','consumer'=>['sleep_seconds'=>1,'memory_limit'=>96,'dynamic_mode'=>真,'capacity'=>6,'flex_interval'=>5*60,'min_worker_number'=>5,'max_worker_number'=>30,'max_handle_number'=>0,],'job_snapshot'=>['interval'=>5*60,'处理程序'=>[],],],];所有参数:nametypecommentdefaultchannelstring通道队列的单位,每个通道中的消息对应各自的消费者和生产者。支持多通道。在命令行上使用--channel参数。defaultdriver.classstring队列驱动类,需要实现QueueInterface。Redisdriver.connectionmap驱动配置。pid_pathstring主进程的PID文件存放路径。请注意,运行用户需要读写权限。/var/run/aint-queueconsumer.sleep_secondsint任务空闲时每次弹出操作后休眠的秒数。1consumer.memory_limitint工作进程使用的最大内存,超过则重启。单位是MB。96consumer.dynamic_modebool是否启用自动伸缩工作进程。trueconsumer.capacityint表示每个worker进程在短时间内健康状态下处理的最大消息数,影响worker进程的自动伸缩策略。5consumer.flex_intervalint每隔flex_interval秒,监控进程会尝试调整工作进程的数量(假设启用了工作进程的自动缩放)。5consumer.min_worker_numberint最小工作进程数。5consumer.max_worker_numberint最大工作进程数。30consumer.max_handle_numberint当前工作进程处理的最大消息数,超过后重启。0表示无限制。0job_snapshotmap每隔job_snapshot.interval秒,job_snapshot.handles会依次执行。job_snapshot.handles需要实现JobSnapshotterInterface。消息推送可用于cli/fpm模式:push(function(){echo"Helloaint-queuedelayed\n";});//推送延迟作业$closureJob=function(){echo"Helloaint-queuedelayed\n";};$queue->push($closureJob,5);更推荐使用classtasks,这样功能会更完备,可以获得更好的编码体验和性能。创建的任务类需要实现JobInterface。具体可以参考/examplehandle():void:任务体,你要执行的内容,这里可以使用swoole相关的API(比如创建协程等);canRetry(int$attempt,$error):bool:决定是否重试;retryAfter(int$attempt):int:决定重试延迟时间;failed(int$id,array$payload):void:任务完全失败(即重试已经达到次数还没有成功);middleware():array:当前任务的中间件列表,原理参考laravelpipeline管道;注意任务在生产者和消费者中必须能够(反)序列化,也就是说需要在同一个项目中使用队列快照事件可以让你实时监控队列,使用任务中间件,你可以实现任务执行速率限制、任务执行日志等。对于队列管理,建议使用Supervisor等进程管理工具来守护worker进程。vendor/bin/aint-queueAintQueueConsoleToolUsage:command[options][arguments]Options:-h,--help显示此帮助消息-q,--quiet不输出任何消息-V,--version显示此应用程序版本--ansi强制ANSI输出--no-ansi禁用ANSI输出-n,--no-interaction不询问任何交互问题-v|vv|vvv,--verbose增加消息的详细程度:1表示正常输出,2用于更详细的输出和3用于debugAvailable命令:help显示命令列表的帮助列出命令queuequeue:clear清除队列。queue:dashboard为仪表板启动http服务器。queue:reload-failed将所有失败的作业重新加载到等待队列中。queue:status获取特定队列的执行状态。workerworker:listen监听队列。worker:reload为队列重新加载worker。工人:跑运行特定作业。worker:stop停止监听队列。测试作曲家测试
