Table Of Contents

骑驴找蚂蚁

全干工程师

mqtt-logo

PHP手撸MQTT客户端

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网设备通信。下面以PHP原生socket实现MQTT 3.1.1协议的CONNECTPUBLISHSUBSCRIBE报文构造与交互.

MQTT协议数据包结构

MQTT是一种基于二进制的协议,控制元素是二进制字节而不是文本字符串.

mqtt-connect

命令和命令确认格式由MQTT使用,这意味着每个命令都有一个伴随的确认。连接命令有连接确认,订阅命令有订阅确认,发布命令有发布确认,如上图所示。这种机制类似于TCP协议的握手机制.

MQTT消息格式由3字段组成

  • 固定包头(Fixed Header)所有 MQTT 数据包都必须存在,固定占用2个字节
  • 可变包头(Variable Header)并不总是存在,有时候不存在这个部分
  • 包体(payload)正在发送的数据存储在有包体中,然而包体部分也并不总是存在**,**一些命令,例如断开消息,就不使用包体字段

mqtt-header

固定包头报文解析

固定包头包含2个字节,
第一个字节的前4位表示消息类型,后4位用于表示传输协议和Qos等级,
第二字节用于存储数据包长度,这个长度=可变包头长度+包体长度

mqtt-header


构造并发送CONNECT报文

$clientId = "phpClient";
$protocolName = "MQTT";
$protocolLevel = chr(0x04); // MQTT 3.1.1
$connectFlags = chr(0x02); // Clean session
$keepAlive = pack("n", 60); // 60秒

// 可变头部
$variableHeader =
    pack("n", strlen($protocolName)) . $protocolName .
    $protocolLevel .
    $connectFlags .
    $keepAlive;

// 载荷
$payload = pack("n", strlen($clientId)) . $clientId;

// 剩余长度
$remainingLength = strlen($variableHeader) + strlen($payload);


// 固定头第一个字节的高4位表示消息类型,CONNECT 类型为 1(即 0001),低4位为标志位(CONNECT 报文要求为 0000)。
// 合并后就是 0001 0000(二进制),即 0x10(十六进制)。
// 固定头部
$fixedHeader = chr(0x10) . chr($remainingLength);

// 完整报文
$packet = $fixedHeader . $variableHeader . $payload;

// 连接并发送
$socket = fsockopen('broker.hivemq.com', 1883, $errno, $errstr, 5);
fwrite($socket, $packet);
// 读取CONNACK响应
$response = fread($socket, 4);
// 检查连接是否成功
if (ord($response[0]) == 0x20 && ord($response[3]) == 0x00) {
    echo "连接成功\n";
} else {
    echo "连接失败\n";
}

连接成功后,后续需要构建发布和订阅两个报文.


构造并发送 PUBLISH 报文

$topic = "test/topic";
$message = "Hello MQTT!";
// 主题
$topicField = pack("n", strlen($topic)) . $topic;
// 固定头部: 0x30 = PUBLISH, QoS 0
$fixedHeader = chr(0x30);
$remainingLength = strlen($topicField) + strlen($message);
$fixedHeader .= chr($remainingLength);
$packet = $fixedHeader . $topicField . $message;
fwrite($socket, $packet);
echo "已发送PUBLISH\n";

构造并发送 SUBSCRIBE 报文

$packetId = 1;
$topic = "test/topic";
// 可变头部: 报文标识符
$variableHeader = pack("n", $packetId);
// 载荷: 主题+QoS
$payload = pack("n", strlen($topic)) . $topic . chr(0x00); // QoS 0
// 固定头部: 0x82 = SUBSCRIBE, 剩余长度
$fixedHeader = chr(0x82);
$remainingLength = strlen($variableHeader) + strlen($payload);
$fixedHeader .= chr($remainingLength);
$packet = $fixedHeader . $variableHeader . $payload;
fwrite($socket, $packet);
// 读取SUBACK响应
$suback = fread($socket, 5);
// 严谨一点需要解析读取的报文
echo "已发送SUBSCRIBE\n";

读取 PUBLISH 消息(订阅后)

// 读取服务器推送的PUBLISH消息
while (!feof($socket)) {
    $header = fread($socket, 2);
    if (!$header) break;
    $type = ord($header[0]) >> 4;
    $len = ord($header[1]);
    $body = fread($socket, $len);
    if ($type == 3) { // PUBLISH
        $topicLen = unpack('n', substr($body, 0, 2))[1];
        $topic = substr($body, 2, $topicLen);
        $msg = substr($body, 2 + $topicLen);
        echo "收到消息: 主题=$topic, 内容=$msg\n";
        break;
    }
}
// 关闭连接
fclose($socket);

完整示例

<?php


class MQTT {

    private string $clientId;

    private array $subscribeCallbacks = [];

