多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] 注意如果一直执行一个出错的job,他会一直try,最终把内存爆掉 ~~~ $pheanstalk=new Pheanstalk('ip','11300'); $pheanstalk以下这个变量认为是beanstalkd的实例了 ~~~ # 维护 ## 查看状态stats ~~~ require __DIR__."/vendor/autoload.php"; use Pheanstalk\Pheanstalk; //ip和端口,由于我使用的是docker,写容器名就可以了 $pheanstalk=new Pheanstalk('beanstalkd','11300'); print_r($pheanstalk->stats()); ~~~ ~~~ Pheanstalk\Response\ArrayResponse Object ( [_name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [current-jobs-urgent] => 0 # 当前存在优先级的任务是0 [current-jobs-ready] => 0 # 当前处在准备读取的任务数是0 [current-jobs-reserved] => 0 # 当前正在执行的任务数是0 [current-jobs-delayed] => 0 # 当前处在延迟的任务数是0 [current-jobs-buried] => 0 # 当前处在预留任务数是0 #cmd开头表示运行过这个命令的累计数量 [cmd-put] => 0 [cmd-peek] => 0 [cmd-peek-ready] => 0 [cmd-peek-delayed] => 0 [cmd-peek-buried] => 0 [cmd-reserve] => 0 [cmd-reserve-with-timeout] => 0 [cmd-delete] => 0 [cmd-release] => 0 [cmd-use] => 0 [cmd-watch] => 0 [cmd-ignore] => 0 [cmd-bury] => 0 [cmd-kick] => 0 [cmd-touch] => 0 [cmd-stats] => 2 [cmd-stats-job] => 0 [cmd-stats-tube] => 0 [cmd-list-tubes] => 0 [cmd-list-tube-used] => 0 [cmd-list-tubes-watched] => 0 [cmd-pause-tube] => 0 [job-timeouts] => 0 # 任务超时的次数.就是任务超过时间,被重新放入管道的次数 [total-jobs] => 0 # 用户累计数量 [max-job-size] => 65535 # 任务字符串大小,不能超过这个,也就是2的16次方 [current-tubes] => 1 # 当前存在的管道数量 [current-connections] => 1 # 当前打开的连接数量 [current-producers] => 0 # 当前有多少个生产者,连接上发送至少一个就算 [current-workers] => 0 # 有多少消费者 [current-waiting] => 0 # 消费者发送了reserved但没有响应的数量,就一直阻塞在那,等着任务 [total-connections] => 2 #总共的链接数 [pid] => 1 #pid [version] => 1.10 #版本 [rusage-utime] => 0.010000 #进程执行用户代码的时间 [rusage-stime] => 0.000000 #进程执行内核代码的时间,这2个主要看负载的 [uptime] => 160 # 从启动到现在一共运行了多少秒 [binlog-oldest-index] => 0 [binlog-current-index] => 0 [binlog-records-migrated] => 0 [binlog-records-written] => 0 [binlog-max-size] => 10485760 [id] => 1bd5de699bcabfbc [hostname] => 2935a637423d ) ) ~~~ ## 查看管道数量listTubes ~~~ print_r($pheanstalk->listTubes()); ~~~ ~~~ Array ( [0] => default ) ~~~ 目前存在一个管道,这个管道的名字教default是默认管道,一般不使用默认管道,自己创建管道使用 ## 查看具体管道详情statsTube ~~~ 参数是管道的名字 print_r($pheanstalk->statsTube('default')); ~~~ ~~~ Pheanstalk\Response\ArrayResponse Object ( [_name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [name] => default #当前管道的名字 [current-jobs-urgent] => 0 #当前管道存在任务优先级的任务数 [current-jobs-ready] => 0 # 当前处在准备读取的任务数 [current-jobs-reserved] => 0 # 当前正在执行的任务数 [current-jobs-delayed] => 0 # 当前处在延迟的任务数 [current-jobs-buried] => 0 # 当前处在预留任务数 [total-jobs] => 0 # 当前有多少个任务数 [current-using] => 1 #当前有多少个用这个管道 [current-watching] => 1 #当前有多少个消费者在使用这个管道 [current-waiting] => 0 # 当前有多少个消费者在等待这个任务 [cmd-delete] => 0 [cmd-pause-tube] => 0 [pause] => 0 [pause-time-left] => 0 ) ) ~~~ ## 使用一个管道useTube ~~~ //useTube表示要使用那个管道,put是放入的意思 $pheanstalk->useTube('default')->put('test'); //查看管道状态 print_r($pheanstalk->statsTube('default')); ~~~ ~~~ Pheanstalk\Response\ArrayResponse Object ( [_name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [name] => default [current-jobs-urgent] => 0 [current-jobs-ready] => 1 #准备的任务数是1个 [current-jobs-reserved] => 0 [current-jobs-delayed] => 0 [current-jobs-buried] => 0 [total-jobs] => 1 [current-using] => 1 [current-watching] => 1 [current-waiting] => 0 [cmd-delete] => 0 [cmd-pause-tube] => 0 [pause] => 0 [pause-time-left] => 0 ) ) ~~~ ## 查看任务详情statsJob ~~~ //watch表示监听管道,reserve取出 $job=$pheanstalk->watch('default')->reserve(); //statsJob查看队列详情 $stats=$pheanstalk->statsJob($job); print_r($stats); ~~~ ~~~ Pheanstalk\Response\ArrayResponse Object ( [_name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [id] => 1 #当前任务的id,这个id是唯一的 [tube] => default #当前任务所在的管道 [state] => reserved #当前任务的状态是什么 [pri] => 1024 # 任务优先级,1024表示默认优先级 [age] => 206 # 表示从创建到现在活了多少秒 [delay] => 0 # 延迟的描述 [ttr] => 60 #当前任务的ttr时间,默认60秒,也可以设置 [time-left] => 59 # 这个任务在reserved维持的秒数,ttr是60,超过就超时了 [file] => 0 # 没有开启binlog日志这个就是0,开binlog这就表示0 [reserves] => 1 #当前状态reserves的次数,超时就重新放入在reserves,再超时 [timeouts] => 0 # ttr超时的次数 [releases] => 0 # 重设任务的状态 [buries] => 0 # 被预留的次数 [kicks] => 0 # 让一个任务从预留的状态提回到releases状态 ) ) ~~~ ## 根据任务的id来查看任务peek ~~~ //这个数字是任务的id $job=$pheanstalk->peek(1); $stats=$pheanstalk->statsJob($job); print_r($stats); ~~~ 调用put方法的时候也会返回一个任务的id # 生产类 ## putInTube把任务放到管道 ~~~ /** * 第一个参数是管道名字 * 第二个参数是要放入任务的字符串 * 第三参数是优先级,可省略不写,默认1024 * 返回任务的id */ $id=$pheanstalk->putInTube('default','666','1000'); print_r($id); ~~~ ## put任务放到管道 ~~~ $tube=$pheanstalk->useTube('default'); /** * 第二个参数是任务优先级,0最大,越小越大 [可省略] * 第三个参数是延迟的秒数,表示要延迟多少秒执行 [可省略] * 第四个参数表示超过多少秒就重发,超时重发 [可省略] */ $tube->put('test',4,20,30); ~~~ ## delete删除任务 读取到任务,然后把他删除 ~~~ $job=$pheanstalk->watch('default')->reserve(); print_r($job); $pheanstalk->delete($job); ~~~ ## touch即将延迟的任务续命,让任务重置ttr时间 如果我们一个任务执行10秒,任务的延迟也是10秒,那这个任务我们一直拿不到,怎么办呢? 我们让这个任务等等,这样就能拿到了 ~~~ $tube=$pheanstalk->useTube('default'); $tube->put('test',0,0,10); $job=$pheanstalk->watch('default')->reserve(); print_r($pheanstalk->statsJob($job)); sleep(9); //给任务续命 $pheanstalk->touch($job); $pheanstalk->delete($job); ~~~ # 消费类 ## watch监听管道,可以同时监听多个 ~~~ //watch参数是监听的管道名字 $job=$pheanstalk->watch('newUsers') ->watch('default'); //列出监听的管道 $tubes=$pheanstalk->listTubesWatched(); print_r($tubes); ~~~ ## ignore忽略的管道 ~~~ $job=$pheanstalk->watch('newUsers') ->watch('default') ->ignore('default'); //列出监听的管道 $tubes=$pheanstalk->listTubesWatched(); print_r($tubes); ~~~ ## reserve阻塞的方式获取任务 ~~~ $job=$pheanstalk->watch('default') ->reserve(); var_dump($job); $pheanstalk->delete($job); ~~~ ~~~ /** * reserve参数 * 设置reserve阻塞的秒数 [可省略] * 超过这个秒数立即返回 */ $job=$pheanstalk->watch('default') ->reserve(3); var_dump($job); $pheanstalk->delete($job); ~~~ ## release把任务重新放回管道 ~~~ $job=$pheanstalk->watch('default')->reserve(); /** * 因为一些元素,比如网络连接不上 */ $module=false; if (!$module) { //把取出来的任务重新放回管道,任务变为ready状态 $pheanstalk->release($job); }else{ $pheanstalk->delete($job); } ~~~ ## bury把任务放一边,预留 ~~~ $job=$pheanstalk->watch('default')->reserve(); /** * 比如我们要发邮件,但是邮件服务器连接不上,就把这个任务放一边设为预留 */ $pheanstalk->bury($job); ~~~ ## peekBuried,kickJob把任务从预留中取出来,变为ready ~~~ /** * 从管道中取出预留的任务 */ $job=$pheanstalk->peekBuried('default'); /** * 把这个任务变为ready状态,这样消费者就能去消费了 */ $pheanstalk->kickJob($job); var_dump($pheanstalk->statsJob($job)); ~~~ ## kick批量的把预留的设为ready ~~~ /** * 从这个管道中,把任务id小于999的预留任务全部变为ready */ $pheanstalk->useTube('default')->kick(999); ~~~ ## peekReady一次性取出所有ready的任务,不阻塞,无法获取任务会抛出异常 ~~~ /** * 一次性从这个管道中取出ready的任务 */ $job=$pheanstalk->peekReady('default'); var_dump($job); $pheanstalk->delete($job); #相似还有peekDelayed一次性取出Delayed的消息 #需要注意peek的一些方法,不阻塞,无法获取任务会抛出异常 ~~~ ## pauseTube设置管道延迟多少秒 ~~~ /** * 设置管道延迟10秒 */ $pheanstalk->pauseTube('default',10); $job=$pheanstalk->watch('default')->reserve(); var_dump($job); ~~~ ## resumeTube把管道从延迟中恢复 ~~~ $pheanstalk->resumeTube('default'); ~~~ # 具体使用 ~~~ /** * 生产者 */ $pheanstalk->useTube('default') ->put(222); ~~~ ~~~ /** * 消费者 */ while(true) { $queue = $pheanstalk->watch('default'); if(($job=$queue->reserve(5)) == false) { continue; } //没有任务的时候reserve会阻塞在这里 $job=$job->reserve(); //getData是从任务中取出数据 echo $job->getData(); //处理具体业务 //这边要删除,不删除的话,超时会重新放入管道 $pheanstalk->delete($job); } ~~~