diff --git a/.gitignore b/.gitignore index a1c2a23..4e2ac05 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.class # Log file +log *.log # BlueJ files @@ -11,7 +12,6 @@ .mtj.tmp/ # Package Files # -*.jar *.war *.nar *.ear @@ -21,3 +21,13 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* +.idea +target +*.iml +data/elasticsearch +.init +*.db +.flattened-pom.xml + +.DS_Store +dependency-reduced-pom.xml diff --git a/http-plugin/pom.xml b/http-plugin/pom.xml new file mode 100644 index 0000000..56cd2a8 --- /dev/null +++ b/http-plugin/pom.xml @@ -0,0 +1,84 @@ + + + + iot-iita-plugins + cc.iotkit.plugins + 1.0.0 + + 4.0.0 + + http-plugin + + + + + io.vertx + vertx-core + ${vertx.version} + + + + io.vertx + vertx-web + ${vertx.version} + + + + org.slf4j + slf4j-api + + + + + + + dev + + true + + + dev + + + + + prod + + prod + + + + + + + + com.gitee.starblues + spring-brick-maven-packager + + ${plugin.build.mode} + + http-plugin + cc.iotkit.plugins.http.Application + 1.0.0 + iita + http示例插件 + application.yml + + + jar-outer + + + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java new file mode 100755 index 0000000..913fce2 --- /dev/null +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/Application.java @@ -0,0 +1,19 @@ +package cc.iotkit.plugins.http; + +import com.gitee.starblues.bootstrap.SpringPluginBootstrap; +import com.gitee.starblues.bootstrap.annotation.OneselfConfig; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +/** + * @author sjg + */ +@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.http"}) +@OneselfConfig(mainConfigFileName = {"application.yml"}) +@EnableConfigurationProperties +public class Application extends SpringPluginBootstrap { + + public static void main(String[] args) { + new Application().run(Application.class, args); + } +} diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/BeanConfig.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/BeanConfig.java new file mode 100644 index 0000000..aa18494 --- /dev/null +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/BeanConfig.java @@ -0,0 +1,21 @@ +package cc.iotkit.plugins.http.conf; + +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugins.http.service.FakeThingService; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +/** + * @author sjg + */ +@Component +public class BeanConfig { + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IThingService getThingService() { + return new FakeThingService(); + } + +} diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/HttpConfig.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/HttpConfig.java new file mode 100755 index 0000000..e4cb217 --- /dev/null +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/HttpConfig.java @@ -0,0 +1,23 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.plugins.http.conf; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "http") +public class HttpConfig { + + private int port; + +} diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/FakeThingService.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/FakeThingService.java new file mode 100644 index 0000000..514985c --- /dev/null +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/FakeThingService.java @@ -0,0 +1,49 @@ +package cc.iotkit.plugins.http.service; + +import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.product.Product; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.IDeviceAction; +import io.vertx.core.json.JsonObject; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; + +/** + * 测试服务 + * + * @author sjg + */ +@Slf4j +public class FakeThingService implements IThingService { + + @Override + public ActionResult post(String pluginId, IDeviceAction action) { + log.info("post action:{}", action); + return ActionResult.builder().code(0).build(); + } + + @Override + public Product getProduct(String pk) { + return Product.builder() + .productKey("cGCrkK7Ex4FESAwe") + .productSecret("aaaaaaaa") + .build(); + } + + @Override + public DeviceInfo getDevice(String dn) { + return DeviceInfo.builder() + .productKey("cGCrkK7Ex4FESAwe") + .deviceName(dn) + .secret("mBCr3TKstTj2KeM6") + .build(); + } + + @Override + public Map getProperty(String dn) { + return new JsonObject().put("powerstate", 1).getMap(); + } +} diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpDevice.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpDevice.java new file mode 100644 index 0000000..561d57a --- /dev/null +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpDevice.java @@ -0,0 +1,83 @@ +package cc.iotkit.plugins.http.service; + +import cc.iotkit.common.enums.ErrCode; +import cc.iotkit.common.exception.BizException; +import cc.iotkit.plugin.core.thing.IDevice; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig; +import cc.iotkit.plugin.core.thing.actions.down.PropertyGet; +import cc.iotkit.plugin.core.thing.actions.down.PropertySet; +import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke; +import io.vertx.core.json.JsonObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * http设备下行接口 + * + * @author sjg + */ +@Service +public class HttpDevice implements IDevice { + + @Autowired + private HttpVerticle httpVerticle; + + @Override + public ActionResult config(DeviceConfig action) { + return ActionResult.builder().code(0).reason("").build(); + } + + @Override + public ActionResult propertyGet(PropertyGet action) { + String topic = String.format("/sys/%s/%s/c/service/property/get", action.getProductKey(), action.getDeviceName()); + return send( + topic, + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service.property.get") + .put("params", action.getKeys()) + .toString() + ); + } + + @Override + public ActionResult propertySet(PropertySet action) { + String topic = String.format("/sys/%s/%s/c/service/property/set", action.getProductKey(), action.getDeviceName()); + return send( + topic, + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service.property.set") + .put("params", action.getParams()) + .toString() + ); + } + + @Override + public ActionResult serviceInvoke(ServiceInvoke action) { + String topic = String.format("/sys/%s/%s/c/service/%s", action.getProductKey(), action.getDeviceName(), action.getName()); + return send( + topic, + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service." + action.getName()) + .put("params", action.getParams()) + .toString() + ); + } + + private ActionResult send(String topic, String deviceName, String payload) { + try { + return ActionResult.builder().code(0).reason("").build(); + } catch (BizException e) { + return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build(); + } catch (Exception e) { + return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build(); + } + } + +} diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java new file mode 100644 index 0000000..45696a7 --- /dev/null +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java @@ -0,0 +1,71 @@ +package cc.iotkit.plugins.http.service; + +import com.gitee.starblues.bootstrap.realize.PluginCloseListener; +import com.gitee.starblues.core.PluginCloseType; +import com.gitee.starblues.core.PluginInfo; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.concurrent.CountDownLatch; + +/** + * @author sjg + */ +@Slf4j +@Service +public class HttpPlugin implements PluginCloseListener { + + @Autowired + private PluginInfo pluginInfo; + @Autowired + private HttpVerticle httpVerticle; + + private Vertx vertx; + private CountDownLatch countDownLatch; + private String deployedId; + + @PostConstruct + public void init() { + vertx = Vertx.vertx(); + try { + countDownLatch = new CountDownLatch(1); + Future future = vertx.deployVerticle(httpVerticle); + future.onSuccess((s -> { + deployedId = s; + countDownLatch.countDown(); + })); + future.onFailure((e) -> { + countDownLatch.countDown(); + log.error("start mqtt plugin failed", e); + }); + countDownLatch.await(); + future.succeeded(); + } catch (Throwable e) { + log.error("start mqtt plugin error.", e); + } + } + + @Override + public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { + try { + httpVerticle.stop(); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> log.info("stop mqtt plugin success")); + if (closeType == PluginCloseType.UNINSTALL) { + log.info("插件被卸载了:{}", pluginInfo.getPluginId()); + } else if (closeType == PluginCloseType.STOP) { + log.info("插件被关闭了:{}", pluginInfo.getPluginId()); + } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { + log.info("插件被升级卸载了:{}", pluginInfo.getPluginId()); + } + } catch (Throwable e) { + log.error("stop mqtt plugin error.", e); + } + } + +} diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpVerticle.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpVerticle.java new file mode 100755 index 0000000..63e8398 --- /dev/null +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpVerticle.java @@ -0,0 +1,193 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.plugins.http.service; + +import cc.iotkit.common.utils.StringUtils; +import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.DeviceState; +import cc.iotkit.plugin.core.thing.actions.EventLevel; +import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange; +import cc.iotkit.plugin.core.thing.actions.up.EventReport; +import cc.iotkit.plugin.core.thing.actions.up.PropertyReport; +import cc.iotkit.plugins.http.conf.HttpConfig; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; +import com.gitee.starblues.core.PluginInfo; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.BodyHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +/** + * mqtt官方协议文档: + * http://iotkit-open-source.gitee.io/document/pages/device_protocol/http/#%E4%BA%8B%E4%BB%B6%E4%B8%8A%E6%8A%A5 + * + * @author sjg + */ +@Slf4j +@Component +public class HttpVerticle extends AbstractVerticle implements Handler { + + @Autowired + private HttpConfig config; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IThingService thingService; + + @Autowired + private PluginInfo pluginInfo; + + private static final Set DEVICE_ONLINE = new HashSet<>(); + + @Override + public void start() { + HttpServer httpServer = vertx.createHttpServer(); + Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()).handler(this); + httpServer.requestHandler(router).listen(config.getPort(), ar -> { + if (ar.succeeded()) { + log.info("http server is listening on port " + ar.result().actualPort()); + } else { + log.error("Error on starting the server", ar.cause()); + } + }); + } + + @Override + public void handle(RoutingContext ctx) { + HttpServerResponse response = ctx.response(); + response.putHeader("content-type", "application/json"); + response.setStatusCode(200); + + try { + String secret = ctx.request().getHeader("secret"); + if (StringUtils.isBlank(secret)) { + log.error("secret不能为空"); + response.setStatusCode(401); + end(response); + return; + } + + HttpServerRequest request = ctx.request(); + // /sys/{productKey}/{deviceName}/properties + String path = request.path(); + String[] parts = path.split("/"); + if (parts.length < 5) { + log.error("不正确的路径"); + response.setStatusCode(500); + } + + String productKey = parts[2]; + String deviceName = parts[3]; + String type = parts[4]; + DeviceInfo device = thingService.getDevice(deviceName); + if (device == null) { + log.error("认证失败,设备:{} 不存在", deviceName); + response.setStatusCode(401); + end(response); + return; + } + if (!secret.equalsIgnoreCase(device.getSecret())) { + log.error("认证失败,secret不正确,期望值:{}", device.getSecret()); + response.setStatusCode(401); + end(response); + return; + } + + //设备上线 + if (!DEVICE_ONLINE.contains(deviceName)) { + thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder() + .id(UUID.randomUUID().toString()) + .productKey(productKey) + .deviceName(deviceName) + .state(DeviceState.ONLINE) + .time(System.currentTimeMillis()) + .build()); + DEVICE_ONLINE.add(deviceName); + } + + String method = request.method().name(); + JsonObject payload = ctx.getBodyAsJson(); + + if ("event".equals(type)) { + //事件上报 + if ("POST".equalsIgnoreCase(method)) { + response.setStatusCode(500); + log.error("请求类型不正确,期望值:POST,实际值:{}", method); + end(response); + } + thingService.post( + pluginInfo.getPluginId(), + EventReport.builder() + .id(payload.getString("id")) + .productKey(productKey) + .deviceName(deviceName) + .level(EventLevel.INFO) + .name(parts[3]) + .params(payload.getJsonObject("params").getMap()) + .build() + ); + end(response); + return; + } + + if ("properties".equals(type)) { + if ("POST".equalsIgnoreCase(method)) { + //属性上报 + thingService.post( + pluginInfo.getPluginId(), + PropertyReport.builder() + .id(UUID.randomUUID().toString()) + .productKey(productKey) + .deviceName(deviceName) + .params(payload.getJsonObject("params").getMap()) + .build() + ); + end(response); + return; + } + + if ("GET".equalsIgnoreCase(method)) { + //属性获取 + Map property = thingService.getProperty(deviceName); + response.end(new JsonObject() + .put("code", 0) + .put("data", property) + .toString()); + } + } + } catch (Exception e) { + log.error("消息处理失败", e); + response.setStatusCode(500); + end(response); + } + } + + private void end(HttpServerResponse response) { + response.end(new JsonObject() + .put("code", response.getStatusCode() == 200 ? 0 : response.getStatusCode()) + .toString()); + } + +} diff --git a/http-plugin/src/main/resources/application.yml b/http-plugin/src/main/resources/application.yml new file mode 100644 index 0000000..4bf4e13 --- /dev/null +++ b/http-plugin/src/main/resources/application.yml @@ -0,0 +1,6 @@ +plugin: + runMode: dev + mainPackage: cc.iotkit.plugin + +http: + port: 9081 diff --git a/mqtt-plugin/pom.xml b/mqtt-plugin/pom.xml new file mode 100644 index 0000000..b10d75c --- /dev/null +++ b/mqtt-plugin/pom.xml @@ -0,0 +1,89 @@ + + + + iot-iita-plugins + cc.iotkit.plugins + 1.0.0 + + 4.0.0 + + mqtt-plugin + + + + + io.vertx + vertx-core + ${vertx.version} + + + + io.vertx + vertx-mqtt + ${vertx.version} + + + + io.netty + netty-codec-mqtt + + + + org.slf4j + slf4j-api + + + + + + + dev + + true + + + dev + + + + + prod + + prod + + + + + + + + com.gitee.starblues + spring-brick-maven-packager + + ${plugin.build.mode} + + mqtt-plugin + cc.iotkit.plugins.mqtt.Application + 1.0.0 + iita + mqtt示例插件 + application.yml + + + jar-outer + + + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java new file mode 100755 index 0000000..b7f2c18 --- /dev/null +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/Application.java @@ -0,0 +1,19 @@ +package cc.iotkit.plugins.mqtt; + +import com.gitee.starblues.bootstrap.SpringPluginBootstrap; +import com.gitee.starblues.bootstrap.annotation.OneselfConfig; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +/** + * @author sjg + */ +@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.mqtt"}) +@OneselfConfig(mainConfigFileName = {"application.yml"}) +@EnableConfigurationProperties +public class Application extends SpringPluginBootstrap { + + public static void main(String[] args) { + new Application().run(Application.class, args); + } +} diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/conf/BeanConfig.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/conf/BeanConfig.java new file mode 100644 index 0000000..f0b684c --- /dev/null +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/conf/BeanConfig.java @@ -0,0 +1,21 @@ +package cc.iotkit.plugins.mqtt.conf; + +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugins.mqtt.service.FakeThingService; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +/** + * @author sjg + */ +@Component +public class BeanConfig { + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IThingService getThingService() { + return new FakeThingService(); + } + +} diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/conf/MqttConfig.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/conf/MqttConfig.java new file mode 100755 index 0000000..660dcc9 --- /dev/null +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/conf/MqttConfig.java @@ -0,0 +1,31 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.plugins.mqtt.conf; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "mqtt") +public class MqttConfig { + + private int port; + + private String sslKey; + + private String sslCert; + + private boolean ssl; + + private boolean useWebSocket; + +} diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/FakeThingService.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/FakeThingService.java new file mode 100644 index 0000000..0923414 --- /dev/null +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/FakeThingService.java @@ -0,0 +1,47 @@ +package cc.iotkit.plugins.mqtt.service; + +import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.product.Product; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.IDeviceAction; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; + +/** + * 测试服务 + * + * @author sjg + */ +@Slf4j +public class FakeThingService implements IThingService { + + @Override + public ActionResult post(String pluginId, IDeviceAction action) { + log.info("post action:{}", action); + return ActionResult.builder().code(0).build(); + } + + @Override + public Product getProduct(String pk) { + return Product.builder() + .productKey("cGCrkK7Ex4FESAwe") + .productSecret("aaaaaaaa") + .build(); + } + + @Override + public DeviceInfo getDevice(String dn) { + return DeviceInfo.builder() + .productKey("cGCrkK7Ex4FESAwe") + .deviceName(dn) + .build(); + } + + @Override + public Map getProperty(String dn) { + return new HashMap<>(0); + } +} diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttDevice.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttDevice.java new file mode 100644 index 0000000..ea718bf --- /dev/null +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttDevice.java @@ -0,0 +1,88 @@ +package cc.iotkit.plugins.mqtt.service; + +import cc.iotkit.common.enums.ErrCode; +import cc.iotkit.common.exception.BizException; +import cc.iotkit.plugin.core.thing.IDevice; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig; +import cc.iotkit.plugin.core.thing.actions.down.PropertyGet; +import cc.iotkit.plugin.core.thing.actions.down.PropertySet; +import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke; +import io.vertx.core.json.JsonObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * mqtt设备下行接口 + * + * @author sjg + */ +@Service +public class MqttDevice implements IDevice { + + @Autowired + private MqttVerticle mqttVerticle; + + @Override + public ActionResult config(DeviceConfig action) { + return ActionResult.builder().code(0).reason("").build(); + } + + @Override + public ActionResult propertyGet(PropertyGet action) { + String topic = String.format("/sys/%s/%s/c/service/property/get", action.getProductKey(), action.getDeviceName()); + return send( + topic, + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service.property.get") + .put("params", action.getKeys()) + .toString() + ); + } + + @Override + public ActionResult propertySet(PropertySet action) { + String topic = String.format("/sys/%s/%s/c/service/property/set", action.getProductKey(), action.getDeviceName()); + return send( + topic, + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service.property.set") + .put("params", action.getParams()) + .toString() + ); + } + + @Override + public ActionResult serviceInvoke(ServiceInvoke action) { + String topic = String.format("/sys/%s/%s/c/service/%s", action.getProductKey(), action.getDeviceName(), action.getName()); + return send( + topic, + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service." + action.getName()) + .put("params", action.getParams()) + .toString() + ); + } + + private ActionResult send(String topic, String deviceName, String payload) { + try { + mqttVerticle.publish( + deviceName, + topic, + payload + ); + return ActionResult.builder().code(0).reason("").build(); + } catch (BizException e) { + return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build(); + } catch (Exception e) { + return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build(); + } + } + +} diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java new file mode 100644 index 0000000..073d988 --- /dev/null +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java @@ -0,0 +1,71 @@ +package cc.iotkit.plugins.mqtt.service; + +import com.gitee.starblues.bootstrap.realize.PluginCloseListener; +import com.gitee.starblues.core.PluginCloseType; +import com.gitee.starblues.core.PluginInfo; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.concurrent.CountDownLatch; + +/** + * @author sjg + */ +@Slf4j +@Service +public class MqttPlugin implements PluginCloseListener { + + @Autowired + private PluginInfo pluginInfo; + @Autowired + private MqttVerticle mqttVerticle; + + private Vertx vertx; + private CountDownLatch countDownLatch; + private String deployedId; + + @PostConstruct + public void init() { + vertx = Vertx.vertx(); + try { + countDownLatch = new CountDownLatch(1); + Future future = vertx.deployVerticle(mqttVerticle); + future.onSuccess((s -> { + deployedId = s; + countDownLatch.countDown(); + })); + future.onFailure((e) -> { + countDownLatch.countDown(); + log.error("start mqtt plugin failed", e); + }); + countDownLatch.await(); + future.succeeded(); + } catch (Throwable e) { + log.error("start mqtt plugin error.", e); + } + } + + @Override + public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { + try { + mqttVerticle.stop(); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> log.info("stop mqtt plugin success")); + if (closeType == PluginCloseType.UNINSTALL) { + log.info("插件被卸载了:{}", pluginInfo.getPluginId()); + } else if (closeType == PluginCloseType.STOP) { + log.info("插件被关闭了:{}", pluginInfo.getPluginId()); + } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { + log.info("插件被升级卸载了:{}", pluginInfo.getPluginId()); + } + } catch (Throwable e) { + log.error("stop mqtt plugin error.", e); + } + } + +} diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java new file mode 100755 index 0000000..b0a6e34 --- /dev/null +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java @@ -0,0 +1,349 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.plugins.mqtt.service; + +import cc.iotkit.common.enums.ErrCode; +import cc.iotkit.common.exception.BizException; +import cc.iotkit.common.utils.CodecUtil; +import cc.iotkit.common.utils.UniqueIdUtil; +import cc.iotkit.model.product.Product; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.DeviceState; +import cc.iotkit.plugin.core.thing.actions.EventLevel; +import cc.iotkit.plugin.core.thing.actions.IDeviceAction; +import cc.iotkit.plugin.core.thing.actions.up.*; +import cc.iotkit.plugins.mqtt.conf.MqttConfig; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; +import com.gitee.starblues.core.PluginInfo; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.mqtt.*; +import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * mqtt官方协议文档: + * http://iotkit-open-source.gitee.io/document/pages/device_protocol/mqtt/#%E7%BD%91%E5%85%B3%E8%BF%9E%E6%8E%A5%E5%92%8C%E6%B3%A8%E5%86%8C + * + * @author sjg + */ +@Slf4j +@Component +public class MqttVerticle extends AbstractVerticle implements Handler { + + private MqttServer mqttServer; + private final Map endpointMap = new HashMap<>(); + /** + * 增加一个客户端连接clientid-连接状态池,避免mqtt关闭的时候走异常断开和mqtt断开的handler,导致多次离线消息 + */ + private static final Map MQTT_CONNECT_POOL = new ConcurrentHashMap<>(); + + @Autowired + private MqttConfig config; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IThingService thingService; + + @Autowired + private PluginInfo pluginInfo; + + @Override + public void start() { + MqttServerOptions options = new MqttServerOptions() + .setPort(config.getPort()); + if (config.isSsl()) { + options = options.setSsl(true) + .setKeyCertOptions(new PemKeyCertOptions() + .setKeyPath(config.getSslKey()) + .setCertPath(config.getSslCert())); + } + options.setUseWebSocket(config.isUseWebSocket()); + + mqttServer = MqttServer.create(vertx, options); + mqttServer.endpointHandler(this::handle).listen(ar -> { + if (ar.succeeded()) { + log.info("MQTT server is listening on port " + ar.result().actualPort()); + } else { + log.error("Error on starting the server", ar.cause()); + } + }); + } + + @Override + public void handle(MqttEndpoint endpoint) { + log.info("MQTT client:{} request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession()); + + MqttAuth auth = endpoint.auth(); + if (auth == null) { + endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); + return; + } + //mqtt连接认证信息: + /* + * mqttClientId: productKey_deviceName_model + * mqttUserName: deviceName + * mqttPassword: md5(产品密钥,mqttClientId) + */ + String clientId = endpoint.clientIdentifier(); + String[] parts = clientId.split("_"); + if (parts.length < 3) { + log.error("clientId:{}不正确", clientId); + endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID); + return; + } + + log.info("MQTT client auth,clientId:{},username:{},password:{}", + clientId, auth.getUsername(), auth.getPassword()); + + String productKey = parts[0]; + String deviceName = parts[1]; + if (!auth.getUsername().equals(deviceName)) { + log.error("username:{}不正确", deviceName); + endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); + return; + } + + Product product = thingService.getProduct(productKey); + if (product == null) { + log.error("获取产品信息失败,productKey:{}", productKey); + endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); + return; + } + + String validPasswd = CodecUtil.md5Str(product.getProductSecret() + clientId); + if (!validPasswd.equalsIgnoreCase(auth.getPassword())) { + log.error("密码验证失败,期望值:{}", validPasswd); + endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); + return; + } + + //设备注册 + ActionResult result = thingService.post( + pluginInfo.getPluginId(), + fillAction( + DeviceRegister.builder() + .model(parts[2]) + .version("1.0") + .build() + , productKey, deviceName + ) + ); + if (result.getCode() != 0) { + log.error("设备注册失败:{}", result); + endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED); + return; + } + + //保存设备与连接关系 + endpointMap.put(deviceName, endpoint); + MQTT_CONNECT_POOL.put(clientId, true); + + log.info("MQTT client keep alive timeout = {} ", endpoint.keepAliveTimeSeconds()); + + endpoint.accept(false); + + endpoint.closeHandler((v) -> { + log.warn("client connection closed,clientId:{}", clientId); + if (Boolean.FALSE.equals(MQTT_CONNECT_POOL.get(clientId))) { + MQTT_CONNECT_POOL.remove(clientId); + return; + } + //下线 + thingService.post( + pluginInfo.getPluginId(), + fillAction( + DeviceStateChange.builder() + .state(DeviceState.OFFLINE) + .build() + , productKey, deviceName + ) + ); + //删除设备与连接关系 + endpointMap.remove(deviceName); + }).disconnectMessageHandler(disconnectMessage -> { + log.info("Received disconnect from client, reason code = {}", disconnectMessage.code()); + //删除设备与连接关系 + endpointMap.remove(deviceName); + MQTT_CONNECT_POOL.put(clientId, false); + }).subscribeHandler(subscribe -> { + //上线 + thingService.post( + pluginInfo.getPluginId(), + fillAction(DeviceStateChange.builder() + .state(DeviceState.ONLINE) + .build() + , productKey, deviceName + ) + ); + + List reasonCodes = new ArrayList<>(); + for (MqttTopicSubscription s : subscribe.topicSubscriptions()) { + log.info("Subscription for {},with QoS {}", s.topicName(), s.qualityOfService()); + try { + String topic = s.topicName(); + //topic订阅验证 /sys/{productKey}/{deviceName}/# + String regex = String.format("^/sys/%s/%s/.*", productKey, deviceName); + if (!topic.matches(regex)) { + log.error("subscript topic:{} incorrect,regex:{}", topic, regex); + continue; + } + reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService())); + } catch (Throwable e) { + log.error("subscribe failed,topic:" + s.topicName(), e); + reasonCodes.add(MqttSubAckReasonCode.NOT_AUTHORIZED); + } + } + // ack the subscriptions request + endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); + + }).unsubscribeHandler(unsubscribe -> { + //下线 + thingService.post( + pluginInfo.getPluginId(), + fillAction( + DeviceStateChange.builder() + .state(DeviceState.OFFLINE) + .build() + , productKey, deviceName + ) + ); + + // ack the subscriptions request + endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); + }).publishHandler(message -> { + JsonObject payload = message.payload().toJsonObject(); + log.info("Received message:{}, with QoS {}", payload, + message.qosLevel()); + if (payload.isEmpty()) { + return; + } + String topic = message.topicName(); + + try { + JsonObject defParams = JsonObject.mapFrom(new HashMap<>(0)); + IDeviceAction action = null; + + String method = payload.getString("method", ""); + if ("thing.event.property.post".equalsIgnoreCase(method)) { + //属性上报 + action = PropertyReport.builder() + .params(payload.getJsonObject("params", defParams).getMap()) + .build(); + reply(endpoint, topic, payload); + } else if (method.startsWith("thing.event.")) { + //事件上报 + action = EventReport.builder() + .name(method.replace("thing.event.", "")) + .level(EventLevel.INFO) + .params(payload.getJsonObject("params", defParams).getMap()) + .build(); + reply(endpoint, topic, payload); + } else if (method.startsWith("thing.service.") && method.endsWith("_reply")) { + //服务回复 + action = ServiceReply.builder() + .name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1")) + .code(payload.getInteger("code", 0)) + .params(payload.getJsonObject("data", defParams).getMap()) + .build(); + } + if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { + endpoint.publishAcknowledge(message.messageId()); + } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { + endpoint.publishReceived(message.messageId()); + } + + if (action == null) { + return; + } + action.setId(payload.getString("id")); + action.setProductKey(productKey); + action.setDeviceName(deviceName); + action.setTime(System.currentTimeMillis()); + thingService.post(pluginInfo.getPluginId(), action); + } catch (Throwable e) { + log.error("handler message failed,topic:" + message.topicName(), e); + } + }).publishReleaseHandler(endpoint::publishComplete); + + } + + /** + * 回复设备 + */ + private void reply(MqttEndpoint endpoint, String topic, JsonObject payload) { + Map payloadReply = new HashMap<>(); + payloadReply.put("id", payload.getString("id")); + payloadReply.put("method", payload.getString("method") + "_reply"); + payloadReply.put("code", 0); + + endpoint.publish(topic + "_reply", JsonObject.mapFrom(payloadReply).toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false); + } + + private IDeviceAction fillAction(IDeviceAction action, String productKey, String deviceName) { + action.setId(UniqueIdUtil.newRequestId()); + action.setProductKey(productKey); + action.setDeviceName(deviceName); + action.setTime(System.currentTimeMillis()); + return action; + } + + @Override + public void stop() { + for (MqttEndpoint endpoint : endpointMap.values()) { + String clientId = endpoint.clientIdentifier(); + String[] parts = clientId.split("_"); + if (parts.length < 3) { + continue; + } + + //下线 + thingService.post( + pluginInfo.getPluginId(), + fillAction( + DeviceStateChange.builder() + .state(DeviceState.OFFLINE) + .build(), + parts[0], + parts[1] + ) + ); + } + mqttServer.close(voidAsyncResult -> log.info("close mqtt server...")); + } + + public void publish(String deviceName, String topic, String msg) { + MqttEndpoint endpoint = endpointMap.get(deviceName); + if (endpoint == null) { + throw new BizException(ErrCode.SEND_DESTINATION_NOT_FOUND); + } + Future result = endpoint.publish(topic, Buffer.buffer(msg), + MqttQoS.AT_LEAST_ONCE, false, false); + result.onFailure(e -> log.error("public topic failed", e)); + result.onSuccess(integer -> log.info("publish success,topic:{},payload:{}", topic, msg)); + } + +} diff --git a/mqtt-plugin/src/main/resources/application.yml b/mqtt-plugin/src/main/resources/application.yml new file mode 100644 index 0000000..778daac --- /dev/null +++ b/mqtt-plugin/src/main/resources/application.yml @@ -0,0 +1,6 @@ +plugin: + runMode: prod + mainPackage: cc.iotkit.plugin + +mqtt: + port: 1883 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..599aae3 --- /dev/null +++ b/pom.xml @@ -0,0 +1,76 @@ + + + 4.0.0 + + mqtt-plugin + http-plugin + + + + org.springframework.boot + spring-boot-starter-parent + 2.7.11 + + + 1.0.0 + + cc.iotkit.plugins + iot-iita-plugins + pom + + + 11 + 2.7.11 + 3.1.2 + 4.2.2 + + + + + + cc.iotkit + iot-plugin-core + 0.5.0-SNAPSHOT + + + + com.gitee.starblues + spring-brick + ${spring-brick.version} + + + + com.gitee.starblues + spring-brick-bootstrap + ${spring-brick.version} + + + + + org.springframework.boot + spring-boot-dependencies + pom + import + ${spring-boot.version} + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework + spring-web + + + + org.projectlombok + lombok + + + + + \ No newline at end of file