Files

158 lines
6.5 KiB
Markdown
Raw Permalink Normal View History

2026-02-05 18:01:33 +08:00
# 使用文档
## topic 通配符含义
- `/`:用来表示层次,比如 a/ba/b/c。
- `#`:表示匹配 `>=0` 个层次,比如 a/# 就匹配 a/a/ba/b/c。单独的一个 # 表示匹配所有。不允许 a# 和 a/#/c
- `+`:表示匹配一个层次,例如 a/+ 匹配 a/ba/c不匹配 a/b/c。单独的一个 + 是允许的a+ 不允许,也可以和多层通配符一起使用,+/tennis/# 、sport/+/player1 都有有效的。
## 使用说明
### MQTT 遗嘱消息场景
- 当客户端断开连接时,发送给相关的订阅者的遗嘱消息。在设备 A 进行连接时候,遗嘱消息设定为 `offline`手机App B 订阅这个遗嘱主题。
- 当 A 异常断开时手机App B 会收到这个 `offline` 的遗嘱消息,从而知道设备 A 离线了。
### MQTT 保留消息场景
- 例如,某设备定期发布自身 GPS 坐标,但对于订阅者而言,从它发起订阅到第一次收到数据可能需要几秒钟,也可能需要十几分钟甚至更多,这样并不友好。因此 MQTT 引入了保留消息。
- 而每当有订阅者建立订阅时,服务端就会查找是否存在匹配该订阅的保留消息,如果保留消息存在,就会立即转发给订阅者。
- 借助保留消息,新的订阅者能够立即获取最近的状态。
### 共享订阅
mica-mqtt 支持两种**共享订阅**方式:
1. 共享订阅:订阅前缀 `$queue/`,多个客户端订阅了 `$queue/topic`,发布者发布到 `topic`,则只有一个客户端会接收到消息。
2. 分组订阅:订阅前缀 `$share/<group>/`,组客户端订阅了 `$share/group1/topic``$share/group2/topic`..,发布者发布到 `topic`,则消息会发布到每个 **group** 中,但是每个 **group** 中只有一个客户端会接收到消息。
**注意:** 如果发布的 `topic``/` 开头,例如:`/topic/test`,需要订阅 `$share/group1//topic/test`,另外 mica-mqtt 默认随机消息路由,共享订阅的多个客户端会随机收到消息。
## 客户端使用
### 添加依赖
```xml
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-client</artifactId>
<version>${mica-mqtt.version}</version>
</dependency>
```
## 客户端使用
```java
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1") // mqtt 服务端 ip 地址
.port(1883) // 默认1883
.username("admin") // 账号
.password("123456") // 密码
.version(MqttVersion.MQTT_5) // 默认3_1_1
.clientId("xxxxxx") // 非常重要务必手动设置,一般设备 sn 号默认MICA-MQTT- 前缀和 36进制的纳秒数
.readBufferSize(512) // 消息一起解析的长度,默认:为 8092 mqtt 消息最大长度)
.maxBytesInMessage(1024 * 10) // 最大包体长度,如果包体过大需要设置此参数,默认为: 10M (10*1024*1024)
.keepAliveSecs(120) // 默认60s
.timeout(10) // 超时时间t-io 配置,可为 null为 null 时t-io 默认为 5
.reconnect(true) // 是否重连默认true
.reInterval(5000) // 重连重试时间reconnect 为 true 时有效t-io 默认为5000
.willMessage(builder -> {
builder.topic("/test/offline").messageText("down"); // 遗嘱消息
})
.connectListener(new IMqttClientConnectListener() {
@Override
public void onConnected(ChannelContext context, boolean isReconnect) {
logger.info("链接服务器成功...");
}
@Override
public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
logger.info("与链接服务器断开连接...");
}
})
.properties() // mqtt5 properties
.connectSync(); // 同步连接,也可以使用 connect(),可以避免 broker 没启动照成启动卡住。
// 消息订阅,同类方法 subxxx
client.subQos0("/test/#", (context, topic, message, payload) -> {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
// 取消订阅
client.unSubscribe("/test/#");
// 发送消息
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// 断开连接
client.disconnect();
// 重连
client.reconnect();
// 停止
client.stop();
```
## 在 Android 中使用
### 排除 INDEX.LIST 文件
```groovy
android {
// ... 其他配置
packagingOptions {
// 排除 INDEX.LIST 文件
exclude 'META-INF/INDEX.LIST'
}
}
```
### 添加依赖
```groovy
implementation 'org.dromara.mica-mqtt:mica-mqtt-client:${micaMqttVersion}' // 使用 2.4.2 或以上版本
```
## 全局订阅2.2.9开始支持)
**说明**:由于 mica-mqtt-client 采用传统 mq 的思维进行的开发。其实是跟 mqtt 部分是有违背的。传统 mqtt client 不会按 topic 进行不通的订阅,采用的是这里的**全局订阅**方式。
**注意**:全局订阅也是可以监听到 `subQos0``subQos1``subQos2` 的消息。采用 `globalSubscribe`,保留 session 停机重启,依然可以接受到消息。
```java
// 初始化 mqtt 客户端
MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
// 采用 globalSubscribe保留 session 停机重启后可以接受到离线消息注意clientId 要不能变化。
.clientId("globalTest")
.cleanSession(false)
// 全局订阅的 topic
.globalSubscribe("/test", "/test/123", "/debug/#")
// 全局监听,也会监听到服务端 http api 订阅的数据
.globalMessageListener((context, topic, message, payload) -> {
System.out.println("topic:\t" + topic);
System.out.println("payload:\t" + ByteBufferUtil.toString(payload));
})
.connectSync();
```
## 接口代理
```java
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.connectSync();
// 代理接口
DoorClient doorClient = client.getInterface(DoorClient.class);
client.schedule(() -> {
doorClient.sendMessage("open", false);
}, 1000);
public interface DoorClient {
@MqttClientPublish(value = "/a/door/open", qos = MqttQoS.QOS0)
void sendMessage(@MqttPayload String message, @MqttRetain boolean retain);
}
```