fix:mqtt子设备处理

master
xiwa 2023-11-08 00:44:29 +08:00
parent 1ddaa6045d
commit 6e89f959a9
3 changed files with 63 additions and 39 deletions

View File

@ -35,7 +35,9 @@ import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PemKeyCertOptions; import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.mqtt.*; import io.vertx.mqtt.*;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -203,6 +205,11 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
for (MqttTopicSubscription s : subscribe.topicSubscriptions()) { for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
log.info("Subscription for {},with QoS {}", s.topicName(), s.qualityOfService()); log.info("Subscription for {},with QoS {}", s.topicName(), s.qualityOfService());
try { try {
String topic = s.topicName();
Device device = getDevice(topic);
//添加设备对应连接
endpointMap.put(device.getDeviceName(), endpoint);
online(device.getProductKey(), device.getDeviceName());
reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService())); reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
} catch (Throwable e) { } catch (Throwable e) {
log.error("subscribe failed,topic:" + s.topicName(), e); log.error("subscribe failed,topic:" + s.topicName(), e);
@ -212,17 +219,22 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
// ack the subscriptions request // ack the subscriptions request
endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
}).unsubscribeHandler(unsubscribe -> { }).unsubscribeHandler(unsubscribe -> {
//下线 for (String topic : unsubscribe.topics()) {
thingService.post( Device device = getDevice(topic);
pluginInfo.getPluginId(), //删除设备对应连接
fillAction( endpointMap.remove(device.getDeviceName());
DeviceStateChange.builder() //下线
.state(DeviceState.OFFLINE) thingService.post(
.build() pluginInfo.getPluginId(),
, productKey, deviceName fillAction(
) DeviceStateChange.builder()
); .state(DeviceState.OFFLINE)
DEVICE_ONLINE.clear(); .build()
, device.getProductKey(), device.getDeviceName()
)
);
DEVICE_ONLINE.remove(device.getDeviceName());
}
// ack the subscriptions request // ack the subscriptions request
endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
@ -241,16 +253,13 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
return; return;
} }
String[] topicParts = topic.split("/"); Device device = getDevice(topic);
if (topicParts.length < 5) { if (device == null) {
return; return;
} }
//网关上线 //有消息上报-设备上线
online(productKey, deviceName); online(device.getProductKey(), device.getDeviceName());
String topicPk = topicParts[2];
String topicDn = topicParts[3];
if (!MQTT_CONNECT_POOL.get(clientId)) { if (!MQTT_CONNECT_POOL.get(clientId)) {
//保存设备与连接关系 //保存设备与连接关系
@ -271,16 +280,18 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
if ("thing.lifetime.register".equalsIgnoreCase(method)) { if ("thing.lifetime.register".equalsIgnoreCase(method)) {
//子设备注册 //子设备注册
String subPk = params.getString("productKey");
String subDn = params.getString("deviceName");
ActionResult regResult = thingService.post( ActionResult regResult = thingService.post(
pluginInfo.getPluginId(), pluginInfo.getPluginId(),
fillAction( fillAction(
DeviceRegister.builder() DeviceRegister.builder()
.productKey(params.getString("productKey")) .productKey(subPk)
.deviceName(params.getString("deviceName")) .deviceName(subDn)
.model(params.getString("model")) .model(params.getString("model"))
.version("1.0") .version("1.0")
.build() .build()
, productKey, deviceName , subPk, subDn
) )
); );
if (regResult.getCode() == 0) { if (regResult.getCode() == 0) {
@ -294,16 +305,12 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
} }
if ("thing.event.property.post".equalsIgnoreCase(method)) { if ("thing.event.property.post".equalsIgnoreCase(method)) {
//设备上线处理
online(topicPk, topicDn);
//属性上报 //属性上报
action = PropertyReport.builder() action = PropertyReport.builder()
.params(params.getMap()) .params(params.getMap())
.build(); .build();
reply(endpoint, topic, payload); reply(endpoint, topic, payload);
} else if (method.startsWith("thing.event.")) { } else if (method.startsWith("thing.event.")) {
//设备上线处理
online(topicPk, topicDn);
//事件上报 //事件上报
action = EventReport.builder() action = EventReport.builder()
.name(method.replace("thing.event.", "")) .name(method.replace("thing.event.", ""))
@ -312,8 +319,6 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
.build(); .build();
reply(endpoint, topic, payload); reply(endpoint, topic, payload);
} else if (method.startsWith("thing.service.") && method.endsWith("_reply")) { } else if (method.startsWith("thing.service.") && method.endsWith("_reply")) {
//设备上线处理
online(topicPk, topicDn);
//服务回复 //服务回复
action = ServiceReply.builder() action = ServiceReply.builder()
.name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1")) .name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1"))
@ -326,8 +331,8 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
return; return;
} }
action.setId(payload.getString("id")); action.setId(payload.getString("id"));
action.setProductKey(topicPk); action.setProductKey(device.getProductKey());
action.setDeviceName(topicDn); action.setDeviceName(device.getDeviceName());
action.setTime(System.currentTimeMillis()); action.setTime(System.currentTimeMillis());
thingService.post(pluginInfo.getPluginId(), action); thingService.post(pluginInfo.getPluginId(), action);
} catch (Throwable e) { } catch (Throwable e) {
@ -417,4 +422,22 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
result.onSuccess(integer -> log.info("publish success,topic:{},payload:{}", topic, msg)); result.onSuccess(integer -> log.info("publish success,topic:{},payload:{}", topic, msg));
} }
public Device getDevice(String topic) {
String[] topicParts = topic.split("/");
if (topicParts.length < 5) {
return null;
}
return new Device(topicParts[2], topicParts[3]);
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Device {
private String productKey;
private String deviceName;
}
} }

View File

@ -24,7 +24,7 @@ public class BeanConfig {
@Bean @Bean
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
IPluginScript getPluginScript() { IPluginScript getPluginScript() {
return new LocalPluginScript("test.js"); return new LocalPluginScript("script.js");
} }
@Bean @Bean

View File

@ -1,6 +1,8 @@
package cc.iotkit.plugins.tcp.server; package cc.iotkit.plugins.tcp.server;
import cc.iotkit.common.enums.ErrCode;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.plugin.core.IPluginScript; import cc.iotkit.plugin.core.IPluginScript;
import cc.iotkit.plugin.core.thing.IThingService; import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.ActionResult; import cc.iotkit.plugin.core.thing.actions.ActionResult;
@ -75,14 +77,10 @@ public class TcpServerVerticle extends AbstractVerticle {
private IThingService thingService; private IThingService thingService;
@Override @Override
public void start() { public void start() throws Exception {
try { initConfig();
initConfig(); initTcpServer();
initTcpServer(); log.info("init tcp server failed");
} catch (Exception e) {
log.info("init tcp server failed");
e.printStackTrace();
}
} }
@Override @Override
@ -96,13 +94,16 @@ public class TcpServerVerticle extends AbstractVerticle {
public void initConfig() { public void initConfig() {
//获取脚本引擎 //获取脚本引擎
scriptEngine = pluginScript.getScriptEngine(pluginInfo.getPluginId()); scriptEngine = pluginScript.getScriptEngine(pluginInfo.getPluginId());
if (scriptEngine == null) {
throw new BizException("script engine is null");
}
} }
/** /**
* TCP * TCP
*/ */
private void initTcpServer() throws Exception { private void initTcpServer() {
netServer = vertx.createNetServer( netServer = vertx.createNetServer(
new NetServerOptions().setHost(config.getHost()) new NetServerOptions().setHost(config.getHost())
.setPort(config.getPort())); .setPort(config.getPort()));