master
xiwa 2023-09-04 08:40:14 +08:00
parent 3e8360ad02
commit aa041ae0c4
20 changed files with 1357 additions and 1 deletions

12
.gitignore vendored
View File

@ -2,6 +2,7 @@
*.class *.class
# Log file # Log file
log
*.log *.log
# BlueJ files # BlueJ files
@ -11,7 +12,6 @@
.mtj.tmp/ .mtj.tmp/
# Package Files # # Package Files #
*.jar
*.war *.war
*.nar *.nar
*.ear *.ear
@ -21,3 +21,13 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid* hs_err_pid*
.idea
target
*.iml
data/elasticsearch
.init
*.db
.flattened-pom.xml
.DS_Store
dependency-reduced-pom.xml

84
http-plugin/pom.xml Normal file
View File

@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-iita-plugins</artifactId>
<groupId>cc.iotkit.plugins</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>http-plugin</artifactId>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<plugin.build.mode>dev</plugin.build.mode>
</properties>
</profile>
<profile>
<id>prod</id>
<properties>
<plugin.build.mode>prod</plugin.build.mode>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>com.gitee.starblues</groupId>
<artifactId>spring-brick-maven-packager</artifactId>
<configuration>
<mode>${plugin.build.mode}</mode>
<pluginInfo>
<id>http-plugin</id>
<bootstrapClass>cc.iotkit.plugins.http.Application</bootstrapClass>
<version>1.0.0</version>
<provider>iita</provider>
<description>http示例插件</description>
<configFileName>application.yml</configFileName>
</pluginInfo>
<prodConfig>
<packageType>jar-outer</packageType>
</prodConfig>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,19 @@
package cc.iotkit.plugins.http;
import com.gitee.starblues.bootstrap.SpringPluginBootstrap;
import com.gitee.starblues.bootstrap.annotation.OneselfConfig;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
/**
* @author sjg
*/
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.http"})
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
public class Application extends SpringPluginBootstrap {
public static void main(String[] args) {
new Application().run(Application.class, args);
}
}

View File

@ -0,0 +1,21 @@
package cc.iotkit.plugins.http.conf;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugins.http.service.FakeThingService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author sjg
*/
@Component
public class BeanConfig {
@Bean
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
IThingService getThingService() {
return new FakeThingService();
}
}

View File

@ -0,0 +1,23 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.plugins.http.conf;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "http")
public class HttpConfig {
private int port;
}

View File

@ -0,0 +1,49 @@
package cc.iotkit.plugins.http.service;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.product.Product;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
import io.vertx.core.json.JsonObject;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
*
*
* @author sjg
*/
@Slf4j
public class FakeThingService implements IThingService {
@Override
public ActionResult post(String pluginId, IDeviceAction action) {
log.info("post action:{}", action);
return ActionResult.builder().code(0).build();
}
@Override
public Product getProduct(String pk) {
return Product.builder()
.productKey("cGCrkK7Ex4FESAwe")
.productSecret("aaaaaaaa")
.build();
}
@Override
public DeviceInfo getDevice(String dn) {
return DeviceInfo.builder()
.productKey("cGCrkK7Ex4FESAwe")
.deviceName(dn)
.secret("mBCr3TKstTj2KeM6")
.build();
}
@Override
public Map<String, ?> getProperty(String dn) {
return new JsonObject().put("powerstate", 1).getMap();
}
}

View File

