

PHP手撸MQTT客户端
MQTT
(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,常用于物联网设备通信。下面以PHP
原生socket
实现MQTT 3.1.1
协议的CONNECT
、PUBLISH
、SUBSCRIBE
报文构造与交互.
MQTT协议数据包结构
MQTT
是一种基于二进制的协议,控制元素是二进制字节而不是文本字符串.
命令和命令确认格式由MQTT
使用,这意味着每个命令都有一个伴随的确认。连接命令有连接确认,订阅命令有订阅确认,发布命令有发布确认,如上图所示。这种机制类似于TCP
协议的握手机制.
MQTT
消息格式由3字段组成
- 固定包头(Fixed Header)所有 MQTT 数据包都必须存在,固定占用2个字节
- 可变包头(Variable Header)并不总是存在,有时候不存在这个部分
- 包体(payload)正在发送的数据存储在有包体中,然而包体部分也并不总是存在**,**一些命令,例如断开消息,就不使用包体字段
固定包头报文解析
固定包头包含2个字节,
第一个字节的前4位表示消息类型,后4位用于表示传输协议和Qos等级,
第二字节用于存储数据包长度,这个长度=可变包头长度+包体长度
构造并发送CONNECT报文
$clientId = "phpClient";
$protocolName = "MQTT";
$protocolLevel = chr(0x04); // MQTT 3.1.1
$connectFlags = chr(0x02); // Clean session
$keepAlive = pack("n", 60); // 60秒
// 可变头部
$variableHeader =
pack("n", strlen($protocolName)) . $protocolName .
$protocolLevel .
$connectFlags .
$keepAlive;
// 载荷
$payload = pack("n", strlen($clientId)) . $clientId;
// 剩余长度
$remainingLength = strlen($variableHeader) + strlen($payload);
// 固定头第一个字节的高4位表示消息类型,CONNECT 类型为 1(即 0001),低4位为标志位(CONNECT 报文要求为 0000)。
// 合并后就是 0001 0000(二进制),即 0x10(十六进制)。
// 固定头部
$fixedHeader = chr(0x10) . chr($remainingLength);
// 完整报文
$packet = $fixedHeader . $variableHeader . $payload;
// 连接并发送
$socket = fsockopen('broker.hivemq.com', 1883, $errno, $errstr, 5);
fwrite($socket, $packet);
// 读取CONNACK响应
$response = fread($socket, 4);
// 检查连接是否成功
if (ord($response[0]) == 0x20 && ord($response[3]) == 0x00) {
echo "连接成功\n";
} else {
echo "连接失败\n";
}
连接成功后,后续需要构建发布和订阅两个报文.
构造并发送 PUBLISH 报文
$topic = "test/topic";
$message = "Hello MQTT!";
// 主题
$topicField = pack("n", strlen($topic)) . $topic;
// 固定头部: 0x30 = PUBLISH, QoS 0
$fixedHeader = chr(0x30);
$remainingLength = strlen($topicField) + strlen($message);
$fixedHeader .= chr($remainingLength);
$packet = $fixedHeader . $topicField . $message;
fwrite($socket, $packet);
echo "已发送PUBLISH\n";
构造并发送 SUBSCRIBE 报文
$packetId = 1;
$topic = "test/topic";
// 可变头部: 报文标识符
$variableHeader = pack("n", $packetId);
// 载荷: 主题+QoS
$payload = pack("n", strlen($topic)) . $topic . chr(0x00); // QoS 0
// 固定头部: 0x82 = SUBSCRIBE, 剩余长度
$fixedHeader = chr(0x82);
$remainingLength = strlen($variableHeader) + strlen($payload);
$fixedHeader .= chr($remainingLength);
$packet = $fixedHeader . $variableHeader . $payload;
fwrite($socket, $packet);
// 读取SUBACK响应
$suback = fread($socket, 5);
// 严谨一点需要解析读取的报文
echo "已发送SUBSCRIBE\n";
读取 PUBLISH 消息(订阅后)
// 读取服务器推送的PUBLISH消息
while (!feof($socket)) {
$header = fread($socket, 2);
if (!$header) break;
$type = ord($header[0]) >> 4;
$len = ord($header[1]);
$body = fread($socket, $len);
if ($type == 3) { // PUBLISH
$topicLen = unpack('n', substr($body, 0, 2))[1];
$topic = substr($body, 2, $topicLen);
$msg = substr($body, 2 + $topicLen);
echo "收到消息: 主题=$topic, 内容=$msg\n";
break;
}
}
// 关闭连接
fclose($socket);
完整示例
<?php
class MQTT {
private string $clientId;
private array $subscribeCallbacks = [];
private $socket;
public function __construct(string $clientId) {
$this->clientId = $clientId;
}
public function connect() {
$protocolName = "MQTT";
$protocolLevel = chr(0x04); // MQTT 3.1.1
$connectFlags = chr(0x02); // Clean session
$keepAlive = pack("n", 60); // 60秒
// 可变头部
$variableHeader =
pack("n", strlen($protocolName)) . $protocolName .
$protocolLevel .
$connectFlags .
$keepAlive;
// 载荷
$payload = pack("n", strlen($this->clientId)) . $this->clientId;
// 剩余长度
$remainingLength = strlen($variableHeader) + strlen($payload);
// 固定头第一个字节的高4位表示消息类型,CONNECT 类型为 1(即 0001),低4位为标志位(CONNECT 报文要求为 0000)。
// 合并后就是 0001 0000(二进制),即 0x10(十六进制)。
// 固定头部
$fixedHeader = chr(0x10) . chr($remainingLength);
// 完整报文
$packet = $fixedHeader . $variableHeader . $payload;
// 解析packet
// $parseHeader = unpack("Clen/Cvlen/nnlen", $packet);
// print_r($parseHeader);
// $parseVariableHeader = unpack("A{$parseHeader['nlen']}name/Clevel/Cflag/nalive", $packet, 4);
// print_r($parseVariableHeader);
// $parsePayload = unpack("nplen/A*payload", $packet, 4 + $parseHeader['nlen'] + 4);
// print_r($parsePayload);
// 连接并发送
$this->socket = fsockopen('broker.hivemq.com', 1883, $errno, $errstr, 5);
fwrite($this->socket, $packet);
// 读取CONNACK响应
$response = fread($this->socket, 4);
// 检查连接是否成功
return ord($response[0]) == 0x20 && ord($response[3]) == 0x00;
}
public function publish(string $topic, string $message) {
// 主题
$topicField = pack("n", strlen($topic)) . $topic;
// 固定头部: 0x30 = PUBLISH, QoS 0
// QoS 0:无响应
// QoS 1:返回 PUBACK
// QoS 2:返回 PUBREC → PUBREL → PUBCOMP
$fixedHeader = chr(0x30);
$remainingLength = strlen($topicField) + strlen($message);
$fixedHeader .= chr($remainingLength);
$packet = $fixedHeader . $topicField . $message;
fwrite($this->socket, $packet);
}
public function subscribe(string $topic, callable $call) {
$packetId = 1;
// 可变头部: 报文标识符
$variableHeader = pack("n", $packetId);
// 载荷: 主题+QoS
$payload = pack("n", strlen($topic)) . $topic . chr(0x00); // QoS 0
// 固定头部: 0x82 = SUBSCRIBE, 剩余长度
$fixedHeader = chr(0x82);
$remainingLength = strlen($variableHeader) + strlen($payload);
$fixedHeader .= chr($remainingLength);
$packet = $fixedHeader . $variableHeader . $payload;
fwrite($this->socket, $packet);
// 读取SUBACK响应
$suback = fread($this->socket, 5);
echo strlen($suback);
$parseSuback = unpack("Ctype/c1len/c2id/cqos", $suback);
if ($parseSuback["qos"] === 0) {
$this->subscribeCallbacks[$topic] = $call;
}
}
public function wait() {
while (!feof($this->socket)) {
$header = fread($this->socket, 2);
if (!$header) break;
$type = ord($header[0]) >> 4;
$len = ord($header[1]);
$body = fread($this->socket, $len);
if ($type == 3) { // PUBLISH
$topicLen = unpack('n', substr($body, 0, 2))[1];
$topic = substr($body, 2, $topicLen);
$msg = substr($body, 2 + $topicLen);
if (isset($this->subscribeCallbacks[$topic])) {
call_user_func($this->subscribeCallbacks[$topic], $msg);
}
}
sleep(1);
}
}
public function close() {
fclose($this->socket);
}
}
$mqtt = new MQTT("clientId1");
if ($mqtt->connect()) {
$mqtt->subscribe("test/topic", function($message) {
echo "收到消息: 内容=$message\n";
});
$mqtt->publish("test/topic", "message payload");
$mqtt->wait();
$mqtt->close();
}
示例执行后的结果.
[~/projects/php/examples] php mqtt.php
收到消息: 内容={hello: world}
收到消息: 内容=message payload
此文章主要是笔者为了学习mqtt
相关的一些协议,请误在线上环境使用该代码,你应该使用第三方专业的库.