From 2cb03b146afb17a0a5fe7eb34f7a369ab224f5e9 Mon Sep 17 00:00:00 2001 From: liuzhu Date: Wed, 11 Mar 2026 17:44:06 +0800 Subject: [PATCH] =?UTF-8?q?tcp=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wms-common/pom.xml | 7 ++ wms-module-system/pom.xml | 7 ++ .../wms/admin/tcp/config/NettyTcpServer.java | 97 +++++++++++++++++++ .../tcp/config/SimpleRequestMatcher.java | 69 +++++++++++++ .../admin/tcp/handler/TcpServerHandler.java | 67 +++++++++++++ .../wms/admin/tcp/manager/ChannelManager.java | 47 +++++++++ .../wms/admin/tcp/model/CommandRequest.java | 21 ++++ .../top/wms/admin/tcp/model/VMResult.java | 23 +++++ .../wms/admin/tcp/service/CommandService.java | 58 +++++++++++ .../wms/admin/tcp/util/SpringContextUtil.java | 21 ++++ .../controller/tcp/VmCommandController.java | 77 +++++++++++++++ 11 files changed, 494 insertions(+) create mode 100644 wms-module-system/src/main/java/top/wms/admin/tcp/config/NettyTcpServer.java create mode 100644 wms-module-system/src/main/java/top/wms/admin/tcp/config/SimpleRequestMatcher.java create mode 100644 wms-module-system/src/main/java/top/wms/admin/tcp/handler/TcpServerHandler.java create mode 100644 wms-module-system/src/main/java/top/wms/admin/tcp/manager/ChannelManager.java create mode 100644 wms-module-system/src/main/java/top/wms/admin/tcp/model/CommandRequest.java create mode 100644 wms-module-system/src/main/java/top/wms/admin/tcp/model/VMResult.java create mode 100644 wms-module-system/src/main/java/top/wms/admin/tcp/service/CommandService.java create mode 100644 wms-module-system/src/main/java/top/wms/admin/tcp/util/SpringContextUtil.java create mode 100644 wms-webapi/src/main/java/top/wms/admin/controller/tcp/VmCommandController.java diff --git a/wms-common/pom.xml b/wms-common/pom.xml index 6eca3fb..ac576d2 100644 --- a/wms-common/pom.xml +++ b/wms-common/pom.xml @@ -155,5 +155,12 @@ spring-test + + io.netty + netty-all + 4.1.100.Final + + + diff --git a/wms-module-system/pom.xml b/wms-module-system/pom.xml index 05be82f..0f5c9a6 100644 --- a/wms-module-system/pom.xml +++ b/wms-module-system/pom.xml @@ -18,5 +18,12 @@ top.wms wms-common + + + javax.annotation + javax.annotation-api + 1.3.2 + provided + diff --git a/wms-module-system/src/main/java/top/wms/admin/tcp/config/NettyTcpServer.java b/wms-module-system/src/main/java/top/wms/admin/tcp/config/NettyTcpServer.java new file mode 100644 index 0000000..f48e624 --- /dev/null +++ b/wms-module-system/src/main/java/top/wms/admin/tcp/config/NettyTcpServer.java @@ -0,0 +1,97 @@ +package top.wms.admin.tcp.config; + +import top.wms.admin.tcp.handler.TcpServerHandler; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +@Slf4j +@Component +public class NettyTcpServer { + + @Value("${tcp.server.port:9005}") + private int port; + + @Value("${tcp.server.boss-threads:1}") + private int bossThreads; + + @Value("${tcp.server.worker-threads:4}") + private int workerThreads; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Channel serverChannel; + + + @PostConstruct + public void start() throws InterruptedException { + log.info("正在启动TCP服务端,端口: {}", port); + + bossGroup = new NioEventLoopGroup(bossThreads); + workerGroup = new NioEventLoopGroup(workerThreads); + + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 解决TCP粘包问题 + pipeline.addLast(new LineBasedFrameDecoder(1024)); + + // 字符串编解码 + pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); + pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); + + // 关键修复:直接new,不要从Spring容器获取 + pipeline.addLast(new TcpServerHandler()); + + log.debug("新连接接入,处理器已添加"); + } + }); + + ChannelFuture future = bootstrap.bind(port).sync(); + if (future.isSuccess()) { + serverChannel = future.channel(); + log.info("TCP服务端启动成功,端口: {}", port); + } else { + log.error("TCP服务端启动失败", future.cause()); + } + } + + @PreDestroy + public void stop() { + log.info("正在关闭TCP服务端..."); + try { + if (serverChannel != null) { + serverChannel.close().sync(); + } + } catch (InterruptedException e) { + log.error("关闭服务端通道时发生错误", e); + } finally { + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + } + log.info("TCP服务端已关闭"); + } +} diff --git a/wms-module-system/src/main/java/top/wms/admin/tcp/config/SimpleRequestMatcher.java b/wms-module-system/src/main/java/top/wms/admin/tcp/config/SimpleRequestMatcher.java new file mode 100644 index 0000000..917d210 --- /dev/null +++ b/wms-module-system/src/main/java/top/wms/admin/tcp/config/SimpleRequestMatcher.java @@ -0,0 +1,69 @@ +package top.wms.admin.tcp.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class SimpleRequestMatcher { + + // 用队列存储响应 + private final BlockingQueue responseQueue = new LinkedBlockingQueue<>(1); + + // 当前等待的请求标识 + private volatile boolean isWaiting = false; + + /** + * 等待响应 + */ + public String waitForResponse(int timeoutSeconds) { + isWaiting = true; + try { + log.info("等待响应, 超时={}秒", timeoutSeconds); + // 从队列取响应,最多等待timeoutSeconds秒 + String response = responseQueue.poll(timeoutSeconds, TimeUnit.SECONDS); + if (response != null) { + log.info("收到响应: {}", response); + return response; + } else { + log.error("等待响应超时"); + return "TIMEOUT"; + } + } catch (InterruptedException e) { + log.error("等待响应被中断: {}", e.getMessage()); + return "ERROR"; + } finally { + isWaiting = false; + } + } + + /** + * 接收响应 + */ + public void handleResponse(String response) { + try { + // 如果有请求在等待,把响应放入队列 + if (isWaiting) { + responseQueue.offer(response); + log.info("响应接收成功: {}", response); + } else { + log.warn("没有正在等待的请求, 响应被丢弃: {}", response); + } + } catch (Exception e) { + log.error("处理响应失败: {}", e.getMessage()); + } + } + + /** + * 清空队列 + */ + public void clear() { + responseQueue.clear(); + isWaiting = false; + log.info("请求匹配器已清空"); + } +} diff --git a/wms-module-system/src/main/java/top/wms/admin/tcp/handler/TcpServerHandler.java b/wms-module-system/src/main/java/top/wms/admin/tcp/handler/TcpServerHandler.java new file mode 100644 index 0000000..9ab6558 --- /dev/null +++ b/wms-module-system/src/main/java/top/wms/admin/tcp/handler/TcpServerHandler.java @@ -0,0 +1,67 @@ +package top.wms.admin.tcp.handler; + +import top.wms.admin.tcp.config.SimpleRequestMatcher; +import top.wms.admin.tcp.manager.ChannelManager; +import top.wms.admin.tcp.util.SpringContextUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TcpServerHandler extends SimpleChannelInboundHandler { + + private ChannelManager channelManager; + private SimpleRequestMatcher requestMatcher; + + private ChannelManager getChannelManager() { + if (channelManager == null) { + channelManager = SpringContextUtil.getBean(ChannelManager.class); + } + return channelManager; + } + + private SimpleRequestMatcher getRequestMatcher() { + if (requestMatcher == null) { + requestMatcher = SpringContextUtil.getBean(SimpleRequestMatcher.class); + } + return requestMatcher; + } + + private String getClientId(ChannelHandlerContext ctx) { + return ctx.channel().remoteAddress().toString(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + String clientId = getClientId(ctx); + getChannelManager().addChannel(clientId, ctx.channel()); + log.info("✅ VM客户端连接成功: {}", clientId); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + String clientId = getClientId(ctx); + getChannelManager().removeChannel(clientId); + log.info("❌ VM客户端断开连接: {}", clientId); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) { + String clientId = getClientId(ctx); + log.info("📥 收到VM数据 [{}]: {}", clientId, msg); + + try { + // 直接把收到的消息交给匹配器(不解析,不匹配) + String cleanMsg = msg.trim(); + getRequestMatcher().handleResponse(cleanMsg); + } catch (Exception e) { + log.error("处理消息失败: {}", e.getMessage()); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("连接异常: {}", cause.getMessage()); + ctx.close(); + } +} diff --git a/wms-module-system/src/main/java/top/wms/admin/tcp/manager/ChannelManager.java b/wms-module-system/src/main/java/top/wms/admin/tcp/manager/ChannelManager.java new file mode 100644 index 0000000..90e494b --- /dev/null +++ b/wms-module-system/src/main/java/top/wms/admin/tcp/manager/ChannelManager.java @@ -0,0 +1,47 @@ +package top.wms.admin.tcp.manager; + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Component +public class ChannelManager { + + // 存储所有VM客户端的连接 + private final ConcurrentHashMap channels = new ConcurrentHashMap<>(); + + // 添加连接 + public void addChannel(String clientId, Channel channel) { + channels.put(clientId, channel); + log.info("VM客户端 [{}] 已连接,当前在线客户端数: {}", clientId, channels.size()); + } + + // 移除连接 + public void removeChannel(String clientId) { + channels.remove(clientId); + log.info("VM客户端 [{}] 已断开,当前在线客户端数: {}", clientId, channels.size()); + } + + // 获取第一个连接的VM(如果有多个VM,可以根据需要修改) + public Channel getFirstChannel() { + return channels.values().stream().findFirst().orElse(null); + } + + // 根据ID获取连接 + public Channel getChannel(String clientId) { + return channels.get(clientId); + } + + // 检查是否有VM连接 + public boolean hasConnection() { + return !channels.isEmpty(); + } + + // 获取所有连接 + public ConcurrentHashMap getAllChannels() { + return channels; + } +} diff --git a/wms-module-system/src/main/java/top/wms/admin/tcp/model/CommandRequest.java b/wms-module-system/src/main/java/top/wms/admin/tcp/model/CommandRequest.java new file mode 100644 index 0000000..4a313ef --- /dev/null +++ b/wms-module-system/src/main/java/top/wms/admin/tcp/model/CommandRequest.java @@ -0,0 +1,21 @@ +package top.wms.admin.tcp.model; + +import lombok.Data; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +// 指令请求 +@Data +@NoArgsConstructor +@AllArgsConstructor +public class CommandRequest { + private String processId; // 要执行的流程ID,如 "PROCESS_1" + private String param; // 参数,如 "10.5" + private Integer priority; // 优先级(可选) + + // 构建完整的指令字符串 + public String buildCommand() { + // 格式:*PROCESS_1,10.5# + return String.format("*%s,%s#", processId, param); + } +} diff --git a/wms-module-system/src/main/java/top/wms/admin/tcp/model/VMResult.java b/wms-module-system/src/main/java/top/wms/admin/tcp/model/VMResult.java new file mode 100644 index 0000000..379fe16 --- /dev/null +++ b/wms-module-system/src/main/java/top/wms/admin/tcp/model/VMResult.java @@ -0,0 +1,23 @@ +package top.wms.admin.tcp.model; + +import lombok.Data; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +// VM返回的结果 +@Data +@NoArgsConstructor +@AllArgsConstructor +public class VMResult { + private String processId; // 流程ID + private String status; // OK 或 NG + private double value; // 测量值 + private long timestamp; // 时间戳 + + public VMResult(String processId, String status, double value) { + this.processId = processId; + this.status = status; + this.value = value; + this.timestamp = System.currentTimeMillis(); + } +} diff --git a/wms-module-system/src/main/java/top/wms/admin/tcp/service/CommandService.java b/wms-module-system/src/main/java/top/wms/admin/tcp/service/CommandService.java new file mode 100644 index 0000000..d52580b --- /dev/null +++ b/wms-module-system/src/main/java/top/wms/admin/tcp/service/CommandService.java @@ -0,0 +1,58 @@ +package top.wms.admin.tcp.service; + +import top.wms.admin.tcp.model.VMResult; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@Service +public class CommandService { + + // 统计 + private final AtomicInteger successCount = new AtomicInteger(0); + private final AtomicInteger failCount = new AtomicInteger(0); + + // 存储最近100条结果 + private final ConcurrentLinkedQueue resultQueue = new ConcurrentLinkedQueue<>(); + + // 处理VM返回的结果 + public void processResult(String processId, String status, double value) { + VMResult result = new VMResult(processId, status, value); + + // 更新统计 + if ("OK".equalsIgnoreCase(status)) { + successCount.incrementAndGet(); + } else { + failCount.incrementAndGet(); + } + + // 存储结果 + resultQueue.offer(result); + if (resultQueue.size() > 100) { + resultQueue.poll(); // 只保留最近100条 + } + + log.info("处理结果 - 流程: {}, 状态: {}, 数值: {}", processId, status, value); + + // 这里可以添加你的业务逻辑,比如: + // 1. 存入数据库 + // 2. 通过WebSocket推送给前端 + // 3. 触发其他业务操作 + } + + // 获取统计信息 + public String getStatistics() { + return String.format("成功: %d, 失败: %d, 总数: %d", + successCount.get(), + failCount.get(), + successCount.get() + failCount.get()); + } + + // 获取最新结果 + public VMResult getLatestResult() { + return resultQueue.peek(); + } +} diff --git a/wms-module-system/src/main/java/top/wms/admin/tcp/util/SpringContextUtil.java b/wms-module-system/src/main/java/top/wms/admin/tcp/util/SpringContextUtil.java new file mode 100644 index 0000000..049259c --- /dev/null +++ b/wms-module-system/src/main/java/top/wms/admin/tcp/util/SpringContextUtil.java @@ -0,0 +1,21 @@ +package top.wms.admin.tcp.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +@Component +public class SpringContextUtil implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringContextUtil.applicationContext = applicationContext; + } + + public static T getBean(Class requiredType) { + return applicationContext.getBean(requiredType); + } +} diff --git a/wms-webapi/src/main/java/top/wms/admin/controller/tcp/VmCommandController.java b/wms-webapi/src/main/java/top/wms/admin/controller/tcp/VmCommandController.java new file mode 100644 index 0000000..9333fd4 --- /dev/null +++ b/wms-webapi/src/main/java/top/wms/admin/controller/tcp/VmCommandController.java @@ -0,0 +1,77 @@ +package top.wms.admin.controller.tcp; + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; +import top.wms.admin.tcp.config.SimpleRequestMatcher; +import top.wms.admin.tcp.manager.ChannelManager; + +@Slf4j +@RestController +@RequestMapping("/vm") +public class VmCommandController { + + @Autowired + private ChannelManager channelManager; + + @Autowired + private SimpleRequestMatcher requestMatcher; + + @GetMapping("/send") + public String sendAndWait(@RequestParam String msg) { + // 1. 检查连接 + Channel channel = channelManager.getFirstChannel(); + if (channel == null) { + return "ERROR: VM未连接"; + } + + // 2. 直接发送消息(不加requestId) + String sendMsg = msg; + channel.writeAndFlush(sendMsg); + log.info("发送指令: {}", sendMsg); + + // 3. 等待响应(不传requestId) + String response = requestMatcher.waitForResponse(20); + + // 4. 返回结果 + if ("TIMEOUT".equals(response)) { + return "ERROR: 处理超时"; + } + return response; // 直接返回VM的响应 + } + + @PostMapping("/command") + public String sendCommand(@RequestBody CommandRequest request) { + Channel channel = channelManager.getFirstChannel(); + if (channel == null) { + return "ERROR: VM未连接"; + } + + // 直接发送请求的命令 + String sendMsg = request.getCommand(); + channel.writeAndFlush(sendMsg + "\n"); + log.info("发送指令: {}", sendMsg); + + // 等待响应 + String response = requestMatcher.waitForResponse(request.getTimeout()); + + if ("TIMEOUT".equals(response)) { + return "ERROR: 处理超时"; + } + return response; + } + + /** + * 请求体类 + */ + public static class CommandRequest { + private String command; + private int timeout = 5; // 默认5秒 + + public String getCommand() { return command; } + public void setCommand(String command) { this.command = command; } + public int getTimeout() { return timeout; } + public void setTimeout(int timeout) { this.timeout = timeout; } + } +}