@ -0,0 +1,83 @@
package cc.iotkit.plugins.http.service;
import cc.iotkit.common.enums.ErrCode;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.plugin.core.thing.IDevice;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig;
import cc.iotkit.plugin.core.thing.actions.down.PropertyGet;
import cc.iotkit.plugin.core.thing.actions.down.PropertySet;
import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke;
import io.vertx.core.json.JsonObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* http
*
* @author sjg
*/
@Service
public class HttpDevice implements IDevice {
@Autowired
private HttpVerticle httpVerticle;
@Override
public ActionResult config(DeviceConfig action) {
return ActionResult.builder().code(0).reason("").build();
}
@Override
public ActionResult propertyGet(PropertyGet action) {
String topic = String.format("/sys/%s/%s/c/service/property/get", action.getProductKey(), action.getDeviceName());
return send(
topic,
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service.property.get")
.put("params", action.getKeys())
.toString()
);
}
@Override
public ActionResult propertySet(PropertySet action) {
String topic = String.format("/sys/%s/%s/c/service/property/set", action.getProductKey(), action.getDeviceName());
return send(
topic,
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service.property.set")
.put("params", action.getParams())
.toString()
);
}
@Override
public ActionResult serviceInvoke(ServiceInvoke action) {
String topic = String.format("/sys/%s/%s/c/service/%s", action.getProductKey(), action.getDeviceName(), action.getName());
return send(
topic,
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service." + action.getName())
.put("params", action.getParams())
.toString()
);
}
private ActionResult send(String topic, String deviceName, String payload) {
try {
return ActionResult.builder().code(0).reason("").build();
} catch (BizException e) {
return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build();
} catch (Exception e) {
return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build();
}
}
}

View File

@ -0,0 +1,71 @@
package cc.iotkit.plugins.http.service;
import com.gitee.starblues.bootstrap.realize.PluginCloseListener;
import com.gitee.starblues.core.PluginCloseType;
import com.gitee.starblues.core.PluginInfo;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
/**
* @author sjg
*/
@Slf4j
@Service
public class HttpPlugin implements PluginCloseListener {
@Autowired
private PluginInfo pluginInfo;
@Autowired
private HttpVerticle httpVerticle;
private Vertx vertx;
private CountDownLatch countDownLatch;
private String deployedId;
@PostConstruct
public void init() {
vertx = Vertx.vertx();
try {
countDownLatch = new CountDownLatch(1);
Future<String> future = vertx.deployVerticle(httpVerticle);
future.onSuccess((s -> {
deployedId = s;
countDownLatch.countDown();
}));
future.onFailure((e) -> {
countDownLatch.countDown();
log.error("start mqtt plugin failed", e);
});
countDownLatch.await();
future.succeeded();
} catch (Throwable e) {
log.error("start mqtt plugin error.", e);
}
}
@Override
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
try {
httpVerticle.stop();
Future<Void> future = vertx.undeploy(deployedId);
future.onSuccess(unused -> log.info("stop mqtt plugin success"));
if (closeType == PluginCloseType.UNINSTALL) {
log.info("插件被卸载了:{}", pluginInfo.getPluginId());
} else if (closeType == PluginCloseType.STOP) {
log.info("插件被关闭了:{}", pluginInfo.getPluginId());
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) {
log.info("插件被升级卸载了:{}", pluginInfo.getPluginId());
}
} catch (Throwable e) {
log.error("stop mqtt plugin error.", e);
}
}
}

View File

