# Mqtt [TOC] SD提供了MQTT异步客户端。 * 异步MQTT客户端,可以结合EMQ开源服务器实现百万千万级通讯业务。 * 支持完整的MQTT协议规则 * 纯异步,断线重连 ## 创建MQTT连接 ```php public function initAsynPools($workerId) { parent::initAsynPools(); $mqtt = new MQTT('tcp://127.0.0.1:11883/','root1'); //设置持久会话 $mqtt->setConnectClean(false); //认证 $mqtt->setAuth('root1','root'); //存活时间 $mqtt->setKeepalive(3600); //回调 $mqtt->on('publish', function ($mqtt, PUBLISH $publish_object) { printf( "\e[32mI got a message\e[0m:(msgid=%d, QoS=%d, dup=%d, topic=%s) \e[32m%s\e[0m\n", $publish_object->getMsgID(), $publish_object->getQos(), $publish_object->getDup(), $publish_object->getTopic(), $publish_object->getMessage() ); }); $mqtt->on('connack', function (MQTT $mqtt, CONNACK $connack_object) { var_dump("MQTT连接成功"); $topics['$SYS/#'] = 1; $mqtt->subscribe($topics); }); $mqtt->connect(); } ``` >我们在initAsynPools中创建了Mqtt客户端,这样会导致每个worker进程都会创建一个MQTT客户端。 最好的做法是创建一个自定义进程,在自定义进程中创建MQTT客户端。 MQTT是纯异步的,需要通过回调获取通讯的结果。 ## 回调 * connack 连接成功的回调 * disconnect 断开连接的回调 * puback * pubrec * pubrel * pubcomp * suback 订阅成功的回调 * unsuback 移除订阅的回调 * publish 收到订阅消息的回调 ## Api * setRetryTimeout 设置Retry超时时间 * setVersion 设置MQTT版本 * version 获取MQTT版本 * setAuth 设置AUTH * setKeepalive 设置Keepalive时间 * setConnectClean 设置连接清除标志 * setWill 设置遗嘱消息 * on 设置回调 * ping ping * connect 连接 * disconnect 断开连接 * publish 发布 * subscribe 订阅 * unsubscribe 取消订阅