diff --git a/example/mica-mqtt-server-spring-boot-example/pom.xml b/example/mica-mqtt-server-spring-boot-example/pom.xml
index 43a9b6b..b1ecd8f 100644
--- a/example/mica-mqtt-server-spring-boot-example/pom.xml
+++ b/example/mica-mqtt-server-spring-boot-example/pom.xml
@@ -64,6 +64,12 @@
com.baomidou
mybatis-plus-spring-boot4-starter
3.5.14
+
+
+
+
+
+
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/config/MybatisPlusConfig.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/config/MybatisPlusConfig.java
index 39bea1a..891090b 100644
--- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/config/MybatisPlusConfig.java
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/config/MybatisPlusConfig.java
@@ -1,11 +1,3 @@
-/**
- * Copyright (c) 2018 人人开源 All rights reserved.
- *
- * https://www.renren.io
- *
- * 版权所有,侵权必究!
- */
-
package org.dromara.mica.mqtt.server.config;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
@@ -33,4 +25,14 @@ public class MybatisPlusConfig {
return mybatisPlusInterceptor;
}
+// @Bean
+// public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
+// MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
+// sqlSessionFactoryBean.setDataSource(dataSource);
+// sqlSessionFactoryBean.setMapperLocations(
+// new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/**/*.xml")
+// );
+// return sqlSessionFactoryBean.getObject();
+// }
+
}
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/constant/CacheConstants.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/constant/CacheConstants.java
new file mode 100644
index 0000000..d25490c
--- /dev/null
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/constant/CacheConstants.java
@@ -0,0 +1,24 @@
+package org.dromara.mica.mqtt.server.constant;
+
+/**
+ * 缓存常量信息
+ */
+public class CacheConstants
+{
+
+ /**
+ * 设备心跳缓存key
+ */
+ public static final String EQUIPMENT_HEARTBEAT = "equipment:heartbeat:";
+
+ /**
+ * 设备心跳缓存过期时间
+ */
+ public static final long OFFLINE_THRESHOLD = 15 * 1000;
+
+ /**
+ * 设备在线状态缓存key(数据库映射值)
+ */
+ public static final String EQUIPMENT_FLAG = "equipment:flag:";
+
+}
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/enums/FlagEnums.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/enums/FlagEnums.java
new file mode 100644
index 0000000..428d667
--- /dev/null
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/enums/FlagEnums.java
@@ -0,0 +1,27 @@
+package org.dromara.mica.mqtt.server.enums;
+
+/**
+ * 设备是否在线
+ *
+ */
+public enum FlagEnums {
+ ONLINE("0", "在线"),
+ OFFLINE("1", "离线")
+ ;
+
+ FlagEnums(String code, String name) {
+ this.code = code;
+ this.name = name;
+ }
+
+ private String code;
+ private String name;
+
+ public String getCode() {
+ return code;
+ }
+
+ public String getValue() {
+ return name;
+ }
+}
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java
index 6e59a71..66e19d9 100644
--- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java
@@ -2,8 +2,12 @@ package org.dromara.mica.mqtt.server.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
+import org.dromara.mica.mqtt.server.constant.CacheConstants;
+import org.dromara.mica.mqtt.server.enums.FlagEnums;
import org.dromara.mica.mqtt.server.pojo.User;
import org.dromara.mica.mqtt.core.annotation.MqttServerFunction;
+import org.dromara.mica.mqtt.server.redis.RedisService;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
@@ -11,6 +15,7 @@ import org.tio.core.ChannelContext;
import org.tio.core.Node;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* 消息监听器
@@ -21,16 +26,16 @@ import java.util.Map;
@Service
public class CarMessageListener {
+ @Autowired
+ RedisService redisService;
@MqttServerFunction("device/${sn}/message/up/keep_alive")
public void onKeepAliveMessage(String topic, Map topicVars, byte[] message) {
- log.info("onKeepAliveMessage topic:{}", topic);
- log.info("onKeepAliveMessage topicVars:{}", topicVars);
- log.info("接收到心跳消息 -> Topic: {}, Payload: {}", topic, new String(message));
- // 在这里添加您的业务逻辑,例如:
- // 1. 解析 payload
- // 2. 更新设备在线状态
- // 3. 回复心跳响应等
+ String sn = topicVars.get("sn");
+ log.info("接收到来自客户端 [{}] 的心跳消息 -> Topic: {}", sn, topic);
+
+ // 更新客户端的最后心跳
+ redisService.setCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + sn, FlagEnums.ONLINE.getCode(), CacheConstants.OFFLINE_THRESHOLD, TimeUnit.MILLISECONDS);
}
@MqttServerFunction("device/${sn}/message/down/white_list_operator/reply")
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/MqttConnectStatusListener2.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/MqttConnectStatusListener2.java
index 2b62750..34dc7e6 100644
--- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/MqttConnectStatusListener2.java
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/MqttConnectStatusListener2.java
@@ -19,13 +19,19 @@ package org.dromara.mica.mqtt.server.listener;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.core.server.event.IMqttConnectStatusListener;
+import org.dromara.mica.mqtt.server.constant.CacheConstants;
import org.dromara.mica.mqtt.server.entity.Equipment;
+import org.dromara.mica.mqtt.server.enums.FlagEnums;
import org.dromara.mica.mqtt.server.mapper.EquipmentMapper;
+import org.dromara.mica.mqtt.server.redis.RedisService;
import org.dromara.mica.mqtt.server.service.IEquipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.tio.core.ChannelContext;
+import java.util.concurrent.TimeUnit;
+
/**
* mqtt 连接状态
*
@@ -35,25 +41,25 @@ import org.tio.core.ChannelContext;
@Service
public class MqttConnectStatusListener2 implements IMqttConnectStatusListener {
- private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(MqttConnectStatusListener2.class);
@Autowired
- EquipmentMapper equipmentMapper;
+ IEquipmentService equipmentService;
+
+ @Autowired
+ RedisService redisService;
@Override
public void online(ChannelContext context, String clientId, String username) {
+ //设备上线不做任何处理,只有心跳报文做处理
log.info("online-context: {}", context);
- Equipment equipment = new Equipment();
- equipment.setFlag("0");
- equipmentMapper.update(equipment, new QueryWrapper().eq("sequence", clientId).eq("product_id", 4L));
log.info("设备:{}上线", clientId);
}
@Override
+ @Transactional(rollbackFor = Exception.class)
public void offline(ChannelContext context, String clientId, String username, String reason) {
log.info("offline-context: {}", context);
- Equipment equipment = new Equipment();
- equipment.setFlag("1");
- equipmentMapper.update(equipment, new QueryWrapper().eq("sequence", clientId).eq("product_id", 4L));
+ equipmentService.updateFlag(clientId, FlagEnums.OFFLINE.getCode());
+ redisService.deleteObject(CacheConstants.EQUIPMENT_HEARTBEAT + clientId);
log.info("设备:{}离线,offline reason:{}.", clientId, reason);
}
}
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/IEquipmentService.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/IEquipmentService.java
index 47fc306..04ee0b4 100644
--- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/IEquipmentService.java
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/IEquipmentService.java
@@ -3,7 +3,13 @@ package org.dromara.mica.mqtt.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.dromara.mica.mqtt.server.entity.Equipment;
+import java.util.List;
+
public interface IEquipmentService extends IService {
Equipment selectEquipmentBySn(String sn);
+
+ List selectAllSnFlag();
+
+ void updateFlag(String sn, String flag);
}
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/impl/EquipmentServiceImpl.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/impl/EquipmentServiceImpl.java
index c7ded54..286d8c4 100644
--- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/impl/EquipmentServiceImpl.java
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/impl/EquipmentServiceImpl.java
@@ -8,6 +8,8 @@ import org.dromara.mica.mqtt.server.service.IEquipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.List;
+
@Service
public class EquipmentServiceImpl extends ServiceImpl implements IEquipmentService {
@@ -18,4 +20,16 @@ public class EquipmentServiceImpl extends ServiceImpl().eq("sequence", sn).eq("product_id", 4L).last("limit 1"));
}
+
+ @Override
+ public List selectAllSnFlag() {
+ return equipmentMapper.selectList(new QueryWrapper().eq("product_id", 4L).select("sequence", "flag"));
+ }
+
+ @Override
+ public void updateFlag(String sn, String flag) {
+ Equipment equipment = new Equipment();
+ equipment.setFlag(flag);
+ equipmentMapper.update(equipment, new QueryWrapper().eq("sequence", sn).eq("product_id", 4L));
+ }
}
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLine.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLine.java
new file mode 100644
index 0000000..ea72595
--- /dev/null
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLine.java
@@ -0,0 +1,57 @@
+package org.dromara.mica.mqtt.server.task;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.mica.mqtt.server.constant.CacheConstants;
+import org.dromara.mica.mqtt.server.entity.Equipment;
+import org.dromara.mica.mqtt.server.enums.FlagEnums;
+import org.dromara.mica.mqtt.server.mapper.EquipmentMapper;
+import org.dromara.mica.mqtt.server.redis.RedisService;
+import org.dromara.mica.mqtt.server.service.IEquipmentService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.tio.utils.hutool.StrUtil;
+
+import java.util.List;
+
+/**
+ * 检查设备心跳,判断设备是否在线
+ */
+@Slf4j
+@Service
+public class HeartbeatOnLine {
+
+ @Autowired
+ RedisService redisService;
+
+ @Autowired
+ IEquipmentService equipmentService;
+
+ @Scheduled(fixedRate = 15 * 1000)
+ public void run() {
+ log.info("===========心跳检测=============");
+ //查询车辆摄像头的编码和在线状态
+ List equipment = equipmentService.selectAllSnFlag();
+
+ for (Equipment equip : equipment) {
+ //缓存中有该设备心跳key
+ if (redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence())) {
+ String flag = redisService.getCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence());
+ //有心跳,但是和数据库设备在线状态不一致
+ if (!StrUtil.equals(flag, equip.getFlag())) {
+ log.info("设备:{},flag:{}", equip.getSequence(), flag);
+ equipmentService.updateFlag(equip.getSequence(), flag);
+ }
+ } else {
+ //没有心跳上传,且设备在线,将设备置为离线
+ if (StrUtil.equals(FlagEnums.ONLINE.getCode(), equip.getFlag())) {
+ log.info("设备:{},无心跳离线", equip.getSequence());
+ equipmentService.updateFlag(equip.getSequence(), FlagEnums.OFFLINE.getCode());
+ }
+ }
+
+ }
+
+ }
+}
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/PublishAllTask.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/PublishAllTask.java
index aba2f86..b53f783 100644
--- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/PublishAllTask.java
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/PublishAllTask.java
@@ -16,10 +16,10 @@ public class PublishAllTask {
@Autowired
private MqttServer mqttServer;
- @Scheduled(fixedDelay = 1000)
- public void run() {
- mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
- mqttServer.publishAll("/test/object", User.newUser());
- }
+// @Scheduled(fixedDelay = 1000)
+// public void run() {
+// mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
+// mqttServer.publishAll("/test/object", User.newUser());
+// }
}
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/resources/application.yml b/example/mica-mqtt-server-spring-boot-example/src/main/resources/application.yml
index fa22bf7..fd459ed 100644
--- a/example/mica-mqtt-server-spring-boot-example/src/main/resources/application.yml
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/resources/application.yml
@@ -24,11 +24,6 @@ spring:
max-request-size: 100MB
enabled: true
-
-#mybatis-plus:
-# configuration:
-# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
-
#mybatis
#mybatis-plus:
# mapper-locations: classpath*:/mapper/**/*.xml
diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/resources/mapper/CarInfoMapper.xml b/example/mica-mqtt-server-spring-boot-example/src/main/resources/mapper/CarInfoMapper.xml
index b169760..0992191 100644
--- a/example/mica-mqtt-server-spring-boot-example/src/main/resources/mapper/CarInfoMapper.xml
+++ b/example/mica-mqtt-server-spring-boot-example/src/main/resources/mapper/CarInfoMapper.xml
@@ -4,4 +4,47 @@
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ select ci.customer_id, ci.enable_time, ci.overdue_time, ci.enable, ci.plate, ci.time_seg_enable, ci.seg_time,
+ ci.need_alarm, ci.vehicle_code, ci.vehicle_comment, ci.people_id, ci.del_flag, ci.sync, ci.remark,
+ ci.create_by, ci.create_time, ci.update_by, ci.update_time, ci.overclock_card
+ from car_info ci
+ left join car_park cp on ci.park_id=cp.id
+ left join sys_people p on ci.people_id=p.id
+ left join car_park_record cpr on cpr.customer_id=ci.customer_id
+ left join sys_equipment se on cpr.equipment_id=se.id
+
+
+
+