Compare commits

5 Commits

Author SHA1 Message Date
zc
800fc68cb9 配置 2026-05-13 09:32:41 +08:00
zc
d4f20f7a67 Merge branch 'refs/heads/dev_zc' into dev_io_zc
# Conflicts:
#	example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/entity/CarPassGather.java
#	example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/listener/CarMessageListener.java
#	example/mqtt-car/src/main/resources/application-dev.yml
#	example/mqtt-car/src/main/resources/application-prod.yml
2026-05-12 11:53:39 +08:00
zc
118729ccb2 优化redis,mysql超时异常处理,定时任务线程池扩容 2026-05-12 11:46:26 +08:00
zc
4c41f6f77e 主键自增 2026-04-30 15:40:37 +08:00
zc
90003d79c5 主键自增 2026-04-30 14:54:25 +08:00
7 changed files with 218 additions and 114 deletions

View File

@@ -0,0 +1,35 @@
package org.dromara.mica.mqtt.server.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* 定时任务线程池配置
* 解决默认定时任务线程池(大小为1)被阻塞导致所有定时任务无法执行的问题
*/
@Slf4j
@Configuration
@EnableScheduling
public class SchedulerConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
// 增加线程池大小,避免单线程阻塞影响所有定时任务
scheduler.setPoolSize(5);
scheduler.setThreadNamePrefix("mqtt-scheduler-");
// 等待所有任务完成后再关闭
scheduler.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间
scheduler.setAwaitTerminationSeconds(60);
// 设置异常处理器,确保异常被记录但不中断定时任务
scheduler.setErrorHandler(throwable -> {
log.error("定时任务执行异常", throwable);
});
return scheduler;
}
}

View File

