代码优化,上线离线

This commit is contained in:
zc
2025-12-09 14:05:36 +08:00
parent 963dfc2313
commit 7180eedc29
12 changed files with 218 additions and 33 deletions

View File

@@ -64,6 +64,12 @@
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot4-starter</artifactId>
<version>3.5.14</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>com.baomidou</groupId>-->
<!-- <artifactId>mybatis-plus-spring-boot-autoconfigure</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>
<dependency>

View File

@@ -1,11 +1,3 @@
/**
* Copyright (c) 2018 人人开源 All rights reserved.
*
* https://www.renren.io
*
* 版权所有,侵权必究!
*/
package org.dromara.mica.mqtt.server.config;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
@@ -33,4 +25,14 @@ public class MybatisPlusConfig {
return mybatisPlusInterceptor;
}
// @Bean
// public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
// MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
// sqlSessionFactoryBean.setDataSource(dataSource);
// sqlSessionFactoryBean.setMapperLocations(
// new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/**/*.xml")
// );
// return sqlSessionFactoryBean.getObject();
// }
}

View File

@@ -0,0 +1,24 @@
package org.dromara.mica.mqtt.server.constant;
/**
* 缓存常量信息
*/
public class CacheConstants
{
/**
* 设备心跳缓存key
*/
public static final String EQUIPMENT_HEARTBEAT = "equipment:heartbeat:";
/**
* 设备心跳缓存过期时间
*/
public static final long OFFLINE_THRESHOLD = 15 * 1000;
/**
* 设备在线状态缓存key数据库映射值
*/
public static final String EQUIPMENT_FLAG = "equipment:flag:";
}

View File

@@ -0,0 +1,27 @@
package org.dromara.mica.mqtt.server.enums;
/**
* 设备是否在线
*
*/
public enum FlagEnums {
ONLINE("0", "在线"),
OFFLINE("1", "离线")
;
FlagEnums(String code, String name) {
this.code = code;
this.name = name;
}
private String code;
private String name;
public String getCode() {
return code;
}
public String getValue() {
return name;
}
}

View File

@@ -2,8 +2,12 @@ package org.dromara.mica.mqtt.server.listener;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.server.constant.CacheConstants;
import org.dromara.mica.mqtt.server.enums.FlagEnums;
import org.dromara.mica.mqtt.server.pojo.User;
import org.dromara.mica.mqtt.core.annotation.MqttServerFunction;
import org.dromara.mica.mqtt.server.redis.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
@@ -11,6 +15,7 @@ import org.tio.core.ChannelContext;
import org.tio.core.Node;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 消息监听器
@@ -21,16 +26,16 @@ import java.util.Map;
@Service
public class CarMessageListener {
@Autowired
RedisService redisService;
@MqttServerFunction("device/${sn}/message/up/keep_alive")
public void onKeepAliveMessage(String topic, Map<String, String> topicVars, byte[] message) {
log.info("onKeepAliveMessage topic:{}", topic);
log.info("onKeepAliveMessage topicVars:{}", topicVars);
log.info("接收到心跳消息 -> Topic: {}, Payload: {}", topic, new String(message));
// 在这里添加您的业务逻辑,例如:
// 1. 解析 payload
// 2. 更新设备在线状态
// 3. 回复心跳响应等
String sn = topicVars.get("sn");
log.info("接收到来自客户端 [{}] 的心跳消息 -> Topic: {}", sn, topic);
// 更新客户端的最后心跳
redisService.setCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + sn, FlagEnums.ONLINE.getCode(), CacheConstants.OFFLINE_THRESHOLD, TimeUnit.MILLISECONDS);
}
@MqttServerFunction("device/${sn}/message/down/white_list_operator/reply")

View File