    private $socket;

    public function __construct(string $clientId) {
        $this->clientId = $clientId;
    }

    public function connect() {
        $protocolName = "MQTT";
        $protocolLevel = chr(0x04); // MQTT 3.1.1
        $connectFlags = chr(0x02); // Clean session
        $keepAlive = pack("n", 60); // 60秒

        // 可变头部
        $variableHeader =
            pack("n", strlen($protocolName)) . $protocolName .
            $protocolLevel .
            $connectFlags .
            $keepAlive;

        // 载荷
        $payload = pack("n", strlen($this->clientId)) . $this->clientId;

        // 剩余长度
        $remainingLength = strlen($variableHeader) + strlen($payload);


        // 固定头第一个字节的高4位表示消息类型,CONNECT 类型为 1(即 0001),低4位为标志位(CONNECT 报文要求为 0000)。
        // 合并后就是 0001 0000(二进制),即 0x10(十六进制)。
        // 固定头部
        $fixedHeader = chr(0x10) . chr($remainingLength);

        // 完整报文
        $packet = $fixedHeader . $variableHeader . $payload;

        // 解析packet
        // $parseHeader = unpack("Clen/Cvlen/nnlen", $packet);
        // print_r($parseHeader);
        // $parseVariableHeader = unpack("A{$parseHeader['nlen']}name/Clevel/Cflag/nalive", $packet, 4);
        // print_r($parseVariableHeader);
        // $parsePayload = unpack("nplen/A*payload", $packet, 4 + $parseHeader['nlen'] + 4);
        // print_r($parsePayload);

        // 连接并发送
        $this->socket = fsockopen('broker.hivemq.com', 1883, $errno, $errstr, 5);
        fwrite($this->socket, $packet);
        // 读取CONNACK响应
        $response = fread($this->socket, 4);
        // 检查连接是否成功
        return ord($response[0]) == 0x20 && ord($response[3]) == 0x00;
    }

    public function publish(string $topic, string $message) {
        // 主题
        $topicField = pack("n", strlen($topic)) . $topic;
        // 固定头部: 0x30 = PUBLISH, QoS 0
        // QoS 0:无响应
        // QoS 1:返回 PUBACK
        // QoS 2:返回 PUBREC → PUBREL → PUBCOMP
        $fixedHeader = chr(0x30);
        $remainingLength = strlen($topicField) + strlen($message);
        $fixedHeader .= chr($remainingLength);
        $packet = $fixedHeader . $topicField . $message;
        fwrite($this->socket, $packet);
    }

    public function subscribe(string $topic, callable $call) {
        $packetId = 1;
        // 可变头部: 报文标识符
        $variableHeader = pack("n", $packetId);
        // 载荷: 主题+QoS
        $payload = pack("n", strlen($topic)) . $topic . chr(0x00); // QoS 0
        // 固定头部: 0x82 = SUBSCRIBE, 剩余长度
        $fixedHeader = chr(0x82);
        $remainingLength = strlen($variableHeader) + strlen($payload);
        $fixedHeader .= chr($remainingLength);
        $packet = $fixedHeader . $variableHeader . $payload;
        fwrite($this->socket, $packet);
        // 读取SUBACK响应
        $suback = fread($this->socket, 5);
        echo strlen($suback);

        $parseSuback = unpack("Ctype/c1len/c2id/cqos", $suback);

        if ($parseSuback["qos"] === 0) {
            $this->subscribeCallbacks[$topic] = $call;
        }
    }


    public function wait() {
        while (!feof($this->socket)) {
            $header = fread($this->socket, 2);
            if (!$header) break;
            $type = ord($header[0]) >> 4;
            $len = ord($header[1]);
            $body = fread($this->socket, $len);
            if ($type == 3) { // PUBLISH
                $topicLen = unpack('n', substr($body, 0, 2))[1];
                $topic = substr($body, 2, $topicLen);
                $msg = substr($body, 2 + $topicLen);
                if (isset($this->subscribeCallbacks[$topic])) {
                    call_user_func($this->subscribeCallbacks[$topic], $msg);
                }
            }
            sleep(1);
        }

    }

    public function close() {
        fclose($this->socket);
    }

}


$mqtt = new MQTT("clientId1");

if ($mqtt->connect()) {
    $mqtt->subscribe("test/topic", function($message) {
        echo "收到消息: 内容=$message\n";
    });
    $mqtt->publish("test/topic", "message payload");

    $mqtt->wait();

    $mqtt->close();
}


示例执行后的结果.


[~/projects/php/examples] php mqtt.php

收到消息: 内容={hello: world}
收到消息: 内容=message payload

此文章主要是笔者为了学习mqtt相关的一些协议,请误在线上环境使用该代码,你应该使用第三方专业的库.

推荐阅读

留言