@@ -80,7 +80,11 @@ public class RedisService {
*/ */
public <T> void setCacheObject(final String key, final T value, final Long timeout, final TimeUnit timeUnit) public <T> void setCacheObject(final String key, final T value, final Long timeout, final TimeUnit timeUnit)
{ {
redisTemplate.opsForValue().set(key, value, timeout, timeUnit); try {
redisTemplate.opsForValue().set(key, value, timeout, timeUnit);
} catch (Exception e) {
log.error("Redis setCacheObject error: key={}", key, e);
}
} }
/** /**
@@ -127,7 +131,12 @@ public class RedisService {
*/ */
public Boolean hasKey(String key) public Boolean hasKey(String key)
{ {
return redisTemplate.hasKey(key); try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
log.error("Redis hasKey error: key={}", key, e);
return false;
}
} }
/** /**
@@ -138,8 +147,13 @@ public class RedisService {
*/ */
public <T> T getCacheObject(final String key) public <T> T getCacheObject(final String key)
{ {
ValueOperations<String, T> operation = redisTemplate.opsForValue(); try {
return operation.get(key); ValueOperations<String, T> operation = redisTemplate.opsForValue();
return operation.get(key);
} catch (Exception e) {
log.error("Redis getCacheObject error: key={}", key, e);
return null;
}
} }
/** /**

View File

@@ -30,28 +30,32 @@ public class HeartbeatOnLineTask {
@Scheduled(fixedRate = 15 * 1000) @Scheduled(fixedRate = 15 * 1000)
public void run() { public void run() {
log.info("===========心跳检测============="); // 添加异常捕获,防止单个任务异常导致定时任务线程阻塞
//查询车辆摄像头的编码和在线状态 try {
List<Equipment> equipment = equipmentService.selectAllSnFlag(); log.info("===========心跳检测=============");
//查询车辆摄像头的编码和在线状态
List<Equipment> equipment = equipmentService.selectAllSnFlag();
for (Equipment equip : equipment) { for (Equipment equip : equipment) {
//缓存中有该设备心跳key //缓存中有该设备心跳key
if (redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence())) { if (redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence())) {
String flag = redisService.getCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence()); String flag = redisService.getCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence());
//有心跳,但是和数据库设备在线状态不一致 //有心跳,但是和数据库设备在线状态不一致
if (!StrUtil.equals(flag, equip.getFlag())) { if (!StrUtil.equals(flag, equip.getFlag())) {
log.info("设备:{}flag:{}", equip.getSequence(), flag); log.info("设备:{}flag:{}", equip.getSequence(), flag);
equipmentService.updateFlag(equip.getSequence(), flag); equipmentService.updateFlag(equip.getSequence(), flag);
} }
} else { } else {
//没有心跳上传,且设备在线,将设备置为离线 //没有心跳上传,且设备在线,将设备置为离线
if (StrUtil.equals(FlagEnums.ONLINE.getCode(), equip.getFlag())) { if (StrUtil.equals(FlagEnums.ONLINE.getCode(), equip.getFlag())) {
log.info("设备:{},无心跳离线", equip.getSequence()); log.info("设备:{},无心跳离线", equip.getSequence());
equipmentService.updateFlag(equip.getSequence(), FlagEnums.OFFLINE.getCode()); equipmentService.updateFlag(equip.getSequence(), FlagEnums.OFFLINE.getCode());
}
} }
} }
} catch (Exception e) {
// 捕获所有异常并记录日志,确保定时任务能继续执行
log.error("HeartbeatOnLineTask 定时任务执行异常", e);
} }
} }
} }

View File

@@ -50,93 +50,98 @@ public class PlatePublishTask {
*/ */
@Scheduled(fixedDelay = 8 * 1000) @Scheduled(fixedDelay = 8 * 1000)
public void run() { public void run() {
String whiteUrl = "device/%s/message/down/white_list_operator"; // 添加异常捕获,防止单个任务异常导致定时任务线程阻塞
try {
String whiteUrl = "device/%s/message/down/white_list_operator";
//新增&编辑车牌 //新增&编辑车牌
CarInfo carInfo = new CarInfo(); CarInfo carInfo = new CarInfo();
carInfo.setDelFlag("0"); carInfo.setDelFlag("0");
carInfo.setSync("0"); carInfo.setSync("0");
List<CarInfo> carInfoList = carInfoService.selectCarInfoList(carInfo); List<CarInfo> carInfoList = carInfoService.selectCarInfoList(carInfo);
if (CollUtil.isNotEmpty(carInfoList)) { if (CollUtil.isNotEmpty(carInfoList)) {
//向设备下发车辆信息 //向设备下发车辆信息
for (CarInfo car : carInfoList) { for (CarInfo car : carInfoList) {
//如果该设备没有心跳,不下发车牌信息 //如果该设备没有心跳,不下发车牌信息
if (!redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + car.getSn())) { if (!redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + car.getSn())) {
continue; continue;
}
String topic = String.format(whiteUrl, car.getSn());
WhiteListOperatorPO po = new WhiteListOperatorPO();
String uuid = "add_" + UuidUtil.getUuid();
po.setId(uuid);
po.setSn(car.getSn());
po.setName("white_list_operator");
//构造数据
po.setPayload(buildPayload(car));
//更新car_park_record表的clientId用于回执消息更新下发状态
//必须在发送前更新,不然会导致发送后,还没更新完数据库,回执已经收到,无法更新数据,导致持续下发车牌
CarParkRecord carParkRecord = new CarParkRecord();
carParkRecord.setId(car.getCarParkRecordId());
carParkRecord.setClientId(uuid);
carParkRecordService.updateById(carParkRecord);
//发布车牌到设备(协议只能单条发布)
boolean publish = mqttServer.publish(car.getSn(), topic, JSON.toJSONString(po).getBytes(StandardCharsets.UTF_8));
log.info("定时任务下发车牌topic:{},报文数据:{},发送结果:{}", topic, JSON.toJSONString(po), publish);
}
}
//删除车牌
CarInfo carDel = new CarInfo();
carDel.setDelFlag("2");
carDel.setSync("1");
List<CarInfo> carInfos = carInfoService.selectCarInfoList(carDel);
if (CollUtil.isNotEmpty(carInfos)) {
//需要下发的设备
List<String> snList = carInfos.stream().map(CarInfo::getSn).distinct().toList();
for (String sn : snList) {
if (!redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + sn)) {
log.error("删除车牌,设备:{},无心跳", sn);
continue;
}
//车辆授权列表
List<CarParkRecord> carParkRecords = new ArrayList<>();
String topic = String.format(whiteUrl, sn);
WhiteListOperatorPO po = new WhiteListOperatorPO();
String uuid = "del_" + UuidUtil.getUuid();
po.setId(uuid);
po.setSn(sn);
po.setName("white_list_operator");
//构造内层数据
List<String> plates = new ArrayList<>();
for (CarInfo car : carInfos) {
plates.add(car.getPlate());
if (StrUtil.equals(car.getSn(), sn)) {
CarParkRecord carParkRecord = new CarParkRecord();
carParkRecord.setClientId(uuid);
carParkRecord.setId(car.getCarParkRecordId());
carParkRecords.add(carParkRecord);
} }
}
po.setPayload(buildPayloadDel(plates));
//更新车辆授权信息 String topic = String.format(whiteUrl, car.getSn());
if (CollUtil.isNotEmpty(carParkRecords)) { WhiteListOperatorPO po = new WhiteListOperatorPO();
carParkRecordService.saveOrUpdateBatch(carParkRecords); String uuid = "add_" + UuidUtil.getUuid();
} po.setId(uuid);
//发布车牌到设备 po.setSn(car.getSn());
boolean publish = mqttServer.publish(sn, topic, JSON.toJSONString(po).getBytes(StandardCharsets.UTF_8)); po.setName("white_list_operator");
log.info("定时任务删除车牌topic:{},报文数据:{},发送结果:{}", topic, JSON.toJSONString(po), publish); //构造数据
po.setPayload(buildPayload(car));
//更新car_park_record表的clientId用于回执消息更新下发状态
//必须在发送前更新,不然会导致发送后,还没更新完数据库,回执已经收到,无法更新数据,导致持续下发车牌
CarParkRecord carParkRecord = new CarParkRecord();
carParkRecord.setId(car.getCarParkRecordId());
carParkRecord.setClientId(uuid);
carParkRecordService.updateById(carParkRecord);
//发布车牌到设备(协议只能单条发布)
boolean publish = mqttServer.publish(car.getSn(), topic, JSON.toJSONString(po).getBytes(StandardCharsets.UTF_8));
log.info("定时任务下发车牌topic:{},报文数据:{},发送结果:{}", topic, JSON.toJSONString(po), publish);
}
} }
}
//删除车牌
CarInfo carDel = new CarInfo();
carDel.setDelFlag("2");
carDel.setSync("1");
List<CarInfo> carInfos = carInfoService.selectCarInfoList(carDel);
if (CollUtil.isNotEmpty(carInfos)) {
//需要下发的设备
List<String> snList = carInfos.stream().map(CarInfo::getSn).distinct().toList();
for (String sn : snList) {
if (!redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + sn)) {
log.error("删除车牌,设备:{},无心跳", sn);
continue;
}
//车辆授权列表
List<CarParkRecord> carParkRecords = new ArrayList<>();
String topic = String.format(whiteUrl, sn);
WhiteListOperatorPO po = new WhiteListOperatorPO();
String uuid = "del_" + UuidUtil.getUuid();
po.setId(uuid);
po.setSn(sn);
po.setName("white_list_operator");
//构造内层数据
List<String> plates = new ArrayList<>();
for (CarInfo car : carInfos) {
plates.add(car.getPlate());
if (StrUtil.equals(car.getSn(), sn)) {
CarParkRecord carParkRecord = new CarParkRecord();
carParkRecord.setClientId(uuid);
carParkRecord.setId(car.getCarParkRecordId());
carParkRecords.add(carParkRecord);
}
}
po.setPayload(buildPayloadDel(plates));
//更新车辆授权信息
if (CollUtil.isNotEmpty(carParkRecords)) {
carParkRecordService.saveOrUpdateBatch(carParkRecords);
}
//发布车牌到设备
boolean publish = mqttServer.publish(sn, topic, JSON.toJSONString(po).getBytes(StandardCharsets.UTF_8));
log.info("定时任务删除车牌topic:{},报文数据:{},发送结果:{}", topic, JSON.toJSONString(po), publish);
}
}
} catch (Exception e) {
// 捕获所有异常并记录日志,确保定时任务能继续执行
log.error("PlatePublishTask 定时任务执行异常", e);
}
} }