@ -0,0 +1,193 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.plugins.http.service;
import cc.iotkit.common.utils.StringUtils;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.DeviceState;
import cc.iotkit.plugin.core.thing.actions.EventLevel;
import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange;
import cc.iotkit.plugin.core.thing.actions.up.EventReport;
import cc.iotkit.plugin.core.thing.actions.up.PropertyReport;
import cc.iotkit.plugins.http.conf.HttpConfig;
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
import com.gitee.starblues.core.PluginInfo;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
* mqtt
* http://iotkit-open-source.gitee.io/document/pages/device_protocol/http/#%E4%BA%8B%E4%BB%B6%E4%B8%8A%E6%8A%A5
*
* @author sjg
*/
@Slf4j
@Component
public class HttpVerticle extends AbstractVerticle implements Handler<RoutingContext> {
@Autowired
private HttpConfig config;
@Autowired
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
private IThingService thingService;
@Autowired
private PluginInfo pluginInfo;
private static final Set<String> DEVICE_ONLINE = new HashSet<>();
@Override
public void start() {
HttpServer httpServer = vertx.createHttpServer();
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()).handler(this);
httpServer.requestHandler(router).listen(config.getPort(), ar -> {
if (ar.succeeded()) {
log.info("http server is listening on port " + ar.result().actualPort());
} else {
log.error("Error on starting the server", ar.cause());
}
});
}
@Override
public void handle(RoutingContext ctx) {
HttpServerResponse response = ctx.response();
response.putHeader("content-type", "application/json");
response.setStatusCode(200);
try {
String secret = ctx.request().getHeader("secret");
if (StringUtils.isBlank(secret)) {
log.error("secret不能为空");
response.setStatusCode(401);
end(response);
return;
}
HttpServerRequest request = ctx.request();
// /sys/{productKey}/{deviceName}/properties
String path = request.path();
String[] parts = path.split("/");
if (parts.length < 5) {
log.error("不正确的路径");
response.setStatusCode(500);
}
String productKey = parts[2];
String deviceName = parts[3];
String type = parts[4];
DeviceInfo device = thingService.getDevice(deviceName);
if (device == null) {
log.error("认证失败,设备:{} 不存在", deviceName);
response.setStatusCode(401);
end(response);
return;
}
if (!secret.equalsIgnoreCase(device.getSecret())) {
log.error("认证失败secret不正确期望值:{}", device.getSecret());
response.setStatusCode(401);
end(response);
return;
}
//设备上线
if (!DEVICE_ONLINE.contains(deviceName)) {
thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder()
.id(UUID.randomUUID().toString())
.productKey(productKey)
.deviceName(deviceName)
.state(DeviceState.ONLINE)
.time(System.currentTimeMillis())
.build());
DEVICE_ONLINE.add(deviceName);
}
String method = request.method().name();
JsonObject payload = ctx.getBodyAsJson();
if ("event".equals(type)) {
//事件上报
if ("POST".equalsIgnoreCase(method)) {
response.setStatusCode(500);
log.error("请求类型不正确,期望值:POST实际值:{}", method);
end(response);
}
thingService.post(
pluginInfo.getPluginId(),
EventReport.builder()
.id(payload.getString("id"))
.productKey(productKey)
.deviceName(deviceName)
.level(EventLevel.INFO)
.name(parts[3])
.params(payload.getJsonObject("params").getMap())
.build()
);
end(response);
return;
}
if ("properties".equals(type)) {
if ("POST".equalsIgnoreCase(method)) {
//属性上报
thingService.post(
pluginInfo.getPluginId(),
PropertyReport.builder()
.id(UUID.randomUUID().toString())
.productKey(productKey)
.deviceName(deviceName)
.params(payload.getJsonObject("params").getMap())
.build()
);
end(response);
return;
}
if ("GET".equalsIgnoreCase(method)) {
//属性获取
Map<String, ?> property = thingService.getProperty(deviceName);
response.end(new JsonObject()
.put("code", 0)
.put("data", property)
.toString());
}
}
} catch (Exception e) {
log.error("消息处理失败", e);
response.setStatusCode(500);
end(response);
}
}
private void end(HttpServerResponse response) {
response.end(new JsonObject()
.put("code", response.getStatusCode() == 200 ? 0 : response.getStatusCode())
.toString());
}
}

View File

@ -0,0 +1,6 @@
plugin:
runMode: dev
mainPackage: cc.iotkit.plugin
http:
port: 9081

89
mqtt-plugin/pom.xml Normal file
View File

