first commit

This commit is contained in:
zc
2025-12-08 10:40:43 +08:00
commit 871ae8be0a
410 changed files with 38212 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.annotation;
import org.dromara.mica.mqtt.codec.MqttQoS;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 客户端发布注解
*
* @author ChangJin Wei (魏昌进)
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MqttClientPublish {
/**
* 订阅的 topic
*
* @return topic
*/
String value();
/**
* 发布的 qos
*
* @return MqttQoS
*/
MqttQoS qos() default MqttQoS.QOS0;
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.annotation;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.deserialize.MqttDeserializer;
import org.dromara.mica.mqtt.core.deserialize.MqttJsonDeserializer;
import java.lang.annotation.*;
/**
* 客户端订阅注解
*
* @author L.cm
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface MqttClientSubscribe {
/**
* 默认的客户端模板 bean 名称
*/
String DEFAULT_CLIENT_TEMPLATE_BEAN = "mqttClientTemplate";
/**
* 订阅的 topic filter
*
* @return topic filter
*/
String[] value();
/**
* 订阅的 qos
*
* @return MqttQoS
*/
MqttQoS qos() default MqttQoS.QOS0;
/**
* mqtt 消息反序列化
*
* @return 反序列化
*/
Class<? extends MqttDeserializer> deserialize() default MqttJsonDeserializer.class;
/**
* 客户端 bean 名称
*
* @return bean name
*/
String clientTemplateBean() default MqttClientSubscribe.DEFAULT_CLIENT_TEMPLATE_BEAN;
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author ChangJin Wei (魏昌进)
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface MqttPayload {
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 是否在服务器上保留消息
* @author ChangJin Wei (魏昌进)
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface MqttRetain {
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.annotation;
import org.dromara.mica.mqtt.core.deserialize.MqttDeserializer;
import org.dromara.mica.mqtt.core.deserialize.MqttJsonDeserializer;
import java.lang.annotation.*;
/**
* 服务端函数
*
* @author L.cm
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface MqttServerFunction {
/**
* 订阅的 topic filter
*
* @return topic filter
*/
String[] value();
/**
* mqtt 消息反序列化
*
* @return 反序列化
*/
Class<? extends MqttDeserializer> deserialize() default MqttJsonDeserializer.class;
}

View File

@@ -0,0 +1,93 @@
package org.dromara.mica.mqtt.core.common;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.timer.TimerTaskService;
import java.util.Objects;
/**
* MqttPendingPublish参考于 netty-mqtt-client
*
* @author netty
*/
public final class MqttPendingPublish {
private static final Logger logger = LoggerFactory.getLogger(MqttPendingPublish.class);
private final MqttPublishMessage message;
private final MqttQoS qos;
private final RetryProcessor<MqttPublishMessage> pubRetryProcessor = new RetryProcessor<>();
private final RetryProcessor<MqttMessage> pubRelRetryProcessor = new RetryProcessor<>();
public MqttPendingPublish(MqttPublishMessage message, MqttQoS qos) {
this.message = message;
this.qos = qos;
this.pubRetryProcessor.setOriginalMessage(message);
}
public MqttPublishMessage getMessage() {
return message;
}
public MqttQoS getQos() {
return qos;
}
public void startPublishRetransmissionTimer(TimerTaskService taskService, ChannelContext context) {
this.pubRetryProcessor.setHandle(((fixedHeader, originalMessage) -> {
MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, originalMessage.variableHeader(), this.message.payload());
boolean result = Tio.send(context, publishMessage);
if (context.isServer()) {
logger.info("retry send Publish msg clientId:{} qos:{} result:{}", context.getBsId(), qos, result);
} else {
logger.info("retry send Publish msg qos:{} result:{}", qos, result);
}
}));
this.pubRetryProcessor.start(taskService);
}
public void onPubAckReceived() {
this.pubRetryProcessor.stop();
}
public void setPubRelMessage(MqttMessage pubRelMessage) {
this.pubRelRetryProcessor.setOriginalMessage(pubRelMessage);
}
public void startPubRelRetransmissionTimer(TimerTaskService taskService, ChannelContext context) {
this.pubRelRetryProcessor.setHandle((fixedHeader, originalMessage) -> {
boolean result = Tio.send(context, new MqttMessage(fixedHeader, originalMessage.variableHeader()));
if (context.isServer()) {
logger.info("retry send PubRel msg clientId:{} qos:{} result:{}", context.getBsId(), qos, result);
} else {
logger.info("retry send PubRel msg qos:{} result:{}", qos, result);
}
});
this.pubRelRetryProcessor.start(taskService);
}
public void onPubCompReceived() {
this.pubRelRetryProcessor.stop();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttPendingPublish that = (MqttPendingPublish) o;
return Objects.equals(message, that.message) && qos == that.qos;
}
@Override
public int hashCode() {
return Objects.hash(message, qos);
}
}

View File

@@ -0,0 +1,63 @@
package org.dromara.mica.mqtt.core.common;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.timer.TimerTaskService;
import java.util.Objects;
/**
* MqttPendingPublish参考于 netty-mqtt-client
*/
public final class MqttPendingQos2Publish {
private static final Logger logger = LoggerFactory.getLogger(MqttPendingQos2Publish.class);
private final MqttPublishMessage incomingPublish;
private final RetryProcessor<MqttMessage> retryProcessor = new RetryProcessor<>();
public MqttPendingQos2Publish(MqttPublishMessage incomingPublish, MqttMessage originalMessage) {
this.incomingPublish = incomingPublish;
this.retryProcessor.setOriginalMessage(originalMessage);
}
public MqttPublishMessage getIncomingPublish() {
return incomingPublish;
}
public void startPubRecRetransmitTimer(TimerTaskService taskService, ChannelContext context) {
this.retryProcessor.setHandle((fixedHeader, originalMessage) -> {
boolean result = Tio.send(context, new MqttMessage(fixedHeader, originalMessage.variableHeader()));
if (context.isServer()) {
logger.info("retry send PubRec msg clientId:{} result:{}", context.getBsId(), result);
} else {
logger.info("retry send PubRec msg result:{}", result);
}
});
this.retryProcessor.start(taskService);
}
public void onPubRelReceived() {
this.retryProcessor.stop();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttPendingQos2Publish that = (MqttPendingQos2Publish) o;
return Objects.equals(incomingPublish, that.incomingPublish);
}
@Override
public int hashCode() {
return Objects.hash(incomingPublish);
}
}

View File

@@ -0,0 +1,51 @@
package org.dromara.mica.mqtt.core.common;
import org.dromara.mica.mqtt.codec.message.header.MqttFixedHeader;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.core.util.timer.AckTimerTask;
import org.tio.utils.timer.TimerTaskService;
import java.util.Objects;
import java.util.function.BiConsumer;
/**
* 重试处理器,参考于 netty-mqtt-client
*
* @param <T> MqttMessage
*/
public final class RetryProcessor<T extends MqttMessage> {
private AckTimerTask ackTimerTask;
private BiConsumer<MqttFixedHeader, T> handler;
private T originalMessage;
public void start(TimerTaskService taskService) {
Objects.requireNonNull(this.handler, "RetryProcessor handler is null.");
this.startTimer(Objects.requireNonNull(taskService, "RetryProcessor taskService is null."));
}
private void startTimer(TimerTaskService taskService) {
this.ackTimerTask = taskService.addTask((systemTimer) -> {
return new AckTimerTask(systemTimer, () -> {
MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), true, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength());
handler.accept(fixedHeader, originalMessage);
}, 5, 10);
});
}
public void stop() {
if (this.ackTimerTask != null) {
this.ackTimerTask.cancel();
}
}
public void setHandle(BiConsumer<MqttFixedHeader, T> runnable) {
this.handler = runnable;
}
public void setOriginalMessage(T originalMessage) {
this.originalMessage = originalMessage;
}
}

