diff --git a/DLT645-plugin/src/main/java/cc/iotkit/plugins/dlt645/Application.java b/DLT645-plugin/src/main/java/cc/iotkit/plugins/dlt645/Application.java
index aa37710..1a07d65 100755
--- a/DLT645-plugin/src/main/java/cc/iotkit/plugins/dlt645/Application.java
+++ b/DLT645-plugin/src/main/java/cc/iotkit/plugins/dlt645/Application.java
@@ -10,7 +10,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
* @Author:tfd
* @Date:2023/12/14 16:25
*/
-@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.dlt645"})
+@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.dlt645")
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
@EnableScheduling
diff --git a/emqx-plugin/pom.xml b/emqx-plugin/pom.xml
new file mode 100644
index 0000000..8c64be0
--- /dev/null
+++ b/emqx-plugin/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+ iot-iita-plugins
+ cc.iotkit.plugins
+ 1.0.1
+
+ 4.0.0
+
+ emqx-plugin
+
+
+
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
+
+
+
+ io.vertx
+ vertx-mqtt
+ ${vertx.version}
+
+
+
+ io.vertx
+ vertx-web-proxy
+ ${vertx.version}
+
+
+
+
+
+
+ dev
+
+ true
+
+
+ dev
+
+
+
+
+ prod
+
+ prod
+
+
+
+
+
+
+
+ com.gitee.starblues
+ spring-brick-maven-packager
+ ${spring-brick.version}
+
+ ${plugin.build.mode}
+
+ emqx-plugin
+ cc.iotkit.plugins.emqx.Application
+ ${project.version}
+ iita
+ emqx示例插件
+ application.yml
+
+
+ jar
+
+
+
+
+
+ repackage
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/Application.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/Application.java
new file mode 100644
index 0000000..19b1031
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/Application.java
@@ -0,0 +1,19 @@
+package cc.iotkit.plugins.emqx;
+
+import com.gitee.starblues.bootstrap.SpringPluginBootstrap;
+import com.gitee.starblues.bootstrap.annotation.OneselfConfig;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+/**
+ * @author sjg
+ */
+@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.emqx")
+@OneselfConfig(mainConfigFileName = {"application.yml"})
+@EnableConfigurationProperties
+public class Application extends SpringPluginBootstrap {
+
+ public static void main(String[] args) {
+ new Application().run(Application.class, args);
+ }
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/BeanConfig.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/BeanConfig.java
new file mode 100644
index 0000000..2b8d0a4
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/BeanConfig.java
@@ -0,0 +1,28 @@
+package cc.iotkit.plugins.emqx.conf;
+
+import cc.iotkit.plugin.core.IPluginConfig;
+import cc.iotkit.plugin.core.LocalPluginConfig;
+import cc.iotkit.plugin.core.thing.IThingService;
+import cc.iotkit.plugins.emqx.service.FakeThingService;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author sjg
+ */
+@Component
+public class BeanConfig {
+
+ @Bean
+ @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
+ IThingService getThingService() {
+ return new FakeThingService();
+ }
+
+ @Bean
+ @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
+ IPluginConfig getPluginConfig(){
+ return new LocalPluginConfig();
+ }
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttConfig.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttConfig.java
new file mode 100644
index 0000000..45439e0
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttConfig.java
@@ -0,0 +1,30 @@
+/*
+ * +----------------------------------------------------------------------
+ * | Copyright (c) 奇特物联 2021-2022 All rights reserved.
+ * +----------------------------------------------------------------------
+ * | Licensed 未经许可不能去掉「奇特物联」相关版权
+ * +----------------------------------------------------------------------
+ * | Author: xw2sy@163.com
+ * +----------------------------------------------------------------------
+ */
+package cc.iotkit.plugins.emqx.conf;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Data
+@Component
+@ConfigurationProperties(prefix = "emqx")
+public class MqttConfig {
+
+ private String host;
+
+ private int port;
+
+ private boolean ssl;
+
+ private String topics;
+
+ private int authPort;
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/handler/IMsgHandler.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/handler/IMsgHandler.java
new file mode 100644
index 0000000..8b536e2
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/handler/IMsgHandler.java
@@ -0,0 +1,12 @@
+package cc.iotkit.plugins.emqx.handler;
+
+import io.vertx.core.json.JsonObject;
+
+/**
+ * @author sjg
+ */
+public interface IMsgHandler {
+
+ void handle(String topic, JsonObject payload);
+
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/AuthVerticle.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/AuthVerticle.java
new file mode 100644
index 0000000..d24ce80
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/AuthVerticle.java
@@ -0,0 +1,146 @@
+package cc.iotkit.plugins.emqx.service;
+/*
+ * +----------------------------------------------------------------------
+ * | Copyright (c) 奇特物联 2021-2022 All rights reserved.
+ * +----------------------------------------------------------------------
+ * | Licensed 未经许可不能去掉「奇特物联」相关版权
+ * +----------------------------------------------------------------------
+ * | Author: xw2sy@163.com
+ * +----------------------------------------------------------------------
+ */
+
+import cc.iotkit.common.utils.CodecUtil;
+import cc.iotkit.plugin.core.thing.IThingService;
+import cc.iotkit.plugin.core.thing.model.ThingProduct;
+import com.gitee.starblues.bootstrap.annotation.AutowiredType;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.handler.BodyHandler;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+
+@Slf4j
+@Service
+public class AuthVerticle extends AbstractVerticle {
+
+ private HttpServer backendServer;
+
+ @Setter
+ private int port;
+
+ @Setter
+ private String serverPassword;
+
+ @Autowired
+ @AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
+ private IThingService thingService;
+
+ @Override
+ public void start() {
+ backendServer = vertx.createHttpServer();
+
+ //第一步 声明Router&初始化Router
+ Router backendRouter = Router.router(vertx);
+ //获取body参数,得先添加这句
+ backendRouter.route().handler(BodyHandler.create());
+
+ //第二步 配置Router解析url
+ backendRouter.route(HttpMethod.POST, "/mqtt/auth").handler(rc -> {
+ JsonObject json = rc.getBodyAsJson();
+ log.info("mqtt auth:{}", json);
+ try {
+ String clientId = json.getString("clientid");
+ String username = json.getString("username");
+ String password = json.getString("password");
+
+ //服务端插件连接
+ if (clientId.equals("server") && serverPassword.equals(password)) {
+ httpResult(rc.response(), 200);
+ return;
+ }
+
+ //其它客户端连接
+ String[] parts = clientId.split("_");
+ if (parts.length < 3) {
+ log.error("clientid:{}不正确", clientId);
+ httpResult(rc.response(), 400);
+ return;
+ }
+
+ log.info("MQTT client auth,clientId:{},username:{},password:{}",
+ clientId, username, password);
+
+ String productKey = parts[0];
+ String deviceName = parts[1];
+ if (!username.equals(deviceName)) {
+ log.error("username:{}不正确", deviceName);
+ httpResult(rc.response(), 403);
+ return;
+ }
+
+ ThingProduct product = thingService.getProduct(productKey);
+ if (product == null) {
+ log.error("获取产品信息失败,productKey:{}", productKey);
+ httpResult(rc.response(), 403);
+ return;
+ }
+
+ String validPasswd = CodecUtil.md5Str(product.getProductSecret() + clientId);
+ if (!validPasswd.equalsIgnoreCase(password)) {
+ log.error("密码验证失败,期望值:{}", validPasswd);
+ httpResult(rc.response(), 403);
+ return;
+ }
+
+ Set devices = new HashSet<>();
+ devices.add(parts[0] + "," + parts[1]);
+ EmqxPlugin.CLIENT_DEVICE_MAP.putIfAbsent(parts[0] + parts[1], devices);
+
+ httpResult(rc.response(), 200);
+ } catch (Throwable e) {
+ httpResult(rc.response(), 500);
+ log.error("mqtt auth failed", e);
+ }
+ });
+ backendRouter.route(HttpMethod.POST, "/mqtt/acl").handler(rc -> {
+ String json = rc.getBodyAsString();
+ log.info("mqtt acl:{}", json);
+ try {
+ httpResult(rc.response(), 200);
+ } catch (Throwable e) {
+ httpResult(rc.response(), 500);
+ log.error("mqtt acl failed", e);
+ }
+ });
+
+ backendServer.requestHandler(backendRouter)
+ .listen(port, "0.0.0.0")
+ .onSuccess(s -> {
+ log.info("auth server start success,port:{}", s.actualPort());
+ }).onFailure(e -> {
+ e.printStackTrace();
+ })
+ ;
+ }
+
+ private void httpResult(HttpServerResponse response, int code) {
+ response.putHeader("Content-Type", "application/json");
+ response
+ .setStatusCode(code);
+ response
+ .end("{\"result\": \"" + (code == 200 ? "allow" : "deny") + "\"}");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ backendServer.close(voidAsyncResult -> log.info("close emqx auth server..."));
+ }
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/EmqxPlugin.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/EmqxPlugin.java
new file mode 100644
index 0000000..5270722
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/EmqxPlugin.java
@@ -0,0 +1,380 @@
+package cc.iotkit.plugins.emqx.service;
+
+import cc.iotkit.common.utils.StringUtils;
+import cc.iotkit.common.utils.ThreadUtil;
+import cc.iotkit.common.utils.UniqueIdUtil;
+import cc.iotkit.plugin.core.IPlugin;
+import cc.iotkit.plugin.core.IPluginConfig;
+import cc.iotkit.plugin.core.thing.IThingService;
+import cc.iotkit.plugin.core.thing.actions.ActionResult;
+import cc.iotkit.plugin.core.thing.actions.DeviceState;
+import cc.iotkit.plugin.core.thing.actions.EventLevel;
+import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
+import cc.iotkit.plugin.core.thing.actions.up.*;
+import cc.iotkit.plugin.core.thing.model.ThingDevice;
+import cc.iotkit.plugins.emqx.conf.MqttConfig;
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.bean.copier.CopyOptions;
+import cn.hutool.core.util.IdUtil;
+import com.gitee.starblues.bootstrap.annotation.AutowiredType;
+import com.gitee.starblues.bootstrap.realize.PluginCloseListener;
+import com.gitee.starblues.core.PluginCloseType;
+import com.gitee.starblues.core.PluginInfo;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.MqttClientOptions;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author sjg
+ */
+@Slf4j
+@Service
+public class EmqxPlugin implements PluginCloseListener, IPlugin, Runnable {
+
+ @Autowired
+ private PluginInfo pluginInfo;
+ @Autowired
+ private MqttConfig mqttConfig;
+
+ @Autowired
+ @AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
+ private IPluginConfig pluginConfig;
+
+ @Autowired
+ @AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
+ private IThingService thingService;
+
+ @Autowired
+ private AuthVerticle authVerticle;
+
+ @Autowired
+ private MqttDevice mqttDevice;
+
+ private final ScheduledThreadPoolExecutor emqxConnectTask = ThreadUtil.newScheduled(1, "emqx_connect");
+
+ private Vertx vertx;
+ private String deployedId;
+
+ private MqttClient client;
+
+ private boolean mqttConnected = false;
+
+ private boolean authServerStarted = false;
+
+ private static final Map DEVICE_ONLINE = new ConcurrentHashMap<>();
+
+ public static final Map> CLIENT_DEVICE_MAP = new HashMap<>();
+
+ @PostConstruct
+ public void init() {
+ vertx = Vertx.vertx();
+ try {
+ //获取插件最新配置替换当前配置
+ Map config = pluginConfig.getConfig(pluginInfo.getPluginId());
+ BeanUtil.copyProperties(config, mqttConfig, CopyOptions.create().ignoreNullValue());
+
+ String serverPassword = IdUtil.fastSimpleUUID();
+ MqttClientOptions options = new MqttClientOptions()
+ .setClientId("server")
+ .setUsername("server")
+ .setPassword(serverPassword)
+ .setCleanSession(true)
+ .setMaxInflightQueue(100)
+ .setKeepAliveInterval(60);
+
+ if (mqttConfig.isSsl()) {
+ options.setSsl(true)
+ .setTrustAll(true);
+ }
+ client = MqttClient.create(vertx, options);
+ mqttDevice.setClient(client);
+
+ authVerticle.setPort(mqttConfig.getAuthPort());
+ authVerticle.setServerPassword(serverPassword);
+
+ emqxConnectTask.scheduleWithFixedDelay(this, 3, 3, TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ log.error("mqtt plugin startup error", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!authServerStarted) {
+ try {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Future future = vertx.deployVerticle(authVerticle);
+ future.onSuccess((s -> {
+ deployedId = s;
+ countDownLatch.countDown();
+ authServerStarted = true;
+ log.info("start emqx auth plugin success");
+ }));
+ future.onFailure(e -> {
+ countDownLatch.countDown();
+ authServerStarted = false;
+ log.error("start emqx auth plugin failed", e);
+ });
+ countDownLatch.await();
+ } catch (Exception e) {
+ authServerStarted = false;
+ log.error("start emqx auth server failed", e);
+ }
+ }
+
+ if (mqttConnected) {
+ return;
+ }
+
+ try {
+ String[] topics = mqttConfig.getTopics().split(",");
+ Map subscribes = new HashMap<>(topics.length);
+ for (String topic : topics) {
+ subscribes.put(topic, 1);
+ }
+
+ client.connect(mqttConfig.getPort(), mqttConfig.getHost(), s -> {
+ if (s.succeeded()) {
+ log.info("client connect success.");
+ mqttConnected = true;
+ client.subscribe(subscribes, e -> {
+ if (e.succeeded()) {
+ log.info("===>subscribe success: {}", e.result());
+ } else {
+ log.error("===>subscribe fail: ", e.cause());
+ }
+ });
+
+ } else {
+ mqttConnected = false;
+ log.error("client connect fail: ", s.cause());
+ }
+ }).publishHandler(msg -> {
+ String topic = msg.topicName();
+ if (topic.contains("/c/")) {
+ return;
+ }
+
+ JsonObject payload = msg.payload().toJsonObject();
+ log.info("Client received message on [{}] payload [{}] with QoS [{}]", topic, payload, msg.qosLevel());
+
+ try {
+ //客户端连接断开
+ if (topic.equals("/sys/client/disconnected")) {
+ offline(payload.getString("clientid"));
+ return;
+ }
+
+ ThingDevice device = getDevice(topic);
+ if (device == null) {
+ return;
+ }
+
+ //有消息上报-设备上线
+ online(device.getProductKey(), device.getDeviceName());
+
+ JsonObject defParams = JsonObject.mapFrom(new HashMap<>(0));
+ IDeviceAction action = null;
+
+ String method = payload.getString("method", "");
+ if (StringUtils.isBlank(method)) {
+ return;
+ }
+ JsonObject params = payload.getJsonObject("params", defParams);
+
+ if ("thing.lifetime.register".equalsIgnoreCase(method)) {
+ //子设备注册
+ String subPk = params.getString("productKey");
+ String subDn = params.getString("deviceName");
+ ActionResult regResult = thingService.post(
+ pluginInfo.getPluginId(),
+ fillAction(
+ DeviceRegister.builder()
+ .productKey(subPk)
+ .deviceName(subDn)
+ .model(params.getString("model"))
+ .version("1.0")
+ .build()
+ , subPk, subDn
+ )
+ );
+ if (regResult.getCode() == 0) {
+ //注册成功
+ reply(topic, payload, 0);
+ Set devices = CLIENT_DEVICE_MAP.get(device.getProductKey() + device.getDeviceName());
+ devices.add(subPk + "," + subDn);
+ } else {
+ //注册失败
+ reply(topic, new JsonObject(), regResult.getCode());
+ }
+ return;
+ }
+
+ if ("thing.event.property.post".equalsIgnoreCase(method)) {
+ //属性上报
+ action = PropertyReport.builder()
+ .params(params.getMap())
+ .build();
+ reply(topic, payload, 0);
+ } else if (method.startsWith("thing.event.")) {
+ //事件上报
+ action = EventReport.builder()
+ .name(method.replace("thing.event.", ""))
+ .level(EventLevel.INFO)
+ .params(params.getMap())
+ .build();
+ reply(topic, payload, 0);
+ } else if (method.startsWith("thing.service.") && method.endsWith("_reply")) {
+ //服务回复
+ action = ServiceReply.builder()
+ .name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1"))
+ .code(payload.getInteger("code", 0))
+ .params(params.getMap())
+ .build();
+ }
+
+ if (action == null) {
+ return;
+ }
+ action.setId(payload.getString("id"));
+ action.setProductKey(device.getProductKey());
+ action.setDeviceName(device.getDeviceName());
+ action.setTime(System.currentTimeMillis());
+ thingService.post(pluginInfo.getPluginId(), action);
+
+ } catch (Exception e) {
+ log.error("message is illegal.", e);
+ }
+ }).closeHandler(e -> {
+ mqttConnected = false;
+ log.info("client closed");
+ }).exceptionHandler(event -> log.error("client fail", event));
+ } catch (Exception e) {
+ log.error("start emqx client failed", e);
+ }
+
+ }
+
+ public ThingDevice getDevice(String topic) {
+ String[] topicParts = topic.split("/");
+ if (topicParts.length < 5) {
+ return null;
+ }
+ return ThingDevice.builder()
+ .productKey(topicParts[2])
+ .deviceName(topicParts[3])
+ .build();
+ }
+
+ public void online(String pk, String dn) {
+ if (Boolean.TRUE.equals(DEVICE_ONLINE.get(dn))) {
+ return;
+ }
+
+ //上线
+ thingService.post(
+ pluginInfo.getPluginId(),
+ fillAction(DeviceStateChange.builder()
+ .state(DeviceState.ONLINE)
+ .build()
+ , pk, dn
+ )
+ );
+ DEVICE_ONLINE.put(dn, true);
+ }
+
+ public void offline(String clientId) {
+ String[] parts = clientId.split("_");
+ Set devices = CLIENT_DEVICE_MAP.get(parts[0] + parts[1]);
+ for (String device : devices) {
+ String[] pkDn = device.split(",");
+ //下线
+ thingService.post(
+ pluginInfo.getPluginId(),
+ fillAction(DeviceStateChange.builder()
+ .state(DeviceState.OFFLINE)
+ .build()
+ , pkDn[0], pkDn[1]
+ )
+ );
+ DEVICE_ONLINE.remove(pkDn[1]);
+ }
+ }
+
+ private IDeviceAction fillAction(IDeviceAction action, String productKey, String deviceName) {
+ action.setId(UniqueIdUtil.newRequestId());
+ action.setProductKey(productKey);
+ action.setDeviceName(deviceName);
+ action.setTime(System.currentTimeMillis());
+ return action;
+ }
+
+ /**
+ * 回复设备
+ */
+ private void reply(String topic, JsonObject payload, int code) {
+ Map payloadReply = new HashMap<>();
+ payloadReply.put("id", payload.getString("id"));
+ payloadReply.put("method", payload.getString("method") + "_reply");
+ payloadReply.put("code", code);
+ payloadReply.put("data", payload.getJsonObject("params"));
+ topic = topic.replace("/s/", "/c/") + "_reply";
+
+ String finalTopic = topic;
+ client.publish(topic, JsonObject.mapFrom(payloadReply).toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false)
+ .onSuccess(h -> {
+ log.info("publish {} success", finalTopic);
+ });
+ }
+
+ @Override
+ public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
+ try {
+ log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId());
+ if (deployedId != null) {
+ CountDownLatch wait = new CountDownLatch(1);
+ Future future = vertx.undeploy(deployedId);
+ future.onSuccess(unused -> {
+ log.info("emqx plugin stopped success");
+ wait.countDown();
+ });
+ future.onFailure(h -> {
+ log.error("emqx plugin stopped failed", h);
+ wait.countDown();
+ });
+ wait.await(5, TimeUnit.SECONDS);
+ }
+
+ client.disconnect()
+ .onSuccess(unused -> {
+ mqttConnected = false;
+ log.info("stop emqx connect success");
+ })
+ .onFailure(unused -> log.error("stop emqx connect failure"));
+
+ emqxConnectTask.shutdown();
+
+ } catch (Throwable e) {
+ log.error("emqx plugin stop error", e);
+ }
+ }
+
+ @Override
+ public Map getLinkInfo(String pk, String dn) {
+ return null;
+ }
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/FakeThingService.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/FakeThingService.java
new file mode 100644
index 0000000..6fc1e60
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/FakeThingService.java
@@ -0,0 +1,70 @@
+package cc.iotkit.plugins.emqx.service;
+
+import cc.iotkit.plugin.core.thing.IThingService;
+import cc.iotkit.plugin.core.thing.actions.ActionResult;
+import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
+import cc.iotkit.plugin.core.thing.model.ThingDevice;
+import cc.iotkit.plugin.core.thing.model.ThingProduct;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 测试服务
+ *
+ * @author sjg
+ */
+@Slf4j
+public class FakeThingService implements IThingService {
+
+ /**
+ * 添加测试产品
+ */
+ private static final Map PRODUCTS = Map.of(
+ "hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
+ "Rf4QSjbm65X45753", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
+ "cGCrkK7Ex4FESAwe", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU"
+ );
+
+ /**
+ * 添加测试设备
+ */
+ private static final Map DEVICES = new HashMap<>();
+
+ static {
+ for (int i = 0; i < 10; i++) {
+ DEVICES.put("TEST:GW:" + StringUtils.leftPad(i + "", 6, "0"), "hbtgIA0SuVw9lxjB");
+ DEVICES.put("TEST_SW_" + StringUtils.leftPad(i + "", 6, "0"), "Rf4QSjbm65X45753");
+ DEVICES.put("TEST_SC_" + StringUtils.leftPad(i + "", 6, "0"), "cGCrkK7Ex4FESAwe");
+ }
+ }
+
+ @Override
+ public ActionResult post(String pluginId, IDeviceAction action) {
+ log.info("post action:{}", action);
+ return ActionResult.builder().code(0).build();
+ }
+
+ @Override
+ public ThingProduct getProduct(String pk) {
+ return ThingProduct.builder()
+ .productKey(pk)
+ .productSecret(PRODUCTS.get(pk))
+ .build();
+ }
+
+ @Override
+ public ThingDevice getDevice(String dn) {
+ return ThingDevice.builder()
+ .productKey(DEVICES.get(dn))
+ .deviceName(dn)
+ .build();
+ }
+
+ @Override
+ public Map getProperty(String dn) {
+ return new HashMap<>(0);
+ }
+}
diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/MqttDevice.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/MqttDevice.java
new file mode 100644
index 0000000..1501c8a
--- /dev/null
+++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/MqttDevice.java
@@ -0,0 +1,80 @@
+package cc.iotkit.plugins.emqx.service;
+
+import cc.iotkit.common.enums.ErrCode;
+import cc.iotkit.common.exception.BizException;
+import cc.iotkit.plugin.core.thing.IDevice;
+import cc.iotkit.plugin.core.thing.actions.ActionResult;
+import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig;
+import cc.iotkit.plugin.core.thing.actions.down.PropertyGet;
+import cc.iotkit.plugin.core.thing.actions.down.PropertySet;
+import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.json.JsonObject;
+import io.vertx.mqtt.MqttClient;
+import lombok.Setter;
+import org.springframework.stereotype.Service;
+
+/**
+ * mqtt设备下行接口
+ *
+ * @author sjg
+ */
+@Service
+public class MqttDevice implements IDevice {
+
+ @Setter
+ private MqttClient client;
+
+ @Override
+ public ActionResult config(DeviceConfig action) {
+ return ActionResult.builder().code(0).reason("").build();
+ }
+
+ @Override
+ public ActionResult propertyGet(PropertyGet action) {
+ String topic = String.format("/sys/%s/%s/c/service/property/get", action.getProductKey(), action.getDeviceName());
+ return send(
+ topic,
+ new JsonObject()
+ .put("id", action.getId())
+ .put("method", "thing.service.property.get")
+ .put("params", action.getKeys())
+ );
+ }
+
+ @Override
+ public ActionResult propertySet(PropertySet action) {
+ String topic = String.format("/sys/%s/%s/c/service/property/set", action.getProductKey(), action.getDeviceName());
+ return send(
+ topic,
+ new JsonObject()
+ .put("id", action.getId())
+ .put("method", "thing.service.property.set")
+ .put("params", action.getParams())
+ );
+ }
+
+ @Override
+ public ActionResult serviceInvoke(ServiceInvoke action) {
+ String topic = String.format("/sys/%s/%s/c/service/%s", action.getProductKey(), action.getDeviceName(), action.getName());
+ return send(
+ topic,
+ new JsonObject()
+ .put("id", action.getId())
+ .put("method", "thing.service." + action.getName())
+ .put("params", action.getParams())
+ );
+ }
+
+ private ActionResult send(String topic, JsonObject payload) {
+ try {
+ client.publish(topic, payload.toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);
+ return ActionResult.builder().code(0).reason("").build();
+ } catch (BizException e) {
+ return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build();
+ } catch (Exception e) {
+ return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build();
+ }
+ }
+
+}
diff --git a/emqx-plugin/src/main/resources/application.yml b/emqx-plugin/src/main/resources/application.yml
new file mode 100644
index 0000000..4a05488
--- /dev/null
+++ b/emqx-plugin/src/main/resources/application.yml
@@ -0,0 +1,9 @@
+plugin:
+ runMode: prod
+ mainPackage: cc.iotkit.plugin
+
+emqx:
+ host: 127.0.0.1
+ port: 1883
+ topics: /sys/#
+ authPort: 8104
diff --git a/emqx-plugin/src/main/resources/config.json b/emqx-plugin/src/main/resources/config.json
new file mode 100644
index 0000000..564a4b6
--- /dev/null
+++ b/emqx-plugin/src/main/resources/config.json
@@ -0,0 +1,30 @@
+[
+ {
+ "id": "host",
+ "name": "emqx ip",
+ "type": "text",
+ "value": "127.0.0.1",
+ "desc": "emqx ip,默认为127.0.0.1"
+ },
+ {
+ "id": "port",
+ "name": "emqx端口",
+ "type": "number",
+ "value": 1883,
+ "desc": "emqx端口,默认为1883"
+ },
+ {
+ "id": "auth_port",
+ "name": "认证端口",
+ "type": "number",
+ "value": 8104,
+ "desc": "emqx http认证端口,默认为8104"
+ },
+ {
+ "id": "topics",
+ "name": "订阅主题",
+ "type": "text",
+ "value": "/sys/#",
+ "desc": "订阅主题多个用,隔开"
+ }
+]
\ No newline at end of file
diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java
index 913fce2..2537404 100755
--- a/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java
+++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java
@@ -8,7 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
/**
* @author sjg
*/
-@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.http"})
+@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.http")
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
public class Application extends SpringPluginBootstrap {
diff --git a/hydrovalve-plugin/src/main/java/cc/iotkit/plugins/hydrovalve/Application.java b/hydrovalve-plugin/src/main/java/cc/iotkit/plugins/hydrovalve/Application.java
index a9ccbd5..ed94e72 100644
--- a/hydrovalve-plugin/src/main/java/cc/iotkit/plugins/hydrovalve/Application.java
+++ b/hydrovalve-plugin/src/main/java/cc/iotkit/plugins/hydrovalve/Application.java
@@ -10,7 +10,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
* @Author:tfd
* @Date:2024/1/8 14:57
*/
-@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.hydrovalve"})
+@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.hydrovalve")
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
@EnableScheduling
diff --git a/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/Application.java b/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/Application.java
index 34e50a0..c6c0676 100755
--- a/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/Application.java
+++ b/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/Application.java
@@ -9,7 +9,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @author sjg
*/
-@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.modbus"})
+@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.modbus")
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
@EnableScheduling
diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java
index b7f2c18..1b5cf53 100755
--- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java
+++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java
@@ -8,7 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
/**
* @author sjg
*/
-@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.mqtt"})
+@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.mqtt")
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
public class Application extends SpringPluginBootstrap {
diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java
index bda111a..74899e2 100755
--- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java
+++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java
@@ -20,6 +20,7 @@ import cc.iotkit.plugin.core.thing.actions.DeviceState;
import cc.iotkit.plugin.core.thing.actions.EventLevel;
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
import cc.iotkit.plugin.core.thing.actions.up.*;
+import cc.iotkit.plugin.core.thing.model.ThingDevice;
import cc.iotkit.plugin.core.thing.model.ThingProduct;
import cc.iotkit.plugins.mqtt.conf.MqttConfig;
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
@@ -35,9 +36,7 @@ import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.mqtt.*;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -83,7 +82,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler {
for (String topic : unsubscribe.topics()) {
- Device device = getDevice(topic);
+ ThingDevice device = getDevice(topic);
//删除设备对应连接
endpointMap.remove(device.getDeviceName());
//下线
@@ -259,7 +258,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler log.info("publish success,topic:{},payload:{}", topic, msg));
}
- public Device getDevice(String topic) {
+ public ThingDevice getDevice(String topic) {
String[] topicParts = topic.split("/");
if (topicParts.length < 5) {
return null;
}
- return new Device(topicParts[2], topicParts[3]);
+ return ThingDevice.builder()
+ .productKey(topicParts[2])
+ .deviceName(topicParts[3])
+ .build();
}
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class Device {
-
- private String productKey;
-
- private String deviceName;
- }
}
diff --git a/pom.xml b/pom.xml
index b1ef3b9..8c106b9 100755
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
tcp-plugin
DLT645-plugin
hydrovalve-plugin
-
+ emqx-plugin
diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java
index 5f88b4d..d28d3d2 100755
--- a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java
+++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java
@@ -9,7 +9,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @author sjg
*/
-@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.tcp"})
+@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.tcp")
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
@EnableScheduling