feat:增加emqx插件
parent
2dfea92653
commit
9cfe8ddb09
|
@ -10,7 +10,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
|||
* @Author:tfd
|
||||
* @Date:2023/12/14 16:25
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.dlt645"})
|
||||
@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.dlt645")
|
||||
@OneselfConfig(mainConfigFileName = {"application.yml"})
|
||||
@EnableConfigurationProperties
|
||||
@EnableScheduling
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>iot-iita-plugins</artifactId>
|
||||
<groupId>cc.iotkit.plugins</groupId>
|
||||
<version>1.0.1</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>emqx-plugin</artifactId>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-core</artifactId>
|
||||
<version>${vertx.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-mqtt</artifactId>
|
||||
<version>${vertx.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-web-proxy</artifactId>
|
||||
<version>${vertx.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>dev</id>
|
||||
<activation>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<properties>
|
||||
<plugin.build.mode>dev</plugin.build.mode>
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>prod</id>
|
||||
<properties>
|
||||
<plugin.build.mode>prod</plugin.build.mode>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>com.gitee.starblues</groupId>
|
||||
<artifactId>spring-brick-maven-packager</artifactId>
|
||||
<version>${spring-brick.version}</version>
|
||||
<configuration>
|
||||
<mode>${plugin.build.mode}</mode>
|
||||
<pluginInfo>
|
||||
<id>emqx-plugin</id>
|
||||
<bootstrapClass>cc.iotkit.plugins.emqx.Application</bootstrapClass>
|
||||
<version>${project.version}</version>
|
||||
<provider>iita</provider>
|
||||
<description>emqx示例插件</description>
|
||||
<configFileName>application.yml</configFileName>
|
||||
</pluginInfo>
|
||||
<prodConfig>
|
||||
<packageType>jar</packageType>
|
||||
</prodConfig>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>repackage</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,19 @@
|
|||
package cc.iotkit.plugins.emqx;
|
||||
|
||||
import com.gitee.starblues.bootstrap.SpringPluginBootstrap;
|
||||
import com.gitee.starblues.bootstrap.annotation.OneselfConfig;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
|
||||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.emqx")
|
||||
@OneselfConfig(mainConfigFileName = {"application.yml"})
|
||||
@EnableConfigurationProperties
|
||||
public class Application extends SpringPluginBootstrap {
|
||||
|
||||
public static void main(String[] args) {
|
||||
new Application().run(Application.class, args);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package cc.iotkit.plugins.emqx.conf;
|
||||
|
||||
import cc.iotkit.plugin.core.IPluginConfig;
|
||||
import cc.iotkit.plugin.core.LocalPluginConfig;
|
||||
import cc.iotkit.plugin.core.thing.IThingService;
|
||||
import cc.iotkit.plugins.emqx.service.FakeThingService;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@Component
|
||||
public class BeanConfig {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
|
||||
IThingService getThingService() {
|
||||
return new FakeThingService();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
|
||||
IPluginConfig getPluginConfig(){
|
||||
return new LocalPluginConfig();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* +----------------------------------------------------------------------
|
||||
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||
* +----------------------------------------------------------------------
|
||||
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||
* +----------------------------------------------------------------------
|
||||
* | Author: xw2sy@163.com
|
||||
* +----------------------------------------------------------------------
|
||||
*/
|
||||
package cc.iotkit.plugins.emqx.conf;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "emqx")
|
||||
public class MqttConfig {
|
||||
|
||||
private String host;
|
||||
|
||||
private int port;
|
||||
|
||||
private boolean ssl;
|
||||
|
||||
private String topics;
|
||||
|
||||
private int authPort;
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package cc.iotkit.plugins.emqx.handler;
|
||||
|
||||
import io.vertx.core.json.JsonObject;
|
||||
|
||||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
public interface IMsgHandler {
|
||||
|
||||
void handle(String topic, JsonObject payload);
|
||||
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
package cc.iotkit.plugins.emqx.service;
|
||||
/*
|
||||
* +----------------------------------------------------------------------
|
||||
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||
* +----------------------------------------------------------------------
|
||||
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||
* +----------------------------------------------------------------------
|
||||
* | Author: xw2sy@163.com
|
||||
* +----------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
import cc.iotkit.common.utils.CodecUtil;
|
||||
import cc.iotkit.plugin.core.thing.IThingService;
|
||||
import cc.iotkit.plugin.core.thing.model.ThingProduct;
|
||||
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.http.HttpMethod;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.core.http.HttpServerResponse;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.handler.BodyHandler;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
public class AuthVerticle extends AbstractVerticle {
|
||||
|
||||
private HttpServer backendServer;
|
||||
|
||||
@Setter
|
||||
private int port;
|
||||
|
||||
@Setter
|
||||
private String serverPassword;
|
||||
|
||||
@Autowired
|
||||
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
|
||||
private IThingService thingService;
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
backendServer = vertx.createHttpServer();
|
||||
|
||||
//第一步 声明Router&初始化Router
|
||||
Router backendRouter = Router.router(vertx);
|
||||
//获取body参数,得先添加这句
|
||||
backendRouter.route().handler(BodyHandler.create());
|
||||
|
||||
//第二步 配置Router解析url
|
||||
backendRouter.route(HttpMethod.POST, "/mqtt/auth").handler(rc -> {
|
||||
JsonObject json = rc.getBodyAsJson();
|
||||
log.info("mqtt auth:{}", json);
|
||||
try {
|
||||
String clientId = json.getString("clientid");
|
||||
String username = json.getString("username");
|
||||
String password = json.getString("password");
|
||||
|
||||
//服务端插件连接
|
||||
if (clientId.equals("server") && serverPassword.equals(password)) {
|
||||
httpResult(rc.response(), 200);
|
||||
return;
|
||||
}
|
||||
|
||||
//其它客户端连接
|
||||
String[] parts = clientId.split("_");
|
||||
if (parts.length < 3) {
|
||||
log.error("clientid:{}不正确", clientId);
|
||||
httpResult(rc.response(), 400);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("MQTT client auth,clientId:{},username:{},password:{}",
|
||||
clientId, username, password);
|
||||
|
||||
String productKey = parts[0];
|
||||
String deviceName = parts[1];
|
||||
if (!username.equals(deviceName)) {
|
||||
log.error("username:{}不正确", deviceName);
|
||||
httpResult(rc.response(), 403);
|
||||
return;
|
||||
}
|
||||
|
||||
ThingProduct product = thingService.getProduct(productKey);
|
||||
if (product == null) {
|
||||
log.error("获取产品信息失败,productKey:{}", productKey);
|
||||
httpResult(rc.response(), 403);
|
||||
return;
|
||||
}
|
||||
|
||||
String validPasswd = CodecUtil.md5Str(product.getProductSecret() + clientId);
|
||||
if (!validPasswd.equalsIgnoreCase(password)) {
|
||||
log.error("密码验证失败,期望值:{}", validPasswd);
|
||||
httpResult(rc.response(), 403);
|
||||
return;
|
||||
}
|
||||
|
||||
Set<String> devices = new HashSet<>();
|
||||
devices.add(parts[0] + "," + parts[1]);
|
||||
EmqxPlugin.CLIENT_DEVICE_MAP.putIfAbsent(parts[0] + parts[1], devices);
|
||||
|
||||
httpResult(rc.response(), 200);
|
||||
} catch (Throwable e) {
|
||||
httpResult(rc.response(), 500);
|
||||
log.error("mqtt auth failed", e);
|
||||
}
|
||||
});
|
||||
backendRouter.route(HttpMethod.POST, "/mqtt/acl").handler(rc -> {
|
||||
String json = rc.getBodyAsString();
|
||||
log.info("mqtt acl:{}", json);
|
||||
try {
|
||||
httpResult(rc.response(), 200);
|
||||
} catch (Throwable e) {
|
||||
httpResult(rc.response(), 500);
|
||||
log.error("mqtt acl failed", e);
|
||||
}
|
||||
});
|
||||
|
||||
backendServer.requestHandler(backendRouter)
|
||||
.listen(port, "0.0.0.0")
|
||||
.onSuccess(s -> {
|
||||
log.info("auth server start success,port:{}", s.actualPort());
|
||||
}).onFailure(e -> {
|
||||
e.printStackTrace();
|
||||
})
|
||||
;
|
||||
}
|
||||
|
||||
private void httpResult(HttpServerResponse response, int code) {
|
||||
response.putHeader("Content-Type", "application/json");
|
||||
response
|
||||
.setStatusCode(code);
|
||||
response
|
||||
.end("{\"result\": \"" + (code == 200 ? "allow" : "deny") + "\"}");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
backendServer.close(voidAsyncResult -> log.info("close emqx auth server..."));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,380 @@
|
|||
package cc.iotkit.plugins.emqx.service;
|
||||
|
||||
import cc.iotkit.common.utils.StringUtils;
|
||||
import cc.iotkit.common.utils.ThreadUtil;
|
||||
import cc.iotkit.common.utils.UniqueIdUtil;
|
||||
import cc.iotkit.plugin.core.IPlugin;
|
||||
import cc.iotkit.plugin.core.IPluginConfig;
|
||||
import cc.iotkit.plugin.core.thing.IThingService;
|
||||
import cc.iotkit.plugin.core.thing.actions.ActionResult;
|
||||
import cc.iotkit.plugin.core.thing.actions.DeviceState;
|
||||
import cc.iotkit.plugin.core.thing.actions.EventLevel;
|
||||
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
|
||||
import cc.iotkit.plugin.core.thing.actions.up.*;
|
||||
import cc.iotkit.plugin.core.thing.model.ThingDevice;
|
||||
import cc.iotkit.plugins.emqx.conf.MqttConfig;
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.core.bean.copier.CopyOptions;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
|
||||
import com.gitee.starblues.bootstrap.realize.PluginCloseListener;
|
||||
import com.gitee.starblues.core.PluginCloseType;
|
||||
import com.gitee.starblues.core.PluginInfo;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.mqtt.MqttClient;
|
||||
import io.vertx.mqtt.MqttClientOptions;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class EmqxPlugin implements PluginCloseListener, IPlugin, Runnable {
|
||||
|
||||
@Autowired
|
||||
private PluginInfo pluginInfo;
|
||||
@Autowired
|
||||
private MqttConfig mqttConfig;
|
||||
|
||||
@Autowired
|
||||
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
|
||||
private IPluginConfig pluginConfig;
|
||||
|
||||
@Autowired
|
||||
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
|
||||
private IThingService thingService;
|
||||
|
||||
@Autowired
|
||||
private AuthVerticle authVerticle;
|
||||
|
||||
@Autowired
|
||||
private MqttDevice mqttDevice;
|
||||
|
||||
private final ScheduledThreadPoolExecutor emqxConnectTask = ThreadUtil.newScheduled(1, "emqx_connect");
|
||||
|
||||
private Vertx vertx;
|
||||
private String deployedId;
|
||||
|
||||
private MqttClient client;
|
||||
|
||||
private boolean mqttConnected = false;
|
||||
|
||||
private boolean authServerStarted = false;
|
||||
|
||||
private static final Map<String, Boolean> DEVICE_ONLINE = new ConcurrentHashMap<>();
|
||||
|
||||
public static final Map<String, Set<String>> CLIENT_DEVICE_MAP = new HashMap<>();
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
vertx = Vertx.vertx();
|
||||
try {
|
||||
//获取插件最新配置替换当前配置
|
||||
Map<String, Object> config = pluginConfig.getConfig(pluginInfo.getPluginId());
|
||||
BeanUtil.copyProperties(config, mqttConfig, CopyOptions.create().ignoreNullValue());
|
||||
|
||||
String serverPassword = IdUtil.fastSimpleUUID();
|
||||
MqttClientOptions options = new MqttClientOptions()
|
||||
.setClientId("server")
|
||||
.setUsername("server")
|
||||
.setPassword(serverPassword)
|
||||
.setCleanSession(true)
|
||||
.setMaxInflightQueue(100)
|
||||
.setKeepAliveInterval(60);
|
||||
|
||||
if (mqttConfig.isSsl()) {
|
||||
options.setSsl(true)
|
||||
.setTrustAll(true);
|
||||
}
|
||||
client = MqttClient.create(vertx, options);
|
||||
mqttDevice.setClient(client);
|
||||
|
||||
authVerticle.setPort(mqttConfig.getAuthPort());
|
||||
authVerticle.setServerPassword(serverPassword);
|
||||
|
||||
emqxConnectTask.scheduleWithFixedDelay(this, 3, 3, TimeUnit.SECONDS);
|
||||
} catch (Throwable e) {
|
||||
log.error("mqtt plugin startup error", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (!authServerStarted) {
|
||||
try {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
Future<String> future = vertx.deployVerticle(authVerticle);
|
||||
future.onSuccess((s -> {
|
||||
deployedId = s;
|
||||
countDownLatch.countDown();
|
||||
authServerStarted = true;
|
||||
log.info("start emqx auth plugin success");
|
||||
}));
|
||||
future.onFailure(e -> {
|
||||
countDownLatch.countDown();
|
||||
authServerStarted = false;
|
||||
log.error("start emqx auth plugin failed", e);
|
||||
});
|
||||
countDownLatch.await();
|
||||
} catch (Exception e) {
|
||||
authServerStarted = false;
|
||||
log.error("start emqx auth server failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (mqttConnected) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
String[] topics = mqttConfig.getTopics().split(",");
|
||||
Map<String, Integer> subscribes = new HashMap<>(topics.length);
|
||||
for (String topic : topics) {
|
||||
subscribes.put(topic, 1);
|
||||
}
|
||||
|
||||
client.connect(mqttConfig.getPort(), mqttConfig.getHost(), s -> {
|
||||
if (s.succeeded()) {
|
||||
log.info("client connect success.");
|
||||
mqttConnected = true;
|
||||
client.subscribe(subscribes, e -> {
|
||||
if (e.succeeded()) {
|
||||
log.info("===>subscribe success: {}", e.result());
|
||||
} else {
|
||||
log.error("===>subscribe fail: ", e.cause());
|
||||
}
|
||||
});
|
||||
|
||||
} else {
|
||||
mqttConnected = false;
|
||||
log.error("client connect fail: ", s.cause());
|
||||
}
|
||||
}).publishHandler(msg -> {
|
||||
String topic = msg.topicName();
|
||||
if (topic.contains("/c/")) {
|
||||
return;
|
||||
}
|
||||
|
||||
JsonObject payload = msg.payload().toJsonObject();
|
||||
log.info("Client received message on [{}] payload [{}] with QoS [{}]", topic, payload, msg.qosLevel());
|
||||
|
||||
try {
|
||||
//客户端连接断开
|
||||
if (topic.equals("/sys/client/disconnected")) {
|
||||
offline(payload.getString("clientid"));
|
||||
return;
|
||||
}
|
||||
|
||||
ThingDevice device = getDevice(topic);
|
||||
if (device == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
//有消息上报-设备上线
|
||||
online(device.getProductKey(), device.getDeviceName());
|
||||
|
||||
JsonObject defParams = JsonObject.mapFrom(new HashMap<>(0));
|
||||
IDeviceAction action = null;
|
||||
|
||||
String method = payload.getString("method", "");
|
||||
if (StringUtils.isBlank(method)) {
|
||||
return;
|
||||
}
|
||||
JsonObject params = payload.getJsonObject("params", defParams);
|
||||
|
||||
if ("thing.lifetime.register".equalsIgnoreCase(method)) {
|
||||
//子设备注册
|
||||
String subPk = params.getString("productKey");
|
||||
String subDn = params.getString("deviceName");
|
||||
ActionResult regResult = thingService.post(
|
||||
pluginInfo.getPluginId(),
|
||||
fillAction(
|
||||
DeviceRegister.builder()
|
||||
.productKey(subPk)
|
||||
.deviceName(subDn)
|
||||
.model(params.getString("model"))
|
||||
.version("1.0")
|
||||
.build()
|
||||
, subPk, subDn
|
||||
)
|
||||
);
|
||||
if (regResult.getCode() == 0) {
|
||||
//注册成功
|
||||
reply(topic, payload, 0);
|
||||
Set<String> devices = CLIENT_DEVICE_MAP.get(device.getProductKey() + device.getDeviceName());
|
||||
devices.add(subPk + "," + subDn);
|
||||
} else {
|
||||
//注册失败
|
||||
reply(topic, new JsonObject(), regResult.getCode());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if ("thing.event.property.post".equalsIgnoreCase(method)) {
|
||||
//属性上报
|
||||
action = PropertyReport.builder()
|
||||
.params(params.getMap())
|
||||
.build();
|
||||
reply(topic, payload, 0);
|
||||
} else if (method.startsWith("thing.event.")) {
|
||||
//事件上报
|
||||
action = EventReport.builder()
|
||||
.name(method.replace("thing.event.", ""))
|
||||
.level(EventLevel.INFO)
|
||||
.params(params.getMap())
|
||||
.build();
|
||||
reply(topic, payload, 0);
|
||||
} else if (method.startsWith("thing.service.") && method.endsWith("_reply")) {
|
||||
//服务回复
|
||||
action = ServiceReply.builder()
|
||||
.name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1"))
|
||||
.code(payload.getInteger("code", 0))
|
||||
.params(params.getMap())
|
||||
.build();
|
||||
}
|
||||
|
||||
if (action == null) {
|
||||
return;
|
||||
}
|
||||
action.setId(payload.getString("id"));
|
||||
action.setProductKey(device.getProductKey());
|
||||
action.setDeviceName(device.getDeviceName());
|
||||
action.setTime(System.currentTimeMillis());
|
||||
thingService.post(pluginInfo.getPluginId(), action);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("message is illegal.", e);
|
||||
}
|
||||
}).closeHandler(e -> {
|
||||
mqttConnected = false;
|
||||
log.info("client closed");
|
||||
}).exceptionHandler(event -> log.error("client fail", event));
|
||||
} catch (Exception e) {
|
||||
log.error("start emqx client failed", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public ThingDevice getDevice(String topic) {
|
||||
String[] topicParts = topic.split("/");
|
||||
if (topicParts.length < 5) {
|
||||
return null;
|
||||
}
|
||||
return ThingDevice.builder()
|
||||
.productKey(topicParts[2])
|
||||
.deviceName(topicParts[3])
|
||||
.build();
|
||||
}
|
||||
|
||||
public void online(String pk, String dn) {
|
||||
if (Boolean.TRUE.equals(DEVICE_ONLINE.get(dn))) {
|
||||
return;
|
||||
}
|
||||
|
||||
//上线
|
||||
thingService.post(
|
||||
pluginInfo.getPluginId(),
|
||||
fillAction(DeviceStateChange.builder()
|
||||
.state(DeviceState.ONLINE)
|
||||
.build()
|
||||
, pk, dn
|
||||
)
|
||||
);
|
||||
DEVICE_ONLINE.put(dn, true);
|
||||
}
|
||||
|
||||
public void offline(String clientId) {
|
||||
String[] parts = clientId.split("_");
|
||||
Set<String> devices = CLIENT_DEVICE_MAP.get(parts[0] + parts[1]);
|
||||
for (String device : devices) {
|
||||
String[] pkDn = device.split(",");
|
||||
//下线
|
||||
thingService.post(
|
||||
pluginInfo.getPluginId(),
|
||||
fillAction(DeviceStateChange.builder()
|
||||
.state(DeviceState.OFFLINE)
|
||||
.build()
|
||||
, pkDn[0], pkDn[1]
|
||||
)
|
||||
);
|
||||
DEVICE_ONLINE.remove(pkDn[1]);
|
||||
}
|
||||
}
|
||||
|
||||
private IDeviceAction fillAction(IDeviceAction action, String productKey, String deviceName) {
|
||||
action.setId(UniqueIdUtil.newRequestId());
|
||||
action.setProductKey(productKey);
|
||||
action.setDeviceName(deviceName);
|
||||
action.setTime(System.currentTimeMillis());
|
||||
return action;
|
||||
}
|
||||
|
||||
/**
|
||||
* 回复设备
|
||||
*/
|
||||
private void reply(String topic, JsonObject payload, int code) {
|
||||
Map<String, Object> payloadReply = new HashMap<>();
|
||||
payloadReply.put("id", payload.getString("id"));
|
||||
payloadReply.put("method", payload.getString("method") + "_reply");
|
||||
payloadReply.put("code", code);
|
||||
payloadReply.put("data", payload.getJsonObject("params"));
|
||||
topic = topic.replace("/s/", "/c/") + "_reply";
|
||||
|
||||
String finalTopic = topic;
|
||||
client.publish(topic, JsonObject.mapFrom(payloadReply).toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false)
|
||||
.onSuccess(h -> {
|
||||
log.info("publish {} success", finalTopic);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
|
||||
try {
|
||||
log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId());
|
||||
if (deployedId != null) {
|
||||
CountDownLatch wait = new CountDownLatch(1);
|
||||
Future<Void> future = vertx.undeploy(deployedId);
|
||||
future.onSuccess(unused -> {
|
||||
log.info("emqx plugin stopped success");
|
||||
wait.countDown();
|
||||
});
|
||||
future.onFailure(h -> {
|
||||
log.error("emqx plugin stopped failed", h);
|
||||
wait.countDown();
|
||||
});
|
||||
wait.await(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
client.disconnect()
|
||||
.onSuccess(unused -> {
|
||||
mqttConnected = false;
|
||||
log.info("stop emqx connect success");
|
||||
})
|
||||
.onFailure(unused -> log.error("stop emqx connect failure"));
|
||||
|
||||
emqxConnectTask.shutdown();
|
||||
|
||||
} catch (Throwable e) {
|
||||
log.error("emqx plugin stop error", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getLinkInfo(String pk, String dn) {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
package cc.iotkit.plugins.emqx.service;
|
||||
|
||||
import cc.iotkit.plugin.core.thing.IThingService;
|
||||
import cc.iotkit.plugin.core.thing.actions.ActionResult;
|
||||
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
|
||||
import cc.iotkit.plugin.core.thing.model.ThingDevice;
|
||||
import cc.iotkit.plugin.core.thing.model.ThingProduct;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 测试服务
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Slf4j
|
||||
public class FakeThingService implements IThingService {
|
||||
|
||||
/**
|
||||
* 添加测试产品
|
||||
*/
|
||||
private static final Map<String, String> PRODUCTS = Map.of(
|
||||
"hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
|
||||
"Rf4QSjbm65X45753", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
|
||||
"cGCrkK7Ex4FESAwe", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU"
|
||||
);
|
||||
|
||||
/**
|
||||
* 添加测试设备
|
||||
*/
|
||||
private static final Map<String, String> DEVICES = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
DEVICES.put("TEST:GW:" + StringUtils.leftPad(i + "", 6, "0"), "hbtgIA0SuVw9lxjB");
|
||||
DEVICES.put("TEST_SW_" + StringUtils.leftPad(i + "", 6, "0"), "Rf4QSjbm65X45753");
|
||||
DEVICES.put("TEST_SC_" + StringUtils.leftPad(i + "", 6, "0"), "cGCrkK7Ex4FESAwe");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResult post(String pluginId, IDeviceAction action) {
|
||||
log.info("post action:{}", action);
|
||||
return ActionResult.builder().code(0).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ThingProduct getProduct(String pk) {
|
||||
return ThingProduct.builder()
|
||||
.productKey(pk)
|
||||
.productSecret(PRODUCTS.get(pk))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ThingDevice getDevice(String dn) {
|
||||
return ThingDevice.builder()
|
||||
.productKey(DEVICES.get(dn))
|
||||
.deviceName(dn)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ?> getProperty(String dn) {
|
||||
return new HashMap<>(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
package cc.iotkit.plugins.emqx.service;
|
||||
|
||||
import cc.iotkit.common.enums.ErrCode;
|
||||
import cc.iotkit.common.exception.BizException;
|
||||
import cc.iotkit.plugin.core.thing.IDevice;
|
||||
import cc.iotkit.plugin.core.thing.actions.ActionResult;
|
||||
import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig;
|
||||
import cc.iotkit.plugin.core.thing.actions.down.PropertyGet;
|
||||
import cc.iotkit.plugin.core.thing.actions.down.PropertySet;
|
||||
import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.mqtt.MqttClient;
|
||||
import lombok.Setter;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* mqtt设备下行接口
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Service
|
||||
public class MqttDevice implements IDevice {
|
||||
|
||||
@Setter
|
||||
private MqttClient client;
|
||||
|
||||
@Override
|
||||
public ActionResult config(DeviceConfig action) {
|
||||
return ActionResult.builder().code(0).reason("").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResult propertyGet(PropertyGet action) {
|
||||
String topic = String.format("/sys/%s/%s/c/service/property/get", action.getProductKey(), action.getDeviceName());
|
||||
return send(
|
||||
topic,
|
||||
new JsonObject()
|
||||
.put("id", action.getId())
|
||||
.put("method", "thing.service.property.get")
|
||||
.put("params", action.getKeys())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResult propertySet(PropertySet action) {
|
||||
String topic = String.format("/sys/%s/%s/c/service/property/set", action.getProductKey(), action.getDeviceName());
|
||||
return send(
|
||||
topic,
|
||||
new JsonObject()
|
||||
.put("id", action.getId())
|
||||
.put("method", "thing.service.property.set")
|
||||
.put("params", action.getParams())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResult serviceInvoke(ServiceInvoke action) {
|
||||
String topic = String.format("/sys/%s/%s/c/service/%s", action.getProductKey(), action.getDeviceName(), action.getName());
|
||||
return send(
|
||||
topic,
|
||||
new JsonObject()
|
||||
.put("id", action.getId())
|
||||
.put("method", "thing.service." + action.getName())
|
||||
.put("params", action.getParams())
|
||||
);
|
||||
}
|
||||
|
||||
private ActionResult send(String topic, JsonObject payload) {
|
||||
try {
|
||||
client.publish(topic, payload.toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);
|
||||
return ActionResult.builder().code(0).reason("").build();
|
||||
} catch (BizException e) {
|
||||
return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build();
|
||||
} catch (Exception e) {
|
||||
return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
plugin:
|
||||
runMode: prod
|
||||
mainPackage: cc.iotkit.plugin
|
||||
|
||||
emqx:
|
||||
host: 127.0.0.1
|
||||
port: 1883
|
||||
topics: /sys/#
|
||||
authPort: 8104
|
|
@ -0,0 +1,30 @@
|
|||
[
|
||||
{
|
||||
"id": "host",
|
||||
"name": "emqx ip",
|
||||
"type": "text",
|
||||
"value": "127.0.0.1",
|
||||
"desc": "emqx ip,默认为127.0.0.1"
|
||||
},
|
||||
{
|
||||
"id": "port",
|
||||
"name": "emqx端口",
|
||||
"type": "number",
|
||||
"value": 1883,
|
||||
"desc": "emqx端口,默认为1883"
|
||||
},
|
||||
{
|
||||
"id": "auth_port",
|
||||
"name": "认证端口",
|
||||
"type": "number",
|
||||
"value": 8104,
|
||||
"desc": "emqx http认证端口,默认为8104"
|
||||
},
|
||||
{
|
||||
"id": "topics",
|
||||
"name": "订阅主题",
|
||||
"type": "text",
|
||||
"value": "/sys/#",
|
||||
"desc": "订阅主题多个用,隔开"
|
||||
}
|
||||
]
|
|
@ -8,7 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
|||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.http"})
|
||||
@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.http")
|
||||
@OneselfConfig(mainConfigFileName = {"application.yml"})
|
||||
@EnableConfigurationProperties
|
||||
public class Application extends SpringPluginBootstrap {
|
||||
|
|
|
@ -10,7 +10,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
|||
* @Author:tfd
|
||||
* @Date:2024/1/8 14:57
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.hydrovalve"})
|
||||
@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.hydrovalve")
|
||||
@OneselfConfig(mainConfigFileName = {"application.yml"})
|
||||
@EnableConfigurationProperties
|
||||
@EnableScheduling
|
||||
|
|
|
@ -9,7 +9,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
|||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.modbus"})
|
||||
@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.modbus")
|
||||
@OneselfConfig(mainConfigFileName = {"application.yml"})
|
||||
@EnableConfigurationProperties
|
||||
@EnableScheduling
|
||||
|
|
|
@ -8,7 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
|||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.mqtt"})
|
||||
@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.mqtt")
|
||||
@OneselfConfig(mainConfigFileName = {"application.yml"})
|
||||
@EnableConfigurationProperties
|
||||
public class Application extends SpringPluginBootstrap {
|
||||
|
|
|
@ -20,6 +20,7 @@ import cc.iotkit.plugin.core.thing.actions.DeviceState;
|
|||
import cc.iotkit.plugin.core.thing.actions.EventLevel;
|
||||
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
|
||||
import cc.iotkit.plugin.core.thing.actions.up.*;
|
||||
import cc.iotkit.plugin.core.thing.model.ThingDevice;
|
||||
import cc.iotkit.plugin.core.thing.model.ThingProduct;
|
||||
import cc.iotkit.plugins.mqtt.conf.MqttConfig;
|
||||
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
|
||||
|
@ -35,9 +36,7 @@ import io.vertx.core.json.JsonObject;
|
|||
import io.vertx.core.net.PemKeyCertOptions;
|
||||
import io.vertx.mqtt.*;
|
||||
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
@ -83,7 +82,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
|
|||
Executors.newSingleThreadScheduledExecutor().schedule(this::initMqttServer, 3, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void initMqttServer(){
|
||||
private void initMqttServer() {
|
||||
MqttServerOptions options = new MqttServerOptions()
|
||||
.setPort(config.getPort());
|
||||
if (config.isSsl()) {
|
||||
|
@ -212,7 +211,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
|
|||
log.info("Subscription for {},with QoS {}", s.topicName(), s.qualityOfService());
|
||||
try {
|
||||
String topic = s.topicName();
|
||||
Device device = getDevice(topic);
|
||||
ThingDevice device = getDevice(topic);
|
||||
//添加设备对应连接
|
||||
endpointMap.put(device.getDeviceName(), endpoint);
|
||||
online(device.getProductKey(), device.getDeviceName());
|
||||
|
@ -226,7 +225,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
|
|||
endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
|
||||
}).unsubscribeHandler(unsubscribe -> {
|
||||
for (String topic : unsubscribe.topics()) {
|
||||
Device device = getDevice(topic);
|
||||
ThingDevice device = getDevice(topic);
|
||||
//删除设备对应连接
|
||||
endpointMap.remove(device.getDeviceName());
|
||||
//下线
|
||||
|
@ -259,7 +258,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
|
|||
return;
|
||||
}
|
||||
|
||||
Device device = getDevice(topic);
|
||||
ThingDevice device = getDevice(topic);
|
||||
if (device == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -428,22 +427,15 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
|
|||
result.onSuccess(integer -> log.info("publish success,topic:{},payload:{}", topic, msg));
|
||||
}
|
||||
|
||||
public Device getDevice(String topic) {
|
||||
public ThingDevice getDevice(String topic) {
|
||||
String[] topicParts = topic.split("/");
|
||||
if (topicParts.length < 5) {
|
||||
return null;
|
||||
}
|
||||
return new Device(topicParts[2], topicParts[3]);
|
||||
return ThingDevice.builder()
|
||||
.productKey(topicParts[2])
|
||||
.deviceName(topicParts[3])
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class Device {
|
||||
|
||||
private String productKey;
|
||||
|
||||
private String deviceName;
|
||||
}
|
||||
}
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -10,7 +10,7 @@
|
|||
<module>tcp-plugin</module>
|
||||
<module>DLT645-plugin</module>
|
||||
<module>hydrovalve-plugin</module>
|
||||
<!-- <module>emqx-plugin</module>-->
|
||||
<module>emqx-plugin</module>
|
||||
</modules>
|
||||
|
||||
<parent>
|
||||
|
|
|
@ -9,7 +9,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
|||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.tcp"})
|
||||
@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.tcp")
|
||||
@OneselfConfig(mainConfigFileName = {"application.yml"})
|
||||
@EnableConfigurationProperties
|
||||
@EnableScheduling
|
||||
|
|
Loading…
Reference in New Issue