View File

@@ -1,12 +1,33 @@
spring: spring:
datasource: datasource:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/td_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 url: jdbc:mysql://127.0.0.1:3306/td_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000
username: root username: root
password: root password: root
# Druid 连接池配置
druid:
max-active: 20
max-wait: 60000
min-idle: 5
initial-size: 5
validation-query: SELECT 1
test-while-idle: true
test-on-borrow: false
test-on-return: false
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
data: data:
redis: redis:
# host: 127.0.0.1
host: 192.168.2.30 host: 192.168.2.30
password: redis2025 password: redis2025
database: 5 database: 5
port: 6379 port: 6379
# Redis 超时配置,防止连接阻塞
timeout: 5000ms
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 2
max-wait: 5000ms

View File

@@ -2,25 +2,37 @@ spring:
datasource: datasource:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
#xa #xa
# url: jdbc:mysql://127.0.0.1:3306/xa_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 # url: jdbc:mysql://127.0.0.1:3306/xa_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000
# username: root # username: root
# password: Xahg2024. # password: Xahg2024.
#jl #jl
# url: jdbc:mysql://127.0.0.1:3306/jl_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 # url: jdbc:mysql://127.0.0.1:3306/jl_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000
# username: root # username: root
# password: JL202509jj # password: JL202509jj
#td #td
url: jdbc:mysql://127.0.0.1:3306/td_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 # url: jdbc:mysql://127.0.0.1:3306/td_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000
username: root # username: root
password: td@JJ2024 # password: td@JJ2024
#zr #zr
# url: jdbc:mysql://192.168.155.42:3306/zr_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 # url: jdbc:mysql://192.168.155.42:3306/zr_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000
# username: root # username: root
# password: zr202407.J # password: zr202407.J
#xj #xj
# url: jdbc:mysql://127.0.0.1:3306/xj_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 url: jdbc:mysql://127.0.0.1:3306/xj_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000
# username: root username: root
# password: XjJN2024! password: XjJN2024!
# Druid 连接池配置
druid:
max-active: 20
max-wait: 60000
min-idle: 5
initial-size: 5
validation-query: SELECT 1
test-while-idle: true
test-on-borrow: false
test-on-return: false
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
data: data:
redis: redis:
#zr #zr
@@ -30,3 +42,11 @@ spring:
port: 6379 port: 6379
password: password:
database: 1 database: 1
# Redis 超时配置,防止连接阻塞
timeout: 5000ms
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 2
max-wait: 5000ms

View File

@@ -127,3 +127,8 @@ logging:
server: info # t-io 服务端默认日志 server: info # t-io 服务端默认日志
org.tio: info # t-io 服务端默认日志 org.tio: info # t-io 服务端默认日志
org.dromara.mica.mqtt: info # mica-mqtt 日志 org.dromara.mica.mqtt: info # mica-mqtt 日志
org.dromara.mica.mqtt.server.task: debug # 定时任务日志级别
# 定时任务线程池配置
scheduler:
pool-size: 5