多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
参考:[https://www.kancloud.cn/uuling/tp5-console/366131](https://www.kancloud.cn/uuling/tp5-console/366131) ``` <?php namespace app\command\consumer\order; use think\console\Command; use think\console\Input; use think\console\Output; use app\common\tool\Rabbit; use app\platform\logic\Daraz\Order; use app\platform\model\PlatformOrder; class DarazList extends Command { protected $input = null; protected $output = null; protected $colorList = [ 'red'=> "255;0;0;", 'orange'=> "255;165;0;", 'yellow'=> "255;255;0;", 'green'=> "0;255;0;", 'cyan'=> "0;127;255;", 'blue'=> "0;0;255;", 'purple'=> "139;0;255;", ]; protected function info($msg, $color="green"){ # {\033[}为标签起始 {m}为标签结束,属性用数字加分号隔开 38是设置颜色 $colorRGB = isset($this->colorList[$color]) ? $this->colorList[$color] : $this->colorList['green']; $this->output->writeln("\033[38;2;".$colorRGB."1m".$msg."\033[0m"); } protected function configure() { // 指令配置 $this->setName('DarazList'); // 设置参数 } protected function execute(Input $input, Output $output) { $this->input = $input; $this->output = $output; // 指令输出 $this->info('消费者程序启动...'); $this->info('进程pid:' . getmypid()); $queueName = 'daraz_order_list'; dd($queueName); # 处理失败的队列名 $failedQueueName = 'failed_'.$queueName; $rabbit = Rabbit::instance('rabbit_mq_sync_task'); # 测试获取单条队列消息 // $data = $rabbit->consumeData($queueName, 5); // if(!empty($data)){ // // foreach($data as $d){ // // print_r(json_decode(gzuncompress($d['rawData']), true)); // // } // // $rabbit->ack($data['delivery_tag']); // dd(json_decode(($data[0]['rawData']), true)); // // dd($data); // } // $this->info('结束'); // die; # 消费计数 $i = 1; $point = 1000; $data = []; $delivery_tag = null; # 声明对队列消息的业务处理回调 $callback = function($msg) use (&$i, $point, &$data, &$delivery_tag){ if($i%$point == 0){ # 每处理指定数量的队列消息可以标记下逻辑 } $this->info('第'.$i.'个消息处理'); $delivery_tag = $msg->delivery_info['delivery_tag']; $rawData = $msg->body; // $this->info('rawData:'.$i.' '. $rawData .PHP_EOL); # 业务逻辑处理 $data = json_decode($rawData, true); unset($rawData); # 必要参数校验 $accountId = $data['accountId']; $platformCode = "VO"; $order = $data['order']; // dd($order); $orderLogic = new Order(); # 格式化数据 $formatData = $orderLogic->formatOrder($accountId, $platformCode, $order); // dd($formatData); // # 入库 $orderModel = new PlatformOrder(); $result = $orderModel->syncOrder($formatData); # 如果有返回且插入失败,要抛出插入异常 if(!$result){ throw new Exception("PlatformOrder->syncOrder failed", -1); } // $msg->delivery_info['channel']->basic_ack($delivery_tag); $i++; # 业务逻辑处理完,手动确认删除,否则会一直继续获取队列消息,并标记fetched unacked 只有消费脚本退出进程,才会释放,但不会删除 }; while(true){ try { $rabbit->consume($callback, $queueName, 0); }catch(Throwable $e){ # 如果是因为连接关闭,自动重连 if(strpos($e->getMessage(), 'Broken pipe or closed connection') !== false){ $this->info('连接断开,开始重连'); $rabbit->close(); $rabbit->connect(); } # 异常, 统一丢到失败队列 $this->info('异常记录:'.$e->getMessage()); dd("退出"); $rabbit->ack($delivery_tag); $failedMsg = [ 'raw_queue_name'=> $queueName, 'err_msg'=> $e->getMessage(), 'data'=> $data ]; $failedMsg = json_encode($failedMsg); $rabbit->easyProduce($failedMsg, $failedQueueName); $i++; continue; } } # 常驻进程退出 $this->info('结束'); } } ```