first commit

This commit is contained in:
zc
2026-02-05 18:01:33 +08:00
commit 086d658c62
349 changed files with 36214 additions and 0 deletions

View File

@@ -0,0 +1,419 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.MqttMessageType;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.codes.MqttConnectReasonCode;
import org.dromara.mica.mqtt.codec.message.*;
import org.dromara.mica.mqtt.codec.message.builder.MqttTopicSubscription;
import org.dromara.mica.mqtt.codec.message.header.MqttConnAckVariableHeader;
import org.dromara.mica.mqtt.codec.message.header.MqttFixedHeader;
import org.dromara.mica.mqtt.codec.message.header.MqttMessageIdVariableHeader;
import org.dromara.mica.mqtt.codec.message.header.MqttPublishVariableHeader;
import org.dromara.mica.mqtt.codec.message.payload.MqttSubAckPayload;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.common.MqttPendingQos2Publish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.utils.hutool.CollUtil;
import org.tio.utils.timer.TimerTaskService;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
/**
* 默认的 mqtt 消息处理器
*
* @author L.cm
*/
public class DefaultMqttClientProcessor implements IMqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
private final MqttClientCreator mqttClientCreator;
private final IMqttClientSession clientSession;
private final IMqttClientConnectListener connectListener;
private final IMqttClientGlobalMessageListener globalMessageListener;
private final TimerTaskService taskService;
private final ExecutorService executor;
public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) {
this.mqttClientCreator = mqttClientCreator;
this.clientSession = mqttClientCreator.getClientSession();
this.connectListener = mqttClientCreator.getConnectListener();
this.globalMessageListener = mqttClientCreator.getGlobalMessageListener();
this.taskService = mqttClientCreator.getTaskService();
this.executor = mqttClientCreator.getMqttExecutor();
}
@Override
public void processConAck(ChannelContext context, MqttConnAckMessage message) {
MqttConnAckVariableHeader connAckVariableHeader = message.variableHeader();
MqttConnectReasonCode returnCode = connAckVariableHeader.connectReturnCode();
switch (returnCode) {
case CONNECTION_ACCEPTED:
// 1. 连接成功的日志
context.setAccepted(true);
if (logger.isInfoEnabled()) {
Node node = context.getServerNode();
logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", context.getId(), node.getIp(), node.getPort());
}
// 2. 发布连接通知
publishConnectEvent(context);
// 3. 发送订阅,不管服务端是否存在 session 都发送
reSendSubscription(context);
break;
case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
case CONNECTION_REFUSED_NOT_AUTHORIZED:
case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
default:
String remark = "MqttClient connect error error ReturnCode:" + returnCode;
Tio.close(context, remark);
break;
}
}
/**
* 发布连接成功事件
*
* @param context ChannelContext
*/
private void publishConnectEvent(ChannelContext context) {
// 先判断是否配置监听
if (connectListener == null) {
return;
}
// 触发客户端连接事件
executor.submit(() -> {
try {
connectListener.onConnected(context, context.isReconnect());
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
});
}
/**
* 批量重新订阅
*
* @param context ChannelContext
*/
private void reSendSubscription(ChannelContext context) {
// 0. 全局订阅
Set<MqttTopicSubscription> globalSubscribe = mqttClientCreator.getGlobalSubscribe();
if (globalSubscribe != null && !globalSubscribe.isEmpty()) {
globalReSendSubscription(context, globalSubscribe);
}
List<MqttClientSubscription> reSubscriptionList = clientSession.getSubscriptions();
// 1. 判断是否为空
if (reSubscriptionList.isEmpty()) {
return;
}
// 2. 订阅的数量
int subscribedSize = reSubscriptionList.size();
// 重新订阅批次大小
int reSubscribeBatchSize = mqttClientCreator.getReSubscribeBatchSize();
if (subscribedSize <= reSubscribeBatchSize) {
reSendSubscription(context, reSubscriptionList);
} else {
List<List<MqttClientSubscription>> partitionList = CollUtil.partition(reSubscriptionList, reSubscribeBatchSize);
for (List<MqttClientSubscription> partition : partitionList) {
reSendSubscription(context, partition);
}
}
}
/**
* 全局订阅,不需要存储 session
*
* @param context ChannelContext
* @param globalReSubscriptionList globalReSubscriptionList
*/
private void globalReSendSubscription(ChannelContext context, Set<MqttTopicSubscription> globalReSubscriptionList) {
int packetId = clientSession.getPacketId();
MqttSubscribeMessage message = MqttSubscribeMessage.builder()
.addSubscriptions(globalReSubscriptionList)
.messageId(packetId)
.build();
boolean result = Tio.send(context, message);
logger.info("MQTT globalReSubscriptionList:{} packetId:{} resubscribing result:{}", globalReSubscriptionList, packetId, result);
}
/**
* 批量重新订阅
*
* @param context ChannelContext
* @param reSubscriptionList reSubscriptionList
*/
private void reSendSubscription(ChannelContext context, List<MqttClientSubscription> reSubscriptionList) {
// 2. 批量重新订阅
List<MqttTopicSubscription> topicSubscriptionList = reSubscriptionList.stream()
.map(MqttClientSubscription::toTopicSubscription)
.collect(Collectors.toList());
int packetId = clientSession.getPacketId();
MqttSubscribeMessage message = MqttSubscribeMessage.builder()
.addSubscriptions(topicSubscriptionList)
.messageId(packetId)
.build();
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(reSubscriptionList, message);
pendingSubscription.startRetransmitTimer(taskService, context);
clientSession.addPaddingSubscribe(packetId, pendingSubscription);
// gitee issues #IB72L6 先添加并启动重试,再发送订阅
boolean result = Tio.send(context, message);
logger.info("MQTT subscriptionList:{} packetId:{} resubscribing result:{}", reSubscriptionList, packetId, result);
}
@Override
public void processSubAck(ChannelContext context, MqttSubAckMessage message) {
int packetId = message.variableHeader().messageId();
logger.debug("MqttClient SubAck packetId:{}", packetId);
MqttPendingSubscription paddingSubscribe = clientSession.getPaddingSubscribe(packetId);
if (paddingSubscribe == null) {
return;
}
List<MqttClientSubscription> subscriptionList = paddingSubscribe.getSubscriptionList();
MqttSubAckPayload subAckPayload = message.payload();
List<Short> reasonCodeList = subAckPayload.reasonCodes();
// reasonCodes 为空
if (reasonCodeList.isEmpty()) {
logger.error("MqttClient subscriptionList:{} subscribe failed reasonCodes is empty packetId:{}", subscriptionList, packetId);
return;
}
int reasonCodeListSize = reasonCodeList.size();
// 找出订阅成功的数据
List<MqttClientSubscription> subscribedList = new ArrayList<>();
// MQTT 3.1.1 协议未明确规定批量订阅的返回格式,批量可能只返回一个 reasonCode
if (reasonCodeListSize == 1) {
Short reasonCode = reasonCodeList.get(0);
// reasonCodes 范围0 ~ 2
if (reasonCode != null && reasonCode >= 0 && reasonCode <= 2) {
subscribedList.addAll(subscriptionList);
}
} else {
// MQTT 5.0 要求 Broker 对批量订阅中的每个主题返回独立的 reason code原因码与订阅请求中的主题顺序一一对应
for (int i = 0; i < subscriptionList.size(); i++) {
MqttClientSubscription subscription = subscriptionList.get(i);
String topicFilter = subscription.getTopicFilter();
Short reasonCode = reasonCodeList.get(i);
// reasonCodes 范围
if (reasonCode == null || reasonCode < 0 || reasonCode > 2) {
logger.error("MqttClient topicFilter:{} subscribe failed reasonCodes:{} packetId:{}", topicFilter, reasonCode, packetId);
} else {
subscribedList.add(subscription);
}
}
}
// 判断订阅结果,对于没有订阅成功的,使其触发重试
if (subscribedList.isEmpty()) {
logger.error("MqttClient subscriptionList:{} subscribe failed packetId:{}", subscriptionList, packetId);
return;
} else {
logger.info("MQTT subscribed:{} successfully packetId:{}", subscribedList, packetId);
}
paddingSubscribe.onSubAckReceived();
clientSession.removePaddingSubscribe(packetId);
clientSession.addSubscriptionList(subscribedList);
// 触发已经监听的事件
subscribedList.forEach(clientSubscription -> {
String topicFilter = clientSubscription.getTopicFilter();
MqttQoS mqttQoS = clientSubscription.getMqttQoS();
IMqttClientMessageListener subscriptionListener = clientSubscription.getListener();
executor.execute(() -> {
try {
subscriptionListener.onSubscribed(context, topicFilter, mqttQoS, message);
} catch (Throwable e) {
logger.error("MQTT topicFilter:{} subscribed onSubscribed event error.", topicFilter, e);
}
});
});
}
@Override
public void processPublish(ChannelContext context, MqttPublishMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
String topicName = variableHeader.topicName();
MqttQoS mqttQoS = mqttFixedHeader.qosLevel();
int packetId = variableHeader.packetId();
logger.debug("MqttClient received publish topic:{} qoS:{} packetId:{}", topicName, mqttQoS, packetId);
switch (mqttFixedHeader.qosLevel()) {
case QOS0:
invokeListenerForPublish(context, topicName, message);
break;
case QOS1:
invokeListenerForPublish(context, topicName, message);
if (packetId != -1) {
MqttMessage messageAck = MqttPubAckMessage.builder()
.packetId(packetId)
.build();
boolean resultPubAck = Tio.send(context, messageAck);
logger.debug("Publish - PubAck send topicName:{} mqttQoS:{} packetId:{} result:{}", topicName, mqttQoS, packetId, resultPubAck);
}
break;
case QOS2:
if (packetId != -1) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.QOS0, false, 0);
MqttMessage pubRecMessage = new MqttMessage(fixedHeader, MqttMessageIdVariableHeader.from(packetId));
MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
clientSession.addPendingQos2Publish(packetId, pendingQos2Publish);
pendingQos2Publish.startPubRecRetransmitTimer(taskService, context);
// 先启动重试再发布消息
boolean resultPubRec = Tio.send(context, pubRecMessage);
logger.debug("Publish - PubRec send topicName:{} mqttQoS:{} packetId:{} result:{}", topicName, mqttQoS, packetId, resultPubRec);
}
break;
case FAILURE:
default:
}
}
@Override
public void processUnSubAck(MqttUnSubAckMessage message) {
int packetId = message.variableHeader().messageId();
logger.debug("MqttClient UnSubAck packetId:{}", packetId);
MqttPendingUnSubscription pendingUnSubscription = clientSession.getPaddingUnSubscribe(packetId);
if (pendingUnSubscription == null) {
return;
}
List<String> unSubscriptionTopics = pendingUnSubscription.getTopics();
logger.info("MQTT Topic:{} successfully unSubscribed packetId:{}", unSubscriptionTopics, packetId);
pendingUnSubscription.onUnSubAckReceived();
clientSession.removePaddingUnSubscribe(packetId);
clientSession.removeSubscriptions(unSubscriptionTopics);
}
@Override
public void processPubAck(MqttPubAckMessage message) {
int packetId = message.variableHeader().messageId();
logger.debug("MqttClient PubAck packetId:{}", packetId);
MqttPendingPublish pendingPublish = clientSession.getPendingPublish(packetId);
if (pendingPublish == null) {
return;
}
if (logger.isInfoEnabled()) {
String topicName = pendingPublish.getMessage().variableHeader().topicName();
logger.info("MQTT Topic:{} successfully PubAck packetId:{}", topicName, packetId);
}
pendingPublish.onPubAckReceived();
clientSession.removePendingPublish(packetId);
}
@Override
public void processPubRec(ChannelContext context, MqttMessage message) {
int packetId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
logger.debug("MqttClient PubRec packetId:{}", packetId);
MqttPendingPublish pendingPublish = clientSession.getPendingPublish(packetId);
if (pendingPublish == null) {
return;
}
pendingPublish.onPubAckReceived();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.QOS1, false, 0);
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
MqttMessage pubRelMessage = new MqttMessage(fixedHeader, variableHeader);
pendingPublish.setPubRelMessage(pubRelMessage);
pendingPublish.startPubRelRetransmissionTimer(taskService, context);
// 发送消息
boolean result = Tio.send(context, pubRelMessage);
logger.debug("Publish - PubRec send packetId:{} result:{}", packetId, result);
}
@Override
public void processPubRel(ChannelContext context, MqttMessage message) {
int packetId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
logger.debug("MqttClient PubRel packetId:{}", packetId);
MqttPendingQos2Publish pendingQos2Publish = clientSession.getPendingQos2Publish(packetId);
if (pendingQos2Publish != null) {
MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
String topicName = incomingPublish.variableHeader().topicName();
this.invokeListenerForPublish(context, topicName, incomingPublish);
pendingQos2Publish.onPubRelReceived();
clientSession.removePendingQos2Publish(packetId);
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.QOS0, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(packetId);
// 发送消息
boolean result = Tio.send(context, new MqttMessage(fixedHeader, variableHeader));
logger.debug("Publish - PubRel send packetId:{} result:{}", packetId, result);
}
@Override
public void processPubComp(MqttMessage message) {
int packetId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
MqttPendingPublish pendingPublish = clientSession.getPendingPublish(packetId);
if (pendingPublish == null) {
return;
}
if (logger.isInfoEnabled()) {
String topicName = pendingPublish.getMessage().variableHeader().topicName();
logger.info("MQTT Topic:{} successfully PubComp", topicName);
}
pendingPublish.onPubCompReceived();
clientSession.removePendingPublish(packetId);
}
/**
* 处理订阅的消息
*
* @param context ChannelContext
* @param topicName topicName
* @param message MqttPublishMessage
*/
private void invokeListenerForPublish(ChannelContext context, String topicName, MqttPublishMessage message) {
final byte[] payload = message.payload();
// 全局消息监听器
if (globalMessageListener != null) {
executor.submit(() -> {
try {
globalMessageListener.onMessage(context, topicName, message, payload);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
});
}
// topic 订阅监听
List<MqttClientSubscription> subscriptionList = clientSession.getMatchedSubscription(topicName);
if (subscriptionList.isEmpty()) {
if (globalMessageListener == null || mqttClientCreator.isDebug()) {
logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", topicName);
} else {
logger.debug("Mqtt message to accept topic:{} subscriptionList is empty.", topicName);
}
} else {
subscriptionList.forEach(subscription -> {
IMqttClientMessageListener listener = subscription.getListener();
executor.submit(() -> {
try {
listener.onMessage(context, topicName, message, payload);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
});
});
}
}
}

View File

@@ -0,0 +1,206 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.common.MqttPendingQos2Publish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.collection.IntObjectHashMap;
import org.tio.utils.collection.IntObjectMap;
import org.tio.utils.collection.MultiValueMap;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* 客户端 session 管理,包括 sub 和 pub
*
* @author L.cm
*/
public class DefaultMqttClientSession implements IMqttClientSession {
private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientSession.class);
/**
* packetId 递增生成器
*/
private final AtomicInteger packetIdGen = new AtomicInteger(1);
/**
* 订阅的数据承载
*/
private final MultiValueMap<String, MqttClientSubscription> subscriptions = new MultiValueMap<>(new ConcurrentHashMap<>());
private final IntObjectMap<MqttPendingSubscription> pendingSubscriptions = new IntObjectHashMap<>();
private final IntObjectMap<MqttPendingUnSubscription> pendingUnSubscriptions = new IntObjectHashMap<>();
private final IntObjectMap<MqttPendingPublish> pendingPublishData = new IntObjectHashMap<>();
private final IntObjectMap<MqttPendingQos2Publish> pendingQos2PublishData = new IntObjectHashMap<>();
@Override
public int getPacketId() {
return packetIdGen.getAndUpdate(current -> (current % 0xffff) == 0 ? 1 : current + 1);
}
@Override
public void addPaddingSubscribe(int messageId, MqttPendingSubscription pendingSubscription) {
pendingSubscriptions.put(messageId, pendingSubscription);
}
@Override
public MqttPendingSubscription getPaddingSubscribe(int messageId) {
return pendingSubscriptions.get(messageId);
}
@Override
public void removePaddingSubscribes(List<String> topicFilters) {
Set<Integer> needToRemove = new HashSet<>();
pendingSubscriptions.forEach((messageId, pendingSubscription) -> {
List<MqttClientSubscription> subscriptionList = pendingSubscription.getSubscriptionList();
if (subscriptionList != null) {
subscriptionList.removeIf(subscription -> topicFilters.contains(subscription.getTopicFilter()));
}
// 如果已经被删到为空
if (subscriptionList == null || subscriptionList.isEmpty()) {
// 停止线程
pendingSubscription.onSubAckReceived();
needToRemove.add(messageId);
}
});
// 清除 messageId 的过程订阅
needToRemove.forEach(pendingSubscriptions::remove);
}
@Override
public MqttPendingSubscription removePaddingSubscribe(int messageId) {
return pendingSubscriptions.remove(messageId);
}
@Override
public void addSubscription(MqttClientSubscription subscription) {
subscriptions.add(subscription.getTopicFilter(), subscription);
}
@Override
public boolean isSubscribed(MqttClientSubscription clientSubscription) {
// 1. 判断是否已经存在订阅关系
String topicFilter = clientSubscription.getTopicFilter();
Set<MqttClientSubscription> subscriptionSet = this.subscriptions.get(topicFilter);
if (subscriptionSet == null || subscriptionSet.isEmpty()) {
return false;
}
// 2. 存在时的逻辑
MqttQoS mqttQoS = clientSubscription.getMqttQoS();
IMqttClientMessageListener listener = clientSubscription.getListener();
for (MqttClientSubscription subscription : subscriptionSet) {
// 3. 已经存在订阅
if (clientSubscription.equals(subscription)) {
logger.error("MQTT Topic:{} mqttQoS:{} listener:{} duplicate subscription.", topicFilter, mqttQoS, listener);
return true;
}
MqttQoS subQos = subscription.getMqttQoS();
IMqttClientMessageListener subListener = subscription.getListener();
// 4. 如果已经存在更高或同级别 qos
if (subQos.value() >= mqttQoS.value()) {
// 5. 监听器不相同则直接添加
if (subListener != listener) {
subscriptions.add(topicFilter, clientSubscription);
logger.warn("MQTT Topic:{} mqttQoS:{} listener:{} has a higher level qos, added directly.", topicFilter, mqttQoS, listener);
} else {
logger.error("MQTT Topic:{} mqttQoS:{} listener:{} has a higher level qos, duplicate subscription.", topicFilter, mqttQoS, listener);
}
return true;
}
}
return false;
}
@Override
public List<MqttClientSubscription> getSubscriptions() {
List<MqttClientSubscription> subscriptionList = new ArrayList<>();
for (Set<MqttClientSubscription> mqttSubscriptions : subscriptions.values()) {
subscriptionList.addAll(mqttSubscriptions);
}
return Collections.unmodifiableList(subscriptionList);
}
@Override
public List<MqttClientSubscription> getMatchedSubscription(String topicName) {
return subscriptions.values().stream()
.flatMap(Collection::stream)
.filter(subscription -> subscription.matches(topicName))
.collect(Collectors.toList());
}
@Override
public void removeSubscriptions(List<String> topicFilters) {
topicFilters.forEach(subscriptions::remove);
}
@Override
public void addPaddingUnSubscribe(int messageId, MqttPendingUnSubscription pendingUnSubscription) {
pendingUnSubscriptions.put(messageId, pendingUnSubscription);
}
@Override
public MqttPendingUnSubscription getPaddingUnSubscribe(int messageId) {
return pendingUnSubscriptions.get(messageId);
}
@Override
public MqttPendingUnSubscription removePaddingUnSubscribe(int messageId) {
return pendingUnSubscriptions.remove(messageId);
}
@Override
public void addPendingPublish(int messageId, MqttPendingPublish pendingPublish) {
pendingPublishData.put(messageId, pendingPublish);
}
@Override
public MqttPendingPublish getPendingPublish(int messageId) {
return pendingPublishData.get(messageId);
}
@Override
public MqttPendingPublish removePendingPublish(int messageId) {
return pendingPublishData.remove(messageId);
}
@Override
public void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish) {
pendingQos2PublishData.put(messageId, pendingQos2Publish);
}
@Override
public MqttPendingQos2Publish getPendingQos2Publish(int messageId) {
return pendingQos2PublishData.get(messageId);
}
@Override
public MqttPendingQos2Publish removePendingQos2Publish(int messageId) {
return pendingQos2PublishData.remove(messageId);
}
@Override
public void clean() {
subscriptions.clear();
pendingSubscriptions.clear();
pendingUnSubscriptions.clear();
pendingPublishData.clear();
pendingQos2PublishData.clear();
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import java.lang.reflect.Proxy;
/**
* @author ChangJin Wei (魏昌进)
*/
public interface IMqttClient {
/**
* 获取 mqtt 客户端
*
* @return MqttClient
*/
MqttClient getMqttClient();
/**
* 增加一个代理接口方法
*
* @param clientClass 被代理接口
* @param <T> 代理接口的类型
* @return 代理对象
*/
@SuppressWarnings("unchecked")
default <T> T getInterface(Class<T> clientClass) {
return (T) Proxy.newProxyInstance(
clientClass.getClassLoader(),
new Class<?>[]{clientClass},
new MqttInvocationHandler<>(this)
);
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.tio.core.ChannelContext;
/**
* mqtt 客户端连接监听
*
* @author L.cm
*/
public interface IMqttClientConnectListener {
/**
* 监听到消息
*
* @param context ChannelContext
* @param isReconnect 是否重连
*/
void onConnected(ChannelContext context, boolean isReconnect);
/**
* 连接关闭前触发本方法
*
* @param context the ChannelContext
* @param throwable the throwable 有可能为空
* @param remark the remark 有可能为空
* @param isRemove is removed
*/
void onDisconnect(ChannelContext context, Throwable throwable, String remark, boolean isRemove);
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.tio.core.ChannelContext;
/**
* mqtt 全局消息处理
*
* @author L.cm
*/
@FunctionalInterface
public interface IMqttClientGlobalMessageListener {
/**
* 监听到消息
*
* @param context ChannelContext
* @param topic topic
* @param message MqttPublishMessage
* @param payload payload
*/
void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload);
}

View File

@@ -0,0 +1,65 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.message.MqttSubAckMessage;
import org.tio.core.ChannelContext;
/**
* mqtt 消息处理
*
* @author L.cm
*/
@FunctionalInterface
public interface IMqttClientMessageListener {
/**
* 订阅成功之后的事件
*
* @param context ChannelContext
* @param topicFilter topicFilter
* @param mqttQoS MqttQoS
* @param message MqttSubAckMessage
*/
default void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS, MqttSubAckMessage message) {
onSubscribed(context, topicFilter, mqttQoS);
}
/**
* 订阅成功之后的事件
*
* @param context ChannelContext
* @param topicFilter topicFilter
* @param mqttQoS MqttQoS
*/
default void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
}
/**
* 监听到消息
*
* @param context ChannelContext
* @param topic topic
* @param message MqttPublishMessage
* @param payload payload
*/
void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload);
}

View File

@@ -0,0 +1,90 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.message.*;
import org.tio.core.ChannelContext;
/**
* mqtt 客户端消息处理器
*
* @author L.cm
*/
public interface IMqttClientProcessor {
/**
* 处理服务端链接 ack
*
* @param context ChannelContext
* @param message MqttConnAckMessage
*/
void processConAck(ChannelContext context, MqttConnAckMessage message);
/**
* 处理服务端订阅的 ack
*
* @param message MqttSubAckMessage
* @param context ChannelContext
*/
void processSubAck(ChannelContext context, MqttSubAckMessage message);
/**
* 处理服务端 publish 的消息
*
* @param context ChannelContext
* @param message MqttPublishMessage
*/
void processPublish(ChannelContext context, MqttPublishMessage message);
/**
* 处理服务端解除订阅的 ack
*
* @param message MqttSubAckMessage
*/
void processUnSubAck(MqttUnSubAckMessage message);
/**
* 处理服务端 publish 的 ack
*
* @param message MqttPubAckMessage
*/
void processPubAck(MqttPubAckMessage message);
/**
* 处理服务端 publish rec
*
* @param context ChannelContext
* @param message MqttPubAckMessage
*/
void processPubRec(ChannelContext context, MqttMessage message);
/**
* 处理服务端 publish rel
*
* @param context ChannelContext
* @param message MqttPubAckMessage
*/
void processPubRel(ChannelContext context, MqttMessage message);
/**
* 处理服务端 publish comp
*
* @param message MqttPubAckMessage
*/
void processPubComp(MqttMessage message);
}

View File

@@ -0,0 +1,208 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.common.MqttPendingQos2Publish;
import java.util.List;
/**
* 客户端 session
*
* @author L.cm
*/
public interface IMqttClientSession {
/**
* 获取 packetId
*
* @return packetId
*/
int getPacketId();
/**
* 添加订阅
*
* @param messageId messageId
* @param pendingSubscription MqttPendingSubscription
*/
void addPaddingSubscribe(int messageId, MqttPendingSubscription pendingSubscription);
/**
* 获取过程订阅
*
* @param messageId messageId
* @return MqttPendingSubscription
*/
MqttPendingSubscription getPaddingSubscribe(int messageId);
/**
* 移除过程订阅
*
* @param topicFilters topicFilter 集合
*/
void removePaddingSubscribes(List<String> topicFilters);
/**
* 删除过程订阅
*
* @param messageId messageId
* @return MqttPendingSubscription
*/
MqttPendingSubscription removePaddingSubscribe(int messageId);
/**
* 添加订阅
*
* @param subscription MqttClientSubscription
*/
void addSubscription(MqttClientSubscription subscription);
/**
* 添加启动时的临时订阅
*
* @param topicFilters topicFilters
* @param qos MqttQoS
* @param messageListener IMqttClientMessageListener
*/
default void addSubscriptionList(String[] topicFilters, MqttQoS qos, IMqttClientMessageListener messageListener) {
for (String topicFilter : topicFilters) {
addSubscription(new MqttClientSubscription(qos, topicFilter, messageListener));
}
}
/**
* 添加订阅
*
* @param subscriptionList MqttClientSubscription 集合
*/
default void addSubscriptionList(List<MqttClientSubscription> subscriptionList) {
for (MqttClientSubscription subscription : subscriptionList) {
addSubscription(subscription);
}
}
/**
* 判断是否已经订阅过
*
* @param clientSubscription MqttClientSubscription
* @return 是否已经订阅过
*/
boolean isSubscribed(MqttClientSubscription clientSubscription);
/**
* 获取并清除订阅
*
* @return 订阅集合
*/
List<MqttClientSubscription> getSubscriptions();
/**
* 获取匹配的订阅
*
* @param topicName topicName
* @return 订阅信息集合
*/
List<MqttClientSubscription> getMatchedSubscription(String topicName);
/**
* 删除订阅过程消息
*
* @param topicFilters topicFilter 集合
*/
void removeSubscriptions(List<String> topicFilters);
/**
* 添加取消订阅过程消息
*
* @param messageId messageId
* @param pendingUnSubscription MqttPendingUnSubscription
*/
void addPaddingUnSubscribe(int messageId, MqttPendingUnSubscription pendingUnSubscription);
/**
* 获取取消订阅过程消息
*
* @param messageId messageId
* @return MqttPendingUnSubscription
*/
MqttPendingUnSubscription getPaddingUnSubscribe(int messageId);
/**
* 删除取消订阅过程消息
*
* @param messageId messageId
* @return MqttPendingUnSubscription
*/
MqttPendingUnSubscription removePaddingUnSubscribe(int messageId);
/**
* 添加过程消息
*
* @param messageId messageId
* @param pendingPublish MqttPendingPublish
*/
void addPendingPublish(int messageId, MqttPendingPublish pendingPublish);
/**
* 获取过程消息
*
* @param messageId messageId
* @return MqttPendingPublish
*/
MqttPendingPublish getPendingPublish(int messageId);
/**
* 删除过程消息
*
* @param messageId messageId
* @return MqttPendingPublish
*/
MqttPendingPublish removePendingPublish(int messageId);
/**
* 添加 qos2 过程消息
*
* @param messageId messageId
* @param pendingQos2Publish MqttPendingQos2Publish
*/
void addPendingQos2Publish(int messageId, MqttPendingQos2Publish pendingQos2Publish);
/**
* 获取 qos2 过程消息
*
* @param messageId messageId
* @return MqttPendingQos2Publish
*/
MqttPendingQos2Publish getPendingQos2Publish(int messageId);
/**
* 删除 qos2 过程消息
*
* @param messageId messageId
* @return MqttPendingQos2Publish
*/
MqttPendingQos2Publish removePendingQos2Publish(int messageId);
/**
* 资源清理
*/
void clean();
}

View File

@@ -0,0 +1,640 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.message.MqttSubscribeMessage;
import org.dromara.mica.mqtt.codec.message.MqttUnSubscribeMessage;
import org.dromara.mica.mqtt.codec.message.builder.MqttPublishBuilder;
import org.dromara.mica.mqtt.codec.message.builder.MqttTopicSubscription;
import org.dromara.mica.mqtt.codec.properties.MqttProperties;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.serializer.MqttSerializer;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.ClientChannelContext;
import org.tio.client.TioClient;
import org.tio.client.TioClientConfig;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.utils.timer.TimerTask;
import org.tio.utils.timer.TimerTaskService;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* mqtt 客户端
*
* @author L.cm
* @author ChangJin Wei (魏昌进)
*/
public final class MqttClient implements IMqttClient {
private static final Logger logger = LoggerFactory.getLogger(MqttClient.class);
private final TioClient tioClient;
private final MqttClientCreator config;
private final TioClientConfig clientTioConfig;
private final IMqttClientSession clientSession;
private final TimerTaskService taskService;
private final ExecutorService mqttExecutor;
private final MqttSerializer mqttSerializer;
private ClientChannelContext context;
public static MqttClientCreator create() {
return new MqttClientCreator();
}
MqttClient(TioClient tioClient, MqttClientCreator config) {
this.tioClient = tioClient;
this.config = config;
this.clientTioConfig = tioClient.getClientConfig();
this.taskService = config.getTaskService();
this.mqttExecutor = config.getMqttExecutor();
this.clientSession = config.getClientSession();
this.mqttSerializer = config.getMqttSerializer();
}
/**
* 订阅
*
* @param topicFilter topicFilter
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subQos0(String topicFilter, IMqttClientMessageListener listener) {
return subscribe(topicFilter, MqttQoS.QOS0, listener);
}
/**
* 订阅
*
* @param topicFilter topicFilter
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subQos1(String topicFilter, IMqttClientMessageListener listener) {
return subscribe(topicFilter, MqttQoS.QOS1, listener);
}
/**
* 订阅
*
* @param topicFilter topicFilter
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subQos2(String topicFilter, IMqttClientMessageListener listener) {
return subscribe(topicFilter, MqttQoS.QOS2, listener);
}
/**
* 订阅
*
* @param mqttQoS MqttQoS
* @param topicFilter topicFilter
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subscribe(MqttQoS mqttQoS, String topicFilter, IMqttClientMessageListener listener) {
return subscribe(topicFilter, mqttQoS, listener, null);
}
/**
* 订阅
*
* @param mqttQoS MqttQoS
* @param topicFilter topicFilter
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subscribe(String topicFilter, MqttQoS mqttQoS, IMqttClientMessageListener listener) {
return subscribe(topicFilter, mqttQoS, listener, null);
}
/**
* 订阅
*
* @param mqttQoS MqttQoS
* @param topicFilter topicFilter
* @param listener MqttMessageListener
* @param properties MqttProperties
* @return MqttClient
*/
public MqttClient subscribe(String topicFilter, MqttQoS mqttQoS, IMqttClientMessageListener listener, MqttProperties properties) {
return subscribe(Collections.singletonList(new MqttClientSubscription(mqttQoS, topicFilter, listener)), properties);
}
/**
* 订阅
*
* @param topicFilters topicFilter 数组
* @param mqttQoS MqttQoS
* @param listener MqttMessageListener
* @return MqttClient
*/
public MqttClient subscribe(String[] topicFilters, MqttQoS mqttQoS, IMqttClientMessageListener listener) {
return subscribe(topicFilters, mqttQoS, listener, null);
}
/**
* 订阅
*
* @param topicFilters topicFilter 数组
* @param mqttQoS MqttQoS
* @param listener MqttMessageListener
* @param properties MqttProperties
* @return MqttClient
*/
public MqttClient subscribe(String[] topicFilters, MqttQoS mqttQoS, IMqttClientMessageListener listener, MqttProperties properties) {
Objects.requireNonNull(topicFilters, "MQTT subscribe topicFilters is null.");
List<MqttClientSubscription> subscriptionList = new ArrayList<>();
for (String topicFilter : topicFilters) {
subscriptionList.add(new MqttClientSubscription(mqttQoS, topicFilter, listener));
}
return subscribe(subscriptionList, properties);
}
/**
* 批量订阅
*
* @param subscriptionList 订阅集合
* @return MqttClient
*/
public MqttClient subscribe(List<MqttClientSubscription> subscriptionList) {
return subscribe(subscriptionList, null);
}
/**
* 批量订阅
*
* @param subscriptionList 订阅集合
* @param properties MqttProperties
* @return MqttClient
*/
public MqttClient subscribe(List<MqttClientSubscription> subscriptionList, MqttProperties properties) {
// 1. 先判断是否已经订阅过,重复订阅,直接跳出
List<MqttClientSubscription> needSubscriptionList = new ArrayList<>();
for (MqttClientSubscription subscription : subscriptionList) {
// 校验 topicFilter
TopicUtil.validateTopicFilter(subscription.getTopicFilter());
boolean subscribed = clientSession.isSubscribed(subscription);
if (!subscribed) {
needSubscriptionList.add(subscription);
}
}
// 2. 已经订阅的跳出
if (needSubscriptionList.isEmpty()) {
return this;
}
List<MqttTopicSubscription> topicSubscriptionList = needSubscriptionList.stream()
.map(MqttClientSubscription::toTopicSubscription)
.collect(Collectors.toList());
// 3. 没有订阅过
int messageId = clientSession.getPacketId();
MqttSubscribeMessage message = MqttSubscribeMessage.builder()
.addSubscriptions(topicSubscriptionList)
.messageId(messageId)
.properties(properties)
.build();
// 4. 已经连接成功,直接订阅逻辑,未连接成功的添加到订阅列表,连接成功时会重连。
ClientChannelContext clientContext = getContext();
if (clientContext != null && clientContext.isAccepted()) {
MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(needSubscriptionList, message);
pendingSubscription.startRetransmitTimer(taskService, clientContext);
clientSession.addPaddingSubscribe(messageId, pendingSubscription);
// gitee issues #IB72L6 先添加并启动重试,再发送订阅
boolean result = Tio.send(clientContext, message);
logger.info("MQTT subscriptionList:{} messageId:{} subscribing result:{}", needSubscriptionList, messageId, result);
} else {
clientSession.addSubscriptionList(needSubscriptionList);
}
return this;
}
/**
* 取消订阅
*
* @param topicFilters topicFilter 集合
* @return MqttClient
*/
public MqttClient unSubscribe(String... topicFilters) {
return unSubscribe(Arrays.asList(topicFilters));
}
/**
* 取消订阅
*
* @param topicFilters topicFilter 集合
* @return MqttClient
*/
public MqttClient unSubscribe(List<String> topicFilters) {
// 1. 校验 topicFilter
TopicUtil.validateTopicFilter(topicFilters);
// 2. 优先取消本地订阅
clientSession.removePaddingSubscribes(topicFilters);
clientSession.removeSubscriptions(topicFilters);
// 3. 发送取消订阅到服务端
int messageId = clientSession.getPacketId();
MqttUnSubscribeMessage message = MqttUnSubscribeMessage.builder()
.addTopicFilters(topicFilters)
.messageId(messageId)
.build();
MqttPendingUnSubscription pendingUnSubscription = new MqttPendingUnSubscription(topicFilters, message);
ClientChannelContext clientContext = getContext();
// 4. 启动取消订阅线程
clientSession.addPaddingUnSubscribe(messageId, pendingUnSubscription);
pendingUnSubscription.startRetransmissionTimer(taskService, clientContext);
// 5. 发送取消订阅的消息
boolean result = Tio.send(clientContext, message);
logger.info("MQTT Topic:{} messageId:{} unSubscribing result:{}", topicFilters, messageId, result);
return this;
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息内容
* @return 是否发送成功
*/
public boolean publish(String topic, Object payload) {
return publish(topic, payload, MqttQoS.QOS0);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息内容
* @param qos MqttQoS
* @return 是否发送成功
*/
public boolean publish(String topic, Object payload, MqttQoS qos) {
return publish(topic, payload, qos, false);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息内容
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String topic, Object payload, boolean retain) {
return publish(topic, payload, MqttQoS.QOS0, retain);
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @return 是否发送成功
*/
public boolean publish(String topic, Object payload, MqttQoS qos, boolean retain) {
return publish(topic, payload, qos, (publishBuilder) -> publishBuilder.retained(retain));
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param retain 是否在服务器上保留消息
* @param properties MqttProperties
* @return 是否发送成功
*/
public boolean publish(String topic, Object payload, MqttQoS qos, boolean retain, MqttProperties properties) {
return publish(topic, payload, qos, (publishBuilder) -> publishBuilder.retained(retain).properties(properties));
}
/**
* 发布消息
*
* @param topic topic
* @param payload 消息体
* @param qos MqttQoS
* @param builder PublishBuilder
* @return 是否发送成功
*/
public boolean publish(String topic, Object payload, MqttQoS qos, Consumer<MqttPublishBuilder> builder) {
MqttPublishBuilder publishBuilder = MqttPublishMessage.builder();
// 序列化
byte[] newPayload = payload instanceof byte[] ? (byte[]) payload : mqttSerializer.serialize(payload);
// 自定义配置
builder.accept(publishBuilder);
// 内置配置
publishBuilder.topicName(topic)
.payload(newPayload)
.qos(qos);
return publish(publishBuilder);
}
/**
* 发布消息
*
* @param builder PublishBuilder
* @return 是否发送成功
*/
public boolean publish(MqttPublishBuilder builder) {
String topic = Objects.requireNonNull(builder.getTopicName(), "topic is null");
// 校验 topic
TopicUtil.validateTopicName(topic);
MqttQoS qos = Objects.requireNonNull(builder.getQos(), "qos is null");
// qos 判断
boolean isHighLevelQoS = MqttQoS.QOS1 == qos || MqttQoS.QOS2 == qos;
int messageId = isHighLevelQoS ? clientSession.getPacketId() : -1;
// 内置配置
MqttPublishMessage message = builder
.messageId(messageId)
.build();
ClientChannelContext clientContext = getContext();
if (clientContext == null) {
logger.error("MQTT client publish fail, TCP not connected.");
return false;
}
// 如果已经连接成功,但是还没有 mqtt 认证,不进行休眠等待(避免大批量数据,卡死)
// https://gitee.com/dromara/mica-mqtt/issues/IC4DWT
if (!clientContext.isAccepted()) {
logger.error("TCP is connected but mqtt is not accepted.");
return false;
}
// 如果是高版本的 qos
if (isHighLevelQoS) {
MqttPendingPublish pendingPublish = new MqttPendingPublish(message, qos);
clientSession.addPendingPublish(messageId, pendingPublish);
pendingPublish.startPublishRetransmissionTimer(taskService, clientContext);
}
// 发送消息
boolean result = Tio.send(clientContext, message);
logger.debug("MQTT Topic:{} qos:{} retain:{} publish result:{}", topic, qos, builder.isRetained(), result);
return result;
}
/**
* 添加定时任务,注意:如果抛出异常,会终止后续任务,请自行处理异常
*
* @param command runnable
* @param delay delay
* @return TimerTask
*/
public TimerTask schedule(Runnable command, long delay) {
return this.tioClient.schedule(command, delay);
}
/**
* 添加定时任务,注意:如果抛出异常,会终止后续任务,请自行处理异常
*
* @param command runnable
* @param delay delay
* @param executor 用于自定义线程池,处理耗时业务
* @return TimerTask
*/
public TimerTask schedule(Runnable command, long delay, Executor executor) {
return this.tioClient.schedule(command, delay, executor);
}
/**
* 添加定时任务
*
* @param command runnable
* @param delay delay
* @return TimerTask
*/
public TimerTask scheduleOnce(Runnable command, long delay) {
return this.tioClient.scheduleOnce(command, delay);
}
/**
* 添加定时任务
*
* @param command runnable
* @param delay delay
* @param executor 用于自定义线程池,处理耗时业务
* @return TimerTask
*/
public TimerTask scheduleOnce(Runnable command, long delay, Executor executor) {
return this.tioClient.scheduleOnce(command, delay, executor);
}
/**
* 异步连接
*
* @return TioClient
*/
MqttClient start(boolean sync) {
// 启动 tio
Node node = new Node(config.getIp(), config.getPort());
try {
if (sync) {
this.tioClient.connect(node, config.getBindIp(), 0, config.getTimeout());
} else {
this.tioClient.asyncConnect(node, config.getBindIp(), 0, config.getTimeout());
}
return this;
} catch (Exception e) {
throw new IllegalStateException("Mica mqtt client async start fail.", e);
}
}
/**
* 重连
*/
public void reconnect() {
ClientChannelContext channelContext = getContext();
if (channelContext == null) {
return;
}
try {
// 判断是否 removed
if (channelContext.isRemoved()) {
channelContext.setRemoved(false);
}
tioClient.reconnect(channelContext, config.getTimeout());
} catch (Exception e) {
logger.error("mqtt client reconnect error", e);
}
}
/**
* 重连到新的服务端节点
*
* @param ip ip
* @param port port
* @return 是否成功
*/
public boolean reconnect(String ip, int port) {
return reconnect(new Node(ip, port));
}
/**
* 重连到新的服务端节点
*
* @param serverNode Node
* @return 是否成功
*/
public boolean reconnect(Node serverNode) {
// 更新 ip 和端口
this.config.ip(serverNode.getIp()).port(serverNode.getPort());
// 获取老的,老的有可能为 null因为已经关闭进入 closes 里https://gitee.com/dromara/mica-mqtt/issues/IBY5LQ
ClientChannelContext oldContext = getContext();
if (oldContext == null) {
// 如果是已经关闭的连接,设置 serverNode下一次重连触发就会使用新的 serverNode
Set<ChannelContext> closedSet = clientTioConfig.closeds;
if (closedSet != null && !closedSet.isEmpty()) {
ChannelContext closedContext = closedSet.iterator().next();
closedContext.setServerNode(serverNode);
}
} else {
// 切换 serverNode关闭连接触发重连任务去连接新的 serverNode
oldContext.setServerNode(serverNode);
Tio.close(oldContext, "切换服务地址:" + serverNode);
}
return false;
}
/**
* 断开 mqtt 连接
*
* @return 是否成功
*/
public boolean disconnect() {
ClientChannelContext channelContext = getContext();
if (channelContext == null) {
return false;
}
boolean result = Tio.bSend(channelContext, MqttMessage.DISCONNECT);
if (result) {
Tio.close(channelContext, null, "MqttClient disconnect.", true);
}
return result;
}
/**
* 停止客户端
*
* @return 是否停止成功
*/
public boolean stop() {
// 1. 断开连接
if (config.isDisconnectBeforeStop()) {
this.disconnect();
}
// 2. 停止 tio
boolean result = tioClient.stop();
// 3. 停止工作线程
try {
mqttExecutor.shutdown();
} catch (Exception e1) {
logger.error(e1.getMessage(), e1);
}
try {
// 等待线程池中的任务结束,客户端等待 6 秒基本上足够了
result &= mqttExecutor.awaitTermination(6, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(e.getMessage(), e);
}
logger.info("MqttClient stop result:{}", result);
// 4. 清理 session
this.clientSession.clean();
return result;
}
/**
* 获取 TioClient
*
* @return TioClient
*/
public TioClient getTioClient() {
return tioClient;
}
/**
* 获取配置
*
* @return MqttClientCreator
*/
public MqttClientCreator getClientCreator() {
return config;
}
/**
* 获取 ClientTioConfig
*
* @return ClientTioConfig
*/
public TioClientConfig getClientTioConfig() {
return clientTioConfig;
}
/**
* 获取 ClientChannelContext
*
* @return ClientChannelContext
*/
public ClientChannelContext getContext() {
if (context != null) {
return context;
}
synchronized (this) {
if (context == null) {
Set<ChannelContext> contextSet = Tio.getConnecteds(clientTioConfig);
if (contextSet != null && !contextSet.isEmpty()) {
this.context = (ClientChannelContext) contextSet.iterator().next();
}
}
}
return this.context;
}
/**
* 判断客户端跟服务端是否连接
*
* @return 是否已经连接成功
*/
public boolean isConnected() {
ClientChannelContext channelContext = getContext();
return channelContext != null && channelContext.isAccepted();
}
/**
* 判断客户端跟服务端是否断开连接
*
* @return 是否断连
*/
public boolean isDisconnected() {
return !isConnected();
}
@Override
public MqttClient getMqttClient() {
return this;
}
}

View File

@@ -0,0 +1,98 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.*;
import org.dromara.mica.mqtt.codec.message.*;
import org.dromara.mica.mqtt.codec.message.header.MqttFixedHeader;
import org.tio.client.intf.TioClientHandler;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.TioDecodeException;
import org.tio.core.intf.Packet;
import java.nio.ByteBuffer;
/**
* mqtt 客户端处理
*
* @author L.cm
*/
public class MqttClientAioHandler implements TioClientHandler {
private final MqttDecoder mqttDecoder;
private final MqttEncoder mqttEncoder;
private final IMqttClientProcessor processor;
public MqttClientAioHandler(MqttClientCreator mqttClientCreator,
IMqttClientProcessor processor) {
this.mqttDecoder = new MqttDecoder(mqttClientCreator.getMaxBytesInMessage(), mqttClientCreator.getMaxClientIdLength());
this.mqttEncoder = MqttEncoder.INSTANCE;
this.processor = processor;
}
@Override
public Packet heartbeatPacket(ChannelContext channelContext) {
return MqttMessage.PINGREQ;
}
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext context) throws TioDecodeException {
return mqttDecoder.doDecode(context, buffer, readableLength);
}
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
return mqttEncoder.doEncode(channelContext, (MqttMessage) packet);
}
@Override
public void handler(Packet packet, ChannelContext context) {
MqttMessage message = (MqttMessage) packet;
MqttFixedHeader fixedHeader = message.fixedHeader();
// 根据消息类型处理消息
MqttMessageType messageType = fixedHeader.messageType();
switch (messageType) {
case CONNACK:
processor.processConAck(context, (MqttConnAckMessage) message);
break;
case SUBACK:
processor.processSubAck(context, (MqttSubAckMessage) message);
break;
case PUBLISH:
processor.processPublish(context, (MqttPublishMessage) message);
break;
case UNSUBACK:
processor.processUnSubAck((MqttUnSubAckMessage) message);
break;
case PUBACK:
processor.processPubAck((MqttPubAckMessage) message);
break;
case PUBREC:
processor.processPubRec(context, message);
break;
case PUBREL:
processor.processPubRel(context, message);
break;
case PUBCOMP:
processor.processPubComp(message);
break;
default:
break;
}
}
}

View File

@@ -0,0 +1,132 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.MqttVersion;
import org.dromara.mica.mqtt.codec.message.MqttConnectMessage;
import org.dromara.mica.mqtt.codec.message.builder.MqttConnectBuilder;
import org.dromara.mica.mqtt.codec.properties.IntegerProperty;
import org.dromara.mica.mqtt.codec.properties.MqttProperties;
import org.dromara.mica.mqtt.codec.properties.MqttPropertyType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.DefaultTioClientListener;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.hutool.StrUtil;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
/**
* mqtt 客户端监听器
*
* @author L.cm
*/
public class MqttClientAioListener extends DefaultTioClientListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientAioListener.class);
private final MqttClientCreator clientCreator;
private final IMqttClientConnectListener connectListener;
private final ExecutorService executor;
public MqttClientAioListener(MqttClientCreator clientCreator) {
this.clientCreator = clientCreator;
this.connectListener = clientCreator.getConnectListener();
this.executor = clientCreator.getMqttExecutor();
}
@Override
public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) {
if (isConnected) {
// 重连时,发送 mqtt 连接消息
boolean result = Tio.bSend(context, getConnectMessage(this.clientCreator));
logger.info("MqttClient reconnect send connect result:{}", result);
if (!result) {
// 如果重连未成功,直接关闭连接,等待后续重连
Tio.close(context, "MqttClient reconnect send fail.");
}
}
}
@Override
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
context.setAccepted(false);
// 先判断是否配置监听
if (connectListener == null) {
return;
}
// 2. 触发客户断开连接事件
executor.submit(() -> {
try {
connectListener.onDisconnect(context, throwable, remark, isRemove);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
});
}
/**
* 构造连接消息
*
* @param mqttClientCreator MqttClientCreator
* @return MqttConnectMessage
*/
private static MqttConnectMessage getConnectMessage(MqttClientCreator mqttClientCreator) {
MqttWillMessage willMessage = mqttClientCreator.getWillMessage();
MqttVersion version = mqttClientCreator.getVersion();
int keepAliveSecs = mqttClientCreator.getKeepAliveSecs();
// 1. 建立连接后发送 mqtt 连接的消息
MqttConnectBuilder builder = MqttConnectMessage.builder()
.clientId(mqttClientCreator.getClientId())
.username(mqttClientCreator.getUsername())
.cleanStart(mqttClientCreator.isCleanStart())
.protocolVersion(version)
// 心跳
.keepAlive(keepAliveSecs > 0 ? keepAliveSecs : MqttClientCreator.DEFAULT_KEEP_ALIVE_SECS)
.willFlag(willMessage != null);
// 2. 密码
String password = mqttClientCreator.getPassword();
if (StrUtil.isNotBlank(password)) {
builder.password(password.getBytes(StandardCharsets.UTF_8));
}
// 3. 遗嘱消息
if (willMessage != null) {
builder.willTopic(willMessage.getTopic())
.willMessage(willMessage.getMessage())
.willRetain(willMessage.isRetain())
.willQoS(willMessage.getQos())
.willProperties(willMessage.getWillProperties());
}
// 4. mqtt5 特性
if (MqttVersion.MQTT_5 == version) {
MqttProperties properties = mqttClientCreator.getProperties();
// Session Expiry Interval
Integer sessionExpiryInterval = mqttClientCreator.getSessionExpiryIntervalSecs();
if (sessionExpiryInterval != null && sessionExpiryInterval > 0) {
if (properties == null) {
properties = new MqttProperties();
}
properties.add(new IntegerProperty(MqttPropertyType.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval));
}
if (properties != null) {
builder.properties(properties);
}
}
return builder.build();
}
}

View File

@@ -0,0 +1,81 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.codes.MqttConnectReasonCode;
import org.dromara.mica.mqtt.codec.message.*;
import org.dromara.mica.mqtt.codec.message.header.MqttConnAckVariableHeader;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import java.util.concurrent.CompletableFuture;
/**
* 默认的 mqtt 消息处理器
*
* @author L.cm
*/
public class MqttClientConnectTestProcessor implements IMqttClientProcessor {
private final CompletableFuture<MqttConnectReasonCode> future;
public MqttClientConnectTestProcessor(CompletableFuture<MqttConnectReasonCode> future) {
this.future = future;
}
@Override
public void processConAck(ChannelContext context, MqttConnAckMessage message) {
MqttConnAckVariableHeader connAckVariableHeader = message.variableHeader();
Tio.remove(context, "mqtt connect tested.");
future.complete(connAckVariableHeader.connectReturnCode());
}
@Override
public void processSubAck(ChannelContext context, MqttSubAckMessage message) {
}
@Override
public void processPublish(ChannelContext context, MqttPublishMessage message) {
}
@Override
public void processUnSubAck(MqttUnSubAckMessage message) {
}
@Override
public void processPubAck(MqttPubAckMessage message) {
}
@Override
public void processPubRec(ChannelContext context, MqttMessage message) {
}
@Override
public void processPubRel(ChannelContext context, MqttMessage message) {
}
@Override
public void processPubComp(MqttMessage message) {
}
}

View File

@@ -0,0 +1,802 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.*;
import org.dromara.mica.mqtt.codec.codes.MqttConnectReasonCode;
import org.dromara.mica.mqtt.codec.message.builder.MqttTopicSubscription;
import org.dromara.mica.mqtt.codec.properties.MqttProperties;
import org.dromara.mica.mqtt.core.serializer.MqttJsonSerializer;
import org.dromara.mica.mqtt.core.serializer.MqttSerializer;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.TioClientConfig;
import org.tio.client.intf.TioClientHandler;
import org.tio.client.intf.TioClientListener;
import org.tio.client.task.HeartbeatTimeoutStrategy;
import org.tio.core.Node;
import org.tio.core.TioConfig;
import org.tio.core.ssl.SslConfig;
import org.tio.core.task.HeartbeatMode;
import org.tio.utils.hutool.NetUtil;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.thread.ThreadUtils;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import org.tio.utils.timer.DefaultTimerTaskService;
import org.tio.utils.timer.TimerTaskService;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* mqtt 客户端构造器
*
* @author L.cm
* @author ChangJin Wei (魏昌进)
*/
public final class MqttClientCreator {
/**
* 默认的心跳超时
*/
public static final int DEFAULT_KEEP_ALIVE_SECS = 60;
/**
* 名称
*/
private String name = "Mica-Mqtt-Client";
/**
* ip可为空默认为 127.0.0.1
*/
private String ip = "127.0.0.1";
/**
* 端口默认1883
*/
private int port = 1883;
/**
* 超时时间t-io 配置,可为 null默认为5秒
*/
private Integer timeout;
/**
* 绑定 ip绑定网卡用于多网卡默认为 null
*/
private String bindIp;
/**
* 接收数据的 buffer size默认8k
*/
private int readBufferSize = MqttConstant.DEFAULT_MAX_READ_BUFFER_SIZE;
/**
* 消息解析最大 bytes 长度默认8092
*/
private int maxBytesInMessage = MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
/**
* mqtt 3.1 会校验此参数为 23为了减少问题设置成了 64
*/
private int maxClientIdLength = MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
/**
* Keep Alive (s)如果用户不希望框架层面做心跳相关工作请把此值设为0或负数
*/
private int keepAliveSecs = DEFAULT_KEEP_ALIVE_SECS;
/**
* 心跳检测模式,默认:最后请求时间
*/
private HeartbeatMode heartbeatMode = HeartbeatMode.LAST_REQ;
/**
* 心跳超时策略,默认:发送 ping
*/
private HeartbeatTimeoutStrategy heartbeatTimeoutStrategy = HeartbeatTimeoutStrategy.PING;
/**
* SSL配置
*/
private SslConfig sslConfig;
/**
* 自动重连
*/
private boolean reconnect = true;
/**
* 重连的间隔时间单位毫秒默认5000
*/
private long reInterval = 5000;
/**
* 连续重连次数当连续重连这么多次都失败时不再重连。0和负数则一直重连
*/
private int retryCount = 0;
/**
* 重连重新订阅一个批次大小默认20
*/
private int reSubscribeBatchSize = 20;
/**
* 客户端 id默认随机生成
*/
private String clientId;
/**
* mqtt 协议默认MQTT_5
*/
private MqttVersion version = MqttVersion.MQTT_5;
/**
* 用户名
*/
private String username = null;
/**
* 密码
*/
private String password = null;
/**
* 清除会话
* <p>
* false 表示如果订阅的客户机断线了,那么要保存其要推送的消息,如果其重新连接时,则将这些消息推送。
* true 表示消除,表示客户机是第一次连接,消息所以以前的连接信息。
* </p>
*/
private boolean cleanStart = true;
/**
* mqtt 5.0 session 有效期,单位秒
*/
private Integer sessionExpiryIntervalSecs;
/**
* 遗嘱消息
*/
private MqttWillMessage willMessage;
/**
* mqtt5 properties
*/
private MqttProperties properties;
/**
* 连接监听器
*/
private IMqttClientConnectListener connectListener;
/**
* 全局订阅
*/
private Set<MqttTopicSubscription> globalSubscribe;
/**
* 全局消息监听器
*/
private IMqttClientGlobalMessageListener globalMessageListener;
/**
* 客户端 session
*/
private IMqttClientSession clientSession;
/**
* 是否开启监控默认false 不开启,节省内存
*/
private boolean statEnable = false;
/**
* debug
*/
private boolean debug = false;
/**
* tioExecutor
*/
private SynThreadPoolExecutor tioExecutor;
/**
* groupExecutor
*/
private ExecutorService groupExecutor;
/**
* mqttExecutor
*/
private ExecutorService mqttExecutor;
/**
* taskService
*/
private TimerTaskService taskService;
/**
* TioConfig 自定义配置
*/
private Consumer<TioConfig> tioConfigCustomize;
/**
* 序列化
*/
private MqttSerializer mqttSerializer;
/**
* 停止前是否发送 disconnect 消息默认true 不会触发遗嘱消息
*/
private boolean disconnectBeforeStop = true;
public String getName() {
return name;
}
public String getIp() {
return ip;
}
public int getPort() {
return port;
}
public Integer getTimeout() {
return timeout;
}
public String getBindIp() {
return bindIp;
}
public int getReadBufferSize() {
return readBufferSize;
}
public int getMaxBytesInMessage() {
return maxBytesInMessage;
}
public int getMaxClientIdLength() {
return maxClientIdLength;
}
public int getKeepAliveSecs() {
return keepAliveSecs;
}
public HeartbeatMode getHeartbeatMode() {
return heartbeatMode;
}
public HeartbeatTimeoutStrategy getHeartbeatTimeoutStrategy() {
return heartbeatTimeoutStrategy;
}
public SslConfig getSslConfig() {
return sslConfig;
}
public boolean isReconnect() {
return reconnect;
}
public int getRetryCount() {
return retryCount;
}
public long getReInterval() {
return reInterval;
}
public int getReSubscribeBatchSize() {
return reSubscribeBatchSize;
}
public String getClientId() {
return clientId;
}
public MqttVersion getVersion() {
return version;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public boolean isCleanStart() {
return cleanStart;
}
public Integer getSessionExpiryIntervalSecs() {
return sessionExpiryIntervalSecs;
}
public MqttWillMessage getWillMessage() {
return willMessage;
}
public MqttProperties getProperties() {
return properties;
}
public IMqttClientConnectListener getConnectListener() {
return connectListener;
}
public Set<MqttTopicSubscription> getGlobalSubscribe() {
return globalSubscribe;
}
public IMqttClientGlobalMessageListener getGlobalMessageListener() {
return globalMessageListener;
}
public IMqttClientSession getClientSession() {
return clientSession;
}
public boolean isStatEnable() {
return statEnable;
}
public boolean isDebug() {
return debug;
}
public SynThreadPoolExecutor getTioExecutor() {
return tioExecutor;
}
public ExecutorService getGroupExecutor() {
return groupExecutor;
}
public ExecutorService getMqttExecutor() {
return mqttExecutor;
}
public TimerTaskService getTaskService() {
return taskService;
}
public MqttSerializer getMqttSerializer() {
return mqttSerializer;
}
public boolean isDisconnectBeforeStop() {
return disconnectBeforeStop;
}
public MqttClientCreator name(String name) {
this.name = name;
return this;
}
public MqttClientCreator ip(String ip) {
this.ip = ip;
return this;
}
public MqttClientCreator port(int port) {
this.port = port;
return this;
}
public MqttClientCreator timeout(int timeout) {
this.timeout = timeout;
return this;
}
public MqttClientCreator bindIp(String bindIp) {
this.bindIp = bindIp;
return this;
}
public MqttClientCreator bindNetworkInterface(String networkInterfaceName) {
if (StrUtil.isBlank(networkInterfaceName)) {
return this;
} else {
String ipV4 = NetUtil.getNetworkInterfaceIpV4(networkInterfaceName);
return bindIp(Objects.requireNonNull(ipV4, "获取网卡 ip 为 null"));
}
}
public MqttClientCreator readBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}
public MqttClientCreator maxBytesInMessage(int maxBytesInMessage) {
this.maxBytesInMessage = maxBytesInMessage;
return this;
}
public MqttClientCreator maxClientIdLength(int maxClientIdLength) {
this.maxClientIdLength = maxClientIdLength;
return this;
}
public MqttClientCreator keepAliveSecs(int keepAliveSecs) {
this.keepAliveSecs = keepAliveSecs;
return this;
}
public MqttClientCreator heartbeatMode(HeartbeatMode heartbeatMode) {
this.heartbeatMode = heartbeatMode;
return this;
}
public MqttClientCreator heartbeatTimeoutStrategy(HeartbeatTimeoutStrategy heartbeatTimeoutStrategy) {
this.heartbeatTimeoutStrategy = heartbeatTimeoutStrategy;
return this;
}
public MqttClientCreator useSsl() {
return sslConfig(SslConfig.forClient());
}
public MqttClientCreator useSsl(String trustStoreFile, String trustPassword) {
return sslConfig(SslConfig.forClient(trustStoreFile, trustPassword));
}
public MqttClientCreator useSsl(String keyStoreFile, String keyPasswd, String trustStoreFile, String trustPassword) {
return sslConfig(SslConfig.forClient(keyStoreFile, keyPasswd, trustStoreFile, trustPassword));
}
public MqttClientCreator useSsl(InputStream trustStoreInputStream, String trustPassword) {
return sslConfig(SslConfig.forClient(trustStoreInputStream, trustPassword));
}
public MqttClientCreator useSsl(InputStream keyStoreInputStream, String keyPasswd, InputStream trustStoreInputStream, String trustPassword) {
return sslConfig(SslConfig.forClient(keyStoreInputStream, keyPasswd, trustStoreInputStream, trustPassword));
}
public MqttClientCreator sslConfig(SslConfig sslConfig) {
this.sslConfig = sslConfig;
return this;
}
public MqttClientCreator reconnect(boolean reconnect) {
this.reconnect = reconnect;
return this;
}
public MqttClientCreator retryCount(int retryCount) {
this.retryCount = retryCount;
return this;
}
public MqttClientCreator reInterval(long reInterval) {
this.reInterval = reInterval;
return this;
}
public MqttClientCreator reSubscribeBatchSize(int reSubscribeBatchSize) {
this.reSubscribeBatchSize = reSubscribeBatchSize;
return this;
}
public MqttClientCreator clientId(String clientId) {
this.clientId = clientId;
return this;
}
public MqttClientCreator version(MqttVersion version) {
this.version = version;
return this;
}
public MqttClientCreator username(String username) {
this.username = username;
return this;
}
public MqttClientCreator password(String password) {
this.password = password;
return this;
}
public MqttClientCreator cleanStart(boolean cleanStart) {
this.cleanStart = cleanStart;
return this;
}
public MqttClientCreator sessionExpiryIntervalSecs(Integer sessionExpiryIntervalSecs) {
this.sessionExpiryIntervalSecs = sessionExpiryIntervalSecs;
return this;
}
public MqttClientCreator willMessage(MqttWillMessage willMessage) {
this.willMessage = willMessage;
return this;
}
public MqttClientCreator willMessage(Consumer<MqttWillMessage.Builder> consumer) {
MqttWillMessage.Builder builder = MqttWillMessage.builder();
consumer.accept(builder);
return willMessage(builder.build());
}
public MqttClientCreator properties(MqttProperties properties) {
this.properties = properties;
return this;
}
public MqttClientCreator connectListener(IMqttClientConnectListener connectListener) {
this.connectListener = connectListener;
return this;
}
public MqttClientCreator globalSubscribe(String... topics) {
Objects.requireNonNull(topics, "globalSubscribe topics is null.");
List<MqttTopicSubscription> subscriptionList = Arrays.stream(topics)
.map(MqttTopicSubscription::new)
.collect(Collectors.toList());
return globalSubscribe(subscriptionList);
}
public MqttClientCreator globalSubscribe(MqttTopicSubscription... topics) {
Objects.requireNonNull(topics, "globalSubscribe topics is null.");
return globalSubscribe(Arrays.asList(topics));
}
public MqttClientCreator globalSubscribe(List<MqttTopicSubscription> topicList) {
Objects.requireNonNull(topicList, "globalSubscribe topicList is null.");
if (this.globalSubscribe == null) {
this.globalSubscribe = new HashSet<>(topicList);
} else {
this.globalSubscribe.addAll(topicList);
}
return this;
}
public MqttClientCreator globalMessageListener(IMqttClientGlobalMessageListener globalMessageListener) {
this.globalMessageListener = globalMessageListener;
return this;
}
public MqttClientCreator clientSession(IMqttClientSession clientSession) {
this.clientSession = clientSession;
return this;
}
public MqttClientCreator statEnable() {
return statEnable(true);
}
public MqttClientCreator statEnable(boolean enable) {
this.statEnable = enable;
return this;
}
public MqttClientCreator debug() {
return debug(true);
}
public MqttClientCreator debug(boolean debug) {
this.debug = debug;
return this;
}
public MqttClientCreator tioExecutor(SynThreadPoolExecutor tioExecutor) {
this.tioExecutor = tioExecutor;
return this;
}
public MqttClientCreator groupExecutor(ExecutorService groupExecutor) {
this.groupExecutor = groupExecutor;
return this;
}
public MqttClientCreator mqttExecutor(ExecutorService mqttExecutor) {
this.mqttExecutor = mqttExecutor;
return this;
}
public MqttClientCreator bizThreadPoolSize(int poolSize) {
if (poolSize <= 0) {
throw new IllegalArgumentException("poolSize must be greater than zero.");
}
return mqttExecutor(ThreadUtils.getBizExecutor(poolSize));
}
public MqttClientCreator taskService(TimerTaskService taskService) {
this.taskService = taskService;
return this;
}
public MqttClientCreator tioConfigCustomize(Consumer<TioConfig> tioConfigCustomize) {
this.tioConfigCustomize = tioConfigCustomize;
return this;
}
public MqttClientCreator mqttJsonSerializer(MqttSerializer mqttSerializer) {
this.mqttSerializer = mqttSerializer;
return this;
}
/**
* 停止前是否发送 disconnect 消息默认true 不会触发遗嘱消息
*/
public MqttClientCreator disconnectBeforeStop() {
return disconnectBeforeStop(true);
}
/**
* 停止前是否发送 disconnect 消息默认true 不会触发遗嘱消息
*/
public MqttClientCreator disconnectBeforeStop(boolean disconnectBeforeStop) {
this.disconnectBeforeStop = disconnectBeforeStop;
return this;
}
/**
* 构建一个新的 MqttClientCreator
*
* @return 新的 MqttClientCreator
*/
public MqttClientCreator newCreator() {
return new MqttClientCreator()
.name(this.name)
.ip(this.ip)
.port(this.port)
.timeout(this.timeout)
.bindIp(this.bindIp)
.readBufferSize(this.readBufferSize)
.maxBytesInMessage(this.maxBytesInMessage)
.maxClientIdLength(this.maxClientIdLength)
.keepAliveSecs(this.keepAliveSecs)
.sslConfig(this.sslConfig)
.reconnect(this.reconnect)
.reInterval(this.reInterval)
.retryCount(this.retryCount)
.reSubscribeBatchSize(this.reSubscribeBatchSize)
.version(this.version)
.cleanStart(this.cleanStart)
.sessionExpiryIntervalSecs(this.sessionExpiryIntervalSecs)
.willMessage(this.willMessage)
.connectListener(this.connectListener)
.statEnable(this.statEnable)
.debug(this.debug)
.mqttJsonSerializer(this.mqttSerializer)
.disconnectBeforeStop(this.disconnectBeforeStop);
}
private MqttClient build() {
// 1. clientId 为空,生成默认的 clientId
if (StrUtil.isBlank(this.clientId)) {
// 默认为MICA-MQTT- 前缀和 36进制的纳秒数
this.clientId("MICA-MQTT-" + Long.toString(System.nanoTime(), 36));
}
// 2. 客户端 session
if (this.clientSession == null) {
this.clientSession = new DefaultMqttClientSession();
}
// tioExecutor
if (this.tioExecutor == null) {
this.tioExecutor = ThreadUtils.getTioExecutor(3);
}
// groupExecutor
if (this.groupExecutor == null) {
this.groupExecutor = ThreadUtils.getGroupExecutor(2);
}
// mqttExecutor
if (this.mqttExecutor == null) {
this.mqttExecutor = ThreadUtils.getBizExecutor(Math.max(2, ThreadUtils.CORE_POOL_SIZE));
}
// taskService
if (this.taskService == null) {
this.taskService = new DefaultTimerTaskService();
}
// heartbeatMode
if (this.heartbeatMode == null) {
this.heartbeatMode = HeartbeatMode.LAST_REQ;
}
if (this.mqttSerializer == null) {
this.mqttSerializer = new MqttJsonSerializer();
}
IMqttClientProcessor processor = new DefaultMqttClientProcessor(this);
// 4. 初始化 mqtt 处理器
TioClientHandler clientAioHandler = new MqttClientAioHandler(this, processor);
TioClientListener clientAioListener = new MqttClientAioListener(this);
// 5. 重连配置
ReconnConf reconnConf = null;
if (this.reconnect) {
reconnConf = new ReconnConf(this.reInterval, this.retryCount);
}
// 6. tioConfig
TioClientConfig clientConfig = new TioClientConfig(clientAioHandler, clientAioListener, reconnConf, tioExecutor, groupExecutor);
clientConfig.setName(this.name);
// 7. 心跳超时时间
clientConfig.setHeartbeatTimeout(TimeUnit.SECONDS.toMillis(this.keepAliveSecs));
// 设置心跳检测模式为 LAST_REQkeepAliveSecs 周期内,最后发送的时间差
clientConfig.setHeartbeatMode(this.heartbeatMode);
clientConfig.setHeartbeatTimeoutStrategy(this.heartbeatTimeoutStrategy);
// 8. mqtt 消息最大长度,小于 1 则使用默认的,可通过 property tio.default.read.buffer.size 设置默认大小
if (this.readBufferSize > 0) {
clientConfig.setReadBufferSize(this.readBufferSize);
}
// 9. ssl 证书设置
if (this.sslConfig != null) {
clientConfig.setSslConfig(this.sslConfig);
// 内置 ssl 自定义配置,对 SNI 的支持
if (this.sslConfig.getSslEngineCustomizer() == null) {
this.sslConfig.setSslEngineCustomizer(new MqttSSLEngineCustomizer(ip));
}
}
// 10. 是否开启监控
clientConfig.statOn = this.statEnable;
if (this.debug) {
clientConfig.debug = true;
}
// 11. 绑定 clientId 到 context 上,可以 context.getId() 获取
clientConfig.setTioUuid(new MqttClientId(this));
// 12. 自定义处理
if (this.tioConfigCustomize != null) {
this.tioConfigCustomize.accept(clientConfig);
}
// 13. tioClient
try {
TioClient tioClient = new TioClient(clientConfig);
return new MqttClient(tioClient, this);
} catch (Exception e) {
throw new IllegalStateException("Mica mqtt client start fail.", e);
}
}
/**
* 默认异步连接
*
* @return TioClient
*/
public MqttClient connect() {
return this.build().start(false);
}
/**
* 同步连接
*
* @return TioClient
*/
public MqttClient connectSync() {
return this.build().start(true);
}
/**
* 连接测试
*
* @return MqttConnectReasonCode
*/
public MqttConnectReasonCode connectTest() {
return connectTest(3, TimeUnit.SECONDS);
}
/**
* 连接测试
*
* @param timeout timeout
* @param timeUnit TimeUnit
* @return MqttConnectReasonCode
*/
public MqttConnectReasonCode connectTest(long timeout, TimeUnit timeUnit) {
// 1. clientId 为空,生成默认的 clientId
if (StrUtil.isBlank(this.clientId)) {
// 默认为MICA-MQTT- 前缀和 36进制的纳秒数
this.clientId("MICA-MQTT-" + Long.toString(System.nanoTime(), 36));
}
CompletableFuture<MqttConnectReasonCode> future = new CompletableFuture<>();
IMqttClientProcessor processor = new MqttClientConnectTestProcessor(future);
// 2. 初始化 mqtt 处理器
TioClientHandler clientAioHandler = new MqttClientAioHandler(this, processor);
TioClientListener clientAioListener = new MqttClientAioListener(this);
// 3. tioConfig
TioClientConfig tioConfig = new TioClientConfig(clientAioHandler, clientAioListener);
tioConfig.setName(this.name);
// 4. 心跳超时时间,关闭心跳检测
tioConfig.setHeartbeatTimeout(0);
TioClient tioClient;
try {
tioClient = new TioClient(tioConfig);
tioClient.asyncConnect(new Node(this.getIp(), this.getPort()), this.bindIp, 0, this.timeout);
} catch (Exception e) {
throw new IllegalStateException("Mica mqtt client start fail.", e);
}
try {
return future.get(timeout, timeUnit);
} catch (Exception e) {
// 超时,一般为服务器不可用
return MqttConnectReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
} finally {
tioClient.stop();
}
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
/**
* MqttClient 配置自定义
*
* @author L.cm
*/
@FunctionalInterface
public interface MqttClientCustomizer {
/**
* MqttServerCreator 自定义扩展
*
* @param creator MqttClientCreator
*/
void customize(MqttClientCreator creator);
}

View File

@@ -0,0 +1,21 @@
package org.dromara.mica.mqtt.core.client;
import org.tio.core.intf.TioUuid;
/**
* 将 mqtt clientId 绑定到 context 中
*
* @author L.cm
*/
public class MqttClientId implements TioUuid {
private final MqttClientCreator creator;
public MqttClientId(MqttClientCreator creator) {
this.creator = creator;
}
@Override
public String uuid() {
return creator.getClientId();
}
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.message.builder.MqttTopicSubscription;
import org.dromara.mica.mqtt.core.common.TopicFilterType;
import java.io.Serializable;
import java.util.Objects;
/**
* 发送订阅,未 ack 前的数据承载
*
* @author L.cm
*/
public final class MqttClientSubscription implements Serializable {
private final String topicFilter;
private final MqttQoS mqttQoS;
private final TopicFilterType type;
private final transient IMqttClientMessageListener listener;
public MqttClientSubscription(MqttQoS mqttQoS,
String topicFilter,
IMqttClientMessageListener listener) {
this.mqttQoS = Objects.requireNonNull(mqttQoS, "MQTT subscribe mqttQoS is null.");
this.topicFilter = Objects.requireNonNull(topicFilter, "MQTT subscribe topicFilter is null.");
this.type = TopicFilterType.getType(topicFilter);
this.listener = Objects.requireNonNull(listener, "MQTT subscribe listener is null.");
}
public MqttQoS getMqttQoS() {
return mqttQoS;
}
public String getTopicFilter() {
return topicFilter;
}
public IMqttClientMessageListener getListener() {
return listener;
}
public boolean matches(String topic) {
return this.type.match(this.topicFilter, topic);
}
public MqttTopicSubscription toTopicSubscription() {
return new MqttTopicSubscription(topicFilter, mqttQoS);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttClientSubscription that = (MqttClientSubscription) o;
return Objects.equals(topicFilter, that.topicFilter) &&
mqttQoS == that.mqttQoS &&
Objects.equals(listener, that.listener);
}
@Override
public int hashCode() {
return Objects.hash(topicFilter, mqttQoS, listener);
}
@Override
public String toString() {
return "MqttClientSubscription{" +
"topicFilter='" + topicFilter + '\'' +
", mqttQoS=" + mqttQoS +
'}';
}
}

View File

@@ -0,0 +1,160 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.message.builder.MqttPublishBuilder;
import org.dromara.mica.mqtt.codec.properties.MqttProperties;
import org.dromara.mica.mqtt.core.annotation.MqttClientPublish;
import org.dromara.mica.mqtt.core.annotation.MqttPayload;
import org.dromara.mica.mqtt.core.annotation.MqttRetain;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import org.tio.utils.hutool.CollUtil;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
/**
* @author ChangJin Wei (魏昌进)
*/
public class MqttInvocationHandler<T extends IMqttClient> implements InvocationHandler {
private final T mqttClient;
private final ConcurrentMap<Method, MethodMetadata> methodCache;
public MqttInvocationHandler(T mqttClient) {
this.mqttClient = mqttClient;
this.methodCache = new ConcurrentHashMap<>();
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 处理默认的 hashCode、equals 和 toString
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
// 其它代理方法
MethodMetadata metadata = resolveMethod(method);
Object payload = metadata.getPayloadIndex() >= 0 ? args[metadata.getPayloadIndex()] : null;
boolean retain = metadata.getRetainIndex() >= 0 && Boolean.TRUE.equals(args[metadata.getRetainIndex()]);
MqttProperties properties = metadata.getPropertiesIndex() >= 0
? (MqttProperties) args[metadata.getPropertiesIndex()]
: null;
Consumer<MqttPublishBuilder> builder = metadata.getBuilderIndex() >= 0
? (Consumer<MqttPublishBuilder>) args[metadata.getBuilderIndex()]
: null;
String topic = TopicUtil.resolveTopic(metadata.getMqttPublish().value(), payload);
MqttQoS qos = metadata.getMqttPublish().qos();
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("Resolved topic is null or empty");
}
MqttClient client = mqttClient.getMqttClient();
if (builder == null) {
return client.publish(topic, payload, qos, retain, properties);
} else {
return client.publish(topic, payload, qos, builder);
}
}
private MethodMetadata resolveMethod(Method method) {
return CollUtil.computeIfAbsent(methodCache, method, m -> {
MqttClientPublish mqttPublish = m.getAnnotation(MqttClientPublish.class);
if (mqttPublish == null) {
throw new UnsupportedOperationException("Method not annotated with @MqttClientPublish");
}
Annotation[][] paramAnnotations = m.getParameterAnnotations();
Class<?>[] paramTypes = m.getParameterTypes();
int payloadIndex = -1;
int retainIndex = -1;
int propertiesIndex = -1;
int builderIndex = -1;
for (int i = 0; i < paramAnnotations.length; i++) {
for (Annotation annotation : paramAnnotations[i]) {
if (annotation instanceof MqttPayload) {
payloadIndex = i;
} else if (annotation instanceof MqttRetain) {
retainIndex = i;
}
}
}
for (int i = 0; i < paramTypes.length; i++) {
if (propertiesIndex == -1 && MqttProperties.class.isAssignableFrom(paramTypes[i])) {
propertiesIndex = i;
} else if (builderIndex == -1 && Consumer.class.isAssignableFrom(paramTypes[i])) {
builderIndex = i;
}
}
return new MethodMetadata(mqttPublish, payloadIndex, retainIndex, propertiesIndex, builderIndex);
});
}
private static class MethodMetadata {
private final MqttClientPublish mqttPublish;
private final int payloadIndex;
private final int retainIndex;
private final int propertiesIndex;
private final int builderIndex;
MethodMetadata(MqttClientPublish mqttPublish,
int payloadIndex,
int retainIndex,
int propertiesIndex,
int builderIndex) {
this.mqttPublish = mqttPublish;
this.payloadIndex = payloadIndex;
this.retainIndex = retainIndex;
this.propertiesIndex = propertiesIndex;
this.builderIndex = builderIndex;
}
public MqttClientPublish getMqttPublish() {
return mqttPublish;
}
public int getPayloadIndex() {
return payloadIndex;
}
public int getRetainIndex() {
return retainIndex;
}
public int getPropertiesIndex() {
return propertiesIndex;
}
public int getBuilderIndex() {
return builderIndex;
}
}
}

View File

@@ -0,0 +1,62 @@
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.codec.message.MqttSubscribeMessage;
import org.dromara.mica.mqtt.core.common.RetryProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.timer.TimerTaskService;
import java.util.List;
import java.util.Objects;
/**
* MqttPendingSubscription参考于 netty-mqtt-client
*/
final class MqttPendingSubscription {
private static final Logger logger = LoggerFactory.getLogger(MqttPendingSubscription.class);
private final List<MqttClientSubscription> subscriptionList;
private final RetryProcessor<MqttSubscribeMessage> retryProcessor = new RetryProcessor<>();
MqttPendingSubscription(List<MqttClientSubscription> subscriptionList, MqttSubscribeMessage message) {
this.subscriptionList = subscriptionList;
this.retryProcessor.setOriginalMessage(message);
}
public List<MqttClientSubscription> getSubscriptionList() {
return subscriptionList;
}
void startRetransmitTimer(TimerTaskService taskService, ChannelContext context) {
this.retryProcessor.setHandle((fixedHeader, originalMessage) -> {
MqttMessage message = new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload());
boolean result = Tio.send(context, message);
logger.info("retry send Subscribe topics:{} result:{}", subscriptionList, result);
});
this.retryProcessor.start(taskService);
}
void onSubAckReceived() {
this.retryProcessor.stop();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttPendingSubscription that = (MqttPendingSubscription) o;
return Objects.equals(subscriptionList, that.subscriptionList);
}
@Override
public int hashCode() {
return Objects.hash(subscriptionList);
}
}

View File

@@ -0,0 +1,61 @@
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.codec.message.MqttUnSubscribeMessage;
import org.dromara.mica.mqtt.core.common.RetryProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.utils.timer.TimerTaskService;
import java.util.List;
import java.util.Objects;
/**
* MqttPendingSubscription参考于 netty-mqtt-client
*/
final class MqttPendingUnSubscription {
private static final Logger logger = LoggerFactory.getLogger(MqttPendingUnSubscription.class);
private final List<String> topics;
private final RetryProcessor<MqttUnSubscribeMessage> retryProcessor = new RetryProcessor<>();
MqttPendingUnSubscription(List<String> topics, MqttUnSubscribeMessage unSubscribeMessage) {
this.topics = topics;
this.retryProcessor.setOriginalMessage(unSubscribeMessage);
}
List<String> getTopics() {
return topics;
}
void startRetransmissionTimer(TimerTaskService taskService, ChannelContext context) {
this.retryProcessor.setHandle((fixedHeader, originalMessage) -> {
MqttMessage message = new MqttUnSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload());
boolean result = Tio.send(context, message);
logger.info("retry send Unsubscribe topics:{} result:{}", topics, result);
});
this.retryProcessor.start(taskService);
}
void onUnSubAckReceived() {
this.retryProcessor.stop();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttPendingUnSubscription that = (MqttPendingUnSubscription) o;
return Objects.equals(topics, that.topics);
}
@Override
public int hashCode() {
return Objects.hash(topics);
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.tio.core.ssl.SSLEngineCustomizer;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import java.util.ArrayList;
import java.util.List;
/**
* mqtt ssl 自定义配置
*
* @author L.cm
*/
public class MqttSSLEngineCustomizer implements SSLEngineCustomizer {
/**
* ip 或域名
*/
private final String host;
/**
* 端点识别算法,默认 null生产环境建议配置成 HTTPS支持HTTPS/LDAPS/null
*/
private final String identificationAlgorithm;
public MqttSSLEngineCustomizer(String host) {
this(host, null);
}
public MqttSSLEngineCustomizer(String host, String identificationAlgorithm) {
this.host = host;
this.identificationAlgorithm = identificationAlgorithm;
}
@Override
public void customize(SSLEngine engine) {
// SNI 支持
SSLParameters sslParameters = engine.getSSLParameters();
List<SNIServerName> sniHostNames = new ArrayList<>(1);
sniHostNames.add(new SNIHostName(host));
sslParameters.setServerNames(sniHostNames);
// 端点识别算法
if (identificationAlgorithm != null) {
sslParameters.setEndpointIdentificationAlgorithm(identificationAlgorithm);
}
engine.setSSLParameters(sslParameters);
}
}

View File

@@ -0,0 +1,169 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.mica.mqtt.core.client;
import org.dromara.mica.mqtt.codec.message.properties.MqttWillPublishProperties;
import org.dromara.mica.mqtt.codec.properties.MqttProperties;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Consumer;
/**
* 遗嘱消息
*
* @author L.cm
*/
public final class MqttWillMessage {
private final String topic;
private final byte[] message;
/**
* 遗嘱消息保留标志
*/
private final boolean retain;
/**
* 如果遗嘱标志被设置为 false遗嘱 QoS 也必须设置为 0。 如果遗嘱标志被设置为 true遗嘱 QoS 的值可以等于 012。
*/
private final MqttQoS qos;
/**
* mqtt5 willProperties
*/
private final MqttProperties willProperties;
private MqttWillMessage(String topic, byte[] message, boolean retain, MqttQoS qos, MqttProperties willProperties) {
this.topic = topic;
this.message = message;
this.retain = retain;
this.qos = qos;
this.willProperties = willProperties;
}
public String getTopic() {
return topic;
}
public byte[] getMessage() {
return message;
}
public boolean isRetain() {
return retain;
}
public MqttQoS getQos() {
return qos;
}
public MqttProperties getWillProperties() {
return willProperties;
}
public static MqttWillMessage.Builder builder() {
return new MqttWillMessage.Builder();
}
public static final class Builder {
private String topic;
private byte[] message;
/**
* 默认为不保存
*/
private boolean retain = false;
/**
* 默认为 qos 0
*/
private MqttQoS qos = MqttQoS.QOS0;
private MqttProperties willProperties;
public Builder topic(String topic) {
TopicUtil.validateTopicName(topic);
this.topic = topic;
return this;
}
public Builder message(byte[] message) {
this.message = Objects.requireNonNull(message);
return this;
}
public Builder messageText(String message) {
this.message = Objects.requireNonNull(message).getBytes(StandardCharsets.UTF_8);
return this;
}
public Builder retain(boolean retain) {
this.retain = retain;
return this;
}
public Builder qos(MqttQoS qos) {
this.qos = Objects.requireNonNull(qos);
return this;
}
public Builder willProperties(MqttProperties willProperties) {
this.willProperties = Objects.requireNonNull(willProperties);
return this;
}
public Builder willProperties(Consumer<MqttWillPublishProperties> consumer) {
MqttWillPublishProperties willPublishProperties = new MqttWillPublishProperties();
consumer.accept(willPublishProperties);
return willProperties(willPublishProperties.getProperties());
}
public MqttWillMessage build() {
// 有效载荷中必须包含 Will Topic 和 Will Message字段
Objects.requireNonNull(this.topic, "WillMessage topic is null.");
Objects.requireNonNull(this.message, "WillMessage message is null.");
return new MqttWillMessage(this.topic, this.message, this.retain, this.qos, this.willProperties);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MqttWillMessage that = (MqttWillMessage) o;
return retain == that.retain &&
Objects.equals(topic, that.topic) &&
Arrays.equals(message, that.message) &&
qos == that.qos;
}
@Override
public int hashCode() {
return Objects.hash(topic, Arrays.hashCode(message), retain, qos);
}
@Override
public String toString() {
return "MqttWillMessage{" +
"topic='" + topic + '\'' +
", message='" + Arrays.toString(message) + '\'' +
", retain=" + retain +
", qos=" + qos +
'}';
}
}

View File

@@ -0,0 +1,4 @@
open module org.dromara.mica.mqtt.client {
requires transitive org.dromara.mica.mqtt.common;
exports org.dromara.mica.mqtt.core.client;
}