NIUCLOUD是一款SaaS管理后台框架多应用插件+云编译。上千名开发者、服务商正在积极拥抱开发者生态。欢迎开发者们免费入驻。一起助力发展! 广告
[TOC] ## RabbitMq介绍 **消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶.** 或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。 RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。 中文文档 [http://rabbitmq.mr-ping.com/](http://rabbitmq.mr-ping.com/) 本文章为基于rapphp swoole环境下快速对接 amqp 的技术方案 ## 安装 compose 依赖 [ly_lib/amqp](http://package.magcloud.net/?#ly_lib/amqp) docker快速部署RabbitMq registry.cn-hangzhou.aliyuncs.com/alicode/rabbitmq:3.7.23-management-delayed 本镜像已安装对应的延迟插件 需要监控的端口 5672:服务端口 15672:web管理端口 环境变量 RABBITMQ_DEFAULT_USER : 默认用户 RABBITMQ_DEFAULT_PASS:默认密码 ## 注册mq mq 配置使用了rap ioc的构造器配置 ~~~ class MyBeanConstructorMapper extends BeanConstrustorMapper { function mapper() { return [[ "class" => MqConnection::class, "constructor" => [ "host" => 'rabbitmq3.7.7', "port" => 5672, "user_name" => 'admin', "password" => 'admin', "fetch_count" => 1, 'exchanges'=>["delayed_exchanges"=>['x-delayed-message','topic']] ], ] ]; } } ~~~ ### fetch_count fetch_count 指basic_qos 中prefetch_count ,即每次获取多少条消息进行处理 fetch_count 影响消息处理的速度,定义太大会自己程序会处理不过来,定义太小可能出消息堆积太多 ### exchanges exchanges 指交换机,我们用到的所有交换机都需要进行定义,不然对应的交换机在使用时不会检查是否已安装对应交换机 exchanges 格式 一般交换机 ~~~ ["交换机名称"=>"类型"] 类型我们正常都是用topic ~~~ 支持延时功能 ~~~ ["交换机名称"=>["x-delayed-message","类型"]] 或则 ["交换机名称"=>["delayed","类型"]] ~~~ ### 初始化 ~~~ class AppInit implements Init { public function appInit(AutoFindHandlerMapping $autoMapping, Router $router) { //注册类构造器定义 Ioc::register(MyBeanConstructorMapper::class); //注册消费者 Mq::consume(MyConsume::class); Mq::consume(MyConsume2::class); //启动mq Mq::start(); } } ~~~ ## 发布者 ~~~ class MyPublish extends Publisher { //路由的key protected $routing_key = 'aa.cc'; //其他自定义属性 public $name; public $age; } ~~~ ### 发布消息 发布者发布消息实例 ~~~ $publisher = MyPublish::newInstance(); $publisher->name = "n"; $publisher->age = $i; $publisher->send(); ~~~ ### 使用默认的发布类 如果你不喜欢将所有消息体进行定义,可以使用默认发布类 ~~~ Publisher::sendContent($routing_key, ['a' => 1, 'age' => $i]); ~~~ ## 订阅消息 订阅则可以订阅多个routing_key ### 生成一次性消息队列 一次性消费队列,队列和固定消费者绑定,在消费者定义是会生成队列,在mq断开时删除对应队列 ~~~ class MyConsume extends Consume { protected $routing_key = ['aa.bb']; public function onReceiveMsg(MqMessage $message) { Log::error($message->body); } } ~~~ ### 生成固定消息队列 如果你指定了 队列名称 ,mq在断开是不会删除对应的队列,并且不同的消费者可以共用一个队列 ~~~ class MyConsume extends Consume { protected $routing_key = ['aa.bb']; protected $queue = 'fixname'; public function onReceiveMsg(MqMessage $message) { Log::error($message->body); } } ~~~ ### 消息确认 默认onReceiveMsg正确处理完后都会进行消息确认(ack) 消息在处理过程中,抛出异常会reject(false),mq不会再次进行投递 如果消息你确实需要重新投递,或则需要同队列的其他消费者进行处理,你可以在你的代码里 reject(true); ~~~ public function onReceiveMsg(MqMessage $message) { $message->reject(true); } ~~~ ## 延时消息 rabbitMq 想使用延时消息,需要 1.rabbitmq_delayed_message_exchange-3.7.x.ez 拓展 2.定义交换机是 必须使用`x-delayed-message` 我们代码里只需要定义 delay 属性就可以了,时间单位默认为秒 ~~~ class MyPublish extends Publisher { .... protected $delay = 300; } ~~~ ## 使用指定交换机 如果你不想用默认交换机 `topic_default` 你可以在对应的 发布者或订阅者指定交换机名称 ~~~ class MyPublish extends Publisher { .... protected $exchange = 'exchange_name'; } ~~~