@@ -19,13 +19,19 @@ package org.dromara.mica.mqtt.server.listener;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.core.server.event.IMqttConnectStatusListener;
import org.dromara.mica.mqtt.server.constant.CacheConstants;
import org.dromara.mica.mqtt.server.entity.Equipment;
import org.dromara.mica.mqtt.server.enums.FlagEnums;
import org.dromara.mica.mqtt.server.mapper.EquipmentMapper;
import org.dromara.mica.mqtt.server.redis.RedisService;
import org.dromara.mica.mqtt.server.service.IEquipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.tio.core.ChannelContext;
import java.util.concurrent.TimeUnit;
/**
* mqtt 连接状态
*
@@ -35,25 +41,25 @@ import org.tio.core.ChannelContext;
@Service
public class MqttConnectStatusListener2 implements IMqttConnectStatusListener {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(MqttConnectStatusListener2.class);
@Autowired
EquipmentMapper equipmentMapper;
IEquipmentService equipmentService;
@Autowired
RedisService redisService;
@Override
public void online(ChannelContext context, String clientId, String username) {
//设备上线不做任何处理,只有心跳报文做处理
log.info("online-context: {}", context);
Equipment equipment = new Equipment();
equipment.setFlag("0");
equipmentMapper.update(equipment, new QueryWrapper<Equipment>().eq("sequence", clientId).eq("product_id", 4L));
log.info("设备:{}上线", clientId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void offline(ChannelContext context, String clientId, String username, String reason) {
log.info("offline-context: {}", context);
Equipment equipment = new Equipment();
equipment.setFlag("1");
equipmentMapper.update(equipment, new QueryWrapper<Equipment>().eq("sequence", clientId).eq("product_id", 4L));
equipmentService.updateFlag(clientId, FlagEnums.OFFLINE.getCode());
redisService.deleteObject(CacheConstants.EQUIPMENT_HEARTBEAT + clientId);
log.info("设备:{}离线offline reason:{}.", clientId, reason);
}
}

View File

@@ -3,7 +3,13 @@ package org.dromara.mica.mqtt.server.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.dromara.mica.mqtt.server.entity.Equipment;
import java.util.List;
public interface IEquipmentService extends IService<Equipment> {
Equipment selectEquipmentBySn(String sn);
List<Equipment> selectAllSnFlag();
void updateFlag(String sn, String flag);
}

View File

@@ -8,6 +8,8 @@ import org.dromara.mica.mqtt.server.service.IEquipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class EquipmentServiceImpl extends ServiceImpl<EquipmentMapper, Equipment> implements IEquipmentService {
@@ -18,4 +20,16 @@ public class EquipmentServiceImpl extends ServiceImpl<EquipmentMapper, Equipment
public Equipment selectEquipmentBySn(String sn) {
return equipmentMapper.selectOne(new QueryWrapper<Equipment>().eq("sequence", sn).eq("product_id", 4L).last("limit 1"));
}
@Override
public List<Equipment> selectAllSnFlag() {
return equipmentMapper.selectList(new QueryWrapper<Equipment>().eq("product_id", 4L).select("sequence", "flag"));
}
@Override
public void updateFlag(String sn, String flag) {
Equipment equipment = new Equipment();
equipment.setFlag(flag);
equipmentMapper.update(equipment, new QueryWrapper<Equipment>().eq("sequence", sn).eq("product_id", 4L));
}
}

View File

@@ -0,0 +1,57 @@
package org.dromara.mica.mqtt.server.task;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.dromara.mica.mqtt.server.constant.CacheConstants;
import org.dromara.mica.mqtt.server.entity.Equipment;
import org.dromara.mica.mqtt.server.enums.FlagEnums;
import org.dromara.mica.mqtt.server.mapper.EquipmentMapper;
import org.dromara.mica.mqtt.server.redis.RedisService;
import org.dromara.mica.mqtt.server.service.IEquipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.tio.utils.hutool.StrUtil;
import java.util.List;
/**
* 检查设备心跳,判断设备是否在线
*/
@Slf4j
@Service
public class HeartbeatOnLine {
@Autowired
RedisService redisService;
@Autowired
IEquipmentService equipmentService;
@Scheduled(fixedRate = 15 * 1000)
public void run() {
log.info("===========心跳检测=============");
//查询车辆摄像头的编码和在线状态
List<Equipment> equipment = equipmentService.selectAllSnFlag();
for (Equipment equip : equipment) {
//缓存中有该设备心跳key
if (redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence())) {
String flag = redisService.getCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence());
//有心跳,但是和数据库设备在线状态不一致
if (!StrUtil.equals(flag, equip.getFlag())) {
log.info("设备:{}flag:{}", equip.getSequence(), flag);
equipmentService.updateFlag(equip.getSequence(), flag);
}
} else {
//没有心跳上传,且设备在线,将设备置为离线
if (StrUtil.equals(FlagEnums.ONLINE.getCode(), equip.getFlag())) {
log.info("设备:{},无心跳离线", equip.getSequence());
equipmentService.updateFlag(equip.getSequence(), FlagEnums.OFFLINE.getCode());
}
}
}
}
}

