From 6e89f959a96a7a42e0e70a90ae9a6afe85eaeb0a Mon Sep 17 00:00:00 2001 From: xiwa Date: Wed, 8 Nov 2023 00:44:29 +0800 Subject: [PATCH] =?UTF-8?q?fix:mqtt=E5=AD=90=E8=AE=BE=E5=A4=87=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../plugins/mqtt/service/MqttVerticle.java | 81 ++++++++++++------- .../iotkit/plugins/tcp/conf/BeanConfig.java | 2 +- .../plugins/tcp/server/TcpServerVerticle.java | 19 ++--- 3 files changed, 63 insertions(+), 39 deletions(-) diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java index 44d5b56..1da23a6 100755 --- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java @@ -35,7 +35,9 @@ import io.vertx.core.json.JsonObject; import io.vertx.core.net.PemKeyCertOptions; import io.vertx.mqtt.*; import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -203,6 +205,11 @@ public class MqttVerticle extends AbstractVerticle implements Handler { - //下线 - thingService.post( - pluginInfo.getPluginId(), - fillAction( - DeviceStateChange.builder() - .state(DeviceState.OFFLINE) - .build() - , productKey, deviceName - ) - ); - DEVICE_ONLINE.clear(); + for (String topic : unsubscribe.topics()) { + Device device = getDevice(topic); + //删除设备对应连接 + endpointMap.remove(device.getDeviceName()); + //下线 + thingService.post( + pluginInfo.getPluginId(), + fillAction( + DeviceStateChange.builder() + .state(DeviceState.OFFLINE) + .build() + , device.getProductKey(), device.getDeviceName() + ) + ); + DEVICE_ONLINE.remove(device.getDeviceName()); + } // ack the subscriptions request endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); @@ -241,16 +253,13 @@ public class MqttVerticle extends AbstractVerticle implements Handler log.info("publish success,topic:{},payload:{}", topic, msg)); } + public Device getDevice(String topic) { + String[] topicParts = topic.split("/"); + if (topicParts.length < 5) { + return null; + } + return new Device(topicParts[2], topicParts[3]); + } + + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Device { + + private String productKey; + + private String deviceName; + } } diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/BeanConfig.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/BeanConfig.java index 61353f4..f464642 100755 --- a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/BeanConfig.java +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/BeanConfig.java @@ -24,7 +24,7 @@ public class BeanConfig { @Bean @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") IPluginScript getPluginScript() { - return new LocalPluginScript("test.js"); + return new LocalPluginScript("script.js"); } @Bean diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java index ade0819..878db8d 100644 --- a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java @@ -1,6 +1,8 @@ package cc.iotkit.plugins.tcp.server; +import cc.iotkit.common.enums.ErrCode; +import cc.iotkit.common.exception.BizException; import cc.iotkit.plugin.core.IPluginScript; import cc.iotkit.plugin.core.thing.IThingService; import cc.iotkit.plugin.core.thing.actions.ActionResult; @@ -75,14 +77,10 @@ public class TcpServerVerticle extends AbstractVerticle { private IThingService thingService; @Override - public void start() { - try { - initConfig(); - initTcpServer(); - } catch (Exception e) { - log.info("init tcp server failed"); - e.printStackTrace(); - } + public void start() throws Exception { + initConfig(); + initTcpServer(); + log.info("init tcp server failed"); } @Override @@ -96,13 +94,16 @@ public class TcpServerVerticle extends AbstractVerticle { public void initConfig() { //获取脚本引擎 scriptEngine = pluginScript.getScriptEngine(pluginInfo.getPluginId()); + if (scriptEngine == null) { + throw new BizException("script engine is null"); + } } /** * 初始TCP服务 */ - private void initTcpServer() throws Exception { + private void initTcpServer() { netServer = vertx.createNetServer( new NetServerOptions().setHost(config.getHost()) .setPort(config.getPort()));