From f658e344dd47ba2303a90a9061798c96c0a5c4ff Mon Sep 17 00:00:00 2001 From: xiwa Date: Thu, 24 Mar 2022 06:53:42 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E9=80=9A=E8=AE=AF=E7=BB=84=E4=BB=B6?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- manager/pom.xml | 5 ++ .../config/KeycloakSecurityConfig.java | 2 +- .../controller/ProtocolController.java | 18 +++- protocol-gateway/.DS_Store | Bin .../java/cc/iotkit/comps/MessageHandler.java | 23 ++--- .../java/cc/iotkit/comps/model/AuthInfo.java | 0 .../cc/iotkit/comps/model/DeviceState.java | 0 .../comps/service/DeviceBehaviourService.java | 5 +- protocol-gateway/component/pom.xml | 0 .../java/cc/iotkit/comp/IMessageHandler.java | 0 .../java/cc/iotkit/comp/model/AuthInfo.java | 0 .../cc/iotkit/comp/model/DeviceMessage.java | 0 protocol-gateway/converter/pom.xml | 0 .../main/java/cc/iotkit/converter/Device.java | 0 .../cc/iotkit/converter/DeviceService.java | 0 .../java/cc/iotkit/converter/IConverter.java | 0 .../cc/iotkit/converter/ScriptConverter.java | 0 .../cc/iotkit/comp/mqtt/MqttComponent.java | 16 ++-- .../cc/iotkit/comp/mqtt/MqttVerticle.java | 8 +- .../src/main/resources/component.js | 84 ++++++++++++++++++ .../src/main/resources/converter.js | 51 +++++++++++ 21 files changed, 187 insertions(+), 25 deletions(-) mode change 100644 => 100755 protocol-gateway/.DS_Store mode change 100644 => 100755 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/AuthInfo.java mode change 100644 => 100755 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/DeviceState.java mode change 100644 => 100755 protocol-gateway/component/pom.xml mode change 100644 => 100755 protocol-gateway/component/src/main/java/cc/iotkit/comp/IMessageHandler.java mode change 100644 => 100755 protocol-gateway/component/src/main/java/cc/iotkit/comp/model/AuthInfo.java mode change 100644 => 100755 protocol-gateway/component/src/main/java/cc/iotkit/comp/model/DeviceMessage.java mode change 100644 => 100755 protocol-gateway/converter/pom.xml mode change 100644 => 100755 protocol-gateway/converter/src/main/java/cc/iotkit/converter/Device.java mode change 100644 => 100755 protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceService.java mode change 100644 => 100755 protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java mode change 100644 => 100755 protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java create mode 100644 protocol-gateway/mqtt-component/src/main/resources/component.js create mode 100644 protocol-gateway/mqtt-component/src/main/resources/converter.js diff --git a/manager/pom.xml b/manager/pom.xml index 182092a3..f6678ab9 100755 --- a/manager/pom.xml +++ b/manager/pom.xml @@ -159,6 +159,11 @@ mqtt-component + + cc.iotkit + converter + + diff --git a/manager/src/main/java/cc/iotkit/manager/config/KeycloakSecurityConfig.java b/manager/src/main/java/cc/iotkit/manager/config/KeycloakSecurityConfig.java index c6bb3d02..91451d0c 100755 --- a/manager/src/main/java/cc/iotkit/manager/config/KeycloakSecurityConfig.java +++ b/manager/src/main/java/cc/iotkit/manager/config/KeycloakSecurityConfig.java @@ -54,7 +54,7 @@ public class KeycloakSecurityConfig extends KeycloakWebSecurityConfigurerAdapter http .authorizeRequests() .antMatchers("/*.html", "/favicon.ico", "/v2/api-docs", "/webjars/**", "/swagger-resources/**", "/*.js").permitAll() - .antMatchers("/device_behaviour/**").permitAll() + .antMatchers("/protocol/**").permitAll()//todo for test .antMatchers("/**/save*").hasRole("iot_write") .antMatchers("/**/del*").hasRole("iot_write") .antMatchers("/**/add*").hasRole("iot_write") diff --git a/manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java b/manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java index ea825b2e..7e1edd7f 100755 --- a/manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java +++ b/manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java @@ -14,6 +14,7 @@ import cc.iotkit.model.UserInfo; import cc.iotkit.model.protocol.ProtocolGateway; import cc.iotkit.protocol.server.service.GatewayService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; @@ -22,6 +23,8 @@ import org.springframework.data.domain.Sort; import org.springframework.web.bind.annotation.*; import javax.annotation.PostConstruct; +import java.io.File; +import java.io.IOException; import java.util.Optional; @Slf4j @@ -130,13 +133,20 @@ public class ProtocolController { return new Paging<>(gateways.getTotalElements(), gateways.getContent()); } - @PostConstruct - public void init() { + @GetMapping("/registerMqtt") + public void registerMqtt() throws IOException { MqttComponent component = new MqttComponent(); + component.create("{\"port\":2883,\"ssl\":false}"); ScriptConverter converter = new ScriptConverter(); - converter.setScript(""); + converter.setScript(FileUtils.readFileToString(new File("/Users/sjg/home/gitee/open-source/converter.js"), "UTF-8")); component.setConverter(converter); componentManager.register("123", component); - componentManager.start("123", ""); + componentManager.start("123", FileUtils.readFileToString(new File("/Users/sjg/home/gitee/open-source/component.js"), "UTF-8")); + } + + @GetMapping("/deregisterMqtt") + public void deregisterMqtt() { + componentManager.stop("123"); + componentManager.deRegister("123"); } } diff --git a/protocol-gateway/.DS_Store b/protocol-gateway/.DS_Store old mode 100644 new mode 100755 diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/MessageHandler.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/MessageHandler.java index 3fba5d70..b5393850 100755 --- a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/MessageHandler.java +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/MessageHandler.java @@ -1,5 +1,6 @@ package cc.iotkit.comps; +import cc.iotkit.common.exception.BizException; import cc.iotkit.comp.IMessageHandler; import cc.iotkit.comp.model.DeviceMessage; import cc.iotkit.comps.model.AuthInfo; @@ -23,7 +24,7 @@ import java.util.Map; public class MessageHandler implements IMessageHandler { private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn"); - private final String script; + private final Object scriptObj; private final IConverter converter; @@ -32,10 +33,9 @@ public class MessageHandler implements IMessageHandler { @SneakyThrows public MessageHandler(String script, IConverter converter, DeviceBehaviourService deviceBehaviourService) { - this.script = script; this.converter = converter; this.deviceBehaviourService = deviceBehaviourService; - engine.eval(script); + scriptObj = engine.eval(script); } public void register(Map head, String msg) { @@ -49,7 +49,7 @@ public class MessageHandler implements IMessageHandler { public void onReceive(Map head, String type, String msg) { try { - ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeFunction("onReceive", head, type, msg); + ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeMethod(scriptObj, "onReceive", head, type, msg); Object rstType = result.get("type"); if (rstType == null) { return; @@ -57,7 +57,7 @@ public class MessageHandler implements IMessageHandler { //取脚本执行后返回的数据 Object data = result.get("data"); if (!(data instanceof Map)) { - return; + throw new BizException("script result data is incorrect"); } Map dataMap = (Map) data; @@ -83,18 +83,20 @@ public class MessageHandler implements IMessageHandler { doReport(message); } + } catch (BizException e) { + throw e; } catch (Throwable e) { - log.error("onReceive error", e); + throw new BizException("receive component message error", e); } } private void doRegister(RegisterInfo reg) throws ScriptException, NoSuchMethodException { try { deviceBehaviourService.register(reg); - engine.invokeFunction("onRegistered", reg, true); + engine.invokeMethod(scriptObj, "onRegistered", reg, "true"); } catch (Throwable e) { log.error("register error", e); - engine.invokeFunction("onRegistered", reg, false); + engine.invokeMethod(scriptObj, "onRegistered", reg, "false"); } } @@ -104,10 +106,10 @@ public class MessageHandler implements IMessageHandler { auth.getDeviceName(), auth.getProductSecret(), auth.getDeviceSecret()); - engine.invokeFunction("onAuthed", auth, true); + engine.invokeMethod(scriptObj, "onAuthed", auth, true); } catch (Throwable e) { log.error("device auth error", e); - engine.invokeFunction("onAuthed", auth, false); + engine.invokeMethod(scriptObj, "onAuthed", auth, false); } } @@ -128,5 +130,4 @@ public class MessageHandler implements IMessageHandler { log.error("report device message error", e); } } - } diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/AuthInfo.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/AuthInfo.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/DeviceState.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/DeviceState.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java index 4f8d03ff..ec5fd1c7 100755 --- a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java @@ -104,10 +104,11 @@ public class DeviceBehaviourService { } else { //不存在,注册新设备 device = new DeviceInfo(); + device.setId(newDeviceId(info.getDeviceName())); device.setParentId(parentId); device.setUid(uid); - device.setDeviceId(newDeviceId(info.getDeviceName())); - device.setProductKey(info.getProductKey()); + device.setDeviceId(device.getId()); + device.setProductKey(pk); device.setDeviceName(info.getDeviceName()); device.setTag(info.getTag()); device.setState(new DeviceInfo.State(false, null, null)); diff --git a/protocol-gateway/component/pom.xml b/protocol-gateway/component/pom.xml old mode 100644 new mode 100755 diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/IMessageHandler.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/IMessageHandler.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/AuthInfo.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/AuthInfo.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/DeviceMessage.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/DeviceMessage.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/converter/pom.xml b/protocol-gateway/converter/pom.xml old mode 100644 new mode 100755 diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/Device.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/Device.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceService.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceService.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java old mode 100644 new mode 100755 diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java index dd6c5329..a64a4638 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java @@ -3,8 +3,10 @@ package cc.iotkit.comp.mqtt; import cc.iotkit.common.exception.BizException; import cc.iotkit.common.utils.JsonUtil; import cc.iotkit.comp.AbstractComponent; +import cc.iotkit.comp.IMessageHandler; import io.vertx.core.Future; import io.vertx.core.Vertx; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; @@ -13,18 +15,21 @@ import java.util.concurrent.CountDownLatch; public class MqttComponent extends AbstractComponent { private Vertx vertx; - private final CountDownLatch countDownLatch = new CountDownLatch(1); + private CountDownLatch countDownLatch; private String deployedId; - private MqttConfig mqttConfig; + private MqttVerticle mqttVerticle; public void create(String config) { vertx = Vertx.vertx(); - mqttConfig = JsonUtil.parse(config, MqttConfig.class); + MqttConfig mqttConfig = JsonUtil.parse(config, MqttConfig.class); + mqttVerticle = new MqttVerticle(mqttConfig); } public void start() { try { - Future future = vertx.deployVerticle(new MqttVerticle(mqttConfig, getHandler())); + mqttVerticle.setExecutor(getHandler()); + countDownLatch = new CountDownLatch(1); + Future future = vertx.deployVerticle(mqttVerticle); future.onSuccess((s -> { deployedId = s; countDownLatch.countDown(); @@ -40,13 +45,14 @@ public class MqttComponent extends AbstractComponent { } } + @SneakyThrows public void stop() { + mqttVerticle.stop(); Future future = vertx.undeploy(deployedId); future.onSuccess(unused -> log.info("stop mqtt component success")); } public void destroy() { - vertx.close(); } } diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java index 9263ca4c..ff73ae1e 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java @@ -26,10 +26,13 @@ public class MqttVerticle extends AbstractVerticle { private final MqttConfig config; - private final IMessageHandler executor; + private IMessageHandler executor; - public MqttVerticle(MqttConfig config, IMessageHandler executor) { + public MqttVerticle(MqttConfig config) { this.config = config; + } + + public void setExecutor(IMessageHandler executor) { this.executor = executor; } @@ -62,6 +65,7 @@ public class MqttVerticle extends AbstractVerticle { } catch (Throwable e) { log.error("auth failed", e); endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); + return; } log.info("MQTT client keep alive timeout = {} ", endpoint.keepAliveTimeSeconds()); diff --git a/protocol-gateway/mqtt-component/src/main/resources/component.js b/protocol-gateway/mqtt-component/src/main/resources/component.js new file mode 100644 index 00000000..b9c8a840 --- /dev/null +++ b/protocol-gateway/mqtt-component/src/main/resources/component.js @@ -0,0 +1,84 @@ +new (function () { + !function(n){"use strict";function d(n,t){var r=(65535&n)+(65535&t);return(n>>16)+(t>>16)+(r>>16)<<16|65535&r}function f(n,t,r,e,o,u){return d((u=d(d(t,n),d(e,u)))<>>32-o,r)}function l(n,t,r,e,o,u,c){return f(t&r|~t&e,n,t,o,u,c)}function g(n,t,r,e,o,u,c){return f(t&e|r&~e,n,t,o,u,c)}function v(n,t,r,e,o,u,c){return f(t^r^e,n,t,o,u,c)}function m(n,t,r,e,o,u,c){return f(r^(t|~e),n,t,o,u,c)}function c(n,t){var r,e,o,u;n[t>>5]|=128<>>9<<4)]=t;for(var c=1732584193,f=-271733879,i=-1732584194,a=271733878,h=0;h>5]>>>e%32&255);return t}function a(n){var t=[];for(t[(n.length>>2)-1]=void 0,e=0;e>5]|=(255&n.charCodeAt(e/8))<>>4&15)+r.charAt(15&t);return e}function r(n){return unescape(encodeURIComponent(n))}function o(n){return i(c(a(n=r(n)),8*n.length))}function u(n,t){return function(n,t){var r,e=a(n),o=[],u=[];for(o[15]=u[15]=void 0,16 0) { + var identifier = topic.substring(topic.lastIndexOf("/") + 1); + //事件上报 + return { + mid: msg.mid, + productKey: msg.productKey, + deviceName: msg.deviceName, + type:"event", + identifier: identifier, + occur: new Date().getTime(), + time: new Date().getTime(), + data: payload.params, + }; + } else if (topic.endsWith("_reply")) { + var identifier = topic.substring(topic.lastIndexOf("/") + 1); + //服务回复 + return { + mid: msg.mid, + productKey: msg.productKey, + deviceName: msg.deviceName, + type:"service", + identifier: identifier.replace("_reply", "Reply"), + occur: new Date().getTime(), + time: new Date().getTime(), + code: payload.code, + data: payload.data, + }; + } + return null; + }; + })() + \ No newline at end of file