💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、豆包、星火、月之暗面及文生图、文生视频 广告
继承vendor\topthink\think-queue\src\queue\Connector.php; 然后实现了相应的抽象方法; ``` <?php namespace think\queue\connector; use Exception; use think\helper\Str; use think\queue\Connector; use think\queue\job\Redis as RedisJob; class Redis extends Connector { /** @var \Redis */ protected $redis; protected $options = [ 'expire' => 60, 'default' => 'default', //默认的队列名称 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false ]; public function __construct($options) { if (!extension_loaded('redis')) { throw new Exception('redis扩展未安装'); } if (!empty($options)) {//用application\extra\queue.php里面的配置覆盖默认配置 $this->options = array_merge($this->options, $options); } //判断配置中是否使用redis链接复用,使用长链接可以提高速度 $func = $this->options['persistent'] ? 'pconnect' : 'connect'; $this->redis = new \Redis; $this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']); if ('' != $this->options['password']) { $this->redis->auth($this->options['password']); } if (0 != $this->options['select']) { $this->redis->select($this->options['select']); } } //插入任务 ,立刻执行 public function push($job, $data = '', $queue = null) { return $this->pushRaw($this->createPayload($job, $data), $queue); } //插入任务,延迟$delay秒执行 public function later($delay, $job, $data = '', $queue = null) { $payload = $this->createPayload($job, $data); //Zadd 命令用于将一个或多个成员元素及其分数值加入到有序集当中 $this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload); } public function pop($queue = null) { $original = $queue ?: $this->options['default']; $queue = $this->getQueue($queue); $this->migrateExpiredJobs($queue . ':delayed', $queue, false); if (!is_null($this->options['expire'])) { $this->migrateExpiredJobs($queue . ':reserved', $queue); } $job = $this->redis->lPop($queue); if ($job !== false) { $this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job); return new RedisJob($this, $job, $original); } } /** * 重新发布任务 * * @param string $queue * @param string $payload * @param int $delay * @param int $attempts * @return void */ public function release($queue, $payload, $delay, $attempts) { $payload = $this->setMeta($payload, 'attempts', $attempts); $this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload); } //正式插入任务 public function pushRaw($payload, $queue = null) { //Rpush 命令用于将一个或多个值插入到列表的尾部(最右边) $this->redis->rPush($this->getQueue($queue), $payload); return json_decode($payload, true)['id']; } //创建任务容器 protected function createPayload($job, $data = '', $queue = null) { //调用父类的创建任务容器函数,然后附带上一个随机id $payload = $this->setMeta( parent::createPayload($job, $data), 'id', $this->getRandomId() ); return $this->setMeta($payload, 'attempts', 1); } /** * 删除任务 * * @param string $queue * @param string $job * @return void */ public function deleteReserved($queue, $job) { $this->redis->zRem($this->getQueue($queue) . ':reserved', $job); } /** * 移动延迟任务 * * @param string $from * @param string $to * @param bool $attempt */ public function migrateExpiredJobs($from, $to, $attempt = true) { //$this->redis->watch($from); $jobs = $this->getExpiredJobs( $from, $time = time() ); if (count($jobs) > 0) { $this->transaction(function () use ($from, $to, $time, $jobs, $attempt) { $this->removeExpiredJobs($from, $time); for ($i = 0; $i < count($jobs); $i += 100) { $values = array_slice($jobs, $i, 100); $this->pushExpiredJobsOntoNewQueue($to, $values, $attempt); } }); } //$this->redis->unwatch(); } /** * redis事务 * @param \Closure $closure */ protected function transaction(\Closure $closure) { $this->redis->multi(); try { call_user_func($closure); if (!$this->redis->exec()) { $this->redis->discard(); } } catch (Exception $e) { $this->redis->discard(); } } /** * 获取所有到期任务 * * @param string $from * @param int $time * @return array */ protected function getExpiredJobs($from, $time) { //Zrangebyscore 返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大)次序排列。 //-inf:负无限,这里的意思是 获取 from里面排序 小于 $time的全部数据,一把这个$time就是当前时间戳,也就是小于当前时间的,过期的 return $this->redis->zRangeByScore($from, '-inf', $time); } /** * 删除过期任务 * * @param string $from * @param int $time * @return void */ protected function removeExpiredJobs($from, $time) { //Zremrangebyscore 命令用于移除有序集中,指定分数(score)区间内的所有成员 //移除$time小于当前时间的,已经过期的数据 $this->redis->zRemRangeByScore($from, '-inf', $time); } /** * 重新发布到期任务 * * @param string $to * @param array $jobs * @param boolean $attempt */ protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true) { if ($attempt) { foreach ($jobs as &$job) { $attempts = json_decode($job, true)['attempts']; $job = $this->setMeta($job, 'attempts', $attempts + 1); } } $this->redis->rPush($to, ...$jobs); // call_user_func_array([$this->redis, 'rPush'], array_merge([$to], $jobs)); } /** * 随机id * * @return string */ protected function getRandomId() { return Str::random(32); } /** * 获取队列名 * * @param string|null $queue * @return string */ protected function getQueue($queue) { //组装出完整的 目录 路径,例:queues:shopro , queues:default return 'queues:' . ($queue ?: $this->options['default']); } } ```