18 Commits

Author SHA1 Message Date
zc
52c092cde7 配置 2026-05-12 15:04:06 +08:00
zc
118729ccb2 优化redis,mysql超时异常处理,定时任务线程池扩容 2026-05-12 11:46:26 +08:00
zc
4c41f6f77e 主键自增 2026-04-30 15:40:37 +08:00
zc
90003d79c5 主键自增 2026-04-30 14:54:25 +08:00
zc
778ab69fbc 新疆 2026-04-29 14:59:03 +08:00
zc
c403b07aeb 代码优化 2026-03-16 16:54:57 +08:00
zc
0e7b428980 代码优化 2026-02-10 15:49:09 +08:00
zc
043023b81e 代码优化 2026-02-10 15:39:32 +08:00
55e8e9dbb2 下发白名单非必填数据添加校验 2026-01-06 13:48:54 +08:00
zc
46e3039004 代码优化 2026-01-05 10:24:15 +08:00
zc
88ba83a5b3 代码优化 2025-12-19 10:03:36 +08:00
zc
cb619195fd 代码优化-改名 2025-12-16 16:31:08 +08:00
zc
c9baa6a89e 代码优化 2025-12-16 09:34:37 +08:00
zc
510d9f3af0 代码优化 2025-12-12 15:31:42 +08:00
zc
a01da7c9dd 代码优化 2025-12-11 21:39:02 +08:00
zc
7180eedc29 代码优化,上线离线 2025-12-09 14:05:36 +08:00
zc
963dfc2313 代码优化,上线离线 2025-12-08 16:16:44 +08:00
zc
af93d773ca 去除无用示例 2025-12-08 10:48:38 +08:00
120 changed files with 2510 additions and 2676 deletions

View File

@@ -1,33 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>mica-mqtt-client-solon-plugin-example</artifactId>
<dependencies>
<dependency>
<groupId>org.noear</groupId>
<artifactId>solon-web</artifactId>
</dependency>
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-client-solon-plugin</artifactId>
</dependency>
<dependency>
<groupId>org.noear</groupId>
<artifactId>solon-logging-simple</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,19 +0,0 @@
package org.dromara.mica.mqtt.client.solon;
import org.noear.solon.Solon;
import org.noear.solon.annotation.Configuration;
/**
* @author wsq
*/
@Configuration
public class MqttClientApplication {
/**
* 先启动 mica-mqtt-server-spring-boot-example 再启动本项目,进行测试
*/
public static void main(String[] args) {
Solon.start(MqttClientApplication.class, args);
}
}

View File

@@ -1,26 +0,0 @@
package org.dromara.mica.mqtt.client.solon.controller;
import org.dromara.mica.mqtt.client.solon.service.ClientService;
import org.noear.solon.annotation.*;
@Mapping("/mqtt/client")
@Controller
public class ClientController {
@Inject
private ClientService service;
@Post
@Mapping("/publish")
public boolean publish(String body) {
return service.publish(body);
}
@Get
@Mapping("/sub")
public boolean sub() {
return service.sub();
}
}

View File

@@ -1,41 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.client.solon.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.client.solon.event.MqttConnectedEvent;
import org.dromara.mica.mqtt.core.client.MqttClientCreator;
import org.noear.solon.annotation.Component;
import org.noear.solon.annotation.Inject;
import org.noear.solon.core.event.EventListener;
/**
* 客户端连接状态监听
*
* @author L.cm
*/
@Slf4j
@Component
public class MqttClientConnectedListener implements EventListener<MqttConnectedEvent> {
@Inject
private MqttClientCreator mqttClientCreator;
@Override
public void onEvent(MqttConnectedEvent mqttConnectedEvent) throws Throwable {
log.info("MqttConnectedEvent:{}", mqttConnectedEvent);
}
}

View File

@@ -1,45 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.client.solon.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.client.solon.event.MqttDisconnectEvent;
import org.dromara.mica.mqtt.core.client.MqttClientCreator;
import org.noear.solon.annotation.Component;
import org.noear.solon.annotation.Inject;
import org.noear.solon.core.event.EventListener;
/**
* 客户端连接状态监听
*
* @author L.cm
*/
@Slf4j
@Component
public class MqttClientDisconnectListener implements EventListener<MqttDisconnectEvent> {
@Inject
private MqttClientCreator mqttClientCreator;
@Override
public void onEvent(MqttDisconnectEvent mqttDisconnectEvent) throws Throwable {
log.info("MqttDisconnectEvent:{}", mqttDisconnectEvent);
// 在断线时更新 clientId、username、password只能改这 3 个,不可调用其他方法。
// mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
// .username("newUserName")
// .password("newPassword");
}
}

View File