@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-iita-plugins</artifactId>
<groupId>cc.iotkit.plugins</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>mqtt-plugin</artifactId>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<plugin.build.mode>dev</plugin.build.mode>
</properties>
</profile>
<profile>
<id>prod</id>
<properties>
<plugin.build.mode>prod</plugin.build.mode>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>com.gitee.starblues</groupId>
<artifactId>spring-brick-maven-packager</artifactId>
<configuration>
<mode>${plugin.build.mode}</mode>
<pluginInfo>
<id>mqtt-plugin</id>
<bootstrapClass>cc.iotkit.plugins.mqtt.Application</bootstrapClass>
<version>1.0.0</version>
<provider>iita</provider>
<description>mqtt示例插件</description>
<configFileName>application.yml</configFileName>
</pluginInfo>
<prodConfig>
<packageType>jar-outer</packageType>
</prodConfig>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,19 @@
package cc.iotkit.plugins.mqtt;
import com.gitee.starblues.bootstrap.SpringPluginBootstrap;
import com.gitee.starblues.bootstrap.annotation.OneselfConfig;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
/**
* @author sjg
*/
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.mqtt"})
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
public class Application extends SpringPluginBootstrap {
public static void main(String[] args) {
new Application().run(Application.class, args);
}
}

View File

@ -0,0 +1,21 @@
package cc.iotkit.plugins.mqtt.conf;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugins.mqtt.service.FakeThingService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author sjg
*/
@Component
public class BeanConfig {
@Bean
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
IThingService getThingService() {
return new FakeThingService();
}
}

View File

@ -0,0 +1,31 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.plugins.mqtt.conf;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
private int port;
private String sslKey;
private String sslCert;
private boolean ssl;
private boolean useWebSocket;
}

View File

@ -0,0 +1,47 @@
package cc.iotkit.plugins.mqtt.service;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.product.Product;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
*
*
* @author sjg
*/
@Slf4j
public class FakeThingService implements IThingService {
@Override
public ActionResult post(String pluginId, IDeviceAction action) {
log.info("post action:{}", action);
return ActionResult.builder().code(0).build();
}
@Override
public Product getProduct(String pk) {
return Product.builder()
.productKey("cGCrkK7Ex4FESAwe")
.productSecret("aaaaaaaa")
.build();
}
@Override
public DeviceInfo getDevice(String dn) {
return DeviceInfo.builder()
.productKey("cGCrkK7Ex4FESAwe")
.deviceName(dn)
.build();
}
@Override
public Map<String, ?> getProperty(String dn) {
return new HashMap<>(0);
}
}

View File

@ -0,0 +1,88 @@
package cc.iotkit.plugins.mqtt.service;
import cc.iotkit.common.enums.ErrCode;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.plugin.core.thing.IDevice;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig;
import cc.iotkit.plugin.core.thing.actions.down.PropertyGet;
import cc.iotkit.plugin.core.thing.actions.down.PropertySet;
import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke;
import io.vertx.core.json.JsonObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* mqtt
*
* @author sjg
*/
@Service
public class MqttDevice implements IDevice {
@Autowired
private MqttVerticle mqttVerticle;
@Override
public ActionResult config(DeviceConfig action) {
return ActionResult.builder().code(0).reason("").build();
}
@Override
public ActionResult propertyGet(PropertyGet action) {
String topic = String.format("/sys/%s/%s/c/service/property/get", action.getProductKey(), action.getDeviceName());
return send(
topic,
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service.property.get")
.put("params", action.getKeys())
.toString()
);
}
@Override
public ActionResult propertySet(PropertySet action) {
String topic = String.format("/sys/%s/%s/c/service/property/set", action.getProductKey(), action.getDeviceName());
return send(
topic,
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service.property.set")
.put("params", action.getParams())
.toString()
);
}
@Override
public ActionResult serviceInvoke(ServiceInvoke action) {
String topic = String.format("/sys/%s/%s/c/service/%s", action.getProductKey(), action.getDeviceName(), action.getName());
return send(
topic,
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service." + action.getName())
.put("params", action.getParams())
.toString()
);
}
private ActionResult send(String topic, String deviceName, String payload) {
try {
mqttVerticle.publish(
deviceName,
topic,
payload
);
return ActionResult.builder().code(0).reason("").build();
} catch (BizException e) {
return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build();
} catch (Exception e) {
return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build();
}
}
}

View File

