diff --git a/.gitignore b/.gitignore
index 668b97f4..8da0c3c1 100755
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,4 @@ hs_err_pid*
target
*.iml
*.yml
+log
diff --git a/communication/component/pom.xml b/communication/component/pom.xml
deleted file mode 100644
index 02396859..00000000
--- a/communication/component/pom.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-
-
-
- iotkit-parent
- cc.iotkit
- 0.0.1-SNAPSHOT
- ../../pom.xml
-
- 4.0.0
-
- component
-
-
-
\ No newline at end of file
diff --git a/communication/pom.xml b/communication/pom.xml
deleted file mode 100644
index a25ff27e..00000000
--- a/communication/pom.xml
+++ /dev/null
@@ -1,15 +0,0 @@
-
-
-
- iotkit-parent
- cc.iotkit
- 0.0.1-SNAPSHOT
-
- 4.0.0
-
- communication
-
-
-
\ No newline at end of file
diff --git a/log/error.2022-03-14.0.gz b/log/error.2022-03-14.0.gz
deleted file mode 100755
index 395381df..00000000
Binary files a/log/error.2022-03-14.0.gz and /dev/null differ
diff --git a/log/info.2022-03-14.0.gz b/log/info.2022-03-14.0.gz
deleted file mode 100755
index e1a670e8..00000000
Binary files a/log/info.2022-03-14.0.gz and /dev/null differ
diff --git a/manager/pom.xml b/manager/pom.xml
index 7373bc2b..8268b1f2 100755
--- a/manager/pom.xml
+++ b/manager/pom.xml
@@ -12,6 +12,7 @@
manager
+
org.springframework.boot
spring-boot-starter-data-mongodb
@@ -145,7 +146,7 @@
cc.iotkit
- gateway-server
+ protocol-server
diff --git a/manager/src/main/java/cc/iotkit/manager/Application.java b/manager/src/main/java/cc/iotkit/manager/Application.java
index 81e84f32..f296ebd0 100755
--- a/manager/src/main/java/cc/iotkit/manager/Application.java
+++ b/manager/src/main/java/cc/iotkit/manager/Application.java
@@ -1,9 +1,11 @@
package cc.iotkit.manager;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
+@Slf4j
@EnableFeignClients(basePackages = {"cc.iotkit.deviceapi"})
@SpringBootApplication(scanBasePackages = {"cc.iotkit"})
public class Application {
@@ -11,4 +13,5 @@ public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
+
}
diff --git a/pom.xml b/pom.xml
index 930bf49d..36a9cd2d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,8 @@
dao
tppa-server
protocol-gateway
- communication
- communication/mqtt-component
- communication/component
+ protocol-gateway/mqtt-component
+ protocol-gateway/component
org.springframework.boot
@@ -176,6 +175,24 @@
2.6.0
+
+ io.vertx
+ vertx-core
+ 4.2.6
+
+
+
+ io.vertx
+ vertx-web
+ 4.2.6
+
+
+
+ io.vertx
+ vertx-mqtt
+ 4.2.6
+
+
cc.iotkit
model
@@ -220,7 +237,13 @@
cc.iotkit
- gateway-server
+ protocol-server
+ ${project.version}
+
+
+
+ cc.iotkit
+ component
${project.version}
diff --git a/communication/mqtt-component/pom.xml b/protocol-gateway/component/pom.xml
similarity index 63%
rename from communication/mqtt-component/pom.xml
rename to protocol-gateway/component/pom.xml
index 565bb85a..03ab87dc 100644
--- a/communication/mqtt-component/pom.xml
+++ b/protocol-gateway/component/pom.xml
@@ -10,11 +10,19 @@
4.0.0
- mqtt-component
-
+ component
+
+ org.projectlombok
+ lombok
+
+
+
+ cc.iotkit
+ common
+
diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/AbstractComponent.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/AbstractComponent.java
new file mode 100644
index 00000000..56cc5d96
--- /dev/null
+++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/AbstractComponent.java
@@ -0,0 +1,10 @@
+package cc.iotkit.comp;
+
+import lombok.Data;
+
+@Data
+public class AbstractComponent {
+
+ protected MessageHandler messageHandler;
+
+}
diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/Component.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/Component.java
new file mode 100644
index 00000000..f5f45693
--- /dev/null
+++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/Component.java
@@ -0,0 +1,15 @@
+package cc.iotkit.comp;
+
+public interface Component {
+
+ void create(String config);
+
+ void start();
+
+ void stop();
+
+ void destroy();
+
+ void setHandler(MessageHandler handler);
+
+}
diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/ComponentManager.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/ComponentManager.java
new file mode 100644
index 00000000..fed3f3b5
--- /dev/null
+++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/ComponentManager.java
@@ -0,0 +1,37 @@
+package cc.iotkit.comp;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ComponentManager {
+
+ private final Map components = new HashMap<>();
+
+ public void register(String id, Component component) {
+ components.put(id, component);
+ }
+
+ public void deRegister(String id) {
+ Component component = components.remove(id);
+ component.destroy();
+ }
+
+ public void start(String id, String script) {
+ Component component = components.get(id);
+ if (component == null) {
+ return;
+ }
+ component.setHandler(new MessageHandler(script));
+ component.start();
+ }
+
+ public void stop(String id) {
+ Component component = components.get(id);
+ if (component == null) {
+ return;
+ }
+ component.stop();
+ }
+
+}
diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/Device.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/Device.java
new file mode 100644
index 00000000..8c214b45
--- /dev/null
+++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/Device.java
@@ -0,0 +1,7 @@
+package cc.iotkit.comp;
+
+import lombok.Data;
+
+@Data
+public class Device {
+}
diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/MessageHandler.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/MessageHandler.java
new file mode 100644
index 00000000..43de863a
--- /dev/null
+++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/MessageHandler.java
@@ -0,0 +1,61 @@
+package cc.iotkit.comp;
+
+import cc.iotkit.common.utils.JsonUtil;
+import cc.iotkit.comp.model.RegisterInfo;
+import jdk.nashorn.api.scripting.NashornScriptEngine;
+import jdk.nashorn.api.scripting.ScriptObjectMirror;
+import lombok.Data;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.script.ScriptEngineManager;
+import java.util.Map;
+
+@Slf4j
+@Data
+public class MessageHandler {
+ private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn");
+
+ private final String script;
+
+ @SneakyThrows
+ public MessageHandler(String script) {
+ this.script = script;
+ engine.eval(script);
+ }
+
+ public void register(Map head, String msg) {
+ }
+
+ public void auth(Map head, String msg) {
+ }
+
+ public void state(Map head, String msg) {
+ }
+
+ public void onReceive(Map head, String type, String msg) {
+ try {
+ ScriptObjectMirror obj = (ScriptObjectMirror) engine.invokeFunction("onReceive", head, type, msg);
+ Object rstType = obj.get("type");
+ if (rstType == null) {
+ return;
+ }
+ Object data = obj.get("data");
+
+ if ("register".equals(rstType)) {
+ RegisterInfo regInfo = getData(data, RegisterInfo.class);
+ } else if ("report".equals(rstType)) {
+
+ }
+
+ } catch (Throwable e) {
+ log.error("onReceive error", e);
+ }
+
+ }
+
+ private T getData(Object data, Class cls) {
+ return JsonUtil.parse(JsonUtil.toJsonString(data), cls);
+ }
+
+}
diff --git a/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/RegisterInfo.java b/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/RegisterInfo.java
new file mode 100755
index 00000000..84357c91
--- /dev/null
+++ b/protocol-gateway/component/src/main/java/cc/iotkit/comp/model/RegisterInfo.java
@@ -0,0 +1,49 @@
+package cc.iotkit.comp.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 注册信息
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class RegisterInfo {
+
+ private String productKey;
+
+ private String deviceName;
+
+ private String model;
+
+ private Map tag;
+
+ private List subDevices;
+
+ public RegisterInfo(String productKey, String deviceName, String model) {
+ this.productKey = productKey;
+ this.deviceName = deviceName;
+ this.model = model;
+ }
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class SubDevice {
+
+ private String productKey;
+
+ private String deviceName;
+
+ private String model;
+
+ private Map tag;
+ }
+}
diff --git a/protocol-gateway/protocol-function/.DS_Store b/protocol-gateway/decode-function/.DS_Store
similarity index 100%
rename from protocol-gateway/protocol-function/.DS_Store
rename to protocol-gateway/decode-function/.DS_Store
diff --git a/protocol-gateway/protocol-function/pom.xml b/protocol-gateway/decode-function/pom.xml
similarity index 100%
rename from protocol-gateway/protocol-function/pom.xml
rename to protocol-gateway/decode-function/pom.xml
diff --git a/protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/UplinkTranslateFunction.java b/protocol-gateway/decode-function/src/main/java/cc/iotkit/protocol/function/DecodeFunction.java
similarity index 94%
rename from protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/UplinkTranslateFunction.java
rename to protocol-gateway/decode-function/src/main/java/cc/iotkit/protocol/function/DecodeFunction.java
index bcc087db..dce944b0 100755
--- a/protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/UplinkTranslateFunction.java
+++ b/protocol-gateway/decode-function/src/main/java/cc/iotkit/protocol/function/DecodeFunction.java
@@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* 上行消息转换函数
*/
-public class UplinkTranslateFunction implements Function {
+public class DecodeFunction implements Function {
private static final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn");
private static final Map compiledScripts = new ConcurrentHashMap<>();
diff --git a/protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/DeviceMessage.java b/protocol-gateway/decode-function/src/main/java/cc/iotkit/protocol/function/DeviceMessage.java
similarity index 100%
rename from protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/DeviceMessage.java
rename to protocol-gateway/decode-function/src/main/java/cc/iotkit/protocol/function/DeviceMessage.java
diff --git a/protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/ThingModelMessage.java b/protocol-gateway/decode-function/src/main/java/cc/iotkit/protocol/function/ThingModelMessage.java
similarity index 100%
rename from protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/ThingModelMessage.java
rename to protocol-gateway/decode-function/src/main/java/cc/iotkit/protocol/function/ThingModelMessage.java
diff --git a/protocol-gateway/mqtt-component/pom.xml b/protocol-gateway/mqtt-component/pom.xml
new file mode 100644
index 00000000..21e910b6
--- /dev/null
+++ b/protocol-gateway/mqtt-component/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ iotkit-parent
+ cc.iotkit
+ 0.0.1-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+ mqtt-component
+
+
+
+
+ io.vertx
+ vertx-core
+
+
+
+ io.vertx
+ vertx-mqtt
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ cc.iotkit
+ common
+
+
+
+ cc.iotkit
+ component
+
+
+
+
+
\ No newline at end of file
diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java
new file mode 100644
index 00000000..550194dd
--- /dev/null
+++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java
@@ -0,0 +1,52 @@
+package cc.iotkit.comp.mqtt;
+
+import cc.iotkit.common.exception.BizException;
+import cc.iotkit.common.utils.JsonUtil;
+import cc.iotkit.comp.AbstractComponent;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.CountDownLatch;
+
+@Slf4j
+public class MqttComponent extends AbstractComponent {
+
+ private Vertx vertx;
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+ private String deployedId;
+ private MqttConfig mqttConfig;
+
+ public void create(String config) {
+ vertx = Vertx.vertx();
+ mqttConfig = JsonUtil.parse(config, MqttConfig.class);
+ }
+
+ public void start() {
+ try {
+ Future future = vertx.deployVerticle(new MqttVerticle(mqttConfig, getMessageHandler()));
+ future.onSuccess((s -> {
+ deployedId = s;
+ countDownLatch.countDown();
+ }));
+ future.onFailure((e) -> {
+ countDownLatch.countDown();
+ log.error("start mqtt component failed", e);
+ });
+ countDownLatch.await();
+ future.succeeded();
+ } catch (Throwable e) {
+ throw new BizException("start mqtt component error", e);
+ }
+ }
+
+ public void stop() {
+ Future future = vertx.undeploy(deployedId);
+ future.onSuccess(unused -> log.info("stop mqtt component success"));
+ }
+
+ public void destroy() {
+ vertx.close();
+ }
+
+}
diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttConfig.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttConfig.java
new file mode 100644
index 00000000..84edb733
--- /dev/null
+++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttConfig.java
@@ -0,0 +1,16 @@
+package cc.iotkit.comp.mqtt;
+
+import lombok.Data;
+
+@Data
+public class MqttConfig {
+
+ private int port;
+
+ private String sslKey;
+
+ private String sslCert;
+
+ private boolean ssl;
+
+}
diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java
new file mode 100644
index 00000000..bada79d3
--- /dev/null
+++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java
@@ -0,0 +1,130 @@
+package cc.iotkit.comp.mqtt;
+
+import cc.iotkit.comp.MessageHandler;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttProperties;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.net.PemKeyCertOptions;
+import io.vertx.mqtt.MqttAuth;
+import io.vertx.mqtt.MqttServer;
+import io.vertx.mqtt.MqttServerOptions;
+import io.vertx.mqtt.MqttTopicSubscription;
+import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class MqttVerticle extends AbstractVerticle {
+
+ private MqttServer mqttServer;
+
+ private final MqttConfig config;
+
+ private final MessageHandler executor;
+
+ public MqttVerticle(MqttConfig config, MessageHandler executor) {
+ this.config = config;
+ this.executor = executor;
+ }
+
+ @Override
+ public void start() throws Exception {
+ MqttServerOptions options = new MqttServerOptions()
+ .setPort(config.getPort());
+ if (config.isSsl()) {
+ options = options.setSsl(true)
+ .setKeyCertOptions(new PemKeyCertOptions()
+ .setKeyPath(config.getSslKey())
+ .setCertPath(config.getSslCert()));
+ }
+
+ mqttServer = MqttServer.create(vertx, options);
+ mqttServer.endpointHandler(endpoint -> {
+ log.info("MQTT client:{} request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession());
+
+ MqttAuth auth = endpoint.auth();
+ if (auth == null) {
+ return;
+ }
+
+ String authJson = auth.toJson()
+ .put("clientid", endpoint.clientIdentifier()).toString();
+
+ log.info("MQTT client auth,username:{},password:{}", auth.getUsername(), auth.getPassword());
+ try {
+ executor.onReceive(new HashMap<>(), "auth", authJson);
+ } catch (Throwable e) {
+ log.error("auth failed", e);
+ endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
+ }
+
+ log.info("MQTT client keep alive timeout = {} ", endpoint.keepAliveTimeSeconds());
+
+ endpoint.accept(false);
+ endpoint.disconnectMessageHandler(disconnectMessage -> {
+ log.info("Received disconnect from client, reason code = {}", disconnectMessage.code());
+ executor.onReceive(new HashMap<>(), "disconnect", authJson);
+ }).subscribeHandler(subscribe -> {
+ List reasonCodes = new ArrayList<>();
+ for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
+ log.info("Subscription for {},with QoS {}", s.topicName(), s.qualityOfService());
+ try {
+ executor.onReceive(new HashMap<>(), "subscribe", s.topicName());
+ reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
+ } catch (Throwable e) {
+ log.error("subscribe failed,topic:" + s.topicName(), e);
+ reasonCodes.add(MqttSubAckReasonCode.NOT_AUTHORIZED);
+ }
+ }
+ // ack the subscriptions request
+ endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
+
+ }).unsubscribeHandler(unsubscribe -> {
+ for (String t : unsubscribe.topics()) {
+ log.info("Unsubscription for {}", t);
+ try {
+ executor.onReceive(new HashMap<>(), "unsubscribe", t);
+ } catch (Throwable e) {
+ log.error("unsubscribe failed,topic:" + t, e);
+ }
+ }
+ // ack the subscriptions request
+ endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
+ }).publishHandler(message -> {
+ String payload = message.payload().toString(Charset.defaultCharset());
+ log.info("Received message:{}, with QoS {}", payload,
+ message.qosLevel());
+ try {
+ Map head = new HashMap<>();
+ head.put("topic", message.topicName());
+ executor.onReceive(head, "", payload);
+ } catch (Throwable e) {
+ log.error("handler message failed,topic:" + message.topicName(), e);
+ }
+
+ if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
+ endpoint.publishAcknowledge(message.messageId());
+ } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
+ endpoint.publishReceived(message.messageId());
+ }
+ }).publishReleaseHandler(endpoint::publishComplete);
+ }).listen(ar -> {
+ if (ar.succeeded()) {
+ log.info("MQTT server is listening on port " + ar.result().actualPort());
+ } else {
+ log.error("Error on starting the server", ar.cause());
+ }
+ });
+ }
+
+ @Override
+ public void stop() throws Exception {
+ mqttServer.close(voidAsyncResult -> log.info("close mqtt server..."));
+ }
+}
diff --git a/protocol-gateway/pom.xml b/protocol-gateway/pom.xml
index 23277303..3e9bf227 100755
--- a/protocol-gateway/pom.xml
+++ b/protocol-gateway/pom.xml
@@ -13,9 +13,8 @@
pom
gateway-client
- gateway-server
- gateway-server/fun-test
- protocol-function
+ protocol-server
+ decode-function
diff --git a/protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/MessageDistributionFunction.java b/protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/MessageDistributionFunction.java
deleted file mode 100755
index eb6cd986..00000000
--- a/protocol-gateway/protocol-function/src/main/java/cc/iotkit/protocol/function/MessageDistributionFunction.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package cc.iotkit.protocol.function;
-
-import cc.iotkit.common.utils.JsonUtil;
-import jdk.nashorn.api.scripting.NashornScriptEngine;
-import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.Function;
-
-import javax.script.Bindings;
-import javax.script.CompiledScript;
-import javax.script.ScriptEngineManager;
-import javax.script.SimpleBindings;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 消息分发函数
- */
-public class MessageDistributionFunction implements Function {
-
- private static final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn");
- private static final Map compiledScripts = new ConcurrentHashMap<>();
-
- @Override
- public ThingModelMessage process(ThingModelMessage msg, Context context) throws Exception {
- Optional