@@ -1,25 +0,0 @@
package org.dromara.mica.mqtt.client.solon.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.core.annotation.MqttClientSubscribe;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.client.IMqttClientMessageListener;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
/**
* 客户端消息监听的另一种方式
*
* @author L.cm
*/
@Slf4j
@MqttClientSubscribe("${topic1}")
public class MqttClientMessageListener implements IMqttClientMessageListener {
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
log.info("MqttClientMessageListener,topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
}

View File

@@ -1,68 +0,0 @@
package org.dromara.mica.mqtt.client.solon.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.core.annotation.MqttClientSubscribe;
import org.dromara.mica.mqtt.client.solon.pojo.User;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.deserialize.MqttJsonDeserializer;
import org.noear.solon.annotation.Component;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* 客户端消息监听
*
* @author L.cm
*/
@Slf4j
@Component
public class MqttClientSubscribeListener {
@MqttClientSubscribe("/test/#")
public void subQos0(String topic, byte[] payload) {
log.info("MqttClientSubscribeListener.subQos0,topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.QOS1)
public void subQos1(String topic, byte[] payload) {
log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe(
value = "/test/json",
deserialize = MqttJsonDeserializer.class // 2.4.5 开始支持 自定义序列化,默认 json 序列化
)
public void testJson(String topic, MqttPublishMessage message, Map<String, Object> data) {
// solon 插件为 2.4.6 开始支持,支持 2 到 3 个参数,字段类型映射规则(顺序)如下
// String 字符串会默认映射到 topic
// Map<String, String> topicVars 会默认映射到 topic 中的变量解析v2.5.4支持),注意:别跟消息序列化的冲突,消息反序列化不要用 Map<String, String>
// MqttPublishMessage 会默认映射到 原始的消息,可以拿到 mqtt5 的 props 参数
// byte[] 会映射到 mqtt 消息内容 payload
// ByteBuffer 会映射到 mqtt 消息内容 payload
// 其他类型会走序列化,确保消息能够序列化,默认为 json 序列化
log.info("topic:{} json data:{}", topic, data);
}
@MqttClientSubscribe(value = "/test/object")
public void testJson(String topic, MqttPublishMessage message, User<User> data) {
log.info("topic:{} json data:{}", topic, data);
}
/**
* 订阅,参数为可选,但是参数数量必须大于 2
*
* @param topic topic 参数,可选参数
* @param topicVars 订阅 topic 模板 ${productKey} 中的变量解析v2.5.4支持),可选参数,注意:类型必须为 Map<String, String>
* @param payload 消息内容
*/
@MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
public void thingSubRegister(String topic, Map<String, String> topicVars, byte[] payload) {
// 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
// 注意mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
log.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
}

View File

@@ -1,9 +0,0 @@
package org.dromara.mica.mqtt.client.solon.pojo;
import lombok.Data;
@Data
public class User<T> {
private String name;
private T girlfriend;
}

View File

@@ -1,32 +0,0 @@
package org.dromara.mica.mqtt.client.solon.service;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.client.solon.MqttClientTemplate;
import org.noear.solon.annotation.Component;
import org.noear.solon.annotation.Inject;
import java.nio.charset.StandardCharsets;
/**
* @author wsq
*/
@Slf4j
@Component
public class ClientService {
@Inject
private MqttClientTemplate client;
public boolean publish(String body) {
client.publish("/test/client", body.getBytes(StandardCharsets.UTF_8));
return true;
}
public boolean sub() {
client.subQos0("/test/#", (context, topic, message, payload) -> {
log.info("{}\t{}", topic, new String(payload, StandardCharsets.UTF_8));
});
return true;
}
}

View File

@@ -1,36 +0,0 @@
server:
port: 30303
# solon 配置
solon:
logging:
appender:
console:
level: INFO
# mqtt-client 配置
mqtt:
client:
enabled: true # 是否开启客户端默认true
ip: 127.0.0.1 # 连接的服务端 ip 默认127.0.0.1
port: 1883 # 端口默认1883
name: Mica-Mqtt-Client # 名称默认Mica-Mqtt-Client
clientId: 000001 # 客户端Id非常重要一般为设备 sn不可重复
username: mica # 认证的用户名注意2.5.x 开始将 user-name 改成了 username
password: mica # 认证的密码
timeout: 5 # 超时时间单位默认5秒
reconnect: true # 是否重连默认true
re-interval: 5000 # 重连时间,默认 5000 毫秒
version: mqtt_3_1_1 # mqtt 协议版本,可选 MQTT_3_1、mqtt_3_1_1、mqtt_5默认mqtt_3_1_1
read-buffer-size: 8KB # 接收数据的 buffer size默认8k
max-bytes-in-message: 10MB # 消息解析最大 bytes 长度默认10M
keep-alive-secs: 60 # keep-alive 时间,单位:秒
heartbeat-mode: LAST_REQ # 心跳模式支持最后发送或接收心跳时间来计算心跳默认最后发送心跳的时间。2.4.3 开始支持)
heartbeat-timeout-strategy: PING # 心跳超时策略,支持发送 PING 和 CLOSE 断开连接,默认:最大努力发送 PING。2.4.3 开始支持)
clean-start: true # session 保留 2.5.x 使用 clean-start老版本用 clean-session默认true
ssl:
enabled: false # 是否开启 ssl 认证2.1.0 开始支持双向认证
keystore-path: # 可选参数ssl 双向认证 keystore 目录,支持 classpath:/ 路径。
keystore-pass: # 可选参数ssl 双向认证 keystore 密码
truststore-path: # 可选参数ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
truststore-pass: # 可选参数ssl 双向认证 truststore 密码
topic1: /test/#

View File

@@ -1,183 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>mica-mqtt-example</artifactId>
<name>${project.artifactId}</name>
<parent>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>example</artifactId>
<version>${revision}</version>
</parent>
<properties>
<graalvm.version>25.0.1</graalvm.version>
<mainClass.server>org.dromara.mica.mqtt.server.MqttServerTest</mainClass.server>
<mainClass.client>org.dromara.mica.mqtt.client.MqttClientTest</mainClass.client>
</properties>
<dependencies>
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-client</artifactId>
</dependency>
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-server</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
</build>
<profiles>
<profile>
<id>jar</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<!-- 非 GraalVM 环境用 tinylog -->
<dependency>
<groupId>org.tinylog</groupId>
<artifactId>slf4j-tinylog</artifactId>
</dependency>
<dependency>
<groupId>org.tinylog</groupId>
<artifactId>tinylog-impl</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<archive>
<manifest>
<mainClass>${mainClass.server}</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>mqtt-server-graal</id>
<dependencies>
<!-- GraalVM 环境使用 jdk log -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</dependency>
<!-- GraalVM -->
<dependency>
<groupId>org.graalvm.sdk</groupId>
<artifactId>graal-sdk</artifactId>
<version>${graalvm.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>mqtt-server-graal</finalName>
<plugins>
<plugin>
<groupId>org.graalvm.nativeimage</groupId>
<artifactId>native-image-maven-plugin</artifactId>
<version>21.2.0</version>
<executions>
<execution>
<goals>
<goal>native-image</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
<configuration>
<skip>false</skip>
<imageName>mqtt-server-graalvm</imageName>
<mainClass>${mainClass.server}</mainClass>
<buildArgs>
-H:+RemoveSaturatedTypeFlows
--allow-incomplete-classpath
--no-fallback
</buildArgs>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>mqtt-client-graal</id>
<dependencies>
<!-- GraalVM 环境使用 jdk log -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</dependency>
<!-- GraalVM -->
<dependency>
<groupId>org.graalvm.sdk</groupId>
<artifactId>graal-sdk</artifactId>
<version>${graalvm.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>mqtt-client-graal</finalName>
<plugins>
<plugin>
<groupId>org.graalvm.nativeimage</groupId>
<artifactId>native-image-maven-plugin</artifactId>
<version>21.2.0</version>
<executions>
<execution>
<goals>
<goal>native-image</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
<configuration>
<skip>false</skip>
<imageName>mqtt-client-graalvm</imageName>
<mainClass>${mainClass.client}</mainClass>
<buildArgs>
-H:+RemoveSaturatedTypeFlows
--allow-incomplete-classpath
--no-fallback
</buildArgs>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@@ -1,65 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.aliyun;
import org.dromara.mica.mqtt.core.client.MqttClient;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
/**
* 客户端测试
*
* @author L.cm
*/
public class MqttClientTest {
public static void main(String[] args) {
String productKey = "g27jB42P9hm";
String deviceName = "3dbc1cb4";
String deviceSecret = "";
// 计算MQTT连接参数。
MqttSign sign = new MqttSign(productKey, deviceName, deviceSecret);
String username = sign.getUsername();
String password = sign.getPassword();
String clientId = sign.getClientId();
System.out.println("username: " + username);
System.out.println("password: " + password);
System.out.println("clientid: " + clientId);
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip(productKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com")
.port(443)
.username(username)
.password(password)
.clientId(clientId)
.connectSync();
client.subQos0("/sys/" + productKey + '/' + deviceName + "/thing/event/property/post_reply", (context, topic, message, payload) -> {
System.out.println(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
client.schedule(() -> {
int LightSwitch = ThreadLocalRandom.current().nextBoolean() ? 0 : 1;
String content = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":" + LightSwitch + "}}";
client.publish("/sys/" + productKey + "/" + deviceName + "/thing/event/property/post", content.getBytes(StandardCharsets.UTF_8));
}, 3000);
}
}

View File

@@ -1,81 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.aliyun;
import org.tio.utils.mica.DigestUtils;
import java.util.Objects;
/**
* 阿里云 mqtt 签名方式
*
* @author L.cm
*/
public class MqttSign {
/**
* 用户名
*/
private final String username;
/**
* 密码
*/
private final String password;
/**
* 客户端id
*/
private final String clientId;
public MqttSign(String productKey, String deviceName, String deviceSecret) {
Objects.requireNonNull(productKey, "productKey is null");
Objects.requireNonNull(deviceName, "deviceName is null");
Objects.requireNonNull(deviceSecret, "deviceSecret is null");
this.username = deviceName + '&' + productKey;
String timestamp = Long.toString(System.currentTimeMillis());
this.password = getPassword(productKey, deviceName, deviceSecret, timestamp);
this.clientId = getClientId(productKey, deviceName, timestamp);
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getClientId() {
return clientId;
}
private static String getPassword(String productKey, String deviceName, String deviceSecret, String timestamp) {
String plainPwd = "clientId" + productKey + '.' + deviceName + "deviceName" +
deviceName + "productKey" + productKey + "timestamp" + timestamp;
return hmacSha256(plainPwd, deviceSecret);
}
private static String getClientId(String productKey, String deviceName, String timestamp) {
return productKey + '.' + deviceName + "|timestamp=" + timestamp + ",_v=paho-java-1.0.0,securemode=2,signmethod=hmacsha256|";
}
private static String hmacSha256(String plainText, String key) {
if (plainText == null || key == null) {
return null;
}
return DigestUtils.hmacSha256Hex(plainText, key);
}
}

View File

@@ -1,72 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.benchmark;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.ThreadUtils;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import org.tio.utils.timer.DefaultTimerTaskService;
import org.tio.utils.timer.TimerTaskService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* mqtt 压力测试
*
* @author L.cm
*/
public class MqttBenchmark {
public static void main(String[] args) {
// 注意: windows 上需要修改最大的 Tcp 连接数,不然超不过 2W。
// 《修改Windows服务器最大的Tcp连接数》https://www.jianshu.com/p/00136a97d2d8
int connCount = 5_0000;
String ip = "127.0.0.1";
// 优化使用ArrayList+预分配容量性能比CopyOnWriteArrayList好很多
// 对于压测场景,不需要线程安全的写操作(因为是单线程创建客户端)
final List<MqttClient> clientList = new ArrayList<>(connCount);
SynThreadPoolExecutor tioExecutor = ThreadUtils.getTioExecutor();
ExecutorService groupExecutor = ThreadUtils.getGroupExecutor();
// 自定义全局 taskService避免每个 client new创建过多线程
TimerTaskService taskService = new DefaultTimerTaskService(200L, 60);
for (int i = 0; i < connCount; i++) {
newClient(ip, i, clientList, tioExecutor, groupExecutor, taskService);
}
}
private static void newClient(String ip, int i, final List<MqttClient> clientList,
SynThreadPoolExecutor tioExecutor,
ExecutorService groupExecutor,
TimerTaskService taskService) {
MqttClient client = MqttClient.create()
.ip(ip)
.clientId(StrUtil.getNanoId() + i)
.readBufferSize(128)
// 取消自动重连
.reconnect(false)
.tioExecutor(tioExecutor)
.groupExecutor(groupExecutor)
.mqttExecutor(tioExecutor)
.taskService(taskService)
.connect();
clientList.add(client);
}
}

View File

@@ -1,59 +0,0 @@
package org.dromara.mica.mqtt.benchmark;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.ThreadUtils;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import org.tio.utils.timer.DefaultTimerTaskService;
import org.tio.utils.timer.TimerTaskService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* mqtt 发布端测试
*
* @author L.cm
*/
public class MqttPublishBench {
public static void main(String[] args) {
int clientCount = 10;
int publishCount = 10000;
MqttQoS qos = MqttQoS.QOS0;
List<MqttClient> clients = getClient(clientCount);
Executors.newScheduledThreadPool(ThreadUtils.AVAILABLE_PROCESSORS).scheduleWithFixedDelay(() -> {
for (MqttClient mqttClient : clients) {
for (int j = 0; j < publishCount; j++) {
byte[] payload = new byte[1024 + j];
Arrays.fill(payload, (byte) -1);
mqttClient.publish("/topic/" + j, payload, qos);
}
}
}, 1L, 1L, TimeUnit.SECONDS);
}
public static List<MqttClient> getClient(int clientCount) {
SynThreadPoolExecutor tioExecutor = ThreadUtils.getTioExecutor();
ExecutorService groupExecutor = ThreadUtils.getGroupExecutor();
TimerTaskService taskService = new DefaultTimerTaskService();
List<MqttClient> clients = new ArrayList<>();
for (int i = 0; i < clientCount; i++) {
MqttClient client = MqttClient.create()
.clientId(StrUtil.getNanoId())
.tioExecutor(tioExecutor)
.groupExecutor(groupExecutor)
.mqttExecutor(groupExecutor)
.taskService(taskService)
.connectSync();
clients.add(client);
}
return clients;
}
}

View File

@@ -1,38 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.benchmark;
import org.dromara.mica.mqtt.core.server.MqttServer;
/**
* mqtt 服务端测试
*
* @author L.cm
*/
public class MqttServerBench {
public static void main(String[] args) {
// 设定日志级别为 error
System.setProperty("tinylog.writer.level", "error");
// 启动 mqtt 服务
MqttServer.create()
.enableMqtt()
.statEnable(false)
.start();
}
}

View File

@@ -1,47 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.broker;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
* 设备 A这里默认 APP 应用端
*
* @author L.cm
*/
public class DeviceA {
private static final Logger logger = LoggerFactory.getLogger(DeviceA.class);
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.connectSync();
client.subQos0("/a/door/open", (context, topic, message, payload) -> {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
}
}

View File

@@ -1,46 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.broker;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
* 设备 B这里默认 web 端
*
* @author L.cm
*/
public class DeviceB {
private static final Logger logger = LoggerFactory.getLogger(DeviceB.class);
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.connectSync();
client.subQos0("/a/door/open", (context, topic, message, payload) -> {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
}
}

View File

@@ -1,43 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.broker;
import org.dromara.mica.mqtt.core.client.MqttClient;
/**
* 设备 C每 5 秒上报一个数据
*
* @author L.cm
*/
public class DeviceC {
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.connectSync();
client.schedule(() -> {
client.publish("/a/door/open", null);
}, 5000);
}
}

View File

@@ -1,36 +0,0 @@
package org.dromara.mica.mqtt.broker;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.annotation.MqttRetain;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.dromara.mica.mqtt.core.annotation.MqttClientPublish;
import org.dromara.mica.mqtt.core.annotation.MqttPayload;
/**
* @author ChangJin Wei (魏昌进)
*/
public class DeviceD {
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.connectSync();
DoorClient doorClient = client.getInterface(DoorClient.class);
client.schedule(() -> {
doorClient.sendMessage("open", false);
}, 1000);
}
public interface DoorClient {
@MqttClientPublish(value = "/a/door/open", qos = MqttQoS.QOS0)
void sendMessage(@MqttPayload String message, @MqttRetain boolean retain);
}
}

View File

@@ -1,42 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.broker;
import org.dromara.mica.mqtt.core.server.MqttServer;
/**
* 服务端,单纯的做消息转发
*
* @author L.cm
*/
public class Server {
/**
* 客户端 A 模拟 APP 端订阅 `/a/door/open`
* 客户端 B 模拟 web 网页端 mqtt.js 订阅 `/a/door/open`
* Mqtt 服务端实现 `IMqttMessageListener`,将消息转交给 `AbstractMqttMessageDispatcher`(自定义实现)处理。
* 客户端 C 定时上报转态给 `/a/door/open`
* 结果A 和 B 将收到 C 或 D 发布的消息,并完成相应的效果展示。
*/
public static void main(String[] args) {
// 启动服务mica-mqtt 1.3.x 已经默认为 broker 模式
MqttServer.create()
.enableMqtt()
.debug()
.start();
}
}

View File

@@ -1,79 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.client;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.MqttVersion;
import org.dromara.mica.mqtt.core.client.IMqttClientMessageListener;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
/**
* 客户端测试
*
* @author L.cm
*/
public class Mqtt5ClientTest {
private static final Logger logger = LoggerFactory.getLogger(Mqtt5ClientTest.class);
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("mica")
.password("mica")
.version(MqttVersion.MQTT_5)
.cleanStart(false)
.sessionExpiryIntervalSecs(7200)
.connectListener(new MqttClientConnectListener())
.willMessage(builder -> {
builder.topic("/test/offline")
.messageText("down")
.retain(false)
.qos(MqttQoS.QOS0); // 遗嘱消息
})
// 同步连接,也可以使用 connect() 异步(可以避免 broker 没启动照成启动卡住),但是下面的订阅和发布可能还没连接成功。
.connectSync();
client.subQos0("/test/123", new IMqttClientMessageListener() {
@Override
public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
// 订阅成功之后触发,可在此处做一些业务逻辑
logger.info("topicFilter:{} MqttQoS:{} 订阅成功!!!", topicFilter, mqttQoS);
}
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
}
});
client.publish("/test/client", "mica最牛皮1".getBytes(StandardCharsets.UTF_8));
client.publish("/test/client", "mica最牛皮2".getBytes(StandardCharsets.UTF_8));
client.publish("/test/client", "mica最牛皮3".getBytes(StandardCharsets.UTF_8));
client.schedule(() -> {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
}, 1000);
}
}

View File

@@ -1,40 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.client;
import org.dromara.mica.mqtt.codec.codes.MqttConnectReasonCode;
import org.dromara.mica.mqtt.core.client.MqttClient;
/**
* 客户端测试
*
* @author L.cm
*/
public class MqttClientConnTest {
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttConnectReasonCode reasonCode = MqttClient.create()
// .ip("127.0.0.1")
.ip("mqtt.dreamlu.net")
.port(1883)
.username("mica")
.password("mica1")
.connectTest();
System.out.println(reasonCode);
}
}

View File

@@ -1,48 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.client;
import org.dromara.mica.mqtt.core.client.IMqttClientConnectListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
/**
* 客户端连接状态监听
*
* @author L.cm
*/
public class MqttClientConnectListener implements IMqttClientConnectListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);
@Override
public void onConnected(ChannelContext context, boolean isReconnect) {
String clientId = context.getId();
if (isReconnect) {
logger.info("重连 mqtt 服务器重连成功... clientId:{}", clientId);
} else {
logger.info("连接 mqtt 服务器成功... clientId:{}", clientId);
}
}
@Override
public void onDisconnect(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
String clientId = context.getId();
logger.error("mqtt 链接断开 remark:{} isRemove:{} clientId:{}", remark, isRemove, clientId, throwable);
}
}

View File

@@ -1,50 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.client;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.tio.utils.buffer.ByteBufferUtil;
/**
* 客户端全局订阅测试
*
* @author L.cm
*/
public class MqttClientGlobalTest {
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
// 采用 globalSubscribe保留 session 停机重启后可以接受到离线消息注意clientId 要不能变化。
.clientId("globalTest")
.cleanStart(false)
// 全局订阅的 topic
.globalSubscribe("/test", "/test/123", "/debug/#")
// 全局监听,也会监听到服务端 http api 订阅的数据
.globalMessageListener((context, topic, message, payload) -> {
System.out.println("topic:\t" + topic);
System.out.println("payload:\t" + ByteBufferUtil.toString(payload));
})
// .debug()
.connectSync();
}
}

View File

@@ -1,55 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.client;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
* 客户端测试
*
* @author L.cm
*/
public class MqttClientSyncTest {
private static final Logger logger = LoggerFactory.getLogger(MqttClientSyncTest.class);
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("mica")
.password("mica")
.connectListener(new MqttClientConnectListener())
// 同步连接,注意:连接会阻塞
.connectSync();
client.subQos0("/test/#", (context, topic, message, payload) -> {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
client.unSubscribe("/test/#", "/test/123");
// 连接上之后发送消息,注意:连接时出现异常等就不会发出
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// 2.3.0 开始支持,可停止
// client.stop();
}
}

View File

@@ -1,129 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.client;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.client.IMqttClientMessageListener;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
/**
* 客户端测试
*
* @author L.cm
*/
public class MqttClientTest {
private static final Logger logger = LoggerFactory.getLogger(MqttClientTest.class);
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("mica")
.password("mica")
// 绑定网卡 v2.5.1 添加,支持 bindIp网卡ip和 bindNetworkInterface网卡名取一即可
// .bindIp("127.0.0.1")
// .bindNetworkInterface("lo")
// 如果包体过大,建议将此参数设置和 maxBytesInMessage 一样大
// .readBufferSize(1024 * 10)
// 最大包体长度,如果包体过大需要设置此参数
// .maxBytesInMessage(1024 * 10)
// .version(MqttVersion.MQTT_5)
// 连接监听
.connectListener(new MqttClientConnectListener())
// 遗嘱消息
.willMessage(builder -> {
builder.topic("/test/offline")
.messageText("down")
.retain(false)
.qos(MqttQoS.QOS0)
// mqtt5 遗嘱消息属性
.willProperties(props -> {
props.setWillDelayInterval(1000);
props.setContentType("text/plain");
});
})
// 同步连接,也可以使用 connect() 异步(可以避免 broker 没启动照成启动卡住),但是下面的订阅和发布可能还没连接成功。
.connectSync();
client.subQos0("/test/#", new IMqttClientMessageListener() {
@Override
public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
// 订阅成功之后触发,可在此处做一些业务逻辑
logger.info("topicFilter:{} MqttQoS:{} 订阅成功!!!", topicFilter, mqttQoS);
}
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
}
});
client.subQos0("/test/1", new IMqttClientMessageListener() {
@Override
public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
// 订阅成功之后触发,可在此处做一些业务逻辑
logger.info("topicFilter:{} MqttQoS:{} 订阅成功!!!", topicFilter, mqttQoS);
}
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
}
});
client.subQos0("/test/2", new IMqttClientMessageListener() {
@Override
public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
// 订阅成功之后触发,可在此处做一些业务逻辑
logger.info("topicFilter:{} MqttQoS:{} 订阅成功!!!", topicFilter, mqttQoS);
}
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
}
});
client.subQos0("/test/3", new IMqttClientMessageListener() {
@Override
public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
// 订阅成功之后触发,可在此处做一些业务逻辑
logger.info("topicFilter:{} MqttQoS:{} 订阅成功!!!", topicFilter, mqttQoS);
}
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
}
});
client.publish("/test/client", "mica最牛皮1".getBytes(StandardCharsets.UTF_8));
client.publish("/test/client", "mica最牛皮2".getBytes(StandardCharsets.UTF_8));
client.publish("/test/client", "mica最牛皮3".getBytes(StandardCharsets.UTF_8));
client.schedule(() -> {
client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
}, 2000);
}
}