@ -0,0 +1,71 @@
package cc.iotkit.plugins.mqtt.service;
import com.gitee.starblues.bootstrap.realize.PluginCloseListener;
import com.gitee.starblues.core.PluginCloseType;
import com.gitee.starblues.core.PluginInfo;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
/**
* @author sjg
*/
@Slf4j
@Service
public class MqttPlugin implements PluginCloseListener {
@Autowired
private PluginInfo pluginInfo;
@Autowired
private MqttVerticle mqttVerticle;
private Vertx vertx;
private CountDownLatch countDownLatch;
private String deployedId;
@PostConstruct
public void init() {
vertx = Vertx.vertx();
try {
countDownLatch = new CountDownLatch(1);
Future<String> future = vertx.deployVerticle(mqttVerticle);
future.onSuccess((s -> {
deployedId = s;
countDownLatch.countDown();
}));
future.onFailure((e) -> {
countDownLatch.countDown();
log.error("start mqtt plugin failed", e);
});
countDownLatch.await();
future.succeeded();
} catch (Throwable e) {
log.error("start mqtt plugin error.", e);
}
}
@Override
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
try {
mqttVerticle.stop();
Future<Void> future = vertx.undeploy(deployedId);
future.onSuccess(unused -> log.info("stop mqtt plugin success"));
if (closeType == PluginCloseType.UNINSTALL) {
log.info("插件被卸载了:{}", pluginInfo.getPluginId());
} else if (closeType == PluginCloseType.STOP) {
log.info("插件被关闭了:{}", pluginInfo.getPluginId());
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) {
log.info("插件被升级卸载了:{}", pluginInfo.getPluginId());
}
} catch (Throwable e) {
log.error("stop mqtt plugin error.", e);
}
}
}

View File

