From 7180eedc290951912f0389e096e672746a84cb47 Mon Sep 17 00:00:00 2001 From: zc Date: Tue, 9 Dec 2025 14:05:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96=EF=BC=8C?= =?UTF-8?q?=E4=B8=8A=E7=BA=BF=E7=A6=BB=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pom.xml | 6 ++ .../mqtt/server/config/MybatisPlusConfig.java | 18 +++--- .../mqtt/server/constant/CacheConstants.java | 24 ++++++++ .../mica/mqtt/server/enums/FlagEnums.java | 27 +++++++++ .../server/listener/CarMessageListener.java | 19 ++++--- .../listener/MqttConnectStatusListener2.java | 22 ++++--- .../server/service/IEquipmentService.java | 6 ++ .../service/impl/EquipmentServiceImpl.java | 14 +++++ .../mqtt/server/task/HeartbeatOnLine.java | 57 +++++++++++++++++++ .../mica/mqtt/server/task/PublishAllTask.java | 10 ++-- .../src/main/resources/application.yml | 5 -- .../main/resources/mapper/CarInfoMapper.xml | 43 ++++++++++++++ 12 files changed, 218 insertions(+), 33 deletions(-) create mode 100644 example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/constant/CacheConstants.java create mode 100644 example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/enums/FlagEnums.java create mode 100644 example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLine.java diff --git a/example/mica-mqtt-server-spring-boot-example/pom.xml b/example/mica-mqtt-server-spring-boot-example/pom.xml index 43a9b6b..b1ecd8f 100644 --- a/example/mica-mqtt-server-spring-boot-example/pom.xml +++ b/example/mica-mqtt-server-spring-boot-example/pom.xml @@ -64,6 +64,12 @@ com.baomidou mybatis-plus-spring-boot4-starter 3.5.14 + + + + + + diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/config/MybatisPlusConfig.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/config/MybatisPlusConfig.java index 39bea1a..891090b 100644 --- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/config/MybatisPlusConfig.java +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/config/MybatisPlusConfig.java @@ -1,11 +1,3 @@ -/** - * Copyright (c) 2018 人人开源 All rights reserved. - * - * https://www.renren.io - * - * 版权所有,侵权必究! - */ - package org.dromara.mica.mqtt.server.config; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; @@ -33,4 +25,14 @@ public class MybatisPlusConfig { return mybatisPlusInterceptor; } +// @Bean +// public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception { +// MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); +// sqlSessionFactoryBean.setDataSource(dataSource); +// sqlSessionFactoryBean.setMapperLocations( +// new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/**/*.xml") +// ); +// return sqlSessionFactoryBean.getObject(); +// } + } diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/constant/CacheConstants.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/constant/CacheConstants.java new file mode 100644 index 0000000..d25490c --- /dev/null +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/constant/CacheConstants.java @@ -0,0 +1,24 @@ +package org.dromara.mica.mqtt.server.constant; + +/** + * 缓存常量信息 + */ +public class CacheConstants +{ + + /** + * 设备心跳缓存key + */ + public static final String EQUIPMENT_HEARTBEAT = "equipment:heartbeat:"; + + /** + * 设备心跳缓存过期时间 + */ + public static final long OFFLINE_THRESHOLD = 15 * 1000; + + /** + * 设备在线状态缓存key(数据库映射值) + */ + public static final String EQUIPMENT_FLAG = "equipment:flag:"; + +} diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/enums/FlagEnums.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/enums/FlagEnums.java new file mode 100644 index 0000000..428d667 --- /dev/null +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/enums/FlagEnums.java @@ -0,0 +1,27 @@ +package org.dromara.mica.mqtt.server.enums; + +/** + * 设备是否在线 + * + */ +public enum FlagEnums { + ONLINE("0", "在线"), + OFFLINE("1", "离线") + ; + + FlagEnums(String code, String name) { + this.code = code; + this.name = name; + } + + private String code; + private String name; + + public String getCode() { + return code; + } + + public String getValue() { + return name; + } +} diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java index 6e59a71..66e19d9 100644 --- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java @@ -2,8 +2,12 @@ package org.dromara.mica.mqtt.server.listener; import lombok.extern.slf4j.Slf4j; import org.dromara.mica.mqtt.codec.message.MqttPublishMessage; +import org.dromara.mica.mqtt.server.constant.CacheConstants; +import org.dromara.mica.mqtt.server.enums.FlagEnums; import org.dromara.mica.mqtt.server.pojo.User; import org.dromara.mica.mqtt.core.annotation.MqttServerFunction; +import org.dromara.mica.mqtt.server.redis.RedisService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestParam; @@ -11,6 +15,7 @@ import org.tio.core.ChannelContext; import org.tio.core.Node; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * 消息监听器 @@ -21,16 +26,16 @@ import java.util.Map; @Service public class CarMessageListener { + @Autowired + RedisService redisService; @MqttServerFunction("device/${sn}/message/up/keep_alive") public void onKeepAliveMessage(String topic, Map topicVars, byte[] message) { - log.info("onKeepAliveMessage topic:{}", topic); - log.info("onKeepAliveMessage topicVars:{}", topicVars); - log.info("接收到心跳消息 -> Topic: {}, Payload: {}", topic, new String(message)); - // 在这里添加您的业务逻辑,例如: - // 1. 解析 payload - // 2. 更新设备在线状态 - // 3. 回复心跳响应等 + String sn = topicVars.get("sn"); + log.info("接收到来自客户端 [{}] 的心跳消息 -> Topic: {}", sn, topic); + + // 更新客户端的最后心跳 + redisService.setCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + sn, FlagEnums.ONLINE.getCode(), CacheConstants.OFFLINE_THRESHOLD, TimeUnit.MILLISECONDS); } @MqttServerFunction("device/${sn}/message/down/white_list_operator/reply") diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/MqttConnectStatusListener2.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/MqttConnectStatusListener2.java index 2b62750..34dc7e6 100644 --- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/MqttConnectStatusListener2.java +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/listener/MqttConnectStatusListener2.java @@ -19,13 +19,19 @@ package org.dromara.mica.mqtt.server.listener; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import lombok.extern.slf4j.Slf4j; import org.dromara.mica.mqtt.core.server.event.IMqttConnectStatusListener; +import org.dromara.mica.mqtt.server.constant.CacheConstants; import org.dromara.mica.mqtt.server.entity.Equipment; +import org.dromara.mica.mqtt.server.enums.FlagEnums; import org.dromara.mica.mqtt.server.mapper.EquipmentMapper; +import org.dromara.mica.mqtt.server.redis.RedisService; import org.dromara.mica.mqtt.server.service.IEquipmentService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.tio.core.ChannelContext; +import java.util.concurrent.TimeUnit; + /** * mqtt 连接状态 * @@ -35,25 +41,25 @@ import org.tio.core.ChannelContext; @Service public class MqttConnectStatusListener2 implements IMqttConnectStatusListener { - private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(MqttConnectStatusListener2.class); @Autowired - EquipmentMapper equipmentMapper; + IEquipmentService equipmentService; + + @Autowired + RedisService redisService; @Override public void online(ChannelContext context, String clientId, String username) { + //设备上线不做任何处理,只有心跳报文做处理 log.info("online-context: {}", context); - Equipment equipment = new Equipment(); - equipment.setFlag("0"); - equipmentMapper.update(equipment, new QueryWrapper().eq("sequence", clientId).eq("product_id", 4L)); log.info("设备:{}上线", clientId); } @Override + @Transactional(rollbackFor = Exception.class) public void offline(ChannelContext context, String clientId, String username, String reason) { log.info("offline-context: {}", context); - Equipment equipment = new Equipment(); - equipment.setFlag("1"); - equipmentMapper.update(equipment, new QueryWrapper().eq("sequence", clientId).eq("product_id", 4L)); + equipmentService.updateFlag(clientId, FlagEnums.OFFLINE.getCode()); + redisService.deleteObject(CacheConstants.EQUIPMENT_HEARTBEAT + clientId); log.info("设备:{}离线,offline reason:{}.", clientId, reason); } } diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/IEquipmentService.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/IEquipmentService.java index 47fc306..04ee0b4 100644 --- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/IEquipmentService.java +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/IEquipmentService.java @@ -3,7 +3,13 @@ package org.dromara.mica.mqtt.server.service; import com.baomidou.mybatisplus.extension.service.IService; import org.dromara.mica.mqtt.server.entity.Equipment; +import java.util.List; + public interface IEquipmentService extends IService { Equipment selectEquipmentBySn(String sn); + + List selectAllSnFlag(); + + void updateFlag(String sn, String flag); } diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/impl/EquipmentServiceImpl.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/impl/EquipmentServiceImpl.java index c7ded54..286d8c4 100644 --- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/impl/EquipmentServiceImpl.java +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/service/impl/EquipmentServiceImpl.java @@ -8,6 +8,8 @@ import org.dromara.mica.mqtt.server.service.IEquipmentService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; + @Service public class EquipmentServiceImpl extends ServiceImpl implements IEquipmentService { @@ -18,4 +20,16 @@ public class EquipmentServiceImpl extends ServiceImpl().eq("sequence", sn).eq("product_id", 4L).last("limit 1")); } + + @Override + public List selectAllSnFlag() { + return equipmentMapper.selectList(new QueryWrapper().eq("product_id", 4L).select("sequence", "flag")); + } + + @Override + public void updateFlag(String sn, String flag) { + Equipment equipment = new Equipment(); + equipment.setFlag(flag); + equipmentMapper.update(equipment, new QueryWrapper().eq("sequence", sn).eq("product_id", 4L)); + } } diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLine.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLine.java new file mode 100644 index 0000000..ea72595 --- /dev/null +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLine.java @@ -0,0 +1,57 @@ +package org.dromara.mica.mqtt.server.task; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import lombok.extern.slf4j.Slf4j; +import org.dromara.mica.mqtt.server.constant.CacheConstants; +import org.dromara.mica.mqtt.server.entity.Equipment; +import org.dromara.mica.mqtt.server.enums.FlagEnums; +import org.dromara.mica.mqtt.server.mapper.EquipmentMapper; +import org.dromara.mica.mqtt.server.redis.RedisService; +import org.dromara.mica.mqtt.server.service.IEquipmentService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.tio.utils.hutool.StrUtil; + +import java.util.List; + +/** + * 检查设备心跳,判断设备是否在线 + */ +@Slf4j +@Service +public class HeartbeatOnLine { + + @Autowired + RedisService redisService; + + @Autowired + IEquipmentService equipmentService; + + @Scheduled(fixedRate = 15 * 1000) + public void run() { + log.info("===========心跳检测============="); + //查询车辆摄像头的编码和在线状态 + List equipment = equipmentService.selectAllSnFlag(); + + for (Equipment equip : equipment) { + //缓存中有该设备心跳key + if (redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence())) { + String flag = redisService.getCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence()); + //有心跳,但是和数据库设备在线状态不一致 + if (!StrUtil.equals(flag, equip.getFlag())) { + log.info("设备:{},flag:{}", equip.getSequence(), flag); + equipmentService.updateFlag(equip.getSequence(), flag); + } + } else { + //没有心跳上传,且设备在线,将设备置为离线 + if (StrUtil.equals(FlagEnums.ONLINE.getCode(), equip.getFlag())) { + log.info("设备:{},无心跳离线", equip.getSequence()); + equipmentService.updateFlag(equip.getSequence(), FlagEnums.OFFLINE.getCode()); + } + } + + } + + } +} diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/PublishAllTask.java b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/PublishAllTask.java index aba2f86..b53f783 100644 --- a/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/PublishAllTask.java +++ b/example/mica-mqtt-server-spring-boot-example/src/main/java/org/dromara/mica/mqtt/server/task/PublishAllTask.java @@ -16,10 +16,10 @@ public class PublishAllTask { @Autowired private MqttServer mqttServer; - @Scheduled(fixedDelay = 1000) - public void run() { - mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8)); - mqttServer.publishAll("/test/object", User.newUser()); - } +// @Scheduled(fixedDelay = 1000) +// public void run() { +// mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8)); +// mqttServer.publishAll("/test/object", User.newUser()); +// } } diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/resources/application.yml b/example/mica-mqtt-server-spring-boot-example/src/main/resources/application.yml index fa22bf7..fd459ed 100644 --- a/example/mica-mqtt-server-spring-boot-example/src/main/resources/application.yml +++ b/example/mica-mqtt-server-spring-boot-example/src/main/resources/application.yml @@ -24,11 +24,6 @@ spring: max-request-size: 100MB enabled: true - -#mybatis-plus: -# configuration: -# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl - #mybatis #mybatis-plus: # mapper-locations: classpath*:/mapper/**/*.xml diff --git a/example/mica-mqtt-server-spring-boot-example/src/main/resources/mapper/CarInfoMapper.xml b/example/mica-mqtt-server-spring-boot-example/src/main/resources/mapper/CarInfoMapper.xml index b169760..0992191 100644 --- a/example/mica-mqtt-server-spring-boot-example/src/main/resources/mapper/CarInfoMapper.xml +++ b/example/mica-mqtt-server-spring-boot-example/src/main/resources/mapper/CarInfoMapper.xml @@ -4,4 +4,47 @@ "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> + + + + + + + + + + + + + + + + + + + select ci.customer_id, ci.enable_time, ci.overdue_time, ci.enable, ci.plate, ci.time_seg_enable, ci.seg_time, + ci.need_alarm, ci.vehicle_code, ci.vehicle_comment, ci.people_id, ci.del_flag, ci.sync, ci.remark, + ci.create_by, ci.create_time, ci.update_by, ci.update_time, ci.overclock_card + from car_info ci + left join car_park cp on ci.park_id=cp.id + left join sys_people p on ci.people_id=p.id + left join car_park_record cpr on cpr.customer_id=ci.customer_id + left join sys_equipment se on cpr.equipment_id=se.id + + + +