View File

@@ -1,70 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.huawei;
import org.dromara.mica.mqtt.core.client.MqttClient;
import java.nio.charset.StandardCharsets;
/**
* 客户端测试
*
* @author L.cm
*/
public class MqttClientTest {
public static void main(String[] args) {
// 设备id和密钥请从华为云iot获取
String deviceId = "630eb6f8664c6f7938db6ef0_test";
String deviceSecret = "";
// 计算MQTT连接参数。
MqttSign sign = new MqttSign(deviceId, deviceSecret);
String username = sign.getUsername();
String password = sign.getPassword();
String clientId = sign.getClientId();
System.out.println("username: " + username);
System.out.println("password: " + password);
System.out.println("clientid: " + clientId);
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("iot-mqtts.cn-north-4.myhuaweicloud.com")
.port(8883)
.username(username)
.password(password)
.clientId(clientId)
.useSsl()
.connectSync();
// 订阅命令下发topic
String cmdRequestTopic = "$oc/devices/" + deviceId + "/sys/commands/#";
client.subQos0(cmdRequestTopic, (context, topic, message, payload) -> {
System.out.println(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
});
// 属性上报消息
String reportTopic = "$oc/devices/" + deviceId + "/sys/properties/report";
String jsonMsg = "{\"services\":[{\"service_id\":\"Temperature\", \"properties\":{\"value\":57}},{\"service_id\":\"Battery\",\"properties\":{\"level\":88}}]}";
client.schedule(() -> {
client.publish(reportTopic, jsonMsg.getBytes(StandardCharsets.UTF_8));
}, 3000);
}
}

View File

@@ -1,89 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.huawei;
import org.tio.utils.mica.DigestUtils;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
/**
* 华为云 mqtt 签名方式
*
* @author L.cm
*/
public class MqttSign {
/**
* 用户名
*/
private final String username;
/**
* 密码
*/
private final String password;
/**
* 客户端id
*/
private final String clientId;
public MqttSign(String deviceId, String deviceSecret) {
Objects.requireNonNull(deviceId, "deviceId is null");
Objects.requireNonNull(deviceSecret, "deviceSecret is null");
this.username = deviceId;
String timestamp = getTimeStamp();
this.password = getPassword(deviceSecret, timestamp);
this.clientId = getClientId(deviceId, timestamp);
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getClientId() {
return clientId;
}
private static String getPassword(String deviceSecret, String timestamp) {
return hmacSha256(deviceSecret, timestamp);
}
private static String getClientId(String deviceId, String timestamp) {
return deviceId + "_0_0_" + timestamp;
}
/***
* 要求10位数字
*/
private static String getTimeStamp() {
return ZonedDateTime.ofInstant(Instant.now(), ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
}
/***
* 调用sha256算法进行哈希
*/
private static String hmacSha256(String message, String tStamp) {
return DigestUtils.hmacSha256Hex(message, tStamp);
}
}

View File

@@ -1,40 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.nginx;
import org.dromara.mica.mqtt.core.server.MqttServer;
/**
* mqtt 服务端测试
*
* @author L.cm
*/
public class MqttServerProxyProtocol {
public static void main(String[] args) {
MqttServer.create()
.enableMqtt()
.enableMqttWs()
// 开启代理协议
.proxyProtocolEnable()
.statEnable(false)
.debug()
.start();
}
}

View File

@@ -1,58 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.proxy;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.client.MqttClient;
/**
* 2 个 mqtt 服务间,使用 2 个 client 做数据传输
*
* @author L.cm
*/
public class MqttClientProxy {
public static void main(String[] args) {
MqttClient client1 = MqttClient.create()
.ip("ip1")
.port(1883)
.clientId("clientI")
.username("mica")
.password("mica")
.debug()
.connectSync();
MqttClient client2 = MqttClient.create()
.ip("ip2")
.port(1883)
.clientId("client2")
.username("mica")
.password("mica")
.debug()
.connectSync();
String[] topics = new String[]{
"$share/test/link/product1/+/event/+/post",
"$share/test/link/product2/+/event/+/post",
"$share/test/link/product3/+/event/+/post"
};
client1.subscribe(topics, MqttQoS.QOS0, (context, topic, message, payload) -> {
client2.publish(topic, payload);
});
}
}

View File

@@ -1,67 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.proxy;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.dromara.mica.mqtt.core.server.MqttServer;
import org.dromara.mica.mqtt.server.MqttConnectStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.buffer.ByteBufferUtil;
/**
* mqtt 服务端代理到另外一个服务端
*
* @author L.cm
*/
public class MqttServerProxy {
private static final Logger logger = LoggerFactory.getLogger(MqttServerProxy.class);
public static void main(String[] args) {
// 需要将数据发往的服务端
MqttClient client = MqttClient.create()
.ip("ip")
.port(1883)
.clientId("proxy")
.username("mica")
.password("mcia")
.debug()
.connectSync();
// 接受数据的服务端
MqttServer.create()
.messageListener((context, clientId, topic, qoS, message) -> {
byte[] payload = message.payload();
logger.info("clientId:{} topic:{} payload:\n{}", clientId, topic, ByteBufferUtil.toString(payload));
// 转发数据
client.publish(topic, payload);
})
// 开启 mqtt tcp 协议
.enableMqtt()
// 开启 mqtt websocket
.enableMqttWs()
// 开启 http api 接口
.enableMqttHttpApi()
// 客户端连接状态监听
.connectStatusListener(new MqttConnectStatusListener())
// 开始 stat 监控
.statEnable()
// 开启 debug 信息日志
.debug()
.start();
}
}

View File

@@ -1,42 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.server;
import org.dromara.mica.mqtt.core.server.event.IMqttConnectStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
/**
* mqtt 连接状态
*
* @author L.cm
*/
public class MqttConnectStatusListener implements IMqttConnectStatusListener {
private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);
@Override
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
}
}

View File

@@ -1,58 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.server;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.core.server.interceptor.IMqttMessageInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
/**
* mqtt 消息拦截器
*
* @author L.cm
*/
public class MqttMessageInterceptor implements IMqttMessageInterceptor {
private static final Logger logger = LoggerFactory.getLogger(MqttMessageInterceptor.class);
@Override
public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception {
// 注意:此时 clientId 可能为空
String clientId = context.getBsId();
Node clientNode = context.getClientNode();
// ChannelStat channelStat = context.stat;
// 自定义规则,超限是可用 Tio.remove(context, "xxx超限"); 断开连接。
logger.info("===接收 client:{} clientId:{} data:{}b", clientNode, clientId, receivedBytes);
}
@Override
public void onAfterDecoded(ChannelContext context, MqttMessage message, int packetSize) {
// 注意:此时 clientId 可能为空
String clientId = context.getBsId();
Node clientNode = context.getClientNode();
logger.info("===解码 client:{} clientId:{} message:{}", clientNode, clientId, message);
}
@Override
public void onAfterHandled(ChannelContext context, MqttMessage message, long cost) throws Exception {
String clientId = context.getBsId();
Node clientNode = context.getClientNode();
logger.info("===处理完成 ip:{} clientId:{} message:{} 耗时:{}", clientNode, clientId, message, cost);
}
}

View File

@@ -1,74 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.server;
import org.dromara.mica.mqtt.core.server.MqttServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
* mqtt 服务端测试
*
* @author L.cm
*/
public class MqttServerTest {
private static final Logger logger = LoggerFactory.getLogger(MqttServerTest.class);
public static void main(String[] args) {
// 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
MqttServer mqttServer = MqttServer.create()
// 服务端 ip 默认为空0.0.0.0,建议不要设置,端口 默认1883
.enableMqtt(1883)
// 默认为: 8192mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
.readBufferSize(8192)
// 最大包体长度
// .maxBytesInMessage(1024 * 100)
// mqtt 3.1 协议会校验 clientId 长度。
// .maxClientIdLength(64)
.messageListener((context, clientId, topic, qos, message) -> {
logger.info("clientId:{} payload:{}", clientId, new String(message.payload(), StandardCharsets.UTF_8));
})
// 客户端连接状态监听
.connectStatusListener(new MqttConnectStatusListener())
// 自定义消息拦截器
.addInterceptor(new MqttMessageInterceptor())
// 开启 websocket
.enableMqttWs()
// 开启 mqtt http 接口
.enableMqttHttpApi(builder ->
builder
.basicAuth("mica", "mica") // http basic 认证
.mcpServer() // 开启 mcp 服务
.build()
)
// 开始 stat 监控
.statEnable()
// 开启 debug 信息日志
.debug()
.start();
mqttServer.schedule(() -> {
String message = "mica最牛皮 " + System.currentTimeMillis();
mqttServer.publishAll("/test/123", message.getBytes(StandardCharsets.UTF_8));
}, 2000);
// 2.3.2 开始支持 stop 关闭
// mqttServer.stop();
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.ssl;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
/**
* 客户端测试
*
* @author L.cm
*/
public class SslMqttClientTest {
private static final Logger logger = LoggerFactory.getLogger(SslMqttClientTest.class);
public static void main(String[] args) {
// 初始化 mqtt 客户端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(8883)
.username("mica")
.password("mica")
.useSsl("classpath:ssl/dreamlu.net.jks", "123456")
.connectSync();
client.subQos0("/test/#", (context, topic, message, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// 定时发送数据
client.schedule(() -> {
String message = "mica最牛皮 " + System.currentTimeMillis();
client.publish("/test/123", message.getBytes(StandardCharsets.UTF_8));
}, 5000);
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.ssl;
import org.dromara.mica.mqtt.core.server.MqttServer;
import org.dromara.mica.mqtt.server.MqttConnectStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.buffer.ByteBufferUtil;
import java.nio.charset.StandardCharsets;
/**
* mqtt 服务端测试
*
* @author L.cm
*/
public class SslMqttServerTest {
private static final Logger logger = LoggerFactory.getLogger(SslMqttServerTest.class);
public static void main(String[] args) {
MqttServer mqttServer = MqttServer.create()
.enableMqttSsl(builder ->
builder.useSsl("classpath:ssl/dreamlu.net.jks", "123456")
.build()
)
.messageListener((context, clientId, topic, qoS, message) -> {
logger.info("clientId:{} message:{} payload:{}", clientId, message, ByteBufferUtil.toString(message.payload()));
})
.connectStatusListener(new MqttConnectStatusListener())
.debug()
.start();
// 定时发送数据
mqttServer.schedule(() -> {
String message = "mica最牛皮 " + System.currentTimeMillis();
mqttServer.publishAll("/test/123", message.getBytes(StandardCharsets.UTF_8));
}, 5000);
}
}

View File

@@ -1 +0,0 @@
ssl 自签双向证书详见https://gitee.com/596392912/mica-mqtt/issues/I45GO7

View File

@@ -1,16 +0,0 @@
-----BEGIN CERTIFICATE-----
MIICejCCAeMCFGkzqgl8ilICj+PRCwIiUy3dPNuWMA0GCSqGSIb3DQEBCwUAMHwx
CzAJBgNVBAYTAkNOMQswCQYDVQQIDAJITjELMAkGA1UEBwwCQ1MxDTALBgNVBAoM
BFJNSlMxDTALBgNVBAsMBFJNSlMxFDASBgNVBAMMC2RyZWFtbHUubmV0MR8wHQYJ
KoZIhvcNAQkBFhA1OTYzOTI5MTJAcXEuY29tMB4XDTIyMDkwMzA4MjExNVoXDTMy
MDgzMTA4MjExNVowfDELMAkGA1UEBhMCQ04xCzAJBgNVBAgMAkhOMQswCQYDVQQH
DAJDUzENMAsGA1UECgwEUk1KUzENMAsGA1UECwwEUk1KUzEUMBIGA1UEAwwLZHJl
YW1sdS5uZXQxHzAdBgkqhkiG9w0BCQEWEDU5NjM5MjkxMkBxcS5jb20wgZ8wDQYJ
KoZIhvcNAQEBBQADgY0AMIGJAoGBAJxanDadRHd+D9jH5/mq7Pn3Fl915CiOaaJr
QS0ZxCqRtTaS9+JkFQJ4TnAezlKx/xDZu7G9pk7CJ6w+JQwfI+AwsAkOlrynyFbe
Hc9s6DZyZHXxkpgeQUqpnrkXkWG+jbh2aulWB4smQE/vPnpkjVGEe86+/JsYW3Sm
rFhL2xddAgMBAAEwDQYJKoZIhvcNAQELBQADgYEAP39PM0XpNzg3Bne63oXHRWDJ
bafiCwloO/SaxH7JtCBj1I05W3owwHSqWYadK+lg//tKf6TL+94GtW8s2VtpLGu7
Y5R2aakhywzbWVfrEK+kyvTG/4nP9tvcKwh8Iqr2XlllDLfsQeyLacb4+pcfDY3G
8nrbe0lBKepae8D0SvQ=
-----END CERTIFICATE-----

View File

@@ -1,16 +0,0 @@
-----BEGIN CERTIFICATE-----
MIICejCCAeMCFBebamYN4nfF0CrZWuN68SXxTH/GMA0GCSqGSIb3DQEBCwUAMHwx
CzAJBgNVBAYTAkNOMQswCQYDVQQIDAJITjELMAkGA1UEBwwCQ1MxDTALBgNVBAoM
BFJNSlMxDTALBgNVBAsMBFJNSlMxFDASBgNVBAMMC2RyZWFtbHUubmV0MR8wHQYJ
KoZIhvcNAQkBFhA1OTYzOTI5MTJAcXEuY29tMB4XDTIyMDkwMzA4MjM1N1oXDTMy
MDgzMTA4MjM1N1owfDELMAkGA1UEBhMCQ04xCzAJBgNVBAgMAkhOMQswCQYDVQQH
DAJDUzENMAsGA1UECgwEUk1KUzENMAsGA1UECwwEUk1KUzEUMBIGA1UEAwwLZHJl
YW1sdS5uZXQxHzAdBgkqhkiG9w0BCQEWEDU5NjM5MjkxMkBxcS5jb20wgZ8wDQYJ
KoZIhvcNAQEBBQADgY0AMIGJAoGBAL3hH0vRYCsqs79ghUPskgr+bQyVkO/6U1H1
4yHJWyWeXT2tl+VTvXhou0Zk3yYtd6O3l241PgmAjwBDPyfmM2UU/zluXQzELEuf
AnC4Q2wl0vpoj7mGQeMuRpjZN3FblS0bRifvctx3ubWVvSzCFIaKLZgdrnyWm5Lm
kJQjRaEDAgMBAAEwDQYJKoZIhvcNAQELBQADgYEAkm/Mv9nEGE4xy3L4NmwgyZMo
j5CuarbCEv/MgXST8B/dJvtC7PbCzrb9BbUkiIUjCOnRC+2U0DbSAkHy7CCwHTLx
g9Syn1SMWn9SF74l74qVtuaRjMKCGwSlqBLH5/t1FdPoevY2a+zi5dAqwIhkjY84
uYOgxODe+kFBUbT6W3A=
-----END CERTIFICATE-----

View File

@@ -1,15 +0,0 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQC94R9L0WArKrO/YIVD7JIK/m0MlZDv+lNR9eMhyVslnl09rZfl
U714aLtGZN8mLXejt5duNT4JgI8AQz8n5jNlFP85bl0MxCxLnwJwuENsJdL6aI+5
hkHjLkaY2TdxW5UtG0Yn73Lcd7m1lb0swhSGii2YHa58lpuS5pCUI0WhAwIDAQAB
AoGAR/T8azsZesJgA/KMDkWkws3QfahgmNEAqlrIjJFGHWd6ZllW6u1lLDBkaDTp
7AnnAQAePwGmVOuHRc42LOSsLMX/D2wYsTGjLTT1w+fEDkQCVnDKV8ZYWA5fN7Zh
m5cLB5IB23L/Xvs8UMYQ8qWufv6BxVPr+cawtXOK3O91AoECQQD5PwGSrMpsNkVG
Ox1t1A9wVgelbq+9qANPl8TvdaZKvNsbWwI0PRqK7D1SJXJiMX4m/hc8YkIXJWsO
Wd1yLg6TAkEAwwZLncJYlYOp0v8PXk75rDR7KV9fdBPPOdnZ1XU/yFW3anwOt0OX
LvGL3X0A2f/tqOX9v+3LENCICF4gAv850QJBALo7Yal+giEoy8oWEX8mnAKLxVrO
wXEsQI0QEY36ki31vqFJ9vOhVFvI+GiQok7MPD5WTHZJ1KgGxV8LtnLCBxECQCPD
WcZ6RyhT1qaco0LWFK7hiNxTYvu0TkH7kxizwZiJL3NVgJVWzbiMDuv06l0Ps5NP
abLydlSFCQ0PxasHBqECQQC0lhmqkNbaN3lJyGDoEWm6Kb9z3eh3+9Fk4j328aYW
gQcBeRhKU8kdDTg2flOWS3sxrEysJYv8i9DPbX9RsRFd
-----END RSA PRIVATE KEY-----

View File

@@ -1,8 +0,0 @@
writer = console
writer.format = {date: HH:mm:ss.SSS} [{thread}] {level} {class-name}.{method} : {message}
writer.level = info
# level
level@org.tio = warn
level@org.tio.client.TioClient = off
level@org.tio.server = info
level@org.dromara.mica.mqtt = info

View File

@@ -1 +0,0 @@
package net.dreamlu.iot;

View File

@@ -1,49 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>mica-mqtt-server-solon-plugin-example</artifactId>
<dependencies>
<dependency>
<groupId>org.noear</groupId>
<artifactId>solon-web</artifactId>
</dependency>
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-server-solon-plugin</artifactId>
</dependency>
<!-- 简单的本地定时任务调度 -->
<dependency>
<groupId>org.noear</groupId>
<artifactId>solon-scheduling-simple</artifactId>
</dependency>
<dependency>
<groupId>org.noear</groupId>
<artifactId>solon-logging-simple</artifactId>
</dependency>
<!-- metrics 指标 开始 -->
<dependency>
<groupId>org.noear</groupId>
<artifactId>solon-cloud-metrics</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- metrics 指标 结束 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,19 +0,0 @@
package org.dromara.mica.mqtt.server.solon;
import org.noear.solon.Solon;
import org.noear.solon.annotation.Configuration;
import org.noear.solon.scheduling.annotation.EnableScheduling;
/**
* @author wsq
*/
@Configuration
@EnableScheduling
public class MqttServerApplication {
public static void main(String[] args) {
Solon.start(MqttServerApplication.class, args);
}
}

View File

@@ -1,22 +0,0 @@
package org.dromara.mica.mqtt.server.solon.controller;
import org.dromara.mica.mqtt.server.solon.service.ServerService;
import org.noear.solon.annotation.Controller;
import org.noear.solon.annotation.Inject;
import org.noear.solon.annotation.Mapping;
import org.noear.solon.annotation.Post;
@Mapping("/mqtt/server")
@Controller
public class ServerController {
@Inject
private ServerService service;
@Mapping("publish")
@Post
public boolean publish(String body) {
return service.publish(body);
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.server.solon.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.server.solon.event.MqttClientOfflineEvent;
import org.noear.solon.annotation.Component;
import org.noear.solon.core.event.EventListener;
/**
* mqtt 连接状态,使用 solon event 方式,性能有损耗
*
* @author L.cm
*/
@Slf4j
@Component
public class MqttConnectOfflineListener implements EventListener<MqttClientOfflineEvent> {
@Override
public void onEvent(MqttClientOfflineEvent mqttClientOfflineEvent) throws Throwable {
log.info("MqttClientOnlineEvent:{}", mqttClientOfflineEvent);
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.server.solon.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.server.solon.event.MqttClientOnlineEvent;
import org.noear.solon.annotation.Component;
import org.noear.solon.core.event.EventListener;
/**
* mqtt 连接状态,使用 solon event 方式,性能有损耗
*
* @author L.cm
*/
@Slf4j
@Component
public class MqttConnectOnlineListener implements EventListener<MqttClientOnlineEvent> {
@Override
public void onEvent(MqttClientOnlineEvent mqttClientOnlineEvent) throws Throwable {
log.info("MqttClientOnlineEvent:{}", mqttClientOnlineEvent);
}
}

View File

@@ -1,25 +0,0 @@
package org.dromara.mica.mqtt.server.solon.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.server.event.IMqttMessageListener;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
/**
* 消息监听器示例1直接实现 IMqttMessageListener注意如果实现了 IMqttMessageListenerMqttServerFunction 注解就不生效了。
*
* @author wsq
*/
@Slf4j
//@Component
public class MqttServerMessageListener1 implements IMqttMessageListener {
@Override
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qos, MqttPublishMessage message) {
log.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, new String(message.payload(), StandardCharsets.UTF_8));
}
}

View File

@@ -1,48 +0,0 @@
package org.dromara.mica.mqtt.server.solon.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.annotation.MqttServerFunction;
import org.dromara.mica.mqtt.server.solon.pojo.User;
import org.noear.solon.annotation.Component;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import java.util.Map;
/**
* 消息监听器示例2MqttServerFunction 注解订阅,注意:如果自行实现了 IMqttMessageListenerMqttServerFunction 注解就不生效了。
*
* @author L.cm
*/
@Slf4j
@Component
public class MqttServerMessageListener2 {
@MqttServerFunction("/test/object")
public void func1(String topic, User<?> user) {
log.info("topic:{} user:{}", topic, user);
}
@MqttServerFunction("/test/client")
public void func1(String topic, byte[] message) {
log.info("topic:{} message:{}", topic, new String(message));
}
/**
* MQTT消息处理函数匹配 mqtt Topic /test/+,如何需要匹配所以消息,请使用通配符 #
*
* @param context ChannelContext可选参数
* @param topic 实际接收到消息的主题名称,可选参数
* @param topicVars topic 中的 ${xxxx} 变量解析v2.5.4支持),可选参数,注意:类型必须为 Map<String, String>
* @param publishMessage 完整的MQTT发布消息对象包含消息头和负载可选参数
* @param message 消息负载内容,以字节数组形式提供,可选参数,也可支持对象形式,默认 json 序列化
*/
@MqttServerFunction("/test/${xxxx}")
public void func3(ChannelContext context, String topic, Map<String, String> topicVars, MqttPublishMessage publishMessage, byte[] message) {
// 获取客户端节点信息
Node clientNode = context.getClientNode();
// 记录接收到的MQTT消息信息
log.info("clientNode:{} topic:{} topicVars:{} publishMessage:{} message:{}", clientNode, topic, topicVars, publishMessage, new String(message));
}
}

View File

@@ -1,19 +0,0 @@
package org.dromara.mica.mqtt.server.solon.pojo;
import lombok.Data;
@Data
public class User<T> {
private String name;
private T girlfriend;
public static User newUser(){
User<User> user1 = new User();
user1.setName("name1");
User<User> user2 = new User();
user2.setName("name2");
user2.setGirlfriend(user1);
return user2;
}
}

View File

@@ -1,24 +0,0 @@
package org.dromara.mica.mqtt.server.solon.service;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.server.solon.MqttServerTemplate;
import org.noear.solon.annotation.Component;
import org.noear.solon.annotation.Inject;
import java.nio.charset.StandardCharsets;
/**
* @author wsq
*/
@Slf4j
@Component
public class ServerService {
@Inject
private MqttServerTemplate server;
public boolean publish(String body) {
boolean result = server.publishAll("/test/123", body.getBytes(StandardCharsets.UTF_8));
log.info("Mqtt publishAll result:{}", result);
return result;
}
}

View File

@@ -1,25 +0,0 @@
package org.dromara.mica.mqtt.server.solon.task;
import org.dromara.mica.mqtt.server.solon.MqttServerTemplate;
import org.dromara.mica.mqtt.server.solon.pojo.User;
import org.noear.solon.annotation.Component;
import org.noear.solon.annotation.Inject;
import org.noear.solon.scheduling.annotation.Scheduled;
import java.nio.charset.StandardCharsets;
/**
* @author wsq
*/
@Component
public class PublishTask {
@Inject
private MqttServerTemplate mqttServerTemplate;
@Scheduled(fixedDelay = 1000)
public void publish() {
mqttServerTemplate.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
mqttServerTemplate.publishAll("/test/object", User.newUser());
}
}

View File

@@ -1,54 +0,0 @@
# solon 配置
solon:
logging:
appender:
console:
level: INFO
# mqtt 服务端配置
mqtt:
server:
enabled: true # 是否开启服务端默认true
name: Mica-Mqtt-Server # 名称默认Mica-Mqtt-Server
heartbeat-timeout: 120000 # 心跳超时,单位毫秒,默认: 1000 * 120
read-buffer-size: 8KB # 接收数据的 buffer size默认8k
max-bytes-in-message: 10MB # 消息解析最大 bytes 长度默认10M
auth:
enable: false # 是否开启 mqtt 认证
username: mica # mqtt 认证用户名
password: mica # mqtt 认证密码
debug: true # 如果开启 prometheus 指标收集建议关闭
stat-enable: true # 开启指标收集debug 和 prometheus 开启时需要打开,默认开启,关闭节省内存
mqtt-listener: # mqtt 监听器
enable: true # 是否开启默认false
# ip: "0.0.0.0" # 服务端 ip 默认为空0.0.0.0,建议不要设置
port: 1883 # 端口默认1883
mqtt-ssl-listener: # mqtt ssl 监听器
enable: false # 是否开启默认false
port: 8883 # 端口默认8883
ssl: # ssl 配置,必须
keystore-path: # 必须参数ssl keystore 目录,支持 classpath:/ 路径。
keystore-pass: # 必选参数ssl keystore 密码
truststore-path: # 可选参数ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
truststore-pass: # 可选参数ssl 双向认证 truststore 密码
client-auth: none # 是否需要客户端认证双向认证默认NONE不需要
ws-listener: # websocket mqtt 监听器
enable: true # 是否开启默认false
port: 8083 # websocket 端口默认8083
wss-listener: # websocket ssl mqtt 监听器
enable: false # 是否开启默认false
port: 8084 # 端口默认8084
ssl: # ssl 配置,必须
keystore-path: # 必须参数ssl keystore 目录,支持 classpath:/ 路径。
keystore-pass: # 必选参数ssl keystore 密码
truststore-path: # 可选参数ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
truststore-pass: # 可选参数ssl 双向认证 truststore 密码
client-auth: none # 是否需要客户端认证双向认证默认NONE不需要
http-listener:
enable: true
port: 18083
basic-auth: # 基础认证
enable: true
username: mica
password: mica
mcp-server: # 大模型 mcp
enable: true

View File

@@ -1,25 +0,0 @@
package org.dromara.mica.mqtt.server.controller;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.dromara.mica.mqtt.server.service.ServerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Tag(name = "Mqtt::服务端")
@RequestMapping("/mqtt/server")
@RestController
public class ServerController {
@Autowired
private ServerService service;
@Operation(summary = "publish")
@PostMapping("publish")
public boolean publish(@RequestBody String body) {
return service.publish(body);
}
}

View File

@@ -1,12 +0,0 @@
package org.dromara.mica.mqtt.server.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("car_info")
public class CarInfo {
private Long customerId;
private String plate;
private String enable;
}

View File

@@ -1,55 +0,0 @@
package org.dromara.mica.mqtt.server.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.server.pojo.User;
import org.dromara.mica.mqtt.core.annotation.MqttServerFunction;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import java.util.Map;
/**
* 消息监听器示例2MqttServerFunction 注解订阅,注意:如果自行实现了 IMqttMessageListenerMqttServerFunction 注解就不生效了。
*
* @author wsq
*/
@Slf4j
@Service
public class MqttServerMessageListener2 {
/**
* MQTT消息处理函数
*
* @param topic mqtt Topic
* @param user 订阅消息的负载内容,默认 json 序列化
*/
@MqttServerFunction("/test/object")
public void func1(String topic, User<?> user) {
log.info("topic:{} user:{}", topic, user);
}
@MqttServerFunction("/test/client")
public void func2(String topic, byte[] message) {
log.info("topic:{} message:{}", topic, new String(message));
}
/**
* MQTT消息处理函数匹配 mqtt Topic /test/+,如何需要匹配所以消息,请使用通配符 #
*
* @param context ChannelContext可选参数
* @param topic 实际接收到消息的主题名称,可选参数
* @param topicVars topic 中的 ${xxxx} 变量解析v2.5.4支持),可选参数,注意:类型必须为 Map<String, String>
* @param publishMessage 完整的MQTT发布消息对象包含消息头和负载可选参数
* @param message 消息负载内容,以字节数组形式提供,可选参数,也可支持对象形式,默认 json 序列化
*/
@MqttServerFunction("/test/${xxxx}")
public void func3(ChannelContext context, String topic, Map<String, String> topicVars, MqttPublishMessage publishMessage, byte[] message) {
// 获取客户端节点信息
Node clientNode = context.getClientNode();
// 记录接收到的MQTT消息信息
log.info("clientNode:{} topic:{} topicVars:{} publishMessage:{} message:{}", clientNode, topic, topicVars, publishMessage, new String(message));
}
}

View File

@@ -1,19 +0,0 @@
package org.dromara.mica.mqtt.server.pojo;
import lombok.Data;
@Data
public class User<T> {
private String name;
private T girlfriend;
public static User newUser(){
User<User> user1 = new User();
user1.setName("name1");
User<User> user2 = new User();
user2.setName("name2");
user2.setGirlfriend(user1);
return user2;
}
}

View File

@@ -1,24 +0,0 @@
package org.dromara.mica.mqtt.server.service;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
* @author wsq
*/
@Slf4j
@Service
public class ServerService {
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
boolean result = server.publishAll("/test/123", body.getBytes(StandardCharsets.UTF_8));
log.info("Mqtt publishAll result:{}", result);
return result;
}
}

View File

@@ -1,11 +0,0 @@
package org.dromara.mica.mqtt.server.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.dromara.mica.mqtt.server.entity.CarInfo;
import org.dromara.mica.mqtt.server.mapper.CarInfoMapper;
import org.dromara.mica.mqtt.server.service.ICarInfoService;
import org.springframework.stereotype.Service;
@Service
public class CarInfoServiceImpl extends ServiceImpl<CarInfoMapper, CarInfo> implements ICarInfoService {
}

View File

@@ -1,25 +0,0 @@
package org.dromara.mica.mqtt.server.task;
import org.dromara.mica.mqtt.core.server.MqttServer;
import org.dromara.mica.mqtt.server.pojo.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
* @author wsq
*/
@Service
public class PublishAllTask {
@Autowired
private MqttServer mqttServer;
@Scheduled(fixedDelay = 1000)
public void run() {
mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
mqttServer.publishAll("/test/object", User.newUser());
}
}

View File

@@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>mica-mqtt-server-spring-boot-example</artifactId>
<artifactId>mqtt-car</artifactId>
<parent>
<groupId>org.dromara.mica-mqtt</groupId>
@@ -11,6 +11,18 @@
<version>${revision}</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>4.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
@@ -47,28 +59,54 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot4-starter</artifactId>
<version>3.5.14</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>com.baomidou</groupId>-->
<!-- <artifactId>mybatis-plus-spring-boot-autoconfigure</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>
<!-- &lt;!&ndash; MySQL Connector &ndash;&gt;-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- &lt;!&ndash; Druid Connection Pool &ndash;&gt;-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.20</version>
</dependency>
<!-- SpringBoot Boot Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- JSON 解析器和生成器 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.23</version>
</dependency>
<!-- Hutool工具包 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.40</version>
</dependency>
</dependencies>
<build>

View File

@@ -1,5 +1,6 @@
package org.dromara.mica.mqtt.server;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@@ -9,6 +10,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
*/
@SpringBootApplication
@EnableScheduling
@MapperScan("org.dromara.mica.mqtt.server.mapper")
public class MqttServerApplication {
/**

View File

@@ -1,6 +1,8 @@
package org.dromara.mica.mqtt.server.auth;
import org.dromara.mica.mqtt.core.server.auth.IMqttServerAuthHandler;
import org.dromara.mica.mqtt.spring.server.config.MqttServerProperties;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
@@ -12,10 +14,27 @@ import org.tio.core.ChannelContext;
@Configuration(proxyBeanMethods = false)
public class MqttAuthHandler implements IMqttServerAuthHandler {
@Value("${mqtt.server.auth.enable}")
private boolean enable;
@Value("${mqtt.server.auth.username}")
private String username;
@Value("${mqtt.server.auth.password}")
private String password;
@Override
public boolean authenticate(ChannelContext context, String uniqueId, String clientId, String username, String password) {
// 客户端认证逻辑实现
return true;
if (enable) {
if (username.equals(this.username) && password.equals(this.password)) {
return true;
} else {
return false;
}
} else {
return false;
}
}
}

View File

@@ -0,0 +1,38 @@
package org.dromara.mica.mqtt.server.config;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* mybatis-plus配置
*
* @author Mark sunlightcs@gmail.com
*/
@Configuration
public class MybatisPlusConfig {
/**
* 配置分页等
*/
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor mybatisPlusInterceptor = new MybatisPlusInterceptor();
// 乐观锁
mybatisPlusInterceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return mybatisPlusInterceptor;
}
// @Bean
// public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
// MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
// sqlSessionFactoryBean.setDataSource(dataSource);
// sqlSessionFactoryBean.setMapperLocations(
// new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/**/*.xml")
// );
// return sqlSessionFactoryBean.getObject();
// }
}

View File

@@ -0,0 +1,42 @@
package org.dromara.mica.mqtt.server.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
//使用jackson进行序列化
Jackson2JsonRedisSerializer jsonRedisSerializer =
new Jackson2JsonRedisSerializer(Object.class);
//规定序列化规则
ObjectMapper objectMapper = new ObjectMapper();
/**
* 第一个参数指的是序列化的域ALL指的是字段、get和set方法、构造方法
* 第二个参数指的是序列化哪些访问修饰符默认是publicANY指任何访问修饰符
*/
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//指定序列化输入的类型类必须是非final修饰的类
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jsonRedisSerializer.setObjectMapper(objectMapper);
//序列化key value
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

View File

@@ -0,0 +1,35 @@
package org.dromara.mica.mqtt.server.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* 定时任务线程池配置
* 解决默认定时任务线程池(大小为1)被阻塞导致所有定时任务无法执行的问题
*/
@Slf4j
@Configuration
@EnableScheduling
public class SchedulerConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
// 增加线程池大小,避免单线程阻塞影响所有定时任务
scheduler.setPoolSize(5);
scheduler.setThreadNamePrefix("mqtt-scheduler-");
// 等待所有任务完成后再关闭
scheduler.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间
scheduler.setAwaitTerminationSeconds(60);
// 设置异常处理器,确保异常被记录但不中断定时任务
scheduler.setErrorHandler(throwable -> {
log.error("定时任务执行异常", throwable);
});
return scheduler;
}
}

View File

@@ -0,0 +1,24 @@
package org.dromara.mica.mqtt.server.constant;
/**
* 缓存常量信息
*/
public class CacheConstants
{
/**
* 设备心跳缓存key
*/
public static final String EQUIPMENT_HEARTBEAT = "equipment:heartbeat:";
/**
* 设备心跳缓存过期时间
*/
public static final long OFFLINE_THRESHOLD = 15 * 1000;
/**
* 设备在线状态缓存key数据库映射值
*/
public static final String EQUIPMENT_FLAG = "equipment:flag:";
}

View File

@@ -1,22 +1,31 @@
package org.dromara.mica.mqtt.server.controller;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.AllArgsConstructor;
import org.dromara.mica.mqtt.server.entity.CarInfo;
import org.dromara.mica.mqtt.server.redis.RedisService;
import org.dromara.mica.mqtt.server.service.ICarInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/car")
@AllArgsConstructor
public class CarInfoController {
private final ICarInfoService carInfoService;
@Autowired
ICarInfoService carInfoService;
@Autowired
RedisService redisService;
@GetMapping("/list")
public List<CarInfo> list() {
return carInfoService.list();
}
@PostMapping

View File

@@ -0,0 +1,55 @@
package org.dromara.mica.mqtt.server.controller;
import com.alibaba.fastjson2.JSONObject;
import org.dromara.mica.mqtt.server.service.impl.ServerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/mqtt/server")
@RestController
public class ServerController {
@Autowired
private ServerService service;
@PostMapping("publish")
public JSONObject publish(@RequestBody JSONObject js) {
boolean publish = service.publish(js);
JSONObject jsonObject = new JSONObject();
if (publish) {
jsonObject.put("code", 200);
} else {
jsonObject.put("code", 500);
}
return jsonObject;
}
@PostMapping("open")
public boolean open(@RequestBody String body) {
return service.open(body);
}
@PostMapping("open2")
public boolean open2(@RequestBody String body) {
return service.open2(body);
}
@PostMapping("check_offline_record")
public boolean check_offline_record(@RequestBody String body) {
return service.check_offline_record(body);
}
@PostMapping("offline_record")
public boolean offline_record(@RequestBody String body) {
return service.offline_record(body);
}
@PostMapping("set_time")
public boolean set_time(@RequestBody String body) {
return service.set_time(body);
}
}

View File

@@ -0,0 +1,78 @@
package org.dromara.mica.mqtt.server.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
@Data
@TableName("car_info")
public class CarInfo implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/** customer_id */
@TableId(type = IdType.AUTO)
private Long customerId;
/** 白名单生效时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date enableTime;
/** 白名单失效时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date overdueTime;
/** 是否启动 */
private String enable;
/** 车牌号 */
private String plate;
/** 车牌号 */
private List<String> plateList;
/** 是否启用时间段 */
private String timeSegEnable;
/** 时间段 */
private String segTime;
/** 时间段 */
private String segTimeStart = "00:00:00";
/** 时间段 */
private String segTimeEnd = "00:00:00";
/** 是否需要报警 */
private String needAlarm;
/** 用户自定义代码 */
private String vehicleCode;
/** 用户自定义的注释 */
private String vehicleComment;
/** 所属人员 */
private Long peopleId;
/** 是否删除 0 是未删除 2是删除 */
private String delFlag;
/** 同步标识 sync 0是未同步 1是已同步 */
private String sync;
private String sn;//设备序列号
/** 超频设备识别标签 */
private String overclockCard;
/** car_park_record表的主键id */
private Long carParkRecordId;
}

View File

@@ -0,0 +1,50 @@
package org.dromara.mica.mqtt.server.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.Date;
/**
* 车场设备实体
*
*/
@Data
@TableName("car_park_item")
public class CarParkItem implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 设备id
*/
private String equipmentId;
/**
* 停车场id
*/
private String parkId;
/**
* 过车行驶方向 0 入场过车 1出场过车
*/
private String way;
/**
* 区域
*/
private String area;
}

View File

@@ -0,0 +1,49 @@
package org.dromara.mica.mqtt.server.entity;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
/**
* 车辆授权对象 car_park_record
*
*/
@Data
public class CarParkRecord implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* id
*/
private Long id;
/**
* 设备id
*/
private Long equipmentId;
/**
* 车场id
*/
private Long parkId;
/**
* 车辆id
*/
private Long customerId;
/**
* 是否下发0未下发1已下发
*/
private String sync;
/**
* 摄像头mqtt协议的报文id
*/
private String clientId;
}

View File

@@ -0,0 +1,56 @@
package org.dromara.mica.mqtt.server.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.Date;
/**
* 在场记录对象 car_pass_gather
*
*/
@Data
@TableName("car_pass_gather")
public class CarPassGather implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 停车场id
*/
private String parkId;
/**
* 区域
*/
private String area;
/**
* 车牌号码
*/
private String license;
/**
* 入场时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date joinTime;
/**
* 设备序列号
*/
private String sn;
}

View File

@@ -0,0 +1,90 @@
package org.dromara.mica.mqtt.server.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.io.Serial;
import java.io.Serializable;
import java.util.Date;
/**
* 通行记录对象 car_pass_record
*
*/
@Data
@TableName("car_pass_record")
public class CarPassRecord implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* id
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 停车场id
*/
private String parkId;
/**
* 过车信息唯一标识
*/
private String uniqueNo;
/**
* 过车行驶方向 0 入场过车 1出场过车
*/
private String direction;
/**
* 车牌号码
*/
private String license;
/**
* 通行时间
*/
@JsonFormat(pattern = "yyyy-MM-dd")
private Date passTime;
/**
* 触发类型
*/
private String triggerType;
/**
* 车牌颜色
*/
private String colorType;
/**
* 车辆颜色
*/
private String carColor;
/**
* 车牌图片数据
*/
private String url;
/**
* 数据类型 0 实时数据 1历史数据
*/
private String dataType;
/**
* 设备序列号
*/
private String sn;
private String type;
}

View File

@@ -0,0 +1,51 @@
package org.dromara.mica.mqtt.server.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
@Data
@TableName("sys_equipment")
public class Equipment implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/** 设备Id */
@TableId(type = IdType.AUTO)
private Long id;
/** 所属产品Id */
private Long productId;
/** 设备名称 */
private String name;
/** 设备序列号 */
private String sequence;
/** 设备Ip */
private String ip;
/** 设备密码 */
private String password;
/** 设备区域 */
private Long spaceId;
/** 设备位置 */
private Long pointId;
/** 对接状态(0未对接 1对接成功) */
private Long state;
/** 设备状态(0在线 1离线) */
private String flag;
}

View File

@@ -0,0 +1,27 @@
package org.dromara.mica.mqtt.server.enums;
/**
* 设备是否在线
*
*/
public enum FlagEnums {
ONLINE("0", "在线"),
OFFLINE("1", "离线")
;
FlagEnums(String code, String name) {
this.code = code;
this.name = name;
}
private String code;
private String name;
public String getCode() {
return code;
}
public String getValue() {
return name;
}
}

View File

@@ -0,0 +1,292 @@
package org.dromara.mica.mqtt.server.listener;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.mica.mqtt.core.annotation.MqttServerFunction;
import org.dromara.mica.mqtt.server.constant.CacheConstants;
import org.dromara.mica.mqtt.server.entity.CarParkItem;
import org.dromara.mica.mqtt.server.entity.CarParkRecord;
import org.dromara.mica.mqtt.server.entity.CarPassGather;
import org.dromara.mica.mqtt.server.entity.CarPassRecord;
import org.dromara.mica.mqtt.server.enums.FlagEnums;
import org.dromara.mica.mqtt.server.pojo.WhiteListOperatorPO;
import org.dromara.mica.mqtt.server.redis.RedisService;
import org.dromara.mica.mqtt.server.service.ICarParkItemService;
import org.dromara.mica.mqtt.server.service.ICarParkRecordService;
import org.dromara.mica.mqtt.server.service.ICarPassGatherService;
import org.dromara.mica.mqtt.server.service.ICarPassRecordService;
import org.dromara.mica.mqtt.server.utils.AESUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 消息监听器
*
* @author wsq
*/
@Slf4j
@Service
public class CarMessageListener {
@Autowired
RedisService redisService;
@Autowired
ICarParkRecordService carParkRecordService;
@Autowired
ICarPassRecordService carPassRecordService;
@Autowired
ICarParkItemService carParkItemService;
@Autowired
ICarPassGatherService carPassGatherService;
private static String key = "1234567898765432";
//xa、jl、td、xj
private String jinjiangUrl = "http://127.0.0.1:6609/";
//zr
// private String jinjiangUrl = "http://192.168.155.42:6609/";
/**
* 心跳
* @param topic
* @param topicVars
* @param message
*/
@MqttServerFunction("device/${sn}/message/up/keep_alive")
public void onKeepAliveMessage(String topic, Map<String, String> topicVars, byte[] message) {
String sn = topicVars.get("sn");
log.info("接收到来自客户端 [{}] 的心跳消息 -> Topic: {}", sn, topic);
// 更新客户端的最后心跳
redisService.setCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + sn, FlagEnums.ONLINE.getCode(), CacheConstants.OFFLINE_THRESHOLD, TimeUnit.MILLISECONDS);
}
/**
* 车牌下发结果监听
*
* @param topic
* @param message
*/
@MqttServerFunction("device/${sn}/message/down/white_list_operator/reply")
@Transactional(rollbackFor = Exception.class)
public void white_list_operator_reply(String topic, byte[] message) {
log.info("接收到车牌下发消息 -> Topic: {}, message: {}", topic, new String(message));
String data = new String(message, StandardCharsets.UTF_8);
WhiteListOperatorPO whiteListOperatorPO = JSONObject.parseObject(data, WhiteListOperatorPO.class);
if (null == whiteListOperatorPO || 200 != whiteListOperatorPO.getCode()) {
log.error("white_list_operator发布失败{}", data);
return;
}
CarParkRecord carParkRecord = new CarParkRecord();
carParkRecord.setClientId(whiteListOperatorPO.getId());
//新增修改和删除车牌得回执信息一致通过id区分
if (whiteListOperatorPO.getId().contains("del_")) {
carParkRecord.setSync("2");
} else {
carParkRecord.setSync("1");
}
carParkRecordService.updateByClientId(carParkRecord);
}
/**
* 车牌入场出场识别监听
*
* @param topic
* @param message
*/
@MqttServerFunction("device/${sn}/message/up/ivs_result")
public void ivs_result(String topic, Map<String, String> topicVars, byte[] message) throws Exception {
String sn = topicVars.get("sn");
log.info("接收到车辆识别消息 -> Topic: {}", topic);
CarParkItem carParkItem = carParkItemService.selectBySn(sn);
String data = new String(message, StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(data);
JSONObject payload = jsonObject.getJSONObject("payload");
String id = jsonObject.getString("id");
JSONObject alarmInfoPlate = payload.getJSONObject("AlarmInfoPlate");
JSONObject result = alarmInfoPlate.getJSONObject("result");
JSONObject plateResult = result.getJSONObject("PlateResult");
JSONObject carBrand = plateResult.getJSONObject("car_brand");
String license = plateResult.getString("license");
String colorType = plateResult.getString("colorType");
String str = "";
if ("5".equals(colorType)) {
str = AESUtil.decrptyAES_ECB(license, key).substring(0, 20);
} else {
str = AESUtil.decrptyAES_ECB(license, key).substring(0, 18);
}
license = AESUtil.UTF8decode(str);
log.info("解密前车牌:{},解谜后的车牌:{}", plateResult.getString("license"), license);
//保存通行记录
CarPassRecord record = new CarPassRecord();
record.setCarColor(plateResult.getString("carColor"));
record.setColorType(colorType);
record.setDirection(plateResult.getString("direction"));
record.setLicense(license);
record.setDataType("0");
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("url", plateResult.getString("full_image_content"));
String repose = HttpUtil.createPost(jinjiangUrl + "file/uploadMinioCarBase64")
.body(JSON.toJSONString(paramMap))
.execute()
.body();
log.info("imgRsp:{}", repose);
JSONObject imgRsp = JSONObject.parseObject(repose);
if (null != imgRsp && imgRsp.getInteger("code") == 200) {
record.setUrl(imgRsp.getJSONObject("data").getString("url"));
}
if (plateResult.containsKey("start_time")) {
log.info("拿到了时间:{}", plateResult.getLong("start_time"));
record.setPassTime(DateUtil.date(plateResult.getLong("start_time")));
} else {
log.info("没有拿到时间,默认当前时间");
record.setPassTime(DateUtil.date(new Date()));
}
record.setSn(sn);
record.setUniqueNo(id);
record.setParkId(carParkItem.getParkId());
record.setTriggerType(plateResult.getString("triggerType"));
record.setType(carBrand.getString("type"));
carPassRecordService.save(record);
//保存/删除在场数据
boolean isExist = carPassGatherService.exists(new QueryWrapper<CarPassGather>().eq("license", license));
//入场新增数据
if (StrUtil.equals(carParkItem.getWay(), "0") && !isExist) {
CarPassGather carPassGather = new CarPassGather();
carPassGather.setLicense(license);
carPassGather.setParkId(carParkItem.getParkId());
carPassGather.setArea(carParkItem.getArea());
carPassGather.setJoinTime(record.getPassTime());
carPassGather.setSn(sn);
carPassGatherService.save(carPassGather);
} else if (StrUtil.equals(carParkItem.getWay(), "1") && isExist) {//出场删除数据
carPassGatherService.deleteByLicense(license);
}
}
/**
* 车辆离线识别监听
*
* @param topic
* @param message
*/
@MqttServerFunction("device/${sn}/message/up/ivs_result_offline")
public void ivs_result_offline(String topic, Map<String, String> topicVars, byte[] message) throws Exception {
String sn = topicVars.get("sn");
log.info("接收到车辆离线识别消息 -> Topic: {}", topic);
String data = new String(message, StandardCharsets.UTF_8);
log.info("车牌离线识别监听:{}", data);
JSONObject jsonObject = JSONObject.parseObject(data);
JSONObject payload = jsonObject.getJSONObject("payload");
String id = jsonObject.getString("id");
JSONObject alarmInfoPlate = payload.getJSONObject("AlarmInfoPlate");
JSONObject result = alarmInfoPlate.getJSONObject("result");
JSONObject plateResult = result.getJSONObject("PlateResult");
JSONObject carBrand = plateResult.getJSONObject("car_brand");
String license = plateResult.getString("license");
String colorType = plateResult.getString("colorType");
String str = "";
if ("5".equals(colorType)) {
str = AESUtil.decrptyAES_ECB(license, key).substring(0, 20);
} else {
str = AESUtil.decrptyAES_ECB(license, key).substring(0, 18);
}
license = AESUtil.UTF8decode(str);
log.info("离线-解密前车牌:{},解谜后的车牌:{}", plateResult.getString("license"), license);
//保存通行记录
CarPassRecord record = new CarPassRecord();
record.setCarColor(plateResult.getString("carColor"));
record.setColorType(colorType);
record.setDirection(plateResult.getString("direction"));
record.setLicense(license);
record.setDataType("0");
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("url", plateResult.getString("full_image_content"));
String repose = HttpUtil.createPost(jinjiangUrl + "file/uploadMinioCarBase64")
.body(JSON.toJSONString(paramMap))
.execute()
.body();
log.info("离线-imgRsp:{}", repose);
JSONObject imgRsp = JSONObject.parseObject(repose);
if (null != imgRsp && imgRsp.getInteger("code") == 200) {
record.setUrl(imgRsp.getJSONObject("data").getString("url"));
}
if (plateResult.containsKey("start_time")) {
log.info("拿到了离线时间:{}", plateResult.getLong("start_time"));
record.setPassTime(DateUtil.date(plateResult.getLong("start_time")));
} else {
log.info("没有拿到离线时间,默认当前时间");
record.setPassTime(DateUtil.date(new Date()));
}
record.setSn(sn);
record.setUniqueNo(id);
record.setTriggerType(plateResult.getString("triggerType"));
record.setType(carBrand.getString("type"));
carPassRecordService.save(record);
}
/**
* IO输出事件监听
*
* @param topic
* @param message
*/
@MqttServerFunction("device/${sn}/message/down/gpio_out/reply")
public void gpio_out(String topic, byte[] message) {
log.info("IO输出事件监听消息 -> Topic: {}, message: {}", topic, new String(message));
String data = new String(message, StandardCharsets.UTF_8);
log.info("IO输出事件监听{}", data);
}
/**
* IO锁定事件监听
*
* @param topic
* @param message
*/
@MqttServerFunction("device/${sn}/message/down/set_io_lock_status/reply")
public void set_io_lock_status(String topic, byte[] message) {
log.info("IO锁定事件监听消息 -> Topic: {}, message: {}", topic, new String(message));
String data = new String(message, StandardCharsets.UTF_8);
log.info("IO锁定事件监听{}", data);
}
/**
* 订阅离线数据数量
*
* @param topic
* @param message
*/
@MqttServerFunction("device/${sn}/message/up/offline_record")
public void offline_record(String topic, byte[] message) {
log.info("订阅离线数据数量 -> Topic: {}, message: {}", topic, new String(message));
String data = new String(message, StandardCharsets.UTF_8);
}
}

View File

@@ -16,11 +16,22 @@
package org.dromara.mica.mqtt.server.listener;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.core.server.event.IMqttConnectStatusListener;
import org.dromara.mica.mqtt.server.constant.CacheConstants;
import org.dromara.mica.mqtt.server.entity.Equipment;
import org.dromara.mica.mqtt.server.enums.FlagEnums;
import org.dromara.mica.mqtt.server.mapper.EquipmentMapper;
import org.dromara.mica.mqtt.server.redis.RedisService;
import org.dromara.mica.mqtt.server.service.IEquipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.tio.core.ChannelContext;
import java.util.concurrent.TimeUnit;
/**
* mqtt 连接状态
*
@@ -30,13 +41,25 @@ import org.tio.core.ChannelContext;
@Service
public class MqttConnectStatusListener2 implements IMqttConnectStatusListener {
@Autowired
IEquipmentService equipmentService;
@Autowired
RedisService redisService;
@Override
public void online(ChannelContext context, String clientId, String username) {
log.info("Mqtt clientId:{} username:{} online.", clientId, username);
//设备上线不做任何处理只有心跳报文做处理
log.info("online-context: {}", context);
log.info("设备:{}上线", clientId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void offline(ChannelContext context, String clientId, String username, String reason) {
log.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
log.info("offline-context: {}", context);
equipmentService.updateFlag(clientId, FlagEnums.OFFLINE.getCode());
redisService.deleteObject(CacheConstants.EQUIPMENT_HEARTBEAT + clientId);
log.info("设备:{}离线offline reason:{}.", clientId, reason);
}
}

View File

@@ -4,6 +4,11 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.dromara.mica.mqtt.server.entity.CarInfo;
import java.util.List;
@Mapper
public interface CarInfoMapper extends BaseMapper<CarInfo> {
List<CarInfo> selectCarInfoList(CarInfo car);
}

View File

@@ -0,0 +1,17 @@
package org.dromara.mica.mqtt.server.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.dromara.mica.mqtt.server.entity.CarInfo;
import org.dromara.mica.mqtt.server.entity.CarParkItem;
import java.util.List;
@Mapper
public interface CarParkItemMapper extends BaseMapper<CarParkItem> {
@Select("SELECT p.id,p.park_id parkId,p.way,p.equipment_id equipmentId,p.area FROM car_park_item p left join sys_equipment e on p.equipment_id = e.id where e.sequence = #{sn} and e.product_id = 4 limit 1")
CarParkItem selectBySn(@Param("sn") String sn);
}

View File

@@ -0,0 +1,10 @@
package org.dromara.mica.mqtt.server.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.dromara.mica.mqtt.server.entity.CarParkRecord;
@Mapper
public interface CarParkRecordMapper extends BaseMapper<CarParkRecord> {
}

View File

@@ -0,0 +1,10 @@
package org.dromara.mica.mqtt.server.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.dromara.mica.mqtt.server.entity.CarPassGather;
@Mapper
public interface CarPassGatherMapper extends BaseMapper<CarPassGather> {
}

View File

@@ -0,0 +1,10 @@
package org.dromara.mica.mqtt.server.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.dromara.mica.mqtt.server.entity.CarPassRecord;
@Mapper
public interface CarPassRecordMapper extends BaseMapper<CarPassRecord> {
}

View File

@@ -0,0 +1,11 @@
package org.dromara.mica.mqtt.server.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.dromara.mica.mqtt.server.entity.CarInfo;
import org.dromara.mica.mqtt.server.entity.Equipment;
@Mapper
public interface EquipmentMapper extends BaseMapper<Equipment> {
}

View File

@@ -0,0 +1,47 @@
package org.dromara.mica.mqtt.server.pojo;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
/**
* 设备报文数据实体
*/
@Data
public class WhiteListOperatorPO {
/**
* id
*/
private String id;
/**
* 回执的code
*/
private Integer code;
/**
* 设备编码
*/
private String sn;
/**
* 报文名称
*/
private String name;
/**
* 版本
*/
private String version = "1.0";
/**
* 内容
*/
private JSONObject payload;
/**
* 时间戳(精确到秒)
*/
private Long timestamp = System.currentTimeMillis() / 1000;
}

View File

@@ -0,0 +1,316 @@
package org.dromara.mica.mqtt.server.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class RedisService {
@Autowired
public RedisTemplate redisTemplate;
private static final long DEFAULT_TIMEOUT = 60 * 60 * 24 * 7;
/**
* 缓存基本的对象Integer、String、实体类等
*
* @param key 缓存的键值
* @param value 缓存的值
*/
public <T> void setCacheObject(final String key, final T value)
{
redisTemplate.opsForValue().set(key, value);
}
/**
* 缓存是否存在,存在返回false不存在返回true并存储缓存值
*
* @param key
* @param value
*/
public Boolean setIfAbsent(final String key, final String value, long timeout, TimeUnit timeUnit)
{
try {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, timeUnit);
} catch (Exception e) {
log.error("redis error:{}", e.getMessage());
return false;
}
}
/**
* 分布式加锁
*
* @param key
* @param value
*/
public Boolean lock(final String key, final String value, long timeout, TimeUnit timeUnit)
{
try {
if (timeout <= 0) {
timeout = DEFAULT_TIMEOUT;
timeUnit = TimeUnit.SECONDS;
}
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, timeUnit);
} catch (Exception e) {
log.error("redis error:{}", e.getMessage());
return false;
}
}
/**
* 缓存基本的对象Integer、String、实体类等
*
* @param key 缓存的键值
* @param value 缓存的值
* @param timeout 时间
* @param timeUnit 时间颗粒度
*/
public <T> void setCacheObject(final String key, final T value, final Long timeout, final TimeUnit timeUnit)
{
try {
redisTemplate.opsForValue().set(key, value, timeout, timeUnit);
} catch (Exception e) {
log.error("Redis setCacheObject error: key={}", key, e);
}
}
/**
* 设置有效时间
*
* @param key Redis键
* @param timeout 超时时间
* @return true=设置成功false=设置失败
*/
public boolean expire(final String key, final long timeout)
{
return expire(key, timeout, TimeUnit.SECONDS);
}
/**
* 设置有效时间
*
* @param key Redis键
* @param timeout 超时时间
* @param unit 时间单位
* @return true=设置成功false=设置失败
*/
public boolean expire(final String key, final long timeout, final TimeUnit unit)
{
return redisTemplate.expire(key, timeout, unit);
}
/**
* 获取有效时间
*
* @param key Redis键
* @return 有效时间
*/
public long getExpire(final String key)
{
return redisTemplate.getExpire(key);
}
/**
* 判断 key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public Boolean hasKey(String key)
{
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
log.error("Redis hasKey error: key={}", key, e);
return false;
}
}
/**
* 获得缓存的基本对象。
*
* @param key 缓存键值
* @return 缓存键值对应的数据
*/
public <T> T getCacheObject(final String key)
{
try {
ValueOperations<String, T> operation = redisTemplate.opsForValue();
return operation.get(key);
} catch (Exception e) {
log.error("Redis getCacheObject error: key={}", key, e);
return null;
}
}
/**
* 删除单个对象
*
* @param key
*/
public boolean deleteObject(final String key)
{
return redisTemplate.delete(key);
}
/**
* 删除集合对象
*
* @param collection 多个对象
* @return
*/
public boolean deleteObject(final Collection collection)
{
return redisTemplate.delete(collection) > 0;
}
/**
* 缓存List数据
*
* @param key 缓存的键值
* @param dataList 待缓存的List数据
* @return 缓存的对象
*/
public <T> long setCacheList(final String key, final List<T> dataList)
{
Long count = redisTemplate.opsForList().rightPushAll(key, dataList);
return count == null ? 0 : count;
}
/**
* 获得缓存的list对象
*
* @param key 缓存的键值
* @return 缓存键值对应的数据
*/
public <T> List<T> getCacheList(final String key)
{
return redisTemplate.opsForList().range(key, 0, -1);
}
/**
* 缓存Set
*
* @param key 缓存键值
* @param dataSet 缓存的数据
* @return 缓存数据的对象
*/
public <T> BoundSetOperations<String, T> setCacheSet(final String key, final Set<T> dataSet)
{
BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key);
Iterator<T> it = dataSet.iterator();
while (it.hasNext())
{
setOperation.add(it.next());
}
return setOperation;
}
/**
* 获得缓存的set
*
* @param key
* @return
*/
public <T> Set<T> getCacheSet(final String key)
{
return redisTemplate.opsForSet().members(key);
}
/**
* 缓存Map
*
* @param key
* @param dataMap
*/
public <T> void setCacheMap(final String key, final Map<String, T> dataMap)
{
if (dataMap != null) {
redisTemplate.opsForHash().putAll(key, dataMap);
}
}
/**
* 获得缓存的Map
*
* @param key
* @return
*/
public <T> Map<String, T> getCacheMap(final String key)
{
return redisTemplate.opsForHash().entries(key);
}
/**
* 往Hash中存入数据
*
* @param key Redis键
* @param hKey Hash键
* @param value 值
*/
public <T> void setCacheMapValue(final String key, final String hKey, final T value)
{
redisTemplate.opsForHash().put(key, hKey, value);
}
/**
* 获取Hash中的数据
*
* @param key Redis键
* @param hKey Hash键
* @return Hash中的对象
*/
public <T> T getCacheMapValue(final String key, final String hKey)
{
HashOperations<String, String, T> opsForHash = redisTemplate.opsForHash();
return opsForHash.get(key, hKey);
}
/**
* 获取多个Hash中的数据
*
* @param key Redis键
* @param hKeys Hash键集合
* @return Hash对象集合
*/
public <T> List<T> getMultiCacheMapValue(final String key, final Collection<Object> hKeys)
{
return redisTemplate.opsForHash().multiGet(key, hKeys);
}
/**
* 删除Hash中的某条数据
*
* @param key Redis键
* @param hKey Hash键
* @return 是否成功
*/
public boolean deleteCacheMapValue(final String key, final String hKey)
{
return redisTemplate.opsForHash().delete(key, hKey) > 0;
}
/**
* 获得缓存的基本对象列表
*
* @param pattern 字符串前缀
* @return 对象列表
*/
public Collection<String> keys(final String pattern)
{
return redisTemplate.keys(pattern);
}
}