@ -0,0 +1,349 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.plugins.mqtt.service;
import cc.iotkit.common.enums.ErrCode;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.common.utils.CodecUtil;
import cc.iotkit.common.utils.UniqueIdUtil;
import cc.iotkit.model.product.Product;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.DeviceState;
import cc.iotkit.plugin.core.thing.actions.EventLevel;
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
import cc.iotkit.plugin.core.thing.actions.up.*;
import cc.iotkit.plugins.mqtt.conf.MqttConfig;
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
import com.gitee.starblues.core.PluginInfo;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.mqtt.*;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* mqtt
* http://iotkit-open-source.gitee.io/document/pages/device_protocol/mqtt/#%E7%BD%91%E5%85%B3%E8%BF%9E%E6%8E%A5%E5%92%8C%E6%B3%A8%E5%86%8C
*
* @author sjg
*/
@Slf4j
@Component
public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoint> {
private MqttServer mqttServer;
private final Map<String, MqttEndpoint> endpointMap = new HashMap<>();
/**
* clientid-mqttmqtthandler线
*/
private static final Map<String, Boolean> MQTT_CONNECT_POOL = new ConcurrentHashMap<>();
@Autowired
private MqttConfig config;
@Autowired
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
private IThingService thingService;
@Autowired
private PluginInfo pluginInfo;
@Override
public void start() {
MqttServerOptions options = new MqttServerOptions()
.setPort(config.getPort());
if (config.isSsl()) {
options = options.setSsl(true)
.setKeyCertOptions(new PemKeyCertOptions()
.setKeyPath(config.getSslKey())
.setCertPath(config.getSslCert()));
}
options.setUseWebSocket(config.isUseWebSocket());
mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(this::handle).listen(ar -> {
if (ar.succeeded()) {
log.info("MQTT server is listening on port " + ar.result().actualPort());
} else {
log.error("Error on starting the server", ar.cause());
}
});
}
@Override
public void handle(MqttEndpoint endpoint) {
log.info("MQTT client:{} request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession());
MqttAuth auth = endpoint.auth();
if (auth == null) {
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
return;
}
//mqtt连接认证信息
/*
* mqttClientId: productKey_deviceName_model
* mqttUserName: deviceName
* mqttPassword: md5(,mqttClientId)
*/
String clientId = endpoint.clientIdentifier();
String[] parts = clientId.split("_");
if (parts.length < 3) {
log.error("clientId:{}不正确", clientId);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID);
return;
}
log.info("MQTT client auth,clientId:{},username:{},password:{}",
clientId, auth.getUsername(), auth.getPassword());
String productKey = parts[0];
String deviceName = parts[1];
if (!auth.getUsername().equals(deviceName)) {
log.error("username:{}不正确", deviceName);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
return;
}
Product product = thingService.getProduct(productKey);
if (product == null) {
log.error("获取产品信息失败,productKey:{}", productKey);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
return;
}
String validPasswd = CodecUtil.md5Str(product.getProductSecret() + clientId);
if (!validPasswd.equalsIgnoreCase(auth.getPassword())) {
log.error("密码验证失败,期望值:{}", validPasswd);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
return;
}
//设备注册
ActionResult result = thingService.post(
pluginInfo.getPluginId(),
fillAction(
DeviceRegister.builder()
.model(parts[2])
.version("1.0")
.build()
, productKey, deviceName
)
);
if (result.getCode() != 0) {
log.error("设备注册失败:{}", result);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
return;
}
//保存设备与连接关系
endpointMap.put(deviceName, endpoint);
MQTT_CONNECT_POOL.put(clientId, true);
log.info("MQTT client keep alive timeout = {} ", endpoint.keepAliveTimeSeconds());
endpoint.accept(false);
endpoint.closeHandler((v) -> {
log.warn("client connection closed,clientId:{}", clientId);
if (Boolean.FALSE.equals(MQTT_CONNECT_POOL.get(clientId))) {
MQTT_CONNECT_POOL.remove(clientId);
return;
}
//下线
thingService.post(
pluginInfo.getPluginId(),
fillAction(
DeviceStateChange.builder()
.state(DeviceState.OFFLINE)
.build()
, productKey, deviceName
)
);
//删除设备与连接关系
endpointMap.remove(deviceName);
}).disconnectMessageHandler(disconnectMessage -> {
log.info("Received disconnect from client, reason code = {}", disconnectMessage.code());
//删除设备与连接关系
endpointMap.remove(deviceName);
MQTT_CONNECT_POOL.put(clientId, false);
}).subscribeHandler(subscribe -> {
//上线
thingService.post(
pluginInfo.getPluginId(),
fillAction(DeviceStateChange.builder()
.state(DeviceState.ONLINE)
.build()
, productKey, deviceName
)
);
List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();
for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
log.info("Subscription for {},with QoS {}", s.topicName(), s.qualityOfService());
try {
String topic = s.topicName();
//topic订阅验证 /sys/{productKey}/{deviceName}/#
String regex = String.format("^/sys/%s/%s/.*", productKey, deviceName);
if (!topic.matches(regex)) {
log.error("subscript topic:{} incorrect,regex:{}", topic, regex);
continue;
}
reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
} catch (Throwable e) {
log.error("subscribe failed,topic:" + s.topicName(), e);
reasonCodes.add(MqttSubAckReasonCode.NOT_AUTHORIZED);
}
}
// ack the subscriptions request
endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
}).unsubscribeHandler(unsubscribe -> {
//下线
thingService.post(
pluginInfo.getPluginId(),
fillAction(
DeviceStateChange.builder()
.state(DeviceState.OFFLINE)
.build()
, productKey, deviceName
)
);
// ack the subscriptions request
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
}).publishHandler(message -> {
JsonObject payload = message.payload().toJsonObject();
log.info("Received message:{}, with QoS {}", payload,
message.qosLevel());
if (payload.isEmpty()) {
return;
}
String topic = message.topicName();
try {
JsonObject defParams = JsonObject.mapFrom(new HashMap<>(0));
IDeviceAction action = null;
String method = payload.getString("method", "");
if ("thing.event.property.post".equalsIgnoreCase(method)) {
//属性上报
action = PropertyReport.builder()
.params(payload.getJsonObject("params", defParams).getMap())
.build();
reply(endpoint, topic, payload);
} else if (method.startsWith("thing.event.")) {
//事件上报
action = EventReport.builder()
.name(method.replace("thing.event.", ""))
.level(EventLevel.INFO)
.params(payload.getJsonObject("params", defParams).getMap())
.build();
reply(endpoint, topic, payload);
} else if (method.startsWith("thing.service.") && method.endsWith("_reply")) {
//服务回复
action = ServiceReply.builder()
.name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1"))
.code(payload.getInteger("code", 0))
.params(payload.getJsonObject("data", defParams).getMap())
.build();
}
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId());
}
if (action == null) {
return;
}
action.setId(payload.getString("id"));
action.setProductKey(productKey);
action.setDeviceName(deviceName);
action.setTime(System.currentTimeMillis());
thingService.post(pluginInfo.getPluginId(), action);
} catch (Throwable e) {
log.error("handler message failed,topic:" + message.topicName(), e);
}
}).publishReleaseHandler(endpoint::publishComplete);
}
/**
*
*/
private void reply(MqttEndpoint endpoint, String topic, JsonObject payload) {
Map<String, Object> payloadReply = new HashMap<>();
payloadReply.put("id", payload.getString("id"));
payloadReply.put("method", payload.getString("method") + "_reply");
payloadReply.put("code", 0);
endpoint.publish(topic + "_reply", JsonObject.mapFrom(payloadReply).toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);
}
private IDeviceAction fillAction(IDeviceAction action, String productKey, String deviceName) {
action.setId(UniqueIdUtil.newRequestId());
action.setProductKey(productKey);
action.setDeviceName(deviceName);
action.setTime(System.currentTimeMillis());
return action;
}
@Override
public void stop() {
for (MqttEndpoint endpoint : endpointMap.values()) {
String clientId = endpoint.clientIdentifier();
String[] parts = clientId.split("_");
if (parts.length < 3) {
continue;
}
//下线
thingService.post(
pluginInfo.getPluginId(),
fillAction(
DeviceStateChange.builder()
.state(DeviceState.OFFLINE)
.build(),
parts[0],
parts[1]
)
);
}
mqttServer.close(voidAsyncResult -> log.info("close mqtt server..."));
}
public void publish(String deviceName, String topic, String msg) {
MqttEndpoint endpoint = endpointMap.get(deviceName);
if (endpoint == null) {
throw new BizException(ErrCode.SEND_DESTINATION_NOT_FOUND);
}
Future<Integer> result = endpoint.publish(topic, Buffer.buffer(msg),
MqttQoS.AT_LEAST_ONCE, false, false);
result.onFailure(e -> log.error("public topic failed", e));
result.onSuccess(integer -> log.info("publish success,topic:{},payload:{}", topic, msg));
}
}

View File

@ -0,0 +1,6 @@
plugin:
runMode: prod
mainPackage: cc.iotkit.plugin
mqtt:
port: 1883

76
pom.xml Normal file
View File

@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<module>mqtt-plugin</module>
<module>http-plugin</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/>
</parent>
<version>1.0.0</version>
<groupId>cc.iotkit.plugins</groupId>
<artifactId>iot-iita-plugins</artifactId>
<packaging>pom</packaging>
<properties>
<java.version>11</java.version>
<spring-boot.version>2.7.11</spring-boot.version>
<spring-brick.version>3.1.2</spring-brick.version>
<vertx.version>4.2.2</vertx.version>
</properties>
<dependencies>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>iot-plugin-core</artifactId>
<version>0.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.gitee.starblues</groupId>
<artifactId>spring-brick</artifactId>
<version>${spring-brick.version}</version>
</dependency>
<dependency>
<groupId>com.gitee.starblues</groupId>
<artifactId>spring-brick-bootstrap</artifactId>
<version>${spring-brick.version}</version>
</dependency>
<!-- SpringBoot的依赖配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<type>pom</type>
<scope>import</scope>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>