View File

@@ -16,10 +16,10 @@ public class PublishAllTask {
@Autowired
private MqttServer mqttServer;
@Scheduled(fixedDelay = 1000)
public void run() {
mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
mqttServer.publishAll("/test/object", User.newUser());
}
// @Scheduled(fixedDelay = 1000)
// public void run() {
// mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
// mqttServer.publishAll("/test/object", User.newUser());
// }
}

View File

@@ -24,11 +24,6 @@ spring:
max-request-size: 100MB
enabled: true
#mybatis-plus:
# configuration:
# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#mybatis
#mybatis-plus:
# mapper-locations: classpath*:/mapper/**/*.xml

View File

@@ -4,4 +4,47 @@
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.dromara.mica.mqtt.server.mapper.CarInfoMapper">
<resultMap type="org.dromara.mica.mqtt.server.entity.CarInfo" id="CarInfoResult">
<result property="customerId" column="customer_id" />
<result property="enableTime" column="enable_time" />
<result property="overdueTime" column="overdue_time" />
<result property="enable" column="enable" />
<result property="plate" column="plate" />
<result property="timeSegEnable" column="time_seg_enable" />
<result property="segTime" column="seg_time" />
<result property="needAlarm" column="need_alarm" />
<result property="vehicleCode" column="vehicle_code" />
<result property="vehicleComment" column="vehicle_comment" />
<result property="peopleId" column="people_id" />
<result property="delFlag" column="del_flag" />
<result property="sync" column="sync" />
<result property="overclockCard" column="overclock_card" />
</resultMap>
<sql id="selectCarInfoVo">
select ci.customer_id, ci.enable_time, ci.overdue_time, ci.enable, ci.plate, ci.time_seg_enable, ci.seg_time,
ci.need_alarm, ci.vehicle_code, ci.vehicle_comment, ci.people_id, ci.del_flag, ci.sync, ci.remark,
ci.create_by, ci.create_time, ci.update_by, ci.update_time, ci.overclock_card
from car_info ci
left join car_park cp on ci.park_id=cp.id
left join sys_people p on ci.people_id=p.id
left join car_park_record cpr on cpr.customer_id=ci.customer_id
left join sys_equipment se on cpr.equipment_id=se.id
</sql>
<select id="selectCarInfoList" parameterType="org.dromara.mica.mqtt.server.entity.CarInfo" resultMap="CarInfoResult">
<include refid="selectCarInfoVo"/>
<where>
<if test="enableTime != null "> and ci.enable_time = #{enableTime}</if>
<if test="overdueTime != null "> and ci.overdue_time = #{overdueTime}</if>
<if test="enable != null and enable != ''"> and ci.enable = #{enable}</if>
<if test="plate != null and plate != ''"> and ci.plate = #{plate}</if>
<if test="peopleId != null "> and ci.people_id = #{peopleId}</if>
<if test="sn != null "> and se.sequence = #{sn}</if>
<if test="delFlag != null "> and ci.del_flag = #{delFlag}</if>
<if test="sync != null "> and cpr.sync = #{sync}</if>
limit 10
</where>
</select>
</mapper>