View File

@@ -0,0 +1,65 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.common;
/**
* TopicFilter
*
* @author L.cm
*/
public class TopicFilter {
/**
* topicFilter
*/
private final String topic;
/**
* topicFilterType
*/
private final TopicFilterType type;
public TopicFilter(String topicFilter) {
this.topic = topicFilter;
this.type = TopicFilterType.getType(topicFilter);
}
public String getTopic() {
return topic;
}
public TopicFilterType getType() {
return type;
}
/**
* 判断 topicFilter 和 topicName 匹配情况
*
* @param topicName topicName
* @return 是否匹配
*/
public boolean match(String topicName) {
return type.match(this.topic, topicName);
}
@Override
public String toString() {
return "TopicFilter{" +
"topic='" + topic + '\'' +
", type=" + type +
'}';
}
}

View File

@@ -0,0 +1,153 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.common;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import java.util.Map;
/**
* TopicFilter 类型
*
* @author L.cm
*/
public enum TopicFilterType {
/**
* 默认 TopicFilter
*/
NONE {
@Override
public boolean match(String topicFilter, String topicName) {
return TopicUtil.match(topicFilter, topicName);
}
@Override
public Map<String, String> getTopicVars(String topicTemplate, String topicName) {
return TopicUtil.getTopicVars(topicTemplate, topicName);
}
},
/**
* $queue/ 为前缀的共享订阅是不带群组的共享订阅
*/
QUEUE {
@Override
public boolean match(String topicFilter, String topicName) {
// $queue/ 共享订阅前缀去除
int prefixLen = TopicFilterType.SHARE_QUEUE_PREFIX.length();
return TopicUtil.match(topicFilter.substring(prefixLen), topicName);
}
@Override
public Map<String, String> getTopicVars(String topicTemplate, String topicName) {
// $queue/ 共享订阅前缀去除
int prefixLen = TopicFilterType.SHARE_QUEUE_PREFIX.length();
return TopicUtil.getTopicVars(topicTemplate.substring(prefixLen), topicName);
}
},
/**
* $share/{group-name}/ 为前缀的共享订阅是带群组的共享订阅
*/
SHARE {
@Override
public boolean match(String topicFilter, String topicName) {
// 去除前缀 $share/<group-name>/ ,匹配 topicName / 前缀
int prefixLen = TopicFilterType.findShareTopicIndex(topicFilter);
return TopicUtil.match(topicFilter.substring(prefixLen), topicName);
}
@Override
public Map<String, String> getTopicVars(String topicTemplate, String topicName) {
// 去除前缀 $share/<group-name>/ ,匹配 topicName / 前缀
int prefixLen = TopicFilterType.findShareTopicIndex(topicTemplate);
return TopicUtil.getTopicVars(topicTemplate.substring(prefixLen), topicName);
}
};
/**
* 共享订阅的 topic
*/
public static final String SHARE_QUEUE_PREFIX = "$queue/";
public static final String SHARE_GROUP_PREFIX = "$share/";
/**
* 判断 topicFilter 和 topicName 匹配情况
*
* @param topicFilter topicFilter
* @param topicName topicName
* @return 是否匹配
*/
public abstract boolean match(String topicFilter, String topicName);
/**
* 解析 topic 模板中的变量 例如 $SYS/brokers/${node}/clients/${clientid}/disconnected 中提取 node 和 clientid
*
* @param topicTemplate topicTemplate
* @param topic topic
* @return 提取的变量
*/
public abstract Map<String, String> getTopicVars(String topicTemplate, String topic);
/**
* 获取 topicFilter 类型
*
* @param topicFilter topicFilter
* @return TopicFilterType
*/
public static TopicFilterType getType(String topicFilter) {
if (topicFilter.startsWith(TopicFilterType.SHARE_QUEUE_PREFIX)) {
return TopicFilterType.QUEUE;
} else if (topicFilter.startsWith(TopicFilterType.SHARE_GROUP_PREFIX)) {
return TopicFilterType.SHARE;
} else {
return TopicFilterType.NONE;
}
}
/**
* 读取共享订阅的分组名
*
* @param topicFilter topicFilter
* @return 共享订阅分组名
*/
public static String getShareGroupName(String topicFilter) {
int prefixLength = TopicFilterType.SHARE_GROUP_PREFIX.length();
int topicFilterLength = topicFilter.length();
for (int i = prefixLength; i < topicFilterLength; i++) {
char ch = topicFilter.charAt(i);
if ('/' == ch) {
return topicFilter.substring(prefixLength, i);
}
}
throw new IllegalArgumentException("共享订阅 topicFilter: " + topicFilter + " 不符合规范 $share/<group-name>/xxx");
}
private static int findShareTopicIndex(String topicFilter) {
int prefixLength = TopicFilterType.SHARE_GROUP_PREFIX.length();
int topicFilterLength = topicFilter.length();
for (int i = prefixLength; i < topicFilterLength; i++) {
char ch = topicFilter.charAt(i);
if ('/' == ch) {
return i + 1;
}
}
throw new IllegalArgumentException("共享订阅 topicFilter: " + topicFilter + " 不符合规范 $share/<group-name>/xxx");
}
}

