diff --git a/pom.xml b/pom.xml
index fd98992..455af40 100755
--- a/pom.xml
+++ b/pom.xml
@@ -11,6 +11,7 @@
DLT645-plugin
hydrovalve-plugin
emqx-plugin
+ websocket-plugin
diff --git a/websocket-plugin/pom.xml b/websocket-plugin/pom.xml
new file mode 100644
index 0000000..4d448b8
--- /dev/null
+++ b/websocket-plugin/pom.xml
@@ -0,0 +1,74 @@
+
+
+
+ iot-iita-plugins
+ cc.iotkit.plugins
+ 1.0.1
+
+ 4.0.0
+
+ websocket-plugin
+
+
+
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
+
+
+
+
+
+
+ dev
+
+ true
+
+
+ dev
+
+
+
+
+ prod
+
+ prod
+
+
+
+
+
+
+
+ com.gitee.starblues
+ spring-brick-maven-packager
+ ${spring-brick.version}
+
+ ${plugin.build.mode}
+
+ websocket-plugin
+ cc.iotkit.plugins.websocket.Application
+ ${project.version}
+ iita
+ websocket示例插件
+ application.yml
+
+
+ jar
+
+
+
+
+
+ repackage
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/Application.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/Application.java
new file mode 100644
index 0000000..ce5aee8
--- /dev/null
+++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/Application.java
@@ -0,0 +1,19 @@
+package cc.iotkit.plugins.websocket;
+
+import com.gitee.starblues.bootstrap.SpringPluginBootstrap;
+import com.gitee.starblues.bootstrap.annotation.OneselfConfig;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+/**
+ * @author tfd
+ */
+@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.websocket")
+@OneselfConfig(mainConfigFileName = {"application.yml"})
+@EnableConfigurationProperties
+public class Application extends SpringPluginBootstrap {
+
+ public static void main(String[] args) {
+ new Application().run(Application.class, args);
+ }
+}
diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/BeanConfig.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/BeanConfig.java
new file mode 100644
index 0000000..b10e798
--- /dev/null
+++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/BeanConfig.java
@@ -0,0 +1,28 @@
+package cc.iotkit.plugins.websocket.conf;
+
+import cc.iotkit.plugin.core.IPluginConfig;
+import cc.iotkit.plugin.core.LocalPluginConfig;
+import cc.iotkit.plugin.core.thing.IThingService;
+import cc.iotkit.plugins.websocket.service.FakeThingService;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author tfd
+ */
+@Component
+public class BeanConfig {
+
+ @Bean
+ @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
+ IThingService getThingService() {
+ return new FakeThingService();
+ }
+
+ @Bean
+ @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
+ IPluginConfig getPluginConfig(){
+ return new LocalPluginConfig();
+ }
+}
diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/WebsocketConfig.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/WebsocketConfig.java
new file mode 100644
index 0000000..abd0c50
--- /dev/null
+++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/WebsocketConfig.java
@@ -0,0 +1,39 @@
+/*
+ * +----------------------------------------------------------------------
+ * | Copyright (c) 奇特物联 2021-2022 All rights reserved.
+ * +----------------------------------------------------------------------
+ * | Licensed 未经许可不能去掉「奇特物联」相关版权
+ * +----------------------------------------------------------------------
+ * | Author: xw2sy@163.com
+ * +----------------------------------------------------------------------
+ */
+package cc.iotkit.plugins.websocket.conf;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Data
+@Component
+@ConfigurationProperties(prefix = "websocket")
+public class WebsocketConfig {
+
+ private int port;
+
+ private String sslKey;
+
+ private String sslCert;
+
+ private boolean ssl;
+
+ private List accessTokens;
+
+ @Data
+ public static class AccessToken{
+ private String tokenName;
+ private String tokenStr;
+ }
+
+}
diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/FakeThingService.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/FakeThingService.java
new file mode 100644
index 0000000..0b64fd0
--- /dev/null
+++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/FakeThingService.java
@@ -0,0 +1,53 @@
+package cc.iotkit.plugins.websocket.service;
+
+import cc.iotkit.plugin.core.thing.IThingService;
+import cc.iotkit.plugin.core.thing.actions.ActionResult;
+import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
+import cc.iotkit.plugin.core.thing.model.ThingDevice;
+import cc.iotkit.plugin.core.thing.model.ThingProduct;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 测试服务
+ *
+ * @author tfd
+ */
+@Slf4j
+public class FakeThingService implements IThingService {
+
+
+ /**
+ * 添加测试设备
+ */
+ private static final Map DEVICES = new HashMap<>();
+
+ @Override
+ public ActionResult post(String pluginId, IDeviceAction action) {
+ log.info("post action:{}", action);
+ return ActionResult.builder().code(0).build();
+ }
+
+ @Override
+ public ThingProduct getProduct(String pk) {
+ return ThingProduct.builder()
+ .productKey(pk)
+ .productSecret("")
+ .build();
+ }
+
+ @Override
+ public ThingDevice getDevice(String dn) {
+ return ThingDevice.builder()
+ .productKey(DEVICES.get(dn))
+ .deviceName(dn)
+ .build();
+ }
+
+ @Override
+ public Map getProperty(String dn) {
+ return new HashMap<>(0);
+ }
+}
diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java
new file mode 100644
index 0000000..c7341a9
--- /dev/null
+++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java
@@ -0,0 +1,81 @@
+package cc.iotkit.plugins.websocket.service;
+
+import cc.iotkit.common.enums.ErrCode;
+import cc.iotkit.common.exception.BizException;
+import cc.iotkit.plugin.core.thing.IDevice;
+import cc.iotkit.plugin.core.thing.actions.ActionResult;
+import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig;
+import cc.iotkit.plugin.core.thing.actions.down.PropertyGet;
+import cc.iotkit.plugin.core.thing.actions.down.PropertySet;
+import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke;
+import io.vertx.core.json.JsonObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * websocket设备服务
+ *
+ * @author tfd
+ */
+@Service
+public class WebsocketDevice implements IDevice {
+
+ @Autowired
+ private WebsocketVerticle websocketVerticle;
+
+ @Override
+ public ActionResult config(DeviceConfig action) {
+ return ActionResult.builder().code(0).reason("").build();
+ }
+
+ @Override
+ public ActionResult propertyGet(PropertyGet action) {
+ return send(
+ action.getDeviceName(),
+ new JsonObject()
+ .put("id", action.getId())
+ .put("method", "thing.service.property.get")
+ .put("params", action.getKeys())
+ .toString()
+ );
+ }
+
+ @Override
+ public ActionResult propertySet(PropertySet action) {
+ return send(
+ action.getDeviceName(),
+ new JsonObject()
+ .put("id", action.getId())
+ .put("method", "thing.service.property.set")
+ .put("params", action.getParams())
+ .toString()
+ );
+ }
+
+ @Override
+ public ActionResult serviceInvoke(ServiceInvoke action) {
+ return send(
+ action.getDeviceName(),
+ new JsonObject()
+ .put("id", action.getId())
+ .put("method", "thing.service." + action.getName())
+ .put("params", action.getParams())
+ .toString()
+ );
+ }
+
+ private ActionResult send(String deviceName, String payload) {
+ try {
+ websocketVerticle.send(
+ deviceName,
+ payload
+ );
+ return ActionResult.builder().code(0).reason("").build();
+ } catch (BizException e) {
+ return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build();
+ } catch (Exception e) {
+ return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build();
+ }
+ }
+
+}
diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketPlugin.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketPlugin.java
new file mode 100644
index 0000000..9db80e5
--- /dev/null
+++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketPlugin.java
@@ -0,0 +1,89 @@
+package cc.iotkit.plugins.websocket.service;
+
+import cc.iotkit.plugin.core.IPluginConfig;
+import cc.iotkit.plugins.websocket.conf.WebsocketConfig;
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.bean.copier.CopyOptions;
+import com.gitee.starblues.bootstrap.annotation.AutowiredType;
+import com.gitee.starblues.bootstrap.realize.PluginCloseListener;
+import com.gitee.starblues.core.PluginCloseType;
+import com.gitee.starblues.core.PluginInfo;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author tfd
+ */
+@Slf4j
+@Service
+public class WebsocketPlugin implements PluginCloseListener {
+
+ @Autowired
+ private PluginInfo pluginInfo;
+ @Autowired
+ private WebsocketVerticle websocketVerticle;
+ @Autowired
+ private WebsocketConfig websocketConfig;
+
+ @Autowired
+ @AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
+ private IPluginConfig pluginConfig;
+
+ private Vertx vertx;
+ private String deployedId;
+
+ @PostConstruct
+ public void init() {
+ vertx = Vertx.vertx();
+ try {
+ //获取插件最新配置替换当前配置
+ Map config = pluginConfig.getConfig(pluginInfo.getPluginId());
+ BeanUtil.copyProperties(config, websocketConfig, CopyOptions.create().ignoreNullValue());
+ websocketVerticle.setConfig(websocketConfig);
+
+ Future future = vertx.deployVerticle(websocketVerticle);
+ future.onSuccess((s -> {
+ deployedId = s;
+ log.info("websocket plugin started success");
+ }));
+ future.onFailure((e) -> {
+ log.error("websocket plugin startup failed", e);
+ });
+ } catch (Throwable e) {
+ log.error("websocket plugin startup error", e);
+ }
+ }
+
+ @Override
+ public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
+ try {
+ log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId());
+ if (deployedId != null) {
+ CountDownLatch wait = new CountDownLatch(1);
+ Future future = vertx.undeploy(deployedId);
+ future.onSuccess(unused -> {
+ log.info("websocket plugin stopped success");
+ wait.countDown();
+ });
+ future.onFailure(h -> {
+ log.info("websocket plugin stopped failed");
+ h.printStackTrace();
+ wait.countDown();
+ });
+ wait.await(5, TimeUnit.SECONDS);
+ }
+ } catch (Throwable e) {
+ log.error("websocket plugin stop error", e);
+ }
+ }
+
+}
diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java
new file mode 100644
index 0000000..78713e5
--- /dev/null
+++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java
@@ -0,0 +1,229 @@
+/*
+ * +----------------------------------------------------------------------
+ * | Copyright (c) 奇特物联 2021-2022 All rights reserved.
+ * +----------------------------------------------------------------------
+ * | Licensed 未经许可不能去掉「奇特物联」相关版权
+ * +----------------------------------------------------------------------
+ * | Author: xw2sy@163.com
+ * +----------------------------------------------------------------------
+ */
+package cc.iotkit.plugins.websocket.service;
+
+import cc.iotkit.common.utils.JsonUtils;
+import cc.iotkit.common.utils.StringUtils;
+import cc.iotkit.plugin.core.thing.IThingService;
+import cc.iotkit.plugin.core.thing.actions.ActionResult;
+import cc.iotkit.plugin.core.thing.actions.DeviceState;
+import cc.iotkit.plugin.core.thing.actions.up.DeviceRegister;
+import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange;
+import cc.iotkit.plugins.websocket.conf.WebsocketConfig;
+import com.gitee.starblues.bootstrap.annotation.AutowiredType;
+import com.gitee.starblues.core.PluginInfo;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Future;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.http.ServerWebSocket;
+import io.vertx.core.net.PemKeyCertOptions;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @author tfd
+ */
+@Slf4j
+@Component
+@Data
+public class WebsocketVerticle extends AbstractVerticle {
+
+ private HttpServer httpServer;
+ private final Map wsClients = new ConcurrentHashMap<>();
+
+ private static final Map CONNECT_POOL = new ConcurrentHashMap<>();
+ private static final Map DEVICE_ONLINE = new ConcurrentHashMap<>();
+
+ private Map tokens=new HashMap<>();
+
+ private WebsocketConfig config;
+
+ @Autowired
+ @AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
+ private IThingService thingService;
+
+ @Autowired
+ private PluginInfo pluginInfo;
+
+ @Override
+ public void start() {
+ Executors.newSingleThreadScheduledExecutor().schedule(this::initMqttServer, 3, TimeUnit.SECONDS);
+ }
+
+ private void initMqttServer() {
+ HttpServerOptions options = new HttpServerOptions()
+ .setPort(config.getPort());
+ if (config.isSsl()) {
+ options = options.setSsl(true)
+ .setKeyCertOptions(new PemKeyCertOptions()
+ .setKeyPath(config.getSslKey())
+ .setCertPath(config.getSslCert()));
+ }
+
+ httpServer = vertx.createHttpServer(options).webSocketHandler(wsClient -> {
+ log.info("webSocket client connect sessionId:{},path={}", wsClient.textHandlerID(), wsClient.path());
+ String deviceKey = wsClient.path().replace("/","");
+ String[] strArr=deviceKey.split("_");
+ if(StringUtils.isBlank(deviceKey)||strArr.length!=2){
+ log.warn("陌生连接,拒绝");
+ wsClient.reject();
+ return;
+ }
+ wsClient.writeTextMessage("connect succes! please auth!");
+ wsClient.textMessageHandler(message -> {
+ HashMap msg;
+ try{
+ msg=JsonUtils.parseObject(message,HashMap.class);
+ }catch (Exception e){
+ log.warn("数据格式异常");
+ wsClient.writeTextMessage("data err");
+ return;
+ }
+ if(wsClients.containsKey(deviceKey)){
+ if("ping".equals(msg.get("type"))){
+ msg.put("type","pong");
+ wsClient.writeTextMessage(JsonUtils.toJsonString(msg));
+ return;
+ }
+ if("register".equals(msg.get("type"))){
+ //设备注册
+ ActionResult result = thingService.post(
+ pluginInfo.getPluginId(),
+ DeviceRegister.builder()
+ .productKey(strArr[1])
+ .deviceName(deviceKey)
+ .model("")
+ .version("1.0")
+ .build()
+ );
+ if(result.getCode()==0){
+ thingService.post(
+ pluginInfo.getPluginId(),
+ DeviceStateChange.builder()
+ .productKey(strArr[1])
+ .deviceName(deviceKey)
+ .state(DeviceState.ONLINE)
+ .build()
+ );
+ }else{
+ //注册失败
+ Map ret=new HashMap<>();
+ ret.put("id",msg.get("id"));
+ ret.put("type",msg.get("type"));
+ ret.put("result","fail");
+ wsClient.writeTextMessage(JsonUtils.toJsonString(ret));
+ return;
+ }
+ }
+ }else if(msg!=null&&"auth".equals(msg.get("type"))){
+ Set tokenKey=tokens.keySet();
+ for(String key:tokenKey){
+ if(StringUtils.isNotBlank(msg.get(key))&&tokens.get(key).equals(msg.get(key))){
+ //保存设备与连接关系
+ log.info("认证通过");
+ wsClients.put(deviceKey, wsClient);
+ wsClient.writeTextMessage("auth succes");
+ return;
+ }
+ }
+ log.warn("认证失败,拒绝");
+ wsClient.writeTextMessage("auth fail");
+ return;
+ }else{
+ log.warn("认证失败,拒绝");
+ wsClient.writeTextMessage("auth fail");
+ return;
+ }
+
+ });
+ wsClient.closeHandler(c -> {
+ log.warn("client connection closed,deviceKey:{}", deviceKey);
+ if(wsClients.containsKey(deviceKey)){
+ wsClients.remove(deviceKey);
+ thingService.post(
+ pluginInfo.getPluginId(),
+ DeviceStateChange.builder()
+ .productKey(strArr[1])
+ .deviceName(deviceKey)
+ .state(DeviceState.OFFLINE)
+ .build()
+ );
+ }
+ });
+ wsClient.exceptionHandler(ex -> {
+ log.warn("webSocket client connection exception,deviceKey:{}", deviceKey);
+ if(wsClients.containsKey(deviceKey)){
+ wsClients.remove(deviceKey);
+ thingService.post(
+ pluginInfo.getPluginId(),
+ DeviceStateChange.builder()
+ .productKey(strArr[1])
+ .deviceName(deviceKey)
+ .state(DeviceState.OFFLINE)
+ .build()
+ );
+ }
+ });
+ }).listen(config.getPort(), server -> {
+ if (server.succeeded()) {
+ log.info("webSocket server is listening on port " + config.getPort());
+ if(config.getAccessTokens()!=null){
+ List tokenConfig= config.getAccessTokens();
+ for (WebsocketConfig.AccessToken obj:tokenConfig) {
+ tokens.put(obj.getTokenName(),obj.getTokenStr());
+ }
+ }
+ } else {
+ log.error("webSocket server on starting the server", server.cause());
+ }
+ });
+ }
+
+ @Override
+ public void stop() throws Exception {
+ for (String deviceKey : wsClients.keySet()) {
+ thingService.post(
+ pluginInfo.getPluginId(),
+ DeviceStateChange.builder()
+ .productKey(deviceKey.split("_")[1])
+ .deviceName(deviceKey)
+ .state(DeviceState.OFFLINE)
+ .build()
+ );
+ }
+ tokens.clear();
+ httpServer.close(voidAsyncResult -> log.info("close webocket server..."));
+ }
+
+ private String getDeviceKey(String productKey, String deviceName) {
+ return String.format("%s_%s", productKey, deviceName);
+ }
+
+ public void send(String deviceName,String msg) {
+ ServerWebSocket wsClient = wsClients.get(deviceName);
+ String msgStr = JsonUtils.toJsonString(msg);
+ log.info("send msg payload:{}", msgStr);
+ Future result = wsClient.writeTextMessage(msgStr);
+ result.onFailure(e -> log.error("webSocket server send msg failed", e));
+ }
+
+}
diff --git a/websocket-plugin/src/main/resources/application.yml b/websocket-plugin/src/main/resources/application.yml
new file mode 100644
index 0000000..062764a
--- /dev/null
+++ b/websocket-plugin/src/main/resources/application.yml
@@ -0,0 +1,7 @@
+plugin:
+ runMode: prod
+ mainPackage: cc.iotkit.plugin
+
+websocket:
+ port: 1662
+ accessTokens: [{"tokenName":"test_token","tokenStr":"123456789"}]
diff --git a/websocket-plugin/src/main/resources/config.json b/websocket-plugin/src/main/resources/config.json
new file mode 100644
index 0000000..d6fb543
--- /dev/null
+++ b/websocket-plugin/src/main/resources/config.json
@@ -0,0 +1,16 @@
+[
+ {
+ "id": "port",
+ "name": "端口",
+ "type": "number",
+ "value": 1662,
+ "desc": "websocket端口,默认为1662"
+ },
+ {
+ "id": "accessTokens",
+ "name": "token表",
+ "type": "json",
+ "value": "[{'tokenName':'test_token','tokenStr':'123456789'}]",
+ "desc": "token表,可多个"
+ }
+]
\ No newline at end of file