From 5c4694b080c2b92c73b72acd46561d626b1628e4 Mon Sep 17 00:00:00 2001 From: xiwa Date: Wed, 23 Mar 2022 18:58:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E7=BD=91=E5=85=B3=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .DS_Store | Bin 6148 -> 8196 bytes .../main/java/cc/iotkit/common/Constants.java | 2 +- .../cc/iotkit/simulator/service/Gateway.java | 2 +- .../mqtt/controller/MqttAuthController.java | 2 +- .../mqtt/service/DeviceAuthService.java | 2 +- .../server/mqtt/service/MqttManager.java | 2 +- manager/pom.xml | 10 + .../controller/ProtocolController.java | 16 ++ pom.xml | 14 +- protocol-gateway/.DS_Store | Bin 0 -> 6148 bytes protocol-gateway/component-server/pom.xml | 74 +++++++ .../cc/iotkit/comps/ComponentManager.java | 47 +++++ .../java/cc/iotkit/comps/MessageHandler.java | 132 ++++++++++++ .../cc/iotkit/comps}/config/ServerConfig.java | 2 +- .../java/cc/iotkit/comps/model/AuthInfo.java | 15 ++ .../cc/iotkit/comps/model/DeviceState.java | 17 ++ .../cc/iotkit/comps/model/RegisterInfo.java | 49 +++++ .../comps/service/DeviceBehaviourService.java | 188 ++++++++++++++++++ .../comps/service/DeviceMessageConsumer.java | 75 +++++++ protocol-gateway/component/pom.xml | 23 +-- .../cc/iotkit/comp/AbstractComponent.java | 8 +- .../main/java/cc/iotkit/comp/Component.java | 21 -- .../java/cc/iotkit/comp/ComponentManager.java | 37 ---- .../main/java/cc/iotkit/comp/IComponent.java | 21 ++ .../java/cc/iotkit/comp/IMessageHandler.java | 14 ++ .../java/cc/iotkit/comp/MessageHandler.java | 68 ------- .../java/cc/iotkit/comp/model/AuthInfo.java | 12 ++ .../cc/iotkit/comp/model/DeviceMessage.java | 15 ++ protocol-gateway/converter/pom.xml | 15 ++ .../java/cc/iotkit/converter/Converter.java | 8 - .../java/cc/iotkit/converter}/Device.java | 4 +- .../cc/iotkit/converter/DeviceService.java | 20 ++ .../java/cc/iotkit/converter/IConverter.java | 13 ++ .../cc/iotkit/converter/ScriptConverter.java | 45 ++++- .../iotkit/converter/ThingModelMessage.java | 62 ------ .../cc/iotkit/comp/mqtt/MqttVerticle.java | 6 +- protocol-gateway/pom.xml | 8 +- .../server/config/ProtocolConfig.java | 17 ++ .../controller/DeviceBehaviourController.java | 9 +- ...iourService.java => BehaviourService.java} | 2 +- .../server/service/DeviceMessageConsumer.java | 8 +- .../server/service/GatewayService.java | 4 +- .../ruleengine/config/RuleConfiguration.java | 2 +- 43 files changed, 848 insertions(+), 243 deletions(-) create mode 100644 protocol-gateway/.DS_Store create mode 100755 protocol-gateway/component-server/pom.xml create mode 100755 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentManager.java create mode 100755 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/MessageHandler.java rename protocol-gateway/{protocol-server/src/main/java/cc/iotkit/protocol/server => component-server/src/main/java/cc/iotkit/comps}/config/ServerConfig.java (88%) create mode 100644 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/AuthInfo.java create mode 100644 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/DeviceState.java create mode 100755 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/RegisterInfo.java create mode 100755 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java create mode 100755 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceMessageConsumer.java mode change 100755 => 100644 protocol-gateway/component/pom.xml delete mode 100755 protocol-gateway/component/src/main/java/cc/iotkit/comp/Component.java delete mode 100755 protocol-gateway/component/src/main/java/cc/iotkit/comp/ComponentManager.java create mode 100755 protocol-gateway/component/src/main/java/cc/iotkit/comp/IComponent.java create mode 100644 protocol-gateway/component/src/main/java/cc/iotkit/comp/IMessageHandler.java delete mode 100755 protocol-gateway/component/src/main/java/cc/iotkit/comp/MessageHandler.java create mode 100644 protocol-gateway/component/src/main/java/cc/iotkit/comp/model/AuthInfo.java create mode 100644 protocol-gateway/component/src/main/java/cc/iotkit/comp/model/DeviceMessage.java delete mode 100644 protocol-gateway/converter/src/main/java/cc/iotkit/converter/Converter.java rename protocol-gateway/{component/src/main/java/cc/iotkit/comp => converter/src/main/java/cc/iotkit/converter}/Device.java (62%) mode change 100755 => 100644 create mode 100644 protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceService.java create mode 100644 protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java delete mode 100755 protocol-gateway/converter/src/main/java/cc/iotkit/converter/ThingModelMessage.java create mode 100755 protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/config/ProtocolConfig.java rename protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/{DeviceBehaviourService.java => BehaviourService.java} (99%) diff --git a/.DS_Store b/.DS_Store index 7722cb09355f57d9136253a620408d833f19de59..f9d353f370a1db4f78c1f5da0233068cda235af4 100755 GIT binary patch literal 8196 zcmeHMO>fgc5S?w(#!*z11Bf0iaSbA^s-lXE70MsL2o41W8(YD`^+t(PtBNA|3V(=y z!tdejM^RZfa6m%T%vv+gp500Gp4Yn>hls>vl^hZs5RnH{I2^#;(p0~)M%%HGdq9DF zB1uJDE$4AAuAi?u<$ zbinBn0N6!XH=Lsm&^f-vz+!Dsr{XhB^&o7juqB2t>FD>>aA07uHfYjGm~;~Mn}w}V zg#LEq`Fc2sz#w~8fE8#eplbI$`hd#)0>{tw``37$XUlRt{w8)h-QB&O*YgJ6yYQo| z!ZIpX%Sp62S68p3%;W3xD1M!#)pU6Oxh%>kE7FBd$dVL8-oD9-L{<~IDw3tn4NZsF z_xjV}!}a>;xU(rerX~CW*YNG2{R@JQ6g^Dqhzi)O zJ@g$?4&`0_B&!*ax8rW#AmmUT(i{_8QVCOwSdPg)!@Vv+z5-sgLxA7XbYvMofH{t+ zjnDoTa*hPfrofj?1v(~lpz8k@+u#2;6FI^PumY`7Ky=T-vkB;2|Dtx8sao5Iy@#o$ z#?=OO3Y;#-fw~+A{`kWX=RSt2oWNpj5D~P${vzNmmNUHn>$yUtYCGQltr&j<-vOw) BMSTDO delta 269 zcmZp1XfcprU|?W$DortDU=RQ@Ie-{MGjUEV6q~50D9Qwq2aBaMq%ssGl@}Kz zwRJOrZU8w0=ngR8hte==*5rM{iW^Hlvn*!k;1Fbn2n%onX;+X>HWq$op3E=f2=Xe( UFDxJ$=r538!D5@^dFC(!051eIH~;_u diff --git a/common/src/main/java/cc/iotkit/common/Constants.java b/common/src/main/java/cc/iotkit/common/Constants.java index 860ab832..233ad8c9 100755 --- a/common/src/main/java/cc/iotkit/common/Constants.java +++ b/common/src/main/java/cc/iotkit/common/Constants.java @@ -2,7 +2,7 @@ package cc.iotkit.common; public interface Constants { - String MQTT_SECRET = "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU"; + String PRODUCT_SECRET = "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU"; String ACCOUNT_SECRET = "3n1z33kzvpgz1foijpkepyd3e8tw84us"; diff --git a/device-server/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/service/Gateway.java b/device-server/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/service/Gateway.java index bded1d2c..f447d76d 100755 --- a/device-server/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/service/Gateway.java +++ b/device-server/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/service/Gateway.java @@ -36,7 +36,7 @@ public class Gateway extends Device { // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(this.deviceName); - connOpts.setPassword(DigestUtils.md5Hex(Constants.MQTT_SECRET + clientId).toCharArray()); + connOpts.setPassword(DigestUtils.md5Hex(Constants.PRODUCT_SECRET + clientId).toCharArray()); // 保留会话 connOpts.setCleanSession(true); diff --git a/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/controller/MqttAuthController.java b/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/controller/MqttAuthController.java index 3b3c79e5..a9b82d6b 100755 --- a/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/controller/MqttAuthController.java +++ b/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/controller/MqttAuthController.java @@ -53,7 +53,7 @@ public class MqttAuthController { return false; } clientId = clientId.replaceFirst("su_", ""); - return CodecUtil.aesDecrypt(clientId, Constants.MQTT_SECRET).startsWith("admin_"); + return CodecUtil.aesDecrypt(clientId, Constants.PRODUCT_SECRET).startsWith("admin_"); } catch (Throwable e) { log.error("aesDecrypt error.", e); return false; diff --git a/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/service/DeviceAuthService.java b/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/service/DeviceAuthService.java index f4f30a7d..b1fc8142 100755 --- a/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/service/DeviceAuthService.java +++ b/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/service/DeviceAuthService.java @@ -24,7 +24,7 @@ public class DeviceAuthService { String clientId = auth.getClientid(); String[] pkDnAndModel = getPkDnAndModel(clientId); - String hmac = DigestUtils.md5Hex(Constants.MQTT_SECRET + clientId); + String hmac = DigestUtils.md5Hex(Constants.PRODUCT_SECRET + clientId); if (!hmac.equalsIgnoreCase(auth.getPassword())) { throw new RuntimeException("password is illegal."); } diff --git a/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/service/MqttManager.java b/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/service/MqttManager.java index f2bf9bee..5c9b6a3a 100755 --- a/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/service/MqttManager.java +++ b/device-server/mqtt-server/src/main/java/cc/iotkit/server/mqtt/service/MqttManager.java @@ -73,7 +73,7 @@ public class MqttManager implements MqttCallback, IMqttMessageListener { if (mqttClient == null) { MemoryPersistence persistence = new MemoryPersistence(); String clientId = "mqtt-server-consumer-" + env; - clientId = "su_" + CodecUtil.aesEncrypt("admin_" + clientId, Constants.MQTT_SECRET); + clientId = "su_" + CodecUtil.aesEncrypt("admin_" + clientId, Constants.PRODUCT_SECRET); mqttClient = new MqttClient(url, clientId, persistence); mqttClient.setCallback(this); } diff --git a/manager/pom.xml b/manager/pom.xml index 8268b1f2..182092a3 100755 --- a/manager/pom.xml +++ b/manager/pom.xml @@ -149,6 +149,16 @@ protocol-server + + cc.iotkit + component-server + + + + cc.iotkit + mqtt-component + + diff --git a/manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java b/manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java index ae666898..ea825b2e 100755 --- a/manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java +++ b/manager/src/main/java/cc/iotkit/manager/controller/ProtocolController.java @@ -2,6 +2,9 @@ package cc.iotkit.manager.controller; import cc.iotkit.common.exception.BizException; import cc.iotkit.common.utils.ReflectUtil; +import cc.iotkit.comp.mqtt.MqttComponent; +import cc.iotkit.comps.ComponentManager; +import cc.iotkit.converter.ScriptConverter; import cc.iotkit.dao.ProtocolGatewayRepository; import cc.iotkit.dao.UserInfoRepository; import cc.iotkit.manager.service.DataOwnerService; @@ -18,6 +21,7 @@ import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; import org.springframework.web.bind.annotation.*; +import javax.annotation.PostConstruct; import java.util.Optional; @Slf4j @@ -40,6 +44,9 @@ public class ProtocolController { @Autowired private UserInfoRepository userInfoRepository; + @Autowired + private ComponentManager componentManager; + @PostMapping("/addGateway") public void addGateway(ProtocolGateway gateway) { Optional optGateway = gatewayRepository.findById(gateway.getId()); @@ -123,4 +130,13 @@ public class ProtocolController { return new Paging<>(gateways.getTotalElements(), gateways.getContent()); } + @PostConstruct + public void init() { + MqttComponent component = new MqttComponent(); + ScriptConverter converter = new ScriptConverter(); + converter.setScript(""); + component.setConverter(converter); + componentManager.register("123", component); + componentManager.start("123", ""); + } } diff --git a/pom.xml b/pom.xml index 9ff7fea9..14ab8abc 100755 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,6 @@ dao tppa-server protocol-gateway - protocol-gateway/mqtt-component - protocol-gateway/component org.springframework.boot @@ -253,6 +251,18 @@ ${project.version} + + cc.iotkit + component-server + ${project.version} + + + + cc.iotkit + mqtt-component + ${project.version} + + diff --git a/protocol-gateway/.DS_Store b/protocol-gateway/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 + + + iotkit-parent + cc.iotkit + 0.0.1-SNAPSHOT + ../../pom.xml + + 4.0.0 + + component-server + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.apache.pulsar + pulsar-client-all + + + + org.springframework.boot + spring-boot-starter-data-elasticsearch + + + + org.springframework + spring-context + + + + org.slf4j + slf4j-api + + + + org.projectlombok + lombok + + + + cc.iotkit + common + + + + cc.iotkit + converter + + + + cc.iotkit + model + + + + cc.iotkit + dao + + + + cc.iotkit + component + + + + + \ No newline at end of file diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentManager.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentManager.java new file mode 100755 index 00000000..3662ffd6 --- /dev/null +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentManager.java @@ -0,0 +1,47 @@ +package cc.iotkit.comps; + + +import cc.iotkit.comp.IComponent; +import cc.iotkit.comps.service.DeviceBehaviourService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +@Component +public class ComponentManager { + + private final Map components = new HashMap<>(); + + @Autowired + private DeviceBehaviourService deviceBehaviourService; + + public void register(String id, IComponent component) { + components.put(id, component); + } + + public void deRegister(String id) { + IComponent component = components.remove(id); + component.destroy(); + } + + public void start(String id, String script) { + IComponent component = components.get(id); + if (component == null) { + return; + } + component.setHandler(new MessageHandler(script, component.getConverter(), + deviceBehaviourService)); + component.start(); + } + + public void stop(String id) { + IComponent component = components.get(id); + if (component == null) { + return; + } + component.stop(); + } + +} diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/MessageHandler.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/MessageHandler.java new file mode 100755 index 00000000..3fba5d70 --- /dev/null +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/MessageHandler.java @@ -0,0 +1,132 @@ +package cc.iotkit.comps; + +import cc.iotkit.comp.IMessageHandler; +import cc.iotkit.comp.model.DeviceMessage; +import cc.iotkit.comps.model.AuthInfo; +import cc.iotkit.comps.model.DeviceState; +import cc.iotkit.comps.model.RegisterInfo; +import cc.iotkit.comps.service.DeviceBehaviourService; +import cc.iotkit.converter.IConverter; +import jdk.nashorn.api.scripting.NashornScriptEngine; +import jdk.nashorn.api.scripting.ScriptObjectMirror; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.beanutils.BeanUtils; + +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import java.util.Map; + +@Slf4j +@Data +public class MessageHandler implements IMessageHandler { + private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn"); + + private final String script; + + private final IConverter converter; + + private final DeviceBehaviourService deviceBehaviourService; + + @SneakyThrows + public MessageHandler(String script, IConverter converter, + DeviceBehaviourService deviceBehaviourService) { + this.script = script; + this.converter = converter; + this.deviceBehaviourService = deviceBehaviourService; + engine.eval(script); + } + + public void register(Map head, String msg) { + } + + public void auth(Map head, String msg) { + } + + public void state(Map head, String msg) { + } + + public void onReceive(Map head, String type, String msg) { + try { + ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeFunction("onReceive", head, type, msg); + Object rstType = result.get("type"); + if (rstType == null) { + return; + } + //取脚本执行后返回的数据 + Object data = result.get("data"); + if (!(data instanceof Map)) { + return; + } + Map dataMap = (Map) data; + + if ("register".equals(rstType)) { + //注册数据 + RegisterInfo regInfo = new RegisterInfo(); + BeanUtils.populate(regInfo, dataMap); + doRegister(regInfo); + } else if ("auth".equals(rstType)) { + //设备认证 + AuthInfo authInfo = new AuthInfo(); + BeanUtils.populate(authInfo, dataMap); + doAuth(authInfo); + } else if ("state".equals(rstType)) { + //设备状态变更 + DeviceState state = new DeviceState(); + BeanUtils.populate(state, dataMap); + doStateChange(state); + } else if ("report".equals(rstType)) { + //上报数据 + DeviceMessage message = new DeviceMessage(); + BeanUtils.populate(message, dataMap); + doReport(message); + } + + } catch (Throwable e) { + log.error("onReceive error", e); + } + } + + private void doRegister(RegisterInfo reg) throws ScriptException, NoSuchMethodException { + try { + deviceBehaviourService.register(reg); + engine.invokeFunction("onRegistered", reg, true); + } catch (Throwable e) { + log.error("register error", e); + engine.invokeFunction("onRegistered", reg, false); + } + } + + private void doAuth(AuthInfo auth) throws ScriptException, NoSuchMethodException { + try { + deviceBehaviourService.deviceAuth(auth.getProductKey(), + auth.getDeviceName(), + auth.getProductSecret(), + auth.getDeviceSecret()); + engine.invokeFunction("onAuthed", auth, true); + } catch (Throwable e) { + log.error("device auth error", e); + engine.invokeFunction("onAuthed", auth, false); + } + } + + private void doStateChange(DeviceState state) { + try { + deviceBehaviourService.deviceStateChange(state.getProductKey(), + state.getDeviceName(), + DeviceState.STATE_ONLINE.equals(state.getState())); + } catch (Throwable e) { + log.error("device state change error", e); + } + } + + private void doReport(DeviceMessage message) { + try { + deviceBehaviourService.reportMessage(message); + } catch (Throwable e) { + log.error("report device message error", e); + } + } + +} diff --git a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/config/ServerConfig.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/config/ServerConfig.java similarity index 88% rename from protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/config/ServerConfig.java rename to protocol-gateway/component-server/src/main/java/cc/iotkit/comps/config/ServerConfig.java index 6a5a01d8..7238b060 100755 --- a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/config/ServerConfig.java +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/config/ServerConfig.java @@ -1,4 +1,4 @@ -package cc.iotkit.protocol.server.config; +package cc.iotkit.comps.config; import lombok.Data; import org.springframework.beans.factory.annotation.Value; diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/AuthInfo.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/AuthInfo.java new file mode 100644 index 00000000..cce5fc36 --- /dev/null +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/AuthInfo.java @@ -0,0 +1,15 @@ +package cc.iotkit.comps.model; + +import lombok.Data; + +@Data +public class AuthInfo { + + private String productKey; + + private String deviceName; + + private String productSecret; + + private String deviceSecret; +} diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/DeviceState.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/DeviceState.java new file mode 100644 index 00000000..150ce9cb --- /dev/null +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/DeviceState.java @@ -0,0 +1,17 @@ +package cc.iotkit.comps.model; + +import lombok.Data; + +@Data +public class DeviceState { + + public static final String STATE_ONLINE = "online"; + public static final String STATE_OFFLINE = "offline"; + + private String productKey; + + private String deviceName; + + private String state; + +} diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/RegisterInfo.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/RegisterInfo.java new file mode 100755 index 00000000..0cc51c19 --- /dev/null +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/model/RegisterInfo.java @@ -0,0 +1,49 @@ +package cc.iotkit.comps.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +/** + * 注册信息 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class RegisterInfo { + + private String productKey; + + private String deviceName; + + private String model; + + private Map tag; + + private List subDevices; + + public RegisterInfo(String productKey, String deviceName, String model) { + this.productKey = productKey; + this.deviceName = deviceName; + this.model = model; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class SubDevice { + + private String productKey; + + private String deviceName; + + private String model; + + private Map tag; + } +} diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java new file mode 100755 index 00000000..4f8d03ff --- /dev/null +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java @@ -0,0 +1,188 @@ +package cc.iotkit.comps.service; + +import cc.iotkit.common.Constants; +import cc.iotkit.common.exception.BizException; +import cc.iotkit.common.utils.JsonUtil; +import cc.iotkit.comp.model.DeviceMessage; +import cc.iotkit.comps.config.ServerConfig; +import cc.iotkit.comps.model.RegisterInfo; +import cc.iotkit.dao.DeviceRepository; +import cc.iotkit.dao.ProductRepository; +import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.product.Product; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Slf4j +@Service +public class DeviceBehaviourService { + + @Autowired + private ProductRepository productRepository; + @Autowired + private DeviceRepository deviceRepository; + @Autowired + private ServerConfig serverConfig; + + private Producer deviceMessageProducer; + + @PostConstruct + public void init() throws PulsarClientException { + //初始化pulsar客户端 + PulsarClient client = PulsarClient.builder() + .serviceUrl(serverConfig.getPulsarBrokerUrl()) + .build(); + deviceMessageProducer = client.newProducer(JSONSchema.of(DeviceMessage.class)) + .topic("persistent://public/default/device_raw") + .create(); + + } + + + public void register(RegisterInfo info) { + try { + DeviceInfo deviceInfo = register(null, info); + //子设备注册 + List subDevices = info.getSubDevices(); + if (subDevices != null && subDevices.size() != 0) { + for (RegisterInfo.SubDevice subDevice : subDevices) { + register(deviceInfo.getDeviceId(), + new RegisterInfo(subDevice.getProductKey(), + subDevice.getDeviceName(), + subDevice.getModel(), + subDevice.getTag(), null)); + } + } + //todo 产生设备注册事件 + } catch (BizException e) { + log.error("register device error", e); + throw e; + } catch (Throwable e) { + log.error("register device error", e); + throw new BizException("register device error", e); + } + } + + public DeviceInfo register(String parentId, RegisterInfo info) { + String pk = info.getProductKey(); + Optional optProduct = productRepository.findById(pk); + if (!optProduct.isPresent()) { + throw new BizException("Product does not exist"); + } + String uid = optProduct.get().getUid(); + DeviceInfo device = deviceRepository.findByProductKeyAndDeviceName(pk, info.getDeviceName()); + + if (device != null) { + //更新设备信息 + device.setParentId(parentId); + device.setUid(uid); + Map tag = info.getTag(); + Map oldTag = device.getTag(); + + if (oldTag == null) { + oldTag = new HashMap<>(); + } + + if (tag != null) { + oldTag.putAll(tag); + } + + device.setTag(oldTag); + } else { + //不存在,注册新设备 + device = new DeviceInfo(); + device.setParentId(parentId); + device.setUid(uid); + device.setDeviceId(newDeviceId(info.getDeviceName())); + device.setProductKey(info.getProductKey()); + device.setDeviceName(info.getDeviceName()); + device.setTag(info.getTag()); + device.setState(new DeviceInfo.State(false, null, null)); + device.setCreateAt(System.currentTimeMillis()); + } + + deviceRepository.save(device); + log.info("device registered:{}", JsonUtil.toJsonString(device)); + + return device; + } + + /** + * 1-13位 时间戳 + * 14-29位 deviceNae,去除非字母和数字,不足16位补0,超过16位的mac取后16位,共16位 + * 30-31位 mac长度,共2位 + * 32位 随机一个0-f字符 + */ + public static String newDeviceId(String deviceNae) { + int maxDnLen = 16; + String dn = deviceNae.replaceAll("[^0-9A-Za-z]", ""); + if (dn.length() > maxDnLen) { + dn = dn.substring(dn.length() - maxDnLen); + } else { + dn = (dn + "00000000000000000000").substring(0, maxDnLen); + } + String len = StringUtils.leftPad(deviceNae.length() + "", 2, '0'); + String rnd = Integer.toHexString(RandomUtils.nextInt(0, 16)); + return (System.currentTimeMillis() + "0" + dn + len + rnd).toLowerCase(); + } + + public void deviceAuth(String productKey, + String deviceName, + String productSecret, + String deviceSecret) { + DeviceInfo deviceInfo = deviceRepository.findByProductKeyAndDeviceName(productKey, deviceName); + if (deviceInfo == null) { + throw new BizException("device does not exist"); + } + if (!Constants.PRODUCT_SECRET.equals(productSecret)) { + throw new BizException("incorrect productSecret"); + } + + //todo 按产品ProductSecret认证,子设备需要父设备认证后可通过验证 +// Optional optProduct = productRepository.findById(productKey); +// if (!optProduct.isPresent()) { +// throw new BizException("product does not exist"); +// } +// Product product = optProduct.get(); +// if (product.getNodeType()) { +// +// } + + } + + public void deviceStateChange(String productKey, + String deviceName, + boolean online) { + DeviceInfo device = deviceRepository.findByProductKeyAndDeviceName(productKey, deviceName); + if (device == null) { + throw new BizException("device does not exist"); + } + + if (online) { + device.getState().setOnline(true); + device.getState().setOnlineTime(System.currentTimeMillis()); + } else { + device.getState().setOnline(false); + device.getState().setOfflineTime(System.currentTimeMillis()); + } + deviceRepository.save(device); + //todo 产生在离线事件 + } + + public void reportMessage(DeviceMessage message) throws PulsarClientException { + deviceMessageProducer.send(message); + } +} diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceMessageConsumer.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceMessageConsumer.java new file mode 100755 index 00000000..8a90b95b --- /dev/null +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceMessageConsumer.java @@ -0,0 +1,75 @@ +package cc.iotkit.comps.service; + +import cc.iotkit.common.Constants; +import cc.iotkit.common.utils.JsonUtil; +import cc.iotkit.comps.config.ServerConfig; +import cc.iotkit.dao.ThingModelMessageRepository; +import cc.iotkit.dao.UserInfoRepository; +import cc.iotkit.model.UserInfo; +import cc.iotkit.model.device.message.ThingModelMessage; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class DeviceMessageConsumer implements MessageListener { + + private final ServerConfig serverConfig; + + private final ThingModelMessageRepository messageRepository; + + private final UserInfoRepository userInfoRepository; + + @SneakyThrows + @Autowired + public DeviceMessageConsumer(ServerConfig serverConfig, + ThingModelMessageRepository messageRepository, + UserInfoRepository userInfoRepository) { + this.serverConfig = serverConfig; + this.messageRepository = messageRepository; + this.userInfoRepository = userInfoRepository; + + PulsarClient client = PulsarClient.builder() + .serviceUrl(this.serverConfig.getPulsarBrokerUrl()) + .build(); + + String topicFormat = "persistent://%s/default/" + Constants.THING_MODEL_MESSAGE_TOPIC; + List platformUsers = userInfoRepository.findByType(UserInfo.USER_TYPE_PLATFORM); + List topics = platformUsers.stream().map(u -> String.format(topicFormat, u.getUid())) + .collect(Collectors.toList()); + log.info("subscribe device_thing topic:{}", JsonUtil.toJsonString(topics)); + + client.newConsumer(Schema.JSON(ThingModelMessage.class)) + .topics(topics) + .subscriptionName("thing-model-message") + .consumerName("thing-model-message-consumer") + .messageListener(this).subscribe(); + } + + @SneakyThrows + @Override + public void received(Consumer consumer, Message msg) { + ThingModelMessage modelMessage = msg.getValue(); + log.info("receive message:{}", JsonUtil.toJsonString(modelMessage)); + //设备消息日志入库 + messageRepository.save(modelMessage); + + messageRepository.findAll().forEach(m -> { + log.info(JsonUtil.toJsonString(m)); + }); + + consumer.acknowledge(msg); + } + + @Override + public void reachedEndOfTopic(Consumer consumer) { + + } + +} diff --git a/protocol-gateway/component/pom.xml b/protocol-gateway/component/pom.xml old mode 100755 new mode 100644 index d57e08c0..4086ecf6 --- a/protocol-gateway/component/pom.xml +++ b/protocol-gateway/component/pom.xml @@ -3,32 +3,31 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - iotkit-parent + protocol-gateway cc.iotkit 0.0.1-SNAPSHOT - ../../pom.xml 4.0.0 component + + 8 + 8 + + - - org.projectlombok - lombok - - - - cc.iotkit - common - - cc.iotkit converter + + org.projectlombok + lombok + + \ No newline at end of file diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/AbstractComponent.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/AbstractComponent.java index e08be90b..da9714f4 100755 --- a/protocol-gateway/component/src/main/java/cc/iotkit/comp/AbstractComponent.java +++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/AbstractComponent.java @@ -1,13 +1,13 @@ package cc.iotkit.comp; -import cc.iotkit.converter.Converter; +import cc.iotkit.converter.IConverter; import lombok.Data; @Data -public abstract class AbstractComponent implements Component { +public abstract class AbstractComponent implements IComponent { - protected MessageHandler handler; + protected IMessageHandler handler; - protected Converter converter; + protected IConverter converter; } diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/Component.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/Component.java deleted file mode 100755 index 2de633f4..00000000 --- a/protocol-gateway/component/src/main/java/cc/iotkit/comp/Component.java +++ /dev/null @@ -1,21 +0,0 @@ -package cc.iotkit.comp; - -import cc.iotkit.converter.Converter; - -public interface Component { - - void create(String config); - - void start(); - - void stop(); - - void destroy(); - - void setHandler(MessageHandler handler); - - void setConverter(Converter converter); - - Converter getConverter(); - -} diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/ComponentManager.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/ComponentManager.java deleted file mode 100755 index f12db384..00000000 --- a/protocol-gateway/component/src/main/java/cc/iotkit/comp/ComponentManager.java +++ /dev/null @@ -1,37 +0,0 @@ -package cc.iotkit.comp; - - -import java.util.HashMap; -import java.util.Map; - -public class ComponentManager { - - private final Map components = new HashMap<>(); - - public void register(String id, Component component) { - components.put(id, component); - } - - public void deRegister(String id) { - Component component = components.remove(id); - component.destroy(); - } - - public void start(String id, String script) { - Component component = components.get(id); - if (component == null) { - return; - } - component.setHandler(new MessageHandler(script, component.getConverter())); - component.start(); - } - - public void stop(String id) { - Component component = components.get(id); - if (component == null) { - return; - } - component.stop(); - } - -} diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/IComponent.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/IComponent.java new file mode 100755 index 00000000..0d8b4f53 --- /dev/null +++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/IComponent.java @@ -0,0 +1,21 @@ +package cc.iotkit.comp; + +import cc.iotkit.converter.IConverter; + +public interface IComponent { + + void create(String config); + + void start(); + + void stop(); + + void destroy(); + + void setHandler(IMessageHandler handler); + + void setConverter(IConverter converter); + + IConverter getConverter(); + +} diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/IMessageHandler.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/IMessageHandler.java new file mode 100644 index 00000000..54a7d4ec --- /dev/null +++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/IMessageHandler.java @@ -0,0 +1,14 @@ +package cc.iotkit.comp; + +import java.util.Map; + +public interface IMessageHandler { + + void register(Map head, String msg); + + void auth(Map head, String msg); + + void state(Map head, String msg); + + void onReceive(Map head, String type, String msg); +} diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/MessageHandler.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/MessageHandler.java deleted file mode 100755 index f039f83b..00000000 --- a/protocol-gateway/component/src/main/java/cc/iotkit/comp/MessageHandler.java +++ /dev/null @@ -1,68 +0,0 @@ -package cc.iotkit.comp; - -import cc.iotkit.common.utils.JsonUtil; -import cc.iotkit.comp.model.RegisterInfo; -import cc.iotkit.converter.Converter; -import jdk.nashorn.api.scripting.NashornScriptEngine; -import jdk.nashorn.api.scripting.ScriptObjectMirror; -import lombok.Data; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; - -import javax.script.ScriptEngineManager; -import java.util.Map; - -@Slf4j -@Data -public class MessageHandler { - private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn"); - - private final String script; - - private final Converter converter; - - @SneakyThrows - public MessageHandler(String script, Converter converter) { - this.script = script; - this.converter = converter; - engine.eval(script); - } - - public void register(Map head, String msg) { - } - - public void auth(Map head, String msg) { - } - - public void state(Map head, String msg) { - } - - public void onReceive(Map head, String type, String msg) { - try { - ScriptObjectMirror obj = (ScriptObjectMirror) engine.invokeFunction("onReceive", head, type, msg); - Object rstType = obj.get("type"); - if (rstType == null) { - return; - } - //取脚本执行后返回的数据 - Object data = obj.get("data"); - - if ("register".equals(rstType)) { - //注册数据 - RegisterInfo regInfo = getData(data, RegisterInfo.class); - } else if ("report".equals(rstType)) { - //上报数据 - - } - - } catch (Throwable e) { - log.error("onReceive error", e); - } - - } - - private T getData(Object data, Class cls) { - return JsonUtil.parse(JsonUtil.toJsonString(data), cls); - } - -} diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/AuthInfo.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/AuthInfo.java new file mode 100644 index 00000000..4aae434a --- /dev/null +++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/AuthInfo.java @@ -0,0 +1,12 @@ +package cc.iotkit.comp.model; + +import lombok.Data; + +@Data +public class AuthInfo { + + private String productKey; + + private String deviceName; + +} diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/DeviceMessage.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/DeviceMessage.java new file mode 100644 index 00000000..3cc305c7 --- /dev/null +++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/DeviceMessage.java @@ -0,0 +1,15 @@ +package cc.iotkit.comp.model; + +import lombok.Data; + +@Data +public class DeviceMessage { + + private String productKey; + + private String deviceName; + + private String mid; + + private String content; +} diff --git a/protocol-gateway/converter/pom.xml b/protocol-gateway/converter/pom.xml index 0c843b96..0dc29b76 100644 --- a/protocol-gateway/converter/pom.xml +++ b/protocol-gateway/converter/pom.xml @@ -18,6 +18,21 @@ + + org.slf4j + slf4j-api + + + + commons-beanutils + commons-beanutils + + + + cc.iotkit + model + + org.projectlombok lombok diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/Converter.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/Converter.java deleted file mode 100644 index abafa4e8..00000000 --- a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/Converter.java +++ /dev/null @@ -1,8 +0,0 @@ -package cc.iotkit.converter; - -public interface Converter { - - ThingModelMessage decode(String msg); - - -} diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/Device.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/Device.java old mode 100755 new mode 100644 similarity index 62% rename from protocol-gateway/component/src/main/java/cc/iotkit/comp/Device.java rename to protocol-gateway/converter/src/main/java/cc/iotkit/converter/Device.java index 8c214b45..20379d03 --- a/protocol-gateway/component/src/main/java/cc/iotkit/comp/Device.java +++ b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/Device.java @@ -1,7 +1,9 @@ -package cc.iotkit.comp; +package cc.iotkit.converter; import lombok.Data; @Data public class Device { + + } diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceService.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceService.java new file mode 100644 index 00000000..efa87010 --- /dev/null +++ b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceService.java @@ -0,0 +1,20 @@ +package cc.iotkit.converter; + +import lombok.Data; + +@Data +public class DeviceService { + + private String mid; + + private String productKey; + + private String deviceName; + + private String type; + + private String identifier; + + private T params; + +} diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java new file mode 100644 index 00000000..93230295 --- /dev/null +++ b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java @@ -0,0 +1,13 @@ +package cc.iotkit.converter; + +import cc.iotkit.model.device.message.ThingModelMessage; + +public interface IConverter { + + void setScript(String script); + + ThingModelMessage decode(String msg); + + String encode(DeviceService service, Device device); + +} diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java index 1be49df2..75517bf5 100644 --- a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java +++ b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java @@ -1,8 +1,51 @@ package cc.iotkit.converter; -public class ScriptConverter implements Converter { +import cc.iotkit.model.device.message.ThingModelMessage; +import jdk.nashorn.api.scripting.NashornScriptEngine; +import jdk.nashorn.api.scripting.ScriptObjectMirror; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.beanutils.BeanUtils; + +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; + +@Slf4j +@Data +public class ScriptConverter implements IConverter { + private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn"); + + private String script; + + public void setScript(String script) { + this.script = script; + try { + engine.eval(script); + } catch (ScriptException e) { + log.error("eval converter script error", e); + } + } public ThingModelMessage decode(String msg) { + try { + ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeFunction("decode", msg); + ThingModelMessage modelMessage = new ThingModelMessage(); + BeanUtils.populate(modelMessage, result); + return modelMessage; + } catch (Throwable e) { + log.error("execute decode script error", e); + } + return null; + } + + @Override + public String encode(DeviceService service, Device device) { + try { + ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeFunction("encode", service, device); + return result.toString(); + } catch (Throwable e) { + log.error("execute encode script error", e); + } return null; } diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ThingModelMessage.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ThingModelMessage.java deleted file mode 100755 index 605c5ebb..00000000 --- a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ThingModelMessage.java +++ /dev/null @@ -1,62 +0,0 @@ -package cc.iotkit.converter; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.HashMap; -import java.util.Map; - -/** - * 物模型消息 - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -@Builder -public class ThingModelMessage { - - private String productKey; - - private String deviceName; - - private String mid; - - private String identifier; - - private Map data; - - /** - * 时间戳,设备上的事件或数据产生的本地时间 - */ - private Long occur; - - /** - * 消息上报时间 - */ - private Long time; - - public static ThingModelMessage from(Map map) { - ThingModelMessage message = new ThingModelMessage(); - message.setProductKey(getStr(map, "productKey")); - message.setDeviceName(getStr(map, "deviceName")); - message.setMid(getStr(map, "mid")); - message.setIdentifier(getStr(map, "identifier")); - Object data = map.get("data"); - if (data instanceof Map) { - message.setData((Map) data); - } else { - message.setData(new HashMap<>()); - } - return message; - } - - private static String getStr(Map map, String key) { - Object val = map.get(key); - if (val == null) { - return null; - } - return val.toString(); - } -} diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java index bada79d3..9263ca4c 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java @@ -1,6 +1,6 @@ package cc.iotkit.comp.mqtt; -import cc.iotkit.comp.MessageHandler; +import cc.iotkit.comp.IMessageHandler; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; @@ -26,9 +26,9 @@ public class MqttVerticle extends AbstractVerticle { private final MqttConfig config; - private final MessageHandler executor; + private final IMessageHandler executor; - public MqttVerticle(MqttConfig config, MessageHandler executor) { + public MqttVerticle(MqttConfig config, IMessageHandler executor) { this.config = config; this.executor = executor; } diff --git a/protocol-gateway/pom.xml b/protocol-gateway/pom.xml index 3fab41c3..f53cf12b 100755 --- a/protocol-gateway/pom.xml +++ b/protocol-gateway/pom.xml @@ -15,12 +15,10 @@ gateway-client protocol-server decode-function + component-server converter + mqtt-component + component - - 8 - 8 - - \ No newline at end of file diff --git a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/config/ProtocolConfig.java b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/config/ProtocolConfig.java new file mode 100755 index 00000000..275a3443 --- /dev/null +++ b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/config/ProtocolConfig.java @@ -0,0 +1,17 @@ +package cc.iotkit.protocol.server.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Data +public class ProtocolConfig { + + @Value("${pulsar.broker}") + private String pulsarBrokerUrl; + + @Value("${pulsar.service}") + private String pulsarServiceUrl; + +} diff --git a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/controller/DeviceBehaviourController.java b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/controller/DeviceBehaviourController.java index 0724da7f..1b4d76d4 100755 --- a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/controller/DeviceBehaviourController.java +++ b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/controller/DeviceBehaviourController.java @@ -3,11 +3,10 @@ package cc.iotkit.protocol.server.controller; import cc.iotkit.common.utils.JsonUtil; import cc.iotkit.protocol.*; import cc.iotkit.protocol.client.DeviceBehaviourClient; -import cc.iotkit.protocol.server.config.ServerConfig; -import cc.iotkit.protocol.server.service.DeviceBehaviourService; +import cc.iotkit.protocol.server.config.ProtocolConfig; +import cc.iotkit.protocol.server.service.BehaviourService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.*; @Slf4j @@ -16,10 +15,10 @@ import org.springframework.web.bind.annotation.*; public class DeviceBehaviourController implements DeviceBehaviour { @Autowired - private ServerConfig serverConfig; + private ProtocolConfig serverConfig; @Autowired - private DeviceBehaviourService behaviourService; + private BehaviourService behaviourService; @Override diff --git a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/DeviceBehaviourService.java b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/BehaviourService.java similarity index 99% rename from protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/DeviceBehaviourService.java rename to protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/BehaviourService.java index 38d067c6..8a4ca0a8 100755 --- a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/DeviceBehaviourService.java +++ b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/BehaviourService.java @@ -21,7 +21,7 @@ import java.util.Optional; @Slf4j @Service -public class DeviceBehaviourService { +public class BehaviourService { @Autowired private ProductRepository productRepository; diff --git a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/DeviceMessageConsumer.java b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/DeviceMessageConsumer.java index a819a7a2..04364c8d 100755 --- a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/DeviceMessageConsumer.java +++ b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/DeviceMessageConsumer.java @@ -6,7 +6,7 @@ import cc.iotkit.dao.ThingModelMessageRepository; import cc.iotkit.dao.UserInfoRepository; import cc.iotkit.model.UserInfo; import cc.iotkit.model.device.message.ThingModelMessage; -import cc.iotkit.protocol.server.config.ServerConfig; +import cc.iotkit.protocol.server.config.ProtocolConfig; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.*; @@ -17,10 +17,10 @@ import java.util.List; import java.util.stream.Collectors; @Slf4j -@Service +//@Service public class DeviceMessageConsumer implements MessageListener { - private final ServerConfig serverConfig; + private final ProtocolConfig serverConfig; private final ThingModelMessageRepository messageRepository; @@ -28,7 +28,7 @@ public class DeviceMessageConsumer implements MessageListener @SneakyThrows @Autowired - public DeviceMessageConsumer(ServerConfig serverConfig, + public DeviceMessageConsumer(ProtocolConfig serverConfig, ThingModelMessageRepository messageRepository, UserInfoRepository userInfoRepository) { this.serverConfig = serverConfig; diff --git a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/GatewayService.java b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/GatewayService.java index 90d6cd7e..c00faf9d 100755 --- a/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/GatewayService.java +++ b/protocol-gateway/protocol-server/src/main/java/cc/iotkit/protocol/server/service/GatewayService.java @@ -3,7 +3,7 @@ package cc.iotkit.protocol.server.service; import cc.iotkit.common.Constants; import cc.iotkit.common.utils.JsonUtil; import cc.iotkit.protocol.function.DecodeFunction; -import cc.iotkit.protocol.server.config.ServerConfig; +import cc.iotkit.protocol.server.config.ProtocolConfig; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -22,7 +22,7 @@ public class GatewayService { private PulsarAdmin pulsarAdmin; @Autowired - private ServerConfig serverConfig; + private ProtocolConfig serverConfig; private PulsarAdmin getPulsarAdmin() throws PulsarClientException { if (pulsarAdmin == null) { diff --git a/rule-engine/src/main/java/cc/iotkit/ruleengine/config/RuleConfiguration.java b/rule-engine/src/main/java/cc/iotkit/ruleengine/config/RuleConfiguration.java index 8c2048a2..7720469f 100755 --- a/rule-engine/src/main/java/cc/iotkit/ruleengine/config/RuleConfiguration.java +++ b/rule-engine/src/main/java/cc/iotkit/ruleengine/config/RuleConfiguration.java @@ -79,7 +79,7 @@ public class RuleConfiguration { @Bean public MessageProducer inbound() { String clientId = "rule-consumer-" + env; - clientId = "su_" + CodecUtil.aesEncrypt("admin_" + clientId, Constants.MQTT_SECRET); + clientId = "su_" + CodecUtil.aesEncrypt("admin_" + clientId, Constants.PRODUCT_SECRET); adapter = new MqttPahoMessageDrivenChannelAdapter( clientId, mqttClientFactory()); adapter.setCompletionTimeout(5000);