From 9cfe8ddb09e857c463e5ece328897fd289410341 Mon Sep 17 00:00:00 2001 From: xiwa Date: Wed, 14 Feb 2024 15:20:08 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=A2=9E=E5=8A=A0emqx=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cc/iotkit/plugins/dlt645/Application.java | 2 +- emqx-plugin/pom.xml | 86 ++++ .../cc/iotkit/plugins/emqx/Application.java | 19 + .../iotkit/plugins/emqx/conf/BeanConfig.java | 28 ++ .../iotkit/plugins/emqx/conf/MqttConfig.java | 30 ++ .../plugins/emqx/handler/IMsgHandler.java | 12 + .../plugins/emqx/service/AuthVerticle.java | 146 +++++++ .../plugins/emqx/service/EmqxPlugin.java | 380 ++++++++++++++++++ .../emqx/service/FakeThingService.java | 70 ++++ .../plugins/emqx/service/MqttDevice.java | 80 ++++ .../src/main/resources/application.yml | 9 + emqx-plugin/src/main/resources/config.json | 30 ++ .../cc/iotkit/plugins/http/Application.java | 2 +- .../plugins/hydrovalve/Application.java | 2 +- .../cc/iotkit/plugins/modbus/Application.java | 2 +- .../cc/iotkit/plugins/mqtt/Application.java | 2 +- .../plugins/mqtt/service/MqttVerticle.java | 28 +- pom.xml | 2 +- .../cc/iotkit/plugins/tcp/Application.java | 2 +- 19 files changed, 907 insertions(+), 25 deletions(-) create mode 100644 emqx-plugin/pom.xml create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/Application.java create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/BeanConfig.java create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttConfig.java create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/handler/IMsgHandler.java create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/AuthVerticle.java create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/EmqxPlugin.java create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/FakeThingService.java create mode 100644 emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/MqttDevice.java create mode 100644 emqx-plugin/src/main/resources/application.yml create mode 100644 emqx-plugin/src/main/resources/config.json diff --git a/DLT645-plugin/src/main/java/cc/iotkit/plugins/dlt645/Application.java b/DLT645-plugin/src/main/java/cc/iotkit/plugins/dlt645/Application.java index aa37710..1a07d65 100755 --- a/DLT645-plugin/src/main/java/cc/iotkit/plugins/dlt645/Application.java +++ b/DLT645-plugin/src/main/java/cc/iotkit/plugins/dlt645/Application.java @@ -10,7 +10,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; * @Author:tfd * @Date:2023/12/14 16:25 */ -@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.dlt645"}) +@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.dlt645") @OneselfConfig(mainConfigFileName = {"application.yml"}) @EnableConfigurationProperties @EnableScheduling diff --git a/emqx-plugin/pom.xml b/emqx-plugin/pom.xml new file mode 100644 index 0000000..8c64be0 --- /dev/null +++ b/emqx-plugin/pom.xml @@ -0,0 +1,86 @@ + + + + iot-iita-plugins + cc.iotkit.plugins + 1.0.1 + + 4.0.0 + + emqx-plugin + + + + + io.vertx + vertx-core + ${vertx.version} + + + + io.vertx + vertx-mqtt + ${vertx.version} + + + + io.vertx + vertx-web-proxy + ${vertx.version} + + + + + + + dev + + true + + + dev + + + + + prod + + prod + + + + + + + + com.gitee.starblues + spring-brick-maven-packager + ${spring-brick.version} + + ${plugin.build.mode} + + emqx-plugin + cc.iotkit.plugins.emqx.Application + ${project.version} + iita + emqx示例插件 + application.yml + + + jar + + + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/Application.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/Application.java new file mode 100644 index 0000000..19b1031 --- /dev/null +++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/Application.java @@ -0,0 +1,19 @@ +package cc.iotkit.plugins.emqx; + +import com.gitee.starblues.bootstrap.SpringPluginBootstrap; +import com.gitee.starblues.bootstrap.annotation.OneselfConfig; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +/** + * @author sjg + */ +@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.emqx") +@OneselfConfig(mainConfigFileName = {"application.yml"}) +@EnableConfigurationProperties +public class Application extends SpringPluginBootstrap { + + public static void main(String[] args) { + new Application().run(Application.class, args); + } +} diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/BeanConfig.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/BeanConfig.java new file mode 100644 index 0000000..2b8d0a4 --- /dev/null +++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/BeanConfig.java @@ -0,0 +1,28 @@ +package cc.iotkit.plugins.emqx.conf; + +import cc.iotkit.plugin.core.IPluginConfig; +import cc.iotkit.plugin.core.LocalPluginConfig; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugins.emqx.service.FakeThingService; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +/** + * @author sjg + */ +@Component +public class BeanConfig { + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IThingService getThingService() { + return new FakeThingService(); + } + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IPluginConfig getPluginConfig(){ + return new LocalPluginConfig(); + } +} diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttConfig.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttConfig.java new file mode 100644 index 0000000..45439e0 --- /dev/null +++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/conf/MqttConfig.java @@ -0,0 +1,30 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.plugins.emqx.conf; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "emqx") +public class MqttConfig { + + private String host; + + private int port; + + private boolean ssl; + + private String topics; + + private int authPort; +} diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/handler/IMsgHandler.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/handler/IMsgHandler.java new file mode 100644 index 0000000..8b536e2 --- /dev/null +++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/handler/IMsgHandler.java @@ -0,0 +1,12 @@ +package cc.iotkit.plugins.emqx.handler; + +import io.vertx.core.json.JsonObject; + +/** + * @author sjg + */ +public interface IMsgHandler { + + void handle(String topic, JsonObject payload); + +} diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/AuthVerticle.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/AuthVerticle.java new file mode 100644 index 0000000..d24ce80 --- /dev/null +++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/AuthVerticle.java @@ -0,0 +1,146 @@ +package cc.iotkit.plugins.emqx.service; +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ + +import cc.iotkit.common.utils.CodecUtil; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.model.ThingProduct; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; + +@Slf4j +@Service +public class AuthVerticle extends AbstractVerticle { + + private HttpServer backendServer; + + @Setter + private int port; + + @Setter + private String serverPassword; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IThingService thingService; + + @Override + public void start() { + backendServer = vertx.createHttpServer(); + + //第一步 声明Router&初始化Router + Router backendRouter = Router.router(vertx); + //获取body参数,得先添加这句 + backendRouter.route().handler(BodyHandler.create()); + + //第二步 配置Router解析url + backendRouter.route(HttpMethod.POST, "/mqtt/auth").handler(rc -> { + JsonObject json = rc.getBodyAsJson(); + log.info("mqtt auth:{}", json); + try { + String clientId = json.getString("clientid"); + String username = json.getString("username"); + String password = json.getString("password"); + + //服务端插件连接 + if (clientId.equals("server") && serverPassword.equals(password)) { + httpResult(rc.response(), 200); + return; + } + + //其它客户端连接 + String[] parts = clientId.split("_"); + if (parts.length < 3) { + log.error("clientid:{}不正确", clientId); + httpResult(rc.response(), 400); + return; + } + + log.info("MQTT client auth,clientId:{},username:{},password:{}", + clientId, username, password); + + String productKey = parts[0]; + String deviceName = parts[1]; + if (!username.equals(deviceName)) { + log.error("username:{}不正确", deviceName); + httpResult(rc.response(), 403); + return; + } + + ThingProduct product = thingService.getProduct(productKey); + if (product == null) { + log.error("获取产品信息失败,productKey:{}", productKey); + httpResult(rc.response(), 403); + return; + } + + String validPasswd = CodecUtil.md5Str(product.getProductSecret() + clientId); + if (!validPasswd.equalsIgnoreCase(password)) { + log.error("密码验证失败,期望值:{}", validPasswd); + httpResult(rc.response(), 403); + return; + } + + Set devices = new HashSet<>(); + devices.add(parts[0] + "," + parts[1]); + EmqxPlugin.CLIENT_DEVICE_MAP.putIfAbsent(parts[0] + parts[1], devices); + + httpResult(rc.response(), 200); + } catch (Throwable e) { + httpResult(rc.response(), 500); + log.error("mqtt auth failed", e); + } + }); + backendRouter.route(HttpMethod.POST, "/mqtt/acl").handler(rc -> { + String json = rc.getBodyAsString(); + log.info("mqtt acl:{}", json); + try { + httpResult(rc.response(), 200); + } catch (Throwable e) { + httpResult(rc.response(), 500); + log.error("mqtt acl failed", e); + } + }); + + backendServer.requestHandler(backendRouter) + .listen(port, "0.0.0.0") + .onSuccess(s -> { + log.info("auth server start success,port:{}", s.actualPort()); + }).onFailure(e -> { + e.printStackTrace(); + }) + ; + } + + private void httpResult(HttpServerResponse response, int code) { + response.putHeader("Content-Type", "application/json"); + response + .setStatusCode(code); + response + .end("{\"result\": \"" + (code == 200 ? "allow" : "deny") + "\"}"); + } + + @Override + public void stop() throws Exception { + backendServer.close(voidAsyncResult -> log.info("close emqx auth server...")); + } +} diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/EmqxPlugin.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/EmqxPlugin.java new file mode 100644 index 0000000..5270722 --- /dev/null +++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/EmqxPlugin.java @@ -0,0 +1,380 @@ +package cc.iotkit.plugins.emqx.service; + +import cc.iotkit.common.utils.StringUtils; +import cc.iotkit.common.utils.ThreadUtil; +import cc.iotkit.common.utils.UniqueIdUtil; +import cc.iotkit.plugin.core.IPlugin; +import cc.iotkit.plugin.core.IPluginConfig; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.DeviceState; +import cc.iotkit.plugin.core.thing.actions.EventLevel; +import cc.iotkit.plugin.core.thing.actions.IDeviceAction; +import cc.iotkit.plugin.core.thing.actions.up.*; +import cc.iotkit.plugin.core.thing.model.ThingDevice; +import cc.iotkit.plugins.emqx.conf.MqttConfig; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.bean.copier.CopyOptions; +import cn.hutool.core.util.IdUtil; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; +import com.gitee.starblues.bootstrap.realize.PluginCloseListener; +import com.gitee.starblues.core.PluginCloseType; +import com.gitee.starblues.core.PluginInfo; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author sjg + */ +@Slf4j +@Service +public class EmqxPlugin implements PluginCloseListener, IPlugin, Runnable { + + @Autowired + private PluginInfo pluginInfo; + @Autowired + private MqttConfig mqttConfig; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IPluginConfig pluginConfig; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IThingService thingService; + + @Autowired + private AuthVerticle authVerticle; + + @Autowired + private MqttDevice mqttDevice; + + private final ScheduledThreadPoolExecutor emqxConnectTask = ThreadUtil.newScheduled(1, "emqx_connect"); + + private Vertx vertx; + private String deployedId; + + private MqttClient client; + + private boolean mqttConnected = false; + + private boolean authServerStarted = false; + + private static final Map DEVICE_ONLINE = new ConcurrentHashMap<>(); + + public static final Map> CLIENT_DEVICE_MAP = new HashMap<>(); + + @PostConstruct + public void init() { + vertx = Vertx.vertx(); + try { + //获取插件最新配置替换当前配置 + Map config = pluginConfig.getConfig(pluginInfo.getPluginId()); + BeanUtil.copyProperties(config, mqttConfig, CopyOptions.create().ignoreNullValue()); + + String serverPassword = IdUtil.fastSimpleUUID(); + MqttClientOptions options = new MqttClientOptions() + .setClientId("server") + .setUsername("server") + .setPassword(serverPassword) + .setCleanSession(true) + .setMaxInflightQueue(100) + .setKeepAliveInterval(60); + + if (mqttConfig.isSsl()) { + options.setSsl(true) + .setTrustAll(true); + } + client = MqttClient.create(vertx, options); + mqttDevice.setClient(client); + + authVerticle.setPort(mqttConfig.getAuthPort()); + authVerticle.setServerPassword(serverPassword); + + emqxConnectTask.scheduleWithFixedDelay(this, 3, 3, TimeUnit.SECONDS); + } catch (Throwable e) { + log.error("mqtt plugin startup error", e); + } + } + + @Override + public void run() { + if (!authServerStarted) { + try { + CountDownLatch countDownLatch = new CountDownLatch(1); + Future future = vertx.deployVerticle(authVerticle); + future.onSuccess((s -> { + deployedId = s; + countDownLatch.countDown(); + authServerStarted = true; + log.info("start emqx auth plugin success"); + })); + future.onFailure(e -> { + countDownLatch.countDown(); + authServerStarted = false; + log.error("start emqx auth plugin failed", e); + }); + countDownLatch.await(); + } catch (Exception e) { + authServerStarted = false; + log.error("start emqx auth server failed", e); + } + } + + if (mqttConnected) { + return; + } + + try { + String[] topics = mqttConfig.getTopics().split(","); + Map subscribes = new HashMap<>(topics.length); + for (String topic : topics) { + subscribes.put(topic, 1); + } + + client.connect(mqttConfig.getPort(), mqttConfig.getHost(), s -> { + if (s.succeeded()) { + log.info("client connect success."); + mqttConnected = true; + client.subscribe(subscribes, e -> { + if (e.succeeded()) { + log.info("===>subscribe success: {}", e.result()); + } else { + log.error("===>subscribe fail: ", e.cause()); + } + }); + + } else { + mqttConnected = false; + log.error("client connect fail: ", s.cause()); + } + }).publishHandler(msg -> { + String topic = msg.topicName(); + if (topic.contains("/c/")) { + return; + } + + JsonObject payload = msg.payload().toJsonObject(); + log.info("Client received message on [{}] payload [{}] with QoS [{}]", topic, payload, msg.qosLevel()); + + try { + //客户端连接断开 + if (topic.equals("/sys/client/disconnected")) { + offline(payload.getString("clientid")); + return; + } + + ThingDevice device = getDevice(topic); + if (device == null) { + return; + } + + //有消息上报-设备上线 + online(device.getProductKey(), device.getDeviceName()); + + JsonObject defParams = JsonObject.mapFrom(new HashMap<>(0)); + IDeviceAction action = null; + + String method = payload.getString("method", ""); + if (StringUtils.isBlank(method)) { + return; + } + JsonObject params = payload.getJsonObject("params", defParams); + + if ("thing.lifetime.register".equalsIgnoreCase(method)) { + //子设备注册 + String subPk = params.getString("productKey"); + String subDn = params.getString("deviceName"); + ActionResult regResult = thingService.post( + pluginInfo.getPluginId(), + fillAction( + DeviceRegister.builder() + .productKey(subPk) + .deviceName(subDn) + .model(params.getString("model")) + .version("1.0") + .build() + , subPk, subDn + ) + ); + if (regResult.getCode() == 0) { + //注册成功 + reply(topic, payload, 0); + Set devices = CLIENT_DEVICE_MAP.get(device.getProductKey() + device.getDeviceName()); + devices.add(subPk + "," + subDn); + } else { + //注册失败 + reply(topic, new JsonObject(), regResult.getCode()); + } + return; + } + + if ("thing.event.property.post".equalsIgnoreCase(method)) { + //属性上报 + action = PropertyReport.builder() + .params(params.getMap()) + .build(); + reply(topic, payload, 0); + } else if (method.startsWith("thing.event.")) { + //事件上报 + action = EventReport.builder() + .name(method.replace("thing.event.", "")) + .level(EventLevel.INFO) + .params(params.getMap()) + .build(); + reply(topic, payload, 0); + } else if (method.startsWith("thing.service.") && method.endsWith("_reply")) { + //服务回复 + action = ServiceReply.builder() + .name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1")) + .code(payload.getInteger("code", 0)) + .params(params.getMap()) + .build(); + } + + if (action == null) { + return; + } + action.setId(payload.getString("id")); + action.setProductKey(device.getProductKey()); + action.setDeviceName(device.getDeviceName()); + action.setTime(System.currentTimeMillis()); + thingService.post(pluginInfo.getPluginId(), action); + + } catch (Exception e) { + log.error("message is illegal.", e); + } + }).closeHandler(e -> { + mqttConnected = false; + log.info("client closed"); + }).exceptionHandler(event -> log.error("client fail", event)); + } catch (Exception e) { + log.error("start emqx client failed", e); + } + + } + + public ThingDevice getDevice(String topic) { + String[] topicParts = topic.split("/"); + if (topicParts.length < 5) { + return null; + } + return ThingDevice.builder() + .productKey(topicParts[2]) + .deviceName(topicParts[3]) + .build(); + } + + public void online(String pk, String dn) { + if (Boolean.TRUE.equals(DEVICE_ONLINE.get(dn))) { + return; + } + + //上线 + thingService.post( + pluginInfo.getPluginId(), + fillAction(DeviceStateChange.builder() + .state(DeviceState.ONLINE) + .build() + , pk, dn + ) + ); + DEVICE_ONLINE.put(dn, true); + } + + public void offline(String clientId) { + String[] parts = clientId.split("_"); + Set devices = CLIENT_DEVICE_MAP.get(parts[0] + parts[1]); + for (String device : devices) { + String[] pkDn = device.split(","); + //下线 + thingService.post( + pluginInfo.getPluginId(), + fillAction(DeviceStateChange.builder() + .state(DeviceState.OFFLINE) + .build() + , pkDn[0], pkDn[1] + ) + ); + DEVICE_ONLINE.remove(pkDn[1]); + } + } + + private IDeviceAction fillAction(IDeviceAction action, String productKey, String deviceName) { + action.setId(UniqueIdUtil.newRequestId()); + action.setProductKey(productKey); + action.setDeviceName(deviceName); + action.setTime(System.currentTimeMillis()); + return action; + } + + /** + * 回复设备 + */ + private void reply(String topic, JsonObject payload, int code) { + Map payloadReply = new HashMap<>(); + payloadReply.put("id", payload.getString("id")); + payloadReply.put("method", payload.getString("method") + "_reply"); + payloadReply.put("code", code); + payloadReply.put("data", payload.getJsonObject("params")); + topic = topic.replace("/s/", "/c/") + "_reply"; + + String finalTopic = topic; + client.publish(topic, JsonObject.mapFrom(payloadReply).toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false) + .onSuccess(h -> { + log.info("publish {} success", finalTopic); + }); + } + + @Override + public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { + try { + log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId()); + if (deployedId != null) { + CountDownLatch wait = new CountDownLatch(1); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> { + log.info("emqx plugin stopped success"); + wait.countDown(); + }); + future.onFailure(h -> { + log.error("emqx plugin stopped failed", h); + wait.countDown(); + }); + wait.await(5, TimeUnit.SECONDS); + } + + client.disconnect() + .onSuccess(unused -> { + mqttConnected = false; + log.info("stop emqx connect success"); + }) + .onFailure(unused -> log.error("stop emqx connect failure")); + + emqxConnectTask.shutdown(); + + } catch (Throwable e) { + log.error("emqx plugin stop error", e); + } + } + + @Override + public Map getLinkInfo(String pk, String dn) { + return null; + } +} diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/FakeThingService.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/FakeThingService.java new file mode 100644 index 0000000..6fc1e60 --- /dev/null +++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/FakeThingService.java @@ -0,0 +1,70 @@ +package cc.iotkit.plugins.emqx.service; + +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.IDeviceAction; +import cc.iotkit.plugin.core.thing.model.ThingDevice; +import cc.iotkit.plugin.core.thing.model.ThingProduct; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * 测试服务 + * + * @author sjg + */ +@Slf4j +public class FakeThingService implements IThingService { + + /** + * 添加测试产品 + */ + private static final Map PRODUCTS = Map.of( + "hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "Rf4QSjbm65X45753", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "cGCrkK7Ex4FESAwe", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU" + ); + + /** + * 添加测试设备 + */ + private static final Map DEVICES = new HashMap<>(); + + static { + for (int i = 0; i < 10; i++) { + DEVICES.put("TEST:GW:" + StringUtils.leftPad(i + "", 6, "0"), "hbtgIA0SuVw9lxjB"); + DEVICES.put("TEST_SW_" + StringUtils.leftPad(i + "", 6, "0"), "Rf4QSjbm65X45753"); + DEVICES.put("TEST_SC_" + StringUtils.leftPad(i + "", 6, "0"), "cGCrkK7Ex4FESAwe"); + } + } + + @Override + public ActionResult post(String pluginId, IDeviceAction action) { + log.info("post action:{}", action); + return ActionResult.builder().code(0).build(); + } + + @Override + public ThingProduct getProduct(String pk) { + return ThingProduct.builder() + .productKey(pk) + .productSecret(PRODUCTS.get(pk)) + .build(); + } + + @Override + public ThingDevice getDevice(String dn) { + return ThingDevice.builder() + .productKey(DEVICES.get(dn)) + .deviceName(dn) + .build(); + } + + @Override + public Map getProperty(String dn) { + return new HashMap<>(0); + } +} diff --git a/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/MqttDevice.java b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/MqttDevice.java new file mode 100644 index 0000000..1501c8a --- /dev/null +++ b/emqx-plugin/src/main/java/cc/iotkit/plugins/emqx/service/MqttDevice.java @@ -0,0 +1,80 @@ +package cc.iotkit.plugins.emqx.service; + +import cc.iotkit.common.enums.ErrCode; +import cc.iotkit.common.exception.BizException; +import cc.iotkit.plugin.core.thing.IDevice; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig; +import cc.iotkit.plugin.core.thing.actions.down.PropertyGet; +import cc.iotkit.plugin.core.thing.actions.down.PropertySet; +import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.json.JsonObject; +import io.vertx.mqtt.MqttClient; +import lombok.Setter; +import org.springframework.stereotype.Service; + +/** + * mqtt设备下行接口 + * + * @author sjg + */ +@Service +public class MqttDevice implements IDevice { + + @Setter + private MqttClient client; + + @Override + public ActionResult config(DeviceConfig action) { + return ActionResult.builder().code(0).reason("").build(); + } + + @Override + public ActionResult propertyGet(PropertyGet action) { + String topic = String.format("/sys/%s/%s/c/service/property/get", action.getProductKey(), action.getDeviceName()); + return send( + topic, + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service.property.get") + .put("params", action.getKeys()) + ); + } + + @Override + public ActionResult propertySet(PropertySet action) { + String topic = String.format("/sys/%s/%s/c/service/property/set", action.getProductKey(), action.getDeviceName()); + return send( + topic, + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service.property.set") + .put("params", action.getParams()) + ); + } + + @Override + public ActionResult serviceInvoke(ServiceInvoke action) { + String topic = String.format("/sys/%s/%s/c/service/%s", action.getProductKey(), action.getDeviceName(), action.getName()); + return send( + topic, + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service." + action.getName()) + .put("params", action.getParams()) + ); + } + + private ActionResult send(String topic, JsonObject payload) { + try { + client.publish(topic, payload.toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false); + return ActionResult.builder().code(0).reason("").build(); + } catch (BizException e) { + return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build(); + } catch (Exception e) { + return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build(); + } + } + +} diff --git a/emqx-plugin/src/main/resources/application.yml b/emqx-plugin/src/main/resources/application.yml new file mode 100644 index 0000000..4a05488 --- /dev/null +++ b/emqx-plugin/src/main/resources/application.yml @@ -0,0 +1,9 @@ +plugin: + runMode: prod + mainPackage: cc.iotkit.plugin + +emqx: + host: 127.0.0.1 + port: 1883 + topics: /sys/# + authPort: 8104 diff --git a/emqx-plugin/src/main/resources/config.json b/emqx-plugin/src/main/resources/config.json new file mode 100644 index 0000000..564a4b6 --- /dev/null +++ b/emqx-plugin/src/main/resources/config.json @@ -0,0 +1,30 @@ +[ + { + "id": "host", + "name": "emqx ip", + "type": "text", + "value": "127.0.0.1", + "desc": "emqx ip,默认为127.0.0.1" + }, + { + "id": "port", + "name": "emqx端口", + "type": "number", + "value": 1883, + "desc": "emqx端口,默认为1883" + }, + { + "id": "auth_port", + "name": "认证端口", + "type": "number", + "value": 8104, + "desc": "emqx http认证端口,默认为8104" + }, + { + "id": "topics", + "name": "订阅主题", + "type": "text", + "value": "/sys/#", + "desc": "订阅主题多个用,隔开" + } +] \ No newline at end of file diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java index 913fce2..2537404 100755 --- a/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java @@ -8,7 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties /** * @author sjg */ -@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.http"}) +@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.http") @OneselfConfig(mainConfigFileName = {"application.yml"}) @EnableConfigurationProperties public class Application extends SpringPluginBootstrap { diff --git a/hydrovalve-plugin/src/main/java/cc/iotkit/plugins/hydrovalve/Application.java b/hydrovalve-plugin/src/main/java/cc/iotkit/plugins/hydrovalve/Application.java index a9ccbd5..ed94e72 100644 --- a/hydrovalve-plugin/src/main/java/cc/iotkit/plugins/hydrovalve/Application.java +++ b/hydrovalve-plugin/src/main/java/cc/iotkit/plugins/hydrovalve/Application.java @@ -10,7 +10,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; * @Author:tfd * @Date:2024/1/8 14:57 */ -@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.hydrovalve"}) +@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.hydrovalve") @OneselfConfig(mainConfigFileName = {"application.yml"}) @EnableConfigurationProperties @EnableScheduling diff --git a/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/Application.java b/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/Application.java index 34e50a0..c6c0676 100755 --- a/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/Application.java +++ b/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/Application.java @@ -9,7 +9,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; /** * @author sjg */ -@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.modbus"}) +@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.modbus") @OneselfConfig(mainConfigFileName = {"application.yml"}) @EnableConfigurationProperties @EnableScheduling diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java index b7f2c18..1b5cf53 100755 --- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java @@ -8,7 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties /** * @author sjg */ -@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.mqtt"}) +@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.mqtt") @OneselfConfig(mainConfigFileName = {"application.yml"}) @EnableConfigurationProperties public class Application extends SpringPluginBootstrap { diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java index bda111a..74899e2 100755 --- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java @@ -20,6 +20,7 @@ import cc.iotkit.plugin.core.thing.actions.DeviceState; import cc.iotkit.plugin.core.thing.actions.EventLevel; import cc.iotkit.plugin.core.thing.actions.IDeviceAction; import cc.iotkit.plugin.core.thing.actions.up.*; +import cc.iotkit.plugin.core.thing.model.ThingDevice; import cc.iotkit.plugin.core.thing.model.ThingProduct; import cc.iotkit.plugins.mqtt.conf.MqttConfig; import com.gitee.starblues.bootstrap.annotation.AutowiredType; @@ -35,9 +36,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.core.net.PemKeyCertOptions; import io.vertx.mqtt.*; import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -83,7 +82,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler { for (String topic : unsubscribe.topics()) { - Device device = getDevice(topic); + ThingDevice device = getDevice(topic); //删除设备对应连接 endpointMap.remove(device.getDeviceName()); //下线 @@ -259,7 +258,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler log.info("publish success,topic:{},payload:{}", topic, msg)); } - public Device getDevice(String topic) { + public ThingDevice getDevice(String topic) { String[] topicParts = topic.split("/"); if (topicParts.length < 5) { return null; } - return new Device(topicParts[2], topicParts[3]); + return ThingDevice.builder() + .productKey(topicParts[2]) + .deviceName(topicParts[3]) + .build(); } - - @Data - @NoArgsConstructor - @AllArgsConstructor - public static class Device { - - private String productKey; - - private String deviceName; - } } diff --git a/pom.xml b/pom.xml index b1ef3b9..8c106b9 100755 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ tcp-plugin DLT645-plugin hydrovalve-plugin - + emqx-plugin diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java index 5f88b4d..d28d3d2 100755 --- a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java @@ -9,7 +9,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; /** * @author sjg */ -@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.tcp"}) +@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.tcp") @OneselfConfig(mainConfigFileName = {"application.yml"}) @EnableConfigurationProperties @EnableScheduling