多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# 环境 首先,确定好你的环境已经拥有这些内容: - php - redis - mysql - phpredis 扩展 而接下来,我们通过运行 Mysql 和 Redis 各自的生产者、消费者,来看消息队列是否已经在工作。 # 实例化 Queue 打开生产者和消费者,我们可以看到有这么一段代码: ``` Queue::init('Mysql', [ 'dsn' => 'mysql:host=mysql;dbname=test', 'username' => 'root', 'password' => 'root', 'table' => 'queues', 'ttr' => 60, ]); // 队列初始化 ``` 或: ``` Queue::init('Redis', [ 'ip' => 'redis', 'port' => 6379, 'tubes' => 'tubes' ]); // 队列初始化 ``` 如果前面实现环节,我们有达成共识,那么,相信这里你一定已经知道,这段代码是用来传入连接参数的。 你需要根据你运行的环境,去单独设置它。 # 测试步骤 - 打开 4 个 client ( cmd , terminal )窗口 - 在其中两个命令行中,分别运行如下命令: ``` php Consumer.php // 运行 Mysql 消费者 php RedisConsumer.php // 运行 Redis 消费者 ``` 运行成功后,这两个窗口将处于运行状态,两个消费者都会始终处于 reserve 循环中,直到接收到数据,才会输出。 - 再另外两个窗口,运行如下命令: ``` php Producer.php php RedisProducer.php ``` # 小结 到此为止,我们的模拟消息队列就成功了。 我们可以看到,当我们运行生产者的时候,各自对应的消费者窗口中,就会弹出相关的信息。 这就是一个简单的消息队列案例。 **但是,这里需要注意,这只是一个模拟案例,它帮助我们理解消息队列,但并不能应用于实际项目中。** 我们可以推理一下,消息队列,本该可以使用在高并发的场景,但我们现在实现的消息队列,它还有许多漏洞,根本无法应用在实战场景中。 比如: - mysql 驱动的 reserve 方法,它先接收,接收成功后再修改 job 的状态,这在高并发的情况下,将导致我们的 job 被重复接收,因为在「接收」和「修改状态」两个环节中间,可能会有另一个「请求」进来,此时的 job 还处于 ready 状态 - redis 驱动的 reserve 方法,使用了 redis 的 rPop 操作,这个方法,会在取出数据的同时,将数据从 list 中删除,那么,如果我们消费者处理失败,这个数据就有可能丢失 - redis 驱动的 put 方法,使用了 lPush 操作,此操作执行成功后,将会返回 list 的长度,我们的 redis 驱动会使用这个长度来作为该 job 的下标( id )。然而, list 的长度会不断变化,并发场景下,你 put 成功后,可能还在处理其他流程,此进程并未结束,此时若有消费者接收走一个消息,则 list 的长度就发生了变化,你在 put 时的 id 就无法与 list 中的数据对应起来