企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持知识库和私有化部署方案 广告
适配器类的抽象类,各种适配器继承此类 ``` namespace think\queue; use InvalidArgumentException; abstract class Connector { protected $options = []; //插入队列 abstract public function push($job, $data = '', $queue = null); //延迟执行任务 abstract public function later($delay, $job, $data = '', $queue = null); //提取最前的一个任务 abstract public function pop($queue = null); //这个方法是专门给Topthink适配器使用的,其他类型的适配器没有这个方法的实现 public function marshal() { throw new \RuntimeException('pop queues not support for this type'); } //创建任务容器 protected function createPayload($job, $data = '', $queue = null) { if (is_object($job)) { //如果job是一个对象,就要进行对象序列化处理 $payload = json_encode([ 'job' => 'think\queue\CallQueuedHandler@call', 'data' => [ 'commandName' => get_class($job), 'command' => serialize(clone $job), ], ]); } else { //如果job是任务类的路径字符串,就创建标准任务容器 $payload = json_encode($this->createPlainPayload($job, $data)); } if (JSON_ERROR_NONE !== json_last_error()) { throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); } return $payload; } //创建标准任务容器, payload可以理解为'任务容器',因为里面含有任务类job 和任务数据data 这两项基本数据, protected function createPlainPayload($job, $data) { return ['job' => $job, 'data' => $data]; } //设置任务容器的数据,可以通过这个方法,不断增加新的数据进 任务容器,这就是一个set方法,不要对payload数组进行直接操作 protected function setMeta($payload, $key, $value) { $payload = json_decode($payload, true); $payload[$key] = $value; $payload = json_encode($payload); if (JSON_ERROR_NONE !== json_last_error()) { throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); } return $payload; } } ```