View File

@@ -0,0 +1,43 @@
package org.dromara.mica.mqtt.core.common;
import org.tio.utils.mica.StrTemplateParser;
import java.util.Map;
/**
* topic 模板带 ${var} 变量的模板
*
* @author L.cm
*/
public class TopicTemplate {
private final StrTemplateParser templateParser;
private final String topicFilter;
private final TopicFilterType type;
public TopicTemplate(String topicTemplate, String topicFilter) {
this.templateParser = new StrTemplateParser(topicTemplate);
this.topicFilter = topicFilter;
this.type = TopicFilterType.getType(topicFilter);
}
/**
* 判断 topicFilter 和 topicName 匹配情况
*
* @param topicName topicName
* @return 是否匹配
*/
public boolean match(String topicName) {
return type.match(this.topicFilter, topicName);
}
/**
* 解析 topic 中的变量
*
* @param topicName topicName
* @return 变量
*/
public Map<String, String> getVariables(String topicName) {
return templateParser.getVariables(topicName);
}
}

View File

@@ -0,0 +1,39 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.deserialize;
import java.lang.reflect.Type;
/**
* mqtt 消息反序列化
*
* @author L.cm
* @author ChangJin Wei(魏昌进)
*/
public interface MqttDeserializer {
/**
* 反序列化
*
* @param bytes bytes
* @param type type
* @param <T> 泛型
* @return T
*/
<T> T deserialize(byte[] bytes, Type type);
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.deserialize;
import org.tio.utils.json.JsonUtil;
import java.lang.reflect.Type;
/**
* mqtt 消息反序列化
*
* @author L.cm
* @author ChangJin Wei(魏昌进)
*/
public class MqttJsonDeserializer implements MqttDeserializer {
@Override
public <T> T deserialize(byte[] bytes, Type type) {
return JsonUtil.readValue(bytes, type);
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.function;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.deserialize.MqttDeserializer;
import org.tio.core.ChannelContext;
import java.lang.reflect.Type;
/**
* 需要序列化的对象函数
*
* @author L.cm
*/
public class ObjectParamValueFunction implements ParamValueFunction {
private final MqttDeserializer deserializer;
private final Type parameterType;
public ObjectParamValueFunction(MqttDeserializer deserializer, Type parameterType) {
this.deserializer = deserializer;
this.parameterType = parameterType;
}
@Override
public Object getValue(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
return deserializer.deserialize(payload, parameterType);
}
}

View File

@@ -0,0 +1,41 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.function;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.tio.core.ChannelContext;
/**
* 参数值函数
*
* @author L.cm
*/
@FunctionalInterface
public interface ParamValueFunction {
/**
* 获取值
*
* @param context ChannelContext
* @param topic topic
* @param message message
* @param payload payload
* @return value
*/
Object getValue(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload);
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.function;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.tio.core.ChannelContext;
import java.nio.ByteBuffer;
/**
* 参数值函数
*
* @author L.cm
*/
public enum ParamValueFunctions implements ParamValueFunction {
Context() {
@Override
public Object getValue(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
return context;
}
},
Topic() {
@Override
public Object getValue(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
return topic;
}
},
Message() {
@Override
public Object getValue(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
return message;
}
},
Payload() {
@Override
public Object getValue(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
return payload;
}
},
ByteBuff() {
@Override
public Object getValue(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
return ByteBuffer.wrap(payload);
}
};
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.function;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.common.TopicTemplate;
import org.tio.core.ChannelContext;
import java.util.Collections;
/**
* topic 参数函数
*
* @author L.cm
*/
public class TopicVarsParamValueFunction implements ParamValueFunction {
private final TopicTemplate[] topicTemplates;
public TopicVarsParamValueFunction(String[] topicTemplates, String[] topicFilters) {
this.topicTemplates = getTopicTemplates(topicTemplates, topicFilters);
}
/**
* 获取 topic 模板列表
*
* @param topicTemplates topicTemplates
* @param topicFilters topicFilters
* @return TopicTemplate array
*/
private static TopicTemplate[] getTopicTemplates(String[] topicTemplates, String[] topicFilters) {
TopicTemplate[] templates = new TopicTemplate[topicTemplates.length];
for (int i = 0; i < templates.length; i++) {
templates[i] = new TopicTemplate(topicTemplates[i], topicFilters[i]);
}
return templates;
}
@Override
public Object getValue(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
// 大部分情况下只有一个 topic直接返回
int length = topicTemplates.length;
if (length == 1) {
return topicTemplates[0].getVariables(topic);
}
// 多个 topic 时,需要根据 topic 匹配
for (TopicTemplate topicTemplate : topicTemplates) {
if (topicTemplate.match(topic)) {
return topicTemplate.getVariables(topic);
}
}
return Collections.emptyMap();
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.serializer;
import org.tio.utils.json.JsonUtil;
/**
* mqtt 消息 json 序列化
*
* @author L.cm
*/
public class MqttJsonSerializer implements MqttSerializer {
@Override
public byte[] serialize(Object message) {
return JsonUtil.toJsonBytes(message);
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.serializer;
/**
* mqtt 消息序列化
*
* @author L.cm
*/
public interface MqttSerializer {
/**
* 序列化
*
* @param message message
* @return byte 数组
*/
byte[] serialize(Object message);
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.util;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.deserialize.MqttDeserializer;
import org.dromara.mica.mqtt.core.function.ObjectParamValueFunction;
import org.dromara.mica.mqtt.core.function.ParamValueFunction;
import org.dromara.mica.mqtt.core.function.ParamValueFunctions;
import org.dromara.mica.mqtt.core.function.TopicVarsParamValueFunction;
import org.tio.core.ChannelContext;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* 方法参数工具
*
* @author L.cm
*/
public class MethodParamUtil {
/**
* 获取参数值函数
*
* @param method 方法
* @param topicTemplates 主题模板
* @param topicFilters 主题过滤器
* @param deserializer 反序列化
* @return ParamValueFunc[]
*/
public static ParamValueFunction[] getParamValueFunctions(Method method, String[] topicTemplates, String[] topicFilters, MqttDeserializer deserializer) {
int parameterCount = method.getParameterCount();
ParamValueFunction[] functions = new ParamValueFunction[parameterCount];
Type[] parameterTypes = method.getGenericParameterTypes();
for (int i = 0; i < parameterCount; i++) {
Type parameterType = parameterTypes[i];
if (parameterType == ChannelContext.class) {
functions[i] = ParamValueFunctions.Context;
} else if (parameterType == String.class) {
functions[i] = ParamValueFunctions.Topic;
} else if (parameterType instanceof ParameterizedType && isStringStringMap(parameterType)) {
functions[i] = new TopicVarsParamValueFunction(topicTemplates, topicFilters);
} else if (parameterType == MqttPublishMessage.class) {
functions[i] = ParamValueFunctions.Message;
} else if (parameterType == byte[].class) {
functions[i] = ParamValueFunctions.Payload;
} else if (parameterType == ByteBuffer.class) {
functions[i] = ParamValueFunctions.ByteBuff;
} else {
functions[i] = new ObjectParamValueFunction(deserializer, parameterType);
}
}
return functions;
}
/**
* 判断是否 Map String String
*
* @param parameterType parameterType
* @return 是否 Map String String
*/
public static boolean isStringStringMap(Type parameterType) {
ParameterizedType parameterizedType = (ParameterizedType) parameterType;
Type rawType = parameterizedType.getRawType();
// 检查是否为 Map 类型
if (rawType != Map.class) {
return false;
}
// 获取泛型参数
Type[] typeArguments = parameterizedType.getActualTypeArguments();
// 检查键和值类型是否为 String
return typeArguments[0].equals(String.class) && typeArguments[1].equals(String.class);
}
}

View File

@@ -0,0 +1,370 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.util;
import org.dromara.mica.mqtt.codec.MqttCodecUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.mica.Pair;
import org.tio.utils.mica.StrTemplateParser;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
/**
* Mqtt Topic 工具
*
* @author L.cm
*/
public final class TopicUtil {
private static final Logger logger = LoggerFactory.getLogger(TopicUtil.class);
public static final String TOPIC_LAYER = "/";
public static final String TOPIC_WILDCARDS_ONE = "+";
public static final String TOPIC_WILDCARDS_MORE = "#";
/**
* 校验 topicFilter
*
* @param topicFilterList topicFilter 集合
*/
public static void validateTopicFilter(List<String> topicFilterList) {
for (String topicFilter : topicFilterList) {
validateTopicFilter(topicFilter);
}
}
/**
* 校验 topicFilter
*
* @param topicFilter topicFilter
*/
public static void validateTopicFilter(String topicFilter) throws IllegalArgumentException {
if (StrUtil.isEmpty(topicFilter)) {
throw new IllegalArgumentException("TopicFilter is empty:" + topicFilter);
}
char[] topicFilterChars = topicFilter.toCharArray();
int topicFilterLength = topicFilterChars.length;
int topicFilterIdxEnd = topicFilterLength - 1;
char ch;
for (int i = 0; i < topicFilterLength; i++) {
ch = topicFilterChars[i];
if (Character.isWhitespace(ch)) {
logger.warn("注意topic:[{}] 中包含空白字符串:[{}],请检查是否正确", topicFilter, ch);
} else if (ch == MqttCodecUtil.TOPIC_WILDCARDS_MORE) {
// 校验: # 通配符只能在最后一位
if (i < topicFilterIdxEnd) {
throw new IllegalArgumentException("Mqtt subscribe topicFilter illegal:" + topicFilter);
}
} else if (ch == MqttCodecUtil.TOPIC_WILDCARDS_ONE) {
// 校验: 单独 + 是允许的,判断 + 号前一位是否为 /,如果有后一位也必须为 /
if ((i > 0 && topicFilterChars[i - 1] != MqttCodecUtil.TOPIC_LAYER) || (i < topicFilterIdxEnd && topicFilterChars[i + 1] != MqttCodecUtil.TOPIC_LAYER)) {
throw new IllegalArgumentException("Mqtt subscribe topicFilter illegal:" + topicFilter);
}
}
}
}
/**
* 校验 topicName
*
* @param topicName topicName
*/
public static void validateTopicName(String topicName) throws IllegalArgumentException {
if (MqttCodecUtil.isTopicFilter(topicName)) {
throw new IllegalArgumentException("Topic has wildcards char [+] or [#], topicName:" + topicName);
}
}
/**
* 解析保留消息主题, topicName
*
* @param topicName topicName
*/
public static Pair<String, Integer> retainTopicName(String topicName) {
if (topicName.startsWith("$retain/")) {
return getRetainTopicPair(topicName);
} else {
return new Pair<>(topicName, 0);
}
}
/**
* 处理 $retain topic注意时间的三个含义
*
* <p>
* -1 表示topic有问题需要丢弃消息
* 0 表示使用原 topic
* gt 0表示保留消息存储时间
* </p>
*
* @param topicName topicName
* @return Pair
*/
private static Pair<String, Integer> getRetainTopicPair(String topicName) {
// $retain/ 的长度
int timeIndexBegin = 8;
int nextLayer = topicName.indexOf(MqttCodecUtil.TOPIC_LAYER, timeIndexBegin);
if (nextLayer == -1) {
return new Pair<>(topicName, -1);
}
int time;
try {
time = Integer.parseInt(topicName.substring(timeIndexBegin, nextLayer));
} catch (NumberFormatException e) {
time = -1;
}
String retainTopic = topicName.substring(nextLayer + 1);
if (retainTopic.isEmpty()) {
return new Pair<>(topicName, -1);
} else {
return new Pair<>(retainTopic, time);
}
}
/**
* 判断 topicFilter topicName 是否匹配
*
* @param topicFilter topicFilter
* @param topicName topicName
* @return 是否匹配
*/
public static boolean match(String topicFilter, String topicName) {
char[] topicFilterChars = topicFilter.toCharArray();
char[] topicNameChars = topicName.toCharArray();
int topicFilterLength = topicFilterChars.length;
int topicNameLength = topicNameChars.length;
int topicFilterIdxEnd = topicFilterLength - 1;
int topicNameIdxEnd = topicNameLength - 1;
char ch;
// 是否进入 + 号层级通配符
boolean inLayerWildcard = false;
int wildcardCharLen = 0;
topicFilterLoop:
for (int i = 0; i < topicFilterLength; i++) {
ch = topicFilterChars[i];
if (ch == MqttCodecUtil.TOPIC_WILDCARDS_MORE) {
// 校验: # 通配符只能在最后一位
if (i < topicFilterIdxEnd) {
throw new IllegalArgumentException("Mqtt subscribe topicFilter illegal:" + topicFilter);
}
return true;
} else if (ch == MqttCodecUtil.TOPIC_WILDCARDS_ONE) {
// 校验: 单独 + 是允许的,判断 + 号前一位是否为 /,如果有后一位也必须为 /
if ((i > 0 && topicFilterChars[i - 1] != MqttCodecUtil.TOPIC_LAYER) || (i < topicFilterIdxEnd && topicFilterChars[i + 1] != MqttCodecUtil.TOPIC_LAYER)) {
throw new IllegalArgumentException("Mqtt subscribe topicFilter illegal:" + topicFilter);
}
// 如果 + 是最后一位,判断 topicName 中是否还存在层级 /
// topicName index
int topicNameIdx = i + wildcardCharLen;
if (i == topicFilterIdxEnd && topicNameLength > topicNameIdx) {
for (int j = topicNameIdx; j < topicNameLength; j++) {
if (topicNameChars[j] == MqttCodecUtil.TOPIC_LAYER) {
return false;
}
}
return true;
}
inLayerWildcard = true;
} else if (ch == MqttCodecUtil.TOPIC_LAYER) {
if (inLayerWildcard) {
inLayerWildcard = false;
}
// 预读下一位,如果是 #,并且 topicName 位数已经不足
int next = i + 1;
if ((topicFilterLength > next) && topicFilterChars[next] == MqttCodecUtil.TOPIC_WILDCARDS_MORE && topicNameLength < next) {
return true;
}
}
// topicName 长度不够了
if (topicNameIdxEnd < i) {
return false;
}
// 进入通配符
if (inLayerWildcard) {
for (int j = i + wildcardCharLen; j < topicNameLength; j++) {
if (topicNameChars[j] == MqttCodecUtil.TOPIC_LAYER) {
wildcardCharLen--;
continue topicFilterLoop;
} else {
wildcardCharLen++;
}
}
}
// topicName index
int topicNameIdx = i + wildcardCharLen;
// topic 已经完成topicName 还有数据
if (topicNameIdx > topicNameIdxEnd) {
return false;
}
if (ch != topicNameChars[topicNameIdx]) {
return false;
}
}
// 判断 topicName 是否还有数据
return topicFilterLength + wildcardCharLen + 1 > topicNameLength;
}
/**
* 获取处理完成之后的 topic需要考虑 test/${abc}123 也要替换成 test/+ 而非 test/+123
*
* @param topicTemplate topic 模板
* @return 获取处理完成之后的 topic
*/
public static String getTopicFilter(String topicTemplate) {
// 替换 ${name} 为 +
StringTokenizer tokenizer = new StringTokenizer(topicTemplate, TOPIC_LAYER, true);
String token;
StringBuilder topicFilterBuilder = new StringBuilder(topicTemplate.length());
while (tokenizer.hasMoreTokens()) {
token = tokenizer.nextToken();
if (TOPIC_LAYER.equals(token)) {
topicFilterBuilder.append(token);
} else if (hasVariable(token)) {
topicFilterBuilder.append(MqttCodecUtil.TOPIC_WILDCARDS_ONE);
} else {
topicFilterBuilder.append(token);
}
}
return topicFilterBuilder.toString();
}
/**
* 判断是否含有 ${x} 这样的变量
*
* @param input input
* @return 是否含有变量
*/
public static boolean hasVariable(String input) {
if (StrUtil.isBlank(input)) {
return false;
}
int startIndex = input.indexOf("${");
// 检查是否存在 "${"
if (startIndex == -1) {
return false;
}
int endIndex = input.indexOf('}', startIndex);
// 检查是否同时存在 "${" 和 "}",并且 "}" 在 "${" 之后
return endIndex != -1 && endIndex > startIndex + 2;
}
/**
* 解析 topic 中的变量,变量的格式为 ${x}x 为 payload 中的字段名
*
* @param topicTemplate topic 模板
* @param payload payload
* @return 解析后的 topic
*/
public static String resolveTopic(String topicTemplate, Object payload) {
if (payload == null) {
return topicTemplate;
}
// 替换变量
StringBuilder sb = new StringBuilder((int) (topicTemplate.length() * 1.5));
int cursor = 0;
for (int start, end; (start = topicTemplate.indexOf("${", cursor)) != -1 && (end = topicTemplate.indexOf('}', start)) != -1; ) {
sb.append(topicTemplate, cursor, start);
String fieldName = topicTemplate.substring(start + 2, end);
Object value = getFieldValue(payload, fieldName);
sb.append(value == null ? "" : value);
cursor = end + 1;
}
if (cursor == 0) {
return topicTemplate;
} else {
sb.append(topicTemplate.substring(cursor));
return sb.toString();
}
}
/**
* 获取字段值
*
* @param obj obj
* @param fieldName fieldName
* @return fieldValue
*/
public static Object getFieldValue(Object obj, String fieldName) {
try {
Field field = obj.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(obj);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to resolve field: " + fieldName + " from payload object", e);
}
}
/**
* 以 / 切分 topic如果以 / 开头和 / 结尾会多一级,比 split 性能要好
*
* @param topic topic
* @return part 数组
*/
public static String[] getTopicParts(String topic) {
// 大部分 topic 层级都在 10 以内
List<String> tokenList = new ArrayList<>(10);
char[] topicChars = topic.toCharArray();
int topicLength = topicChars.length;
int topicIdxEnd = topicLength - 1;
char ch;
// 前一个位置
int prev = 0;
for (int i = 0; i < topicLength; i++) {
ch = topicChars[i];
if (MqttCodecUtil.TOPIC_LAYER == ch) {
// 如果 / 为起始和最后的位置,添加 / 进 topic part
if (i == 0) {
tokenList.add(TOPIC_LAYER);
prev++;
} else {
tokenList.add(new String(topicChars, prev, i - prev));
prev = i;
prev++;
if (i == topicIdxEnd) {
tokenList.add(TOPIC_LAYER);
}
}
} else {
if (i == topicIdxEnd) {
tokenList.add(new String(topicChars, prev, topicLength - prev));
}
}
}
return tokenList.toArray(new String[0]);
}
/**
* 解析 topic 模板中的变量,不匹配时返回空 Map
*
* <p>
* 例如 $SYS/brokers/${node}/clients/${clientid}/disconnected 中提取 node 和 clientid
* </p>
*
* @param topicTemplate topicTemplate
* @param topic topic
* @return 获取变量值
*/
public static Map<String, String> getTopicVars(String topicTemplate, String topic) {
StrTemplateParser templateParser = new StrTemplateParser(topicTemplate);
return templateParser.getVariables(topic);
}
}

View File

@@ -0,0 +1,73 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.util.timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.timer.Timer;
import org.tio.utils.timer.TimerTask;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* ack TimerTask
*
* @author L.cm
*/
public class AckTimerTask extends TimerTask {
private static final Logger log = LoggerFactory.getLogger(AckTimerTask.class);
/**
* task
*/
private final Timer timer;
/**
* 需要执行的函数
*/
private final Runnable command;
/**
* qos 1~2 重试次数
*/
private final int maxRetryCount;
/**
* 当前自行的次数,默认从第二次开始,因为进重试前已经执行过一次。
*/
private int count = 1;
public AckTimerTask(Timer timer, Runnable command, int maxRetryCount, int retryIntervalSecs) {
super(TimeUnit.SECONDS.toMillis(retryIntervalSecs));
this.timer = Objects.requireNonNull(timer, "Timer is null.");
this.command = Objects.requireNonNull(command, "Runnable command is null.");
this.maxRetryCount = maxRetryCount;
}
@Override
public void run() {
if (++count <= maxRetryCount + 1) {
// 收先添加任务,保证后续执行
timer.add(this);
log.debug("Mqtt ack task retry running count{}.", count);
try {
command.run();
} catch (Exception e) {
log.error("Mqtt ack task error ", e);
}
}
}
}

View File

@@ -0,0 +1,11 @@
open module org.dromara.mica.mqtt.common {
requires transitive net.dreamlu.mica.net.core;
requires transitive org.dromara.mica.mqtt.codec;
exports org.dromara.mica.mqtt.core.annotation;
exports org.dromara.mica.mqtt.core.common;
exports org.dromara.mica.mqtt.core.deserialize;
exports org.dromara.mica.mqtt.core.function;
exports org.dromara.mica.mqtt.core.serializer;
exports org.dromara.mica.mqtt.core.util;
exports org.dromara.mica.mqtt.core.util.timer;
}

View File

@@ -0,0 +1,42 @@
package org.dromara.mica.mqtt.core.common;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
* TopicFilterType 测试
*
* @author L.cm
*/
class TopicFilterTypeTest {
@Test
void test1() {
String topic1 = "$queue/123";
TopicFilterType type1 = TopicFilterType.getType(topic1);
Assertions.assertEquals(TopicFilterType.QUEUE, type1);
Assertions.assertTrue(type1.match(topic1, "123"));
Assertions.assertFalse(type1.match(topic1, "/123"));
String topic2 = "$share/test/123";
TopicFilterType type2 = TopicFilterType.getType(topic2);
String groupName = TopicFilterType.getShareGroupName(topic2);
Assertions.assertEquals("test", groupName);
Assertions.assertEquals(TopicFilterType.SHARE, type2);
Assertions.assertTrue(type2.match(topic2, "123"));
Assertions.assertFalse(type2.match(topic2, "/123"));
String topic3 = "$queue//123";
TopicFilterType type3 = TopicFilterType.getType(topic3);
Assertions.assertEquals(TopicFilterType.QUEUE, type3);
Assertions.assertFalse(type3.match(topic3, "123"));
Assertions.assertTrue(type3.match(topic3, "/123"));
String topic4 = "$share/test//123";
TopicFilterType type4 = TopicFilterType.getType(topic4);
Assertions.assertEquals(TopicFilterType.SHARE, type4);
Assertions.assertFalse(type4.match(topic4, "123"));
Assertions.assertTrue(type4.match(topic4, "/123"));
}
}

View File

@@ -0,0 +1,25 @@
package org.dromara.mica.mqtt.core.timer;
import org.dromara.mica.mqtt.core.util.timer.AckTimerTask;
import org.tio.utils.timer.SystemTimer;
import org.tio.utils.timer.TimingWheelThread;
import java.util.concurrent.TimeUnit;
public class SystemTimerTest {
public static void main(String[] args) throws InterruptedException {
SystemTimer systemTimer = new SystemTimer("timer");
TimingWheelThread timingWheelThread = new TimingWheelThread(systemTimer);
timingWheelThread.start();
System.out.println(System.currentTimeMillis());
systemTimer.add(new AckTimerTask(systemTimer, () -> {
System.out.println("hello!");
}, 5, 5));
System.out.println(System.nanoTime());
TimeUnit.MINUTES.sleep(10L);
}
}

View File

@@ -0,0 +1,115 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.udp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Node;
import org.tio.core.udp.UdpPacket;
import org.tio.core.udp.UdpServer;
import org.tio.core.udp.task.UdpHandlerRunnable;
import org.tio.core.udp.task.UdpSendRunnable;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.concurrent.LinkedBlockingQueue;
public class UdpCluster {
private static final Logger log = LoggerFactory.getLogger(UdpServer.class);
private final LinkedBlockingQueue<UdpPacket> handlerQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<DatagramPacket> sendQueue = new LinkedBlockingQueue<>();
private volatile boolean isStopped = false;
private final MulticastSocket multicastSocket;
private final byte[] readBuf;
private final UdpHandlerRunnable udpHandlerRunnable;
private final UdpSendRunnable udpSendRunnable;
private final UdpClusterConfig clusterConfig;
private final InetAddress group;
private final int port;
public UdpCluster(UdpClusterConfig clusterConfig) throws IOException {
this.clusterConfig = clusterConfig;
this.port = this.clusterConfig.getServerNode().getPort();
this.multicastSocket = new MulticastSocket(port);
this.readBuf = new byte[this.clusterConfig.getReadBufferSize()];
this.udpHandlerRunnable = new UdpHandlerRunnable(this.clusterConfig.getUdpHandler(), handlerQueue, multicastSocket);
this.udpSendRunnable = new UdpSendRunnable(sendQueue, this.clusterConfig, multicastSocket);
this.multicastSocket.setSoTimeout(this.clusterConfig.getTimeout());
this.group = InetAddress.getByName(this.clusterConfig.getServerNode().getIp());
this.multicastSocket.joinGroup(this.group);
}
public void send(byte[] data) {
DatagramPacket datagramPacket = new DatagramPacket(data, data.length, this.group, this.port);
this.sendQueue.add(datagramPacket);
}
public void start() {
startListen();
startHandler();
startSend();
}
private void startHandler() {
Thread thread = new Thread(udpHandlerRunnable, "tio-udp-server-handler");
thread.setDaemon(false);
thread.start();
}
private void startListen() {
Runnable runnable = () -> {
String startLog = "started tio udp server: " + clusterConfig.getServerNode();
if (log.isInfoEnabled()) {
log.info(startLog);
}
while (!isStopped) {
try {
DatagramPacket datagramPacket = new DatagramPacket(readBuf, readBuf.length);
multicastSocket.receive(datagramPacket);
byte[] data = new byte[datagramPacket.getLength()];
System.arraycopy(readBuf, 0, data, 0, datagramPacket.getLength());
String remoteIp = datagramPacket.getAddress().getHostAddress();
int remotePort = datagramPacket.getPort();
Node remote = new Node(remoteIp, remotePort);
UdpPacket udpPacket = new UdpPacket(data, remote);
handlerQueue.put(udpPacket);
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
};
Thread thread = new Thread(runnable, "tio-udp-server-listen");
thread.setDaemon(false);
thread.start();
}
private void startSend() {
Thread thread = new Thread(udpSendRunnable, "tio-udp-client-send");
thread.setDaemon(false);
thread.start();
}
public void stop() {
this.isStopped = true;
this.multicastSocket.close();
this.udpHandlerRunnable.stop();
}
}

View File

@@ -0,0 +1,32 @@
package org.dromara.mica.mqtt.core.udp;
import org.tio.core.Node;
import org.tio.core.udp.UdpConf;
import org.tio.core.udp.intf.UdpHandler;
public class UdpClusterConfig extends UdpConf {
private UdpHandler udpHandler;
private int readBufferSize = 1024 * 1024;
public UdpClusterConfig(String ip, int port, UdpHandler udpHandler, int timeout) {
super(timeout);
this.setUdpHandler(udpHandler);
this.setServerNode(new Node(ip, port));
}
public int getReadBufferSize() {
return readBufferSize;
}
public UdpHandler getUdpHandler() {
return udpHandler;
}
public void setReadBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
}
public void setUdpHandler(UdpHandler udpHandler) {
this.udpHandler = udpHandler;
}
}

View File

@@ -0,0 +1,29 @@
package org.dromara.mica.mqtt.core.udp;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
/**
* @author L.cm
*/
public class UdpClusterTest1 {
public static void main(String[] args) throws IOException {
UdpTestHandler udpTestHandler = new UdpTestHandler();
UdpClusterConfig udpServerConf = new UdpClusterConfig("224.0.0.1", 12345, udpTestHandler, 5000);
UdpCluster udpCluster = new UdpCluster(udpServerConf);
udpCluster.start();
byte[] buffer = "hello1".getBytes();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
udpCluster.send(buffer);
}
};
Timer timer = new Timer();
timer.schedule(timerTask, 1000, 1000);
}
}

View File

@@ -0,0 +1,31 @@
package org.dromara.mica.mqtt.core.udp;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
/**
* @author L.cm
*/
public class UdpClusterTest2 {
public static void main(String[] args) throws IOException {
UdpTestHandler udpTestHandler = new UdpTestHandler();
UdpClusterConfig udpServerConf = new UdpClusterConfig("224.0.0.1", 12345, udpTestHandler, 5000);
UdpCluster udpCluster = new UdpCluster(udpServerConf);
udpCluster.start();
byte[] buffer = "hello2".getBytes();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
udpCluster.send(buffer);
}
};
Timer timer = new Timer();
timer.schedule(timerTask, 1000, 1000);
}
}

View File

@@ -0,0 +1,35 @@
package org.dromara.mica.mqtt.core.udp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Node;
import org.tio.core.udp.UdpPacket;
import org.tio.core.udp.intf.UdpHandler;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
/**
* @author tanyaowu
*/
public class UdpTestHandler implements UdpHandler {
private static final Logger log = LoggerFactory.getLogger(UdpTestHandler.class);
public UdpTestHandler() {
}
@Override
public void handler(UdpPacket udpPacket, DatagramSocket datagramSocket) {
byte[] data = udpPacket.getData();
String msg = new String(data);
Node remote = udpPacket.getRemote();
System.out.printf("收到来自%s的消息:【%s】%n", remote, msg);
DatagramPacket datagramPacket = new DatagramPacket(data, data.length);
try {
datagramSocket.send(datagramPacket);
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
}

View File

@@ -0,0 +1,40 @@
package org.dromara.mica.mqtt.core.util;
public class TestBean {
private String name;
private String node;
private String clientId;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getNode() {
return node;
}
public void setNode(String node) {
this.node = node;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
@Override
public String toString() {
return "TestBean{" +
"name='" + name + '\'' +
", node='" + node + '\'' +
", clientId='" + clientId + '\'' +
'}';
}
}

View File

@@ -0,0 +1,159 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.util;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.tio.utils.mica.Pair;
import java.util.Map;
/**
* TopicUtil 测试
*
* @author L.cm
*/
class TopicUtilTest {
@Test
void test() {
// gitee issues #I56BTC /iot/test/# 无法匹配到 /iot/test 和 /iot/test/
Assertions.assertFalse(TopicUtil.match("+", "/iot/test"));
Assertions.assertFalse(TopicUtil.match("+", "iot/test"));
Assertions.assertFalse(TopicUtil.match("+", "/iot/test"));
Assertions.assertFalse(TopicUtil.match("+", "/iot"));
Assertions.assertFalse(TopicUtil.match("+/test", "/iot/test"));
Assertions.assertFalse(TopicUtil.match("/iot/test/+/", "/iot/test/123"));
Assertions.assertTrue(TopicUtil.match("/iot/test/+", "/iot/test/123"));
Assertions.assertFalse(TopicUtil.match("/iot/test/+", "/iot/test/123/"));
Assertions.assertTrue(TopicUtil.match("/iot/+/test", "/iot/abc/test"));
Assertions.assertFalse(TopicUtil.match("/iot/+/test", "/iot/abc/test/"));
Assertions.assertFalse(TopicUtil.match("/iot/+/test", "/iot/abc/test1"));
Assertions.assertTrue(TopicUtil.match("/iot/+/+/test", "/iot/abc/123/test"));
Assertions.assertFalse(TopicUtil.match("/iot/+/+/test", "/iot/abc/123/test1"));
Assertions.assertFalse(TopicUtil.match("/iot/+/+/test", "/iot/abc/123/test/"));
Assertions.assertTrue(TopicUtil.match("/iot/+/+/+", "/iot/abc/123/test"));
Assertions.assertFalse(TopicUtil.match("/iot/+/+/+", "/iot/abc/123/test/"));
Assertions.assertTrue(TopicUtil.match("/iot/+/test", "/iot/a/test"));
Assertions.assertTrue(TopicUtil.match("/iot/+/test", "/iot/a/test"));
Assertions.assertFalse(TopicUtil.match("/iot/+/+/+", "/iot/a//test/"));
Assertions.assertFalse(TopicUtil.match("/iot/+/+/+", "/iot/a/b/c/"));
Assertions.assertFalse(TopicUtil.match("/iot/+/+/+", "/iot/a"));
Assertions.assertTrue(TopicUtil.match("#", "/iot/test"));
Assertions.assertTrue(TopicUtil.match("/iot/test/#", "/iot/test"));
Assertions.assertTrue(TopicUtil.match("/iot/test/#", "/iot/test/"));
Assertions.assertTrue(TopicUtil.match("/iot/test/#", "/iot/test/1"));
Assertions.assertTrue(TopicUtil.match("/iot/test/#", "/iot/test/123123/12312"));
Assertions.assertTrue(TopicUtil.match("/iot/test/123", "/iot/test/123"));
}
@Test
void test2() {
String s1 = "$SYS/brokers/${node}/clients/${clientId}/disconnected";
String s2 = "$SYS/brokers/+/clients/+/disconnected";
String s3 = TopicUtil.getTopicFilter(s1);
Assertions.assertEquals(s2, s3);
s1 = "$SYS/brokers/${node}/clients/${clientId}abc/disconnected";
s3 = TopicUtil.getTopicFilter(s1);
Assertions.assertEquals(s2, s3);
s1 = "$SYS/brokers/${node}/clients/${clientId}abc${x}/disconnected";
s3 = TopicUtil.getTopicFilter(s1);
Assertions.assertEquals(s2, s3);
s1 = "$SYS/brokers/${node}/clients/abc${clientId}abc${x}123/disconnected";
s3 = TopicUtil.getTopicFilter(s1);
Assertions.assertEquals(s2, s3);
}
@Test
void test3() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
TopicUtil.validateTopicFilter("/iot/test/+a");
});
Assertions.assertThrows(IllegalArgumentException.class, () -> {
TopicUtil.validateTopicFilter("/iot/test/a+");
});
Assertions.assertThrows(IllegalArgumentException.class, () -> {
TopicUtil.validateTopicFilter("/iot/test/+a/");
});
Assertions.assertThrows(IllegalArgumentException.class, () -> {
TopicUtil.validateTopicFilter("/iot/test/a+/");
});
Assertions.assertDoesNotThrow(() -> TopicUtil.validateTopicFilter("+"));
Assertions.assertDoesNotThrow(() -> TopicUtil.validateTopicFilter("/iot/test/+"));
Assertions.assertDoesNotThrow(() -> TopicUtil.validateTopicFilter("/iot/test/+/"));
}
@Test
void test4() {
String test1 = "Hello, ${name}!";
String test2 = "No variable here";
String test3 = "Invalid ${variable";
String test4 = "${name}!";
Assertions.assertTrue(TopicUtil.hasVariable(test1));
Assertions.assertFalse(TopicUtil.hasVariable(test2));
Assertions.assertFalse(TopicUtil.hasVariable(test3));
Assertions.assertTrue(TopicUtil.hasVariable(test4));
}
@Test
void testResolveTopic() {
String message = "Hello, ${name}!";
TestBean testBean = new TestBean();
testBean.setName("张三");
String m1 = TopicUtil.resolveTopic(message, testBean);
Assertions.assertEquals("Hello, 张三!", m1);
String s1 = "$SYS/brokers/${node}/clients/${clientId}/disconnected";
testBean.setNode("node1");
testBean.setClientId("abc123");
String m2 = TopicUtil.resolveTopic(s1, testBean);
Assertions.assertEquals("$SYS/brokers/node1/clients/abc123/disconnected", m2);
String m3 = TopicUtil.resolveTopic("/iot/test/123", testBean);
Assertions.assertEquals("/iot/test/123", m3);
}
@Test
void testRetainTopicName() {
Pair<String, Integer> pair1 = TopicUtil.retainTopicName("$retain/15/x/y");
Assertions.assertEquals("x/y", pair1.getLeft());
Pair<String, Integer> pair2 = TopicUtil.retainTopicName("$retain/15//x/y");
Assertions.assertEquals("/x/y", pair2.getLeft());
Pair<String, Integer> pair3 = TopicUtil.retainTopicName("$retain/15/");
Assertions.assertEquals(-1, pair3.getRight());
Pair<String, Integer> pair4 = TopicUtil.retainTopicName("$retain/");
Assertions.assertEquals(-1, pair4.getRight());
}
@Test
void testGetTopicVars() {
// 测试匹配
String s1 = "$SYS/brokers/${node}/clients/${clientId}/disconnected";
String s2 = "$SYS/brokers/node1/clients/test1/disconnected";
Map<String, String> vars = TopicUtil.getTopicVars(s1, s2);
Assertions.assertEquals("node1", vars.get("node"));
Assertions.assertEquals("test1", vars.get("clientId"));
// 测试不匹配
String s3 = "$SYS/brokers/${node}/clients/${clientId}/disconnected";
String s4 = "abc/brokers/node1/clients/test1/disconnected";
Map<String, String> vars1 = TopicUtil.getTopicVars(s3, s4);
// 不匹配会返回空
Assertions.assertTrue(vars1.isEmpty());
}
}