View File

@@ -3,5 +3,11 @@ package org.dromara.mica.mqtt.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.dromara.mica.mqtt.server.entity.CarInfo;
import java.util.List;
public interface ICarInfoService extends IService<CarInfo> {
CarInfo selectCarInfoBySn(String sn);
List<CarInfo> selectCarInfoList(CarInfo car);
}

View File

@@ -0,0 +1,9 @@
package org.dromara.mica.mqtt.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.dromara.mica.mqtt.server.entity.CarParkItem;
public interface ICarParkItemService extends IService<CarParkItem> {
CarParkItem selectBySn(String sn);
}

View File

@@ -0,0 +1,11 @@
package org.dromara.mica.mqtt.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.dromara.mica.mqtt.server.entity.CarParkRecord;
public interface ICarParkRecordService extends IService<CarParkRecord> {
int updateByClientId(CarParkRecord carParkRecord);
void delByClientId(CarParkRecord carParkRecord);
}

View File

@@ -0,0 +1,9 @@
package org.dromara.mica.mqtt.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.dromara.mica.mqtt.server.entity.CarPassGather;
public interface ICarPassGatherService extends IService<CarPassGather> {
void deleteByLicense(String license);
}

View File

@@ -0,0 +1,8 @@
package org.dromara.mica.mqtt.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.dromara.mica.mqtt.server.entity.CarPassRecord;
public interface ICarPassRecordService extends IService<CarPassRecord> {
}

Some files were not shown because too many files have changed in this diff Show More