diff --git a/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/config/SchedulerConfig.java b/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/config/SchedulerConfig.java new file mode 100644 index 0000000..be5199f --- /dev/null +++ b/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/config/SchedulerConfig.java @@ -0,0 +1,35 @@ +package org.dromara.mica.mqtt.server.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +/** + * 定时任务线程池配置 + * 解决默认定时任务线程池(大小为1)被阻塞导致所有定时任务无法执行的问题 + */ +@Slf4j +@Configuration +@EnableScheduling +public class SchedulerConfig { + + @Bean + public TaskScheduler taskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + // 增加线程池大小,避免单线程阻塞影响所有定时任务 + scheduler.setPoolSize(5); + scheduler.setThreadNamePrefix("mqtt-scheduler-"); + // 等待所有任务完成后再关闭 + scheduler.setWaitForTasksToCompleteOnShutdown(true); + // 等待时间 + scheduler.setAwaitTerminationSeconds(60); + // 设置异常处理器,确保异常被记录但不中断定时任务 + scheduler.setErrorHandler(throwable -> { + log.error("定时任务执行异常", throwable); + }); + return scheduler; + } +} diff --git a/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/redis/RedisService.java b/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/redis/RedisService.java index 09b746d..c155ce2 100644 --- a/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/redis/RedisService.java +++ b/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/redis/RedisService.java @@ -80,7 +80,11 @@ public class RedisService { */ public void setCacheObject(final String key, final T value, final Long timeout, final TimeUnit timeUnit) { - redisTemplate.opsForValue().set(key, value, timeout, timeUnit); + try { + redisTemplate.opsForValue().set(key, value, timeout, timeUnit); + } catch (Exception e) { + log.error("Redis setCacheObject error: key={}", key, e); + } } /** @@ -127,7 +131,12 @@ public class RedisService { */ public Boolean hasKey(String key) { - return redisTemplate.hasKey(key); + try { + return redisTemplate.hasKey(key); + } catch (Exception e) { + log.error("Redis hasKey error: key={}", key, e); + return false; + } } /** @@ -138,8 +147,13 @@ public class RedisService { */ public T getCacheObject(final String key) { - ValueOperations operation = redisTemplate.opsForValue(); - return operation.get(key); + try { + ValueOperations operation = redisTemplate.opsForValue(); + return operation.get(key); + } catch (Exception e) { + log.error("Redis getCacheObject error: key={}", key, e); + return null; + } } /** diff --git a/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLineTask.java b/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLineTask.java index bf71f82..55a4046 100644 --- a/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLineTask.java +++ b/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/task/HeartbeatOnLineTask.java @@ -30,28 +30,32 @@ public class HeartbeatOnLineTask { @Scheduled(fixedRate = 15 * 1000) public void run() { - log.info("===========心跳检测============="); - //查询车辆摄像头的编码和在线状态 - List equipment = equipmentService.selectAllSnFlag(); + // 添加异常捕获,防止单个任务异常导致定时任务线程阻塞 + try { + log.info("===========心跳检测============="); + //查询车辆摄像头的编码和在线状态 + List equipment = equipmentService.selectAllSnFlag(); - for (Equipment equip : equipment) { - //缓存中有该设备心跳key - if (redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence())) { - String flag = redisService.getCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence()); - //有心跳,但是和数据库设备在线状态不一致 - if (!StrUtil.equals(flag, equip.getFlag())) { - log.info("设备:{},flag:{}", equip.getSequence(), flag); - equipmentService.updateFlag(equip.getSequence(), flag); - } - } else { - //没有心跳上传,且设备在线,将设备置为离线 - if (StrUtil.equals(FlagEnums.ONLINE.getCode(), equip.getFlag())) { - log.info("设备:{},无心跳离线", equip.getSequence()); - equipmentService.updateFlag(equip.getSequence(), FlagEnums.OFFLINE.getCode()); + for (Equipment equip : equipment) { + //缓存中有该设备心跳key + if (redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence())) { + String flag = redisService.getCacheObject(CacheConstants.EQUIPMENT_HEARTBEAT + equip.getSequence()); + //有心跳,但是和数据库设备在线状态不一致 + if (!StrUtil.equals(flag, equip.getFlag())) { + log.info("设备:{},flag:{}", equip.getSequence(), flag); + equipmentService.updateFlag(equip.getSequence(), flag); + } + } else { + //没有心跳上传,且设备在线,将设备置为离线 + if (StrUtil.equals(FlagEnums.ONLINE.getCode(), equip.getFlag())) { + log.info("设备:{},无心跳离线", equip.getSequence()); + equipmentService.updateFlag(equip.getSequence(), FlagEnums.OFFLINE.getCode()); + } } } - + } catch (Exception e) { + // 捕获所有异常并记录日志,确保定时任务能继续执行 + log.error("HeartbeatOnLineTask 定时任务执行异常", e); } - } } diff --git a/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/task/PlatePublishTask.java b/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/task/PlatePublishTask.java index 1fc4235..c7f207e 100644 --- a/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/task/PlatePublishTask.java +++ b/example/mqtt-car/src/main/java/org/dromara/mica/mqtt/server/task/PlatePublishTask.java @@ -50,93 +50,98 @@ public class PlatePublishTask { */ @Scheduled(fixedDelay = 8 * 1000) public void run() { - String whiteUrl = "device/%s/message/down/white_list_operator"; + // 添加异常捕获,防止单个任务异常导致定时任务线程阻塞 + try { + String whiteUrl = "device/%s/message/down/white_list_operator"; - //新增&编辑车牌 - CarInfo carInfo = new CarInfo(); - carInfo.setDelFlag("0"); - carInfo.setSync("0"); - List carInfoList = carInfoService.selectCarInfoList(carInfo); - if (CollUtil.isNotEmpty(carInfoList)) { - //向设备下发车辆信息 - for (CarInfo car : carInfoList) { - //如果该设备没有心跳,不下发车牌信息 - if (!redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + car.getSn())) { - continue; - } - - String topic = String.format(whiteUrl, car.getSn()); - WhiteListOperatorPO po = new WhiteListOperatorPO(); - String uuid = "add_" + UuidUtil.getUuid(); - po.setId(uuid); - po.setSn(car.getSn()); - po.setName("white_list_operator"); - //构造数据 - po.setPayload(buildPayload(car)); - - //更新car_park_record表的clientId,用于回执消息更新下发状态 - //必须在发送前更新,不然会导致发送后,还没更新完数据库,回执已经收到,无法更新数据,导致持续下发车牌 - CarParkRecord carParkRecord = new CarParkRecord(); - carParkRecord.setId(car.getCarParkRecordId()); - carParkRecord.setClientId(uuid); - carParkRecordService.updateById(carParkRecord); - - //发布车牌到设备(协议只能单条发布) - boolean publish = mqttServer.publish(car.getSn(), topic, JSON.toJSONString(po).getBytes(StandardCharsets.UTF_8)); - log.info("定时任务下发车牌topic:{},报文数据:{},发送结果:{}", topic, JSON.toJSONString(po), publish); - } - } - - //删除车牌 - CarInfo carDel = new CarInfo(); - carDel.setDelFlag("2"); - carDel.setSync("1"); - List carInfos = carInfoService.selectCarInfoList(carDel); - if (CollUtil.isNotEmpty(carInfos)) { - //需要下发的设备 - List snList = carInfos.stream().map(CarInfo::getSn).distinct().toList(); - for (String sn : snList) { - if (!redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + sn)) { - log.error("删除车牌,设备:{},无心跳", sn); - continue; - } - - //车辆授权列表 - List carParkRecords = new ArrayList<>(); - - String topic = String.format(whiteUrl, sn); - WhiteListOperatorPO po = new WhiteListOperatorPO(); - String uuid = "del_" + UuidUtil.getUuid(); - po.setId(uuid); - po.setSn(sn); - po.setName("white_list_operator"); - - - //构造内层数据 - List plates = new ArrayList<>(); - for (CarInfo car : carInfos) { - plates.add(car.getPlate()); - - if (StrUtil.equals(car.getSn(), sn)) { - CarParkRecord carParkRecord = new CarParkRecord(); - carParkRecord.setClientId(uuid); - carParkRecord.setId(car.getCarParkRecordId()); - carParkRecords.add(carParkRecord); + //新增&编辑车牌 + CarInfo carInfo = new CarInfo(); + carInfo.setDelFlag("0"); + carInfo.setSync("0"); + List carInfoList = carInfoService.selectCarInfoList(carInfo); + if (CollUtil.isNotEmpty(carInfoList)) { + //向设备下发车辆信息 + for (CarInfo car : carInfoList) { + //如果该设备没有心跳,不下发车牌信息 + if (!redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + car.getSn())) { + continue; } - } - po.setPayload(buildPayloadDel(plates)); - //更新车辆授权信息 - if (CollUtil.isNotEmpty(carParkRecords)) { - carParkRecordService.saveOrUpdateBatch(carParkRecords); - } - //发布车牌到设备 - boolean publish = mqttServer.publish(sn, topic, JSON.toJSONString(po).getBytes(StandardCharsets.UTF_8)); - log.info("定时任务删除车牌topic:{},报文数据:{},发送结果:{}", topic, JSON.toJSONString(po), publish); + String topic = String.format(whiteUrl, car.getSn()); + WhiteListOperatorPO po = new WhiteListOperatorPO(); + String uuid = "add_" + UuidUtil.getUuid(); + po.setId(uuid); + po.setSn(car.getSn()); + po.setName("white_list_operator"); + //构造数据 + po.setPayload(buildPayload(car)); + //更新car_park_record表的clientId,用于回执消息更新下发状态 + //必须在发送前更新,不然会导致发送后,还没更新完数据库,回执已经收到,无法更新数据,导致持续下发车牌 + CarParkRecord carParkRecord = new CarParkRecord(); + carParkRecord.setId(car.getCarParkRecordId()); + carParkRecord.setClientId(uuid); + carParkRecordService.updateById(carParkRecord); + + //发布车牌到设备(协议只能单条发布) + boolean publish = mqttServer.publish(car.getSn(), topic, JSON.toJSONString(po).getBytes(StandardCharsets.UTF_8)); + log.info("定时任务下发车牌topic:{},报文数据:{},发送结果:{}", topic, JSON.toJSONString(po), publish); + } } - } + //删除车牌 + CarInfo carDel = new CarInfo(); + carDel.setDelFlag("2"); + carDel.setSync("1"); + List carInfos = carInfoService.selectCarInfoList(carDel); + if (CollUtil.isNotEmpty(carInfos)) { + //需要下发的设备 + List snList = carInfos.stream().map(CarInfo::getSn).distinct().toList(); + for (String sn : snList) { + if (!redisService.hasKey(CacheConstants.EQUIPMENT_HEARTBEAT + sn)) { + log.error("删除车牌,设备:{},无心跳", sn); + continue; + } + + //车辆授权列表 + List carParkRecords = new ArrayList<>(); + + String topic = String.format(whiteUrl, sn); + WhiteListOperatorPO po = new WhiteListOperatorPO(); + String uuid = "del_" + UuidUtil.getUuid(); + po.setId(uuid); + po.setSn(sn); + po.setName("white_list_operator"); + + + //构造内层数据 + List plates = new ArrayList<>(); + for (CarInfo car : carInfos) { + plates.add(car.getPlate()); + + if (StrUtil.equals(car.getSn(), sn)) { + CarParkRecord carParkRecord = new CarParkRecord(); + carParkRecord.setClientId(uuid); + carParkRecord.setId(car.getCarParkRecordId()); + carParkRecords.add(carParkRecord); + } + } + po.setPayload(buildPayloadDel(plates)); + + //更新车辆授权信息 + if (CollUtil.isNotEmpty(carParkRecords)) { + carParkRecordService.saveOrUpdateBatch(carParkRecords); + } + //发布车牌到设备 + boolean publish = mqttServer.publish(sn, topic, JSON.toJSONString(po).getBytes(StandardCharsets.UTF_8)); + log.info("定时任务删除车牌topic:{},报文数据:{},发送结果:{}", topic, JSON.toJSONString(po), publish); + + } + } + } catch (Exception e) { + // 捕获所有异常并记录日志,确保定时任务能继续执行 + log.error("PlatePublishTask 定时任务执行异常", e); + } } diff --git a/example/mqtt-car/src/main/resources/application-dev.yml b/example/mqtt-car/src/main/resources/application-dev.yml index 9c971ba..98b4eed 100644 --- a/example/mqtt-car/src/main/resources/application-dev.yml +++ b/example/mqtt-car/src/main/resources/application-dev.yml @@ -1,12 +1,33 @@ spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://127.0.0.1:3306/xa_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 + url: jdbc:mysql://127.0.0.1:3306/xa_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000 username: root password: root + # Druid 连接池配置 + druid: + max-active: 20 + max-wait: 60000 + min-idle: 5 + initial-size: 5 + validation-query: SELECT 1 + test-while-idle: true + test-on-borrow: false + test-on-return: false + time-between-eviction-runs-millis: 60000 + min-evictable-idle-time-millis: 300000 data: redis: +# host: 127.0.0.1 host: 192.168.2.30 password: redis2025 database: 5 port: 6379 + # Redis 超时配置,防止连接阻塞 + timeout: 5000ms + lettuce: + pool: + max-active: 8 + max-idle: 8 + min-idle: 2 + max-wait: 5000ms diff --git a/example/mqtt-car/src/main/resources/application-prod.yml b/example/mqtt-car/src/main/resources/application-prod.yml index 57c4cf9..9265dd4 100644 --- a/example/mqtt-car/src/main/resources/application-prod.yml +++ b/example/mqtt-car/src/main/resources/application-prod.yml @@ -2,25 +2,37 @@ spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver #xa -# url: jdbc:mysql://127.0.0.1:3306/xa_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 +# url: jdbc:mysql://127.0.0.1:3306/xa_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000 # username: root # password: Xahg2024. #jl -# url: jdbc:mysql://127.0.0.1:3306/jl_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 +# url: jdbc:mysql://127.0.0.1:3306/jl_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000 # username: root # password: JL202509jj #td -# url: jdbc:mysql://127.0.0.1:3306/td_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 +# url: jdbc:mysql://127.0.0.1:3306/td_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000 # username: root # password: td@JJ2024 #zr -# url: jdbc:mysql://192.168.155.42:3306/zr_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 +# url: jdbc:mysql://192.168.155.42:3306/zr_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000 # username: root # password: zr202407.J #xj - url: jdbc:mysql://127.0.0.1:3306/xj_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 + url: jdbc:mysql://127.0.0.1:3306/xj_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&connectTimeout=5000&socketTimeout=30000 username: root password: XjJN2024! + # Druid 连接池配置 + druid: + max-active: 20 + max-wait: 60000 + min-idle: 5 + initial-size: 5 + validation-query: SELECT 1 + test-while-idle: true + test-on-borrow: false + test-on-return: false + time-between-eviction-runs-millis: 60000 + min-evictable-idle-time-millis: 300000 data: redis: #zr @@ -30,3 +42,11 @@ spring: port: 6379 password: database: 1 + # Redis 超时配置,防止连接阻塞 + timeout: 5000ms + lettuce: + pool: + max-active: 8 + max-idle: 8 + min-idle: 2 + max-wait: 5000ms diff --git a/example/mqtt-car/src/main/resources/application.yml b/example/mqtt-car/src/main/resources/application.yml index caf558c..649eddd 100644 --- a/example/mqtt-car/src/main/resources/application.yml +++ b/example/mqtt-car/src/main/resources/application.yml @@ -6,8 +6,8 @@ spring: name: mica-mqtt-server # 环境 dev|test|prod profiles: -# active: dev - active: prod + active: dev +# active: prod messages: encoding: UTF-8 basename: i18n/messages @@ -127,3 +127,8 @@ logging: server: info # t-io 服务端默认日志 org.tio: info # t-io 服务端默认日志 org.dromara.mica.mqtt: info # mica-mqtt 日志 + org.dromara.mica.mqtt.server.task: debug # 定时任务日志级别 + +# 定时任务线程池配置 +scheduler: + pool-size: 5