From f631bbfd8aa7cf3e6bbb7d0fe0224ce5adb58ea4 Mon Sep 17 00:00:00 2001 From: tangfudong <280620913@qq.com> Date: Wed, 6 Mar 2024 09:25:57 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=B7=BB=E5=8A=A0websocket=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 1 + websocket-plugin/pom.xml | 74 ++++++ .../iotkit/plugins/websocket/Application.java | 19 ++ .../plugins/websocket/conf/BeanConfig.java | 28 +++ .../websocket/conf/WebsocketConfig.java | 39 +++ .../websocket/service/FakeThingService.java | 53 ++++ .../websocket/service/WebsocketDevice.java | 81 +++++++ .../websocket/service/WebsocketPlugin.java | 89 +++++++ .../websocket/service/WebsocketVerticle.java | 229 ++++++++++++++++++ .../src/main/resources/application.yml | 7 + .../src/main/resources/config.json | 16 ++ 11 files changed, 636 insertions(+) create mode 100644 websocket-plugin/pom.xml create mode 100644 websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/Application.java create mode 100644 websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/BeanConfig.java create mode 100644 websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/WebsocketConfig.java create mode 100644 websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/FakeThingService.java create mode 100644 websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java create mode 100644 websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketPlugin.java create mode 100644 websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java create mode 100644 websocket-plugin/src/main/resources/application.yml create mode 100644 websocket-plugin/src/main/resources/config.json diff --git a/pom.xml b/pom.xml index fd98992..455af40 100755 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,7 @@ DLT645-plugin hydrovalve-plugin emqx-plugin + websocket-plugin diff --git a/websocket-plugin/pom.xml b/websocket-plugin/pom.xml new file mode 100644 index 0000000..4d448b8 --- /dev/null +++ b/websocket-plugin/pom.xml @@ -0,0 +1,74 @@ + + + + iot-iita-plugins + cc.iotkit.plugins + 1.0.1 + + 4.0.0 + + websocket-plugin + + + + + io.vertx + vertx-core + ${vertx.version} + + + + + + + dev + + true + + + dev + + + + + prod + + prod + + + + + + + + com.gitee.starblues + spring-brick-maven-packager + ${spring-brick.version} + + ${plugin.build.mode} + + websocket-plugin + cc.iotkit.plugins.websocket.Application + ${project.version} + iita + websocket示例插件 + application.yml + + + jar + + + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/Application.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/Application.java new file mode 100644 index 0000000..ce5aee8 --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/Application.java @@ -0,0 +1,19 @@ +package cc.iotkit.plugins.websocket; + +import com.gitee.starblues.bootstrap.SpringPluginBootstrap; +import com.gitee.starblues.bootstrap.annotation.OneselfConfig; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +/** + * @author tfd + */ +@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.websocket") +@OneselfConfig(mainConfigFileName = {"application.yml"}) +@EnableConfigurationProperties +public class Application extends SpringPluginBootstrap { + + public static void main(String[] args) { + new Application().run(Application.class, args); + } +} diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/BeanConfig.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/BeanConfig.java new file mode 100644 index 0000000..b10e798 --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/BeanConfig.java @@ -0,0 +1,28 @@ +package cc.iotkit.plugins.websocket.conf; + +import cc.iotkit.plugin.core.IPluginConfig; +import cc.iotkit.plugin.core.LocalPluginConfig; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugins.websocket.service.FakeThingService; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +/** + * @author tfd + */ +@Component +public class BeanConfig { + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IThingService getThingService() { + return new FakeThingService(); + } + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IPluginConfig getPluginConfig(){ + return new LocalPluginConfig(); + } +} diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/WebsocketConfig.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/WebsocketConfig.java new file mode 100644 index 0000000..abd0c50 --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/conf/WebsocketConfig.java @@ -0,0 +1,39 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.plugins.websocket.conf; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Data +@Component +@ConfigurationProperties(prefix = "websocket") +public class WebsocketConfig { + + private int port; + + private String sslKey; + + private String sslCert; + + private boolean ssl; + + private List accessTokens; + + @Data + public static class AccessToken{ + private String tokenName; + private String tokenStr; + } + +} diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/FakeThingService.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/FakeThingService.java new file mode 100644 index 0000000..0b64fd0 --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/FakeThingService.java @@ -0,0 +1,53 @@ +package cc.iotkit.plugins.websocket.service; + +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.IDeviceAction; +import cc.iotkit.plugin.core.thing.model.ThingDevice; +import cc.iotkit.plugin.core.thing.model.ThingProduct; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; + +/** + * 测试服务 + * + * @author tfd + */ +@Slf4j +public class FakeThingService implements IThingService { + + + /** + * 添加测试设备 + */ + private static final Map DEVICES = new HashMap<>(); + + @Override + public ActionResult post(String pluginId, IDeviceAction action) { + log.info("post action:{}", action); + return ActionResult.builder().code(0).build(); + } + + @Override + public ThingProduct getProduct(String pk) { + return ThingProduct.builder() + .productKey(pk) + .productSecret("") + .build(); + } + + @Override + public ThingDevice getDevice(String dn) { + return ThingDevice.builder() + .productKey(DEVICES.get(dn)) + .deviceName(dn) + .build(); + } + + @Override + public Map getProperty(String dn) { + return new HashMap<>(0); + } +} diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java new file mode 100644 index 0000000..c7341a9 --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java @@ -0,0 +1,81 @@ +package cc.iotkit.plugins.websocket.service; + +import cc.iotkit.common.enums.ErrCode; +import cc.iotkit.common.exception.BizException; +import cc.iotkit.plugin.core.thing.IDevice; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig; +import cc.iotkit.plugin.core.thing.actions.down.PropertyGet; +import cc.iotkit.plugin.core.thing.actions.down.PropertySet; +import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke; +import io.vertx.core.json.JsonObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * websocket设备服务 + * + * @author tfd + */ +@Service +public class WebsocketDevice implements IDevice { + + @Autowired + private WebsocketVerticle websocketVerticle; + + @Override + public ActionResult config(DeviceConfig action) { + return ActionResult.builder().code(0).reason("").build(); + } + + @Override + public ActionResult propertyGet(PropertyGet action) { + return send( + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service.property.get") + .put("params", action.getKeys()) + .toString() + ); + } + + @Override + public ActionResult propertySet(PropertySet action) { + return send( + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service.property.set") + .put("params", action.getParams()) + .toString() + ); + } + + @Override + public ActionResult serviceInvoke(ServiceInvoke action) { + return send( + action.getDeviceName(), + new JsonObject() + .put("id", action.getId()) + .put("method", "thing.service." + action.getName()) + .put("params", action.getParams()) + .toString() + ); + } + + private ActionResult send(String deviceName, String payload) { + try { + websocketVerticle.send( + deviceName, + payload + ); + return ActionResult.builder().code(0).reason("").build(); + } catch (BizException e) { + return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build(); + } catch (Exception e) { + return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build(); + } + } + +} diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketPlugin.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketPlugin.java new file mode 100644 index 0000000..9db80e5 --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketPlugin.java @@ -0,0 +1,89 @@ +package cc.iotkit.plugins.websocket.service; + +import cc.iotkit.plugin.core.IPluginConfig; +import cc.iotkit.plugins.websocket.conf.WebsocketConfig; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.bean.copier.CopyOptions; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; +import com.gitee.starblues.bootstrap.realize.PluginCloseListener; +import com.gitee.starblues.core.PluginCloseType; +import com.gitee.starblues.core.PluginInfo; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author tfd + */ +@Slf4j +@Service +public class WebsocketPlugin implements PluginCloseListener { + + @Autowired + private PluginInfo pluginInfo; + @Autowired + private WebsocketVerticle websocketVerticle; + @Autowired + private WebsocketConfig websocketConfig; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IPluginConfig pluginConfig; + + private Vertx vertx; + private String deployedId; + + @PostConstruct + public void init() { + vertx = Vertx.vertx(); + try { + //获取插件最新配置替换当前配置 + Map config = pluginConfig.getConfig(pluginInfo.getPluginId()); + BeanUtil.copyProperties(config, websocketConfig, CopyOptions.create().ignoreNullValue()); + websocketVerticle.setConfig(websocketConfig); + + Future future = vertx.deployVerticle(websocketVerticle); + future.onSuccess((s -> { + deployedId = s; + log.info("websocket plugin started success"); + })); + future.onFailure((e) -> { + log.error("websocket plugin startup failed", e); + }); + } catch (Throwable e) { + log.error("websocket plugin startup error", e); + } + } + + @Override + public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { + try { + log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId()); + if (deployedId != null) { + CountDownLatch wait = new CountDownLatch(1); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> { + log.info("websocket plugin stopped success"); + wait.countDown(); + }); + future.onFailure(h -> { + log.info("websocket plugin stopped failed"); + h.printStackTrace(); + wait.countDown(); + }); + wait.await(5, TimeUnit.SECONDS); + } + } catch (Throwable e) { + log.error("websocket plugin stop error", e); + } + } + +} diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java new file mode 100644 index 0000000..78713e5 --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java @@ -0,0 +1,229 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.plugins.websocket.service; + +import cc.iotkit.common.utils.JsonUtils; +import cc.iotkit.common.utils.StringUtils; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.DeviceState; +import cc.iotkit.plugin.core.thing.actions.up.DeviceRegister; +import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange; +import cc.iotkit.plugins.websocket.conf.WebsocketConfig; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; +import com.gitee.starblues.core.PluginInfo; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.ServerWebSocket; +import io.vertx.core.net.PemKeyCertOptions; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * + * @author tfd + */ +@Slf4j +@Component +@Data +public class WebsocketVerticle extends AbstractVerticle { + + private HttpServer httpServer; + private final Map wsClients = new ConcurrentHashMap<>(); + + private static final Map CONNECT_POOL = new ConcurrentHashMap<>(); + private static final Map DEVICE_ONLINE = new ConcurrentHashMap<>(); + + private Map tokens=new HashMap<>(); + + private WebsocketConfig config; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IThingService thingService; + + @Autowired + private PluginInfo pluginInfo; + + @Override + public void start() { + Executors.newSingleThreadScheduledExecutor().schedule(this::initMqttServer, 3, TimeUnit.SECONDS); + } + + private void initMqttServer() { + HttpServerOptions options = new HttpServerOptions() + .setPort(config.getPort()); + if (config.isSsl()) { + options = options.setSsl(true) + .setKeyCertOptions(new PemKeyCertOptions() + .setKeyPath(config.getSslKey()) + .setCertPath(config.getSslCert())); + } + + httpServer = vertx.createHttpServer(options).webSocketHandler(wsClient -> { + log.info("webSocket client connect sessionId:{},path={}", wsClient.textHandlerID(), wsClient.path()); + String deviceKey = wsClient.path().replace("/",""); + String[] strArr=deviceKey.split("_"); + if(StringUtils.isBlank(deviceKey)||strArr.length!=2){ + log.warn("陌生连接,拒绝"); + wsClient.reject(); + return; + } + wsClient.writeTextMessage("connect succes! please auth!"); + wsClient.textMessageHandler(message -> { + HashMap msg; + try{ + msg=JsonUtils.parseObject(message,HashMap.class); + }catch (Exception e){ + log.warn("数据格式异常"); + wsClient.writeTextMessage("data err"); + return; + } + if(wsClients.containsKey(deviceKey)){ + if("ping".equals(msg.get("type"))){ + msg.put("type","pong"); + wsClient.writeTextMessage(JsonUtils.toJsonString(msg)); + return; + } + if("register".equals(msg.get("type"))){ + //设备注册 + ActionResult result = thingService.post( + pluginInfo.getPluginId(), + DeviceRegister.builder() + .productKey(strArr[1]) + .deviceName(deviceKey) + .model("") + .version("1.0") + .build() + ); + if(result.getCode()==0){ + thingService.post( + pluginInfo.getPluginId(), + DeviceStateChange.builder() + .productKey(strArr[1]) + .deviceName(deviceKey) + .state(DeviceState.ONLINE) + .build() + ); + }else{ + //注册失败 + Map ret=new HashMap<>(); + ret.put("id",msg.get("id")); + ret.put("type",msg.get("type")); + ret.put("result","fail"); + wsClient.writeTextMessage(JsonUtils.toJsonString(ret)); + return; + } + } + }else if(msg!=null&&"auth".equals(msg.get("type"))){ + Set tokenKey=tokens.keySet(); + for(String key:tokenKey){ + if(StringUtils.isNotBlank(msg.get(key))&&tokens.get(key).equals(msg.get(key))){ + //保存设备与连接关系 + log.info("认证通过"); + wsClients.put(deviceKey, wsClient); + wsClient.writeTextMessage("auth succes"); + return; + } + } + log.warn("认证失败,拒绝"); + wsClient.writeTextMessage("auth fail"); + return; + }else{ + log.warn("认证失败,拒绝"); + wsClient.writeTextMessage("auth fail"); + return; + } + + }); + wsClient.closeHandler(c -> { + log.warn("client connection closed,deviceKey:{}", deviceKey); + if(wsClients.containsKey(deviceKey)){ + wsClients.remove(deviceKey); + thingService.post( + pluginInfo.getPluginId(), + DeviceStateChange.builder() + .productKey(strArr[1]) + .deviceName(deviceKey) + .state(DeviceState.OFFLINE) + .build() + ); + } + }); + wsClient.exceptionHandler(ex -> { + log.warn("webSocket client connection exception,deviceKey:{}", deviceKey); + if(wsClients.containsKey(deviceKey)){ + wsClients.remove(deviceKey); + thingService.post( + pluginInfo.getPluginId(), + DeviceStateChange.builder() + .productKey(strArr[1]) + .deviceName(deviceKey) + .state(DeviceState.OFFLINE) + .build() + ); + } + }); + }).listen(config.getPort(), server -> { + if (server.succeeded()) { + log.info("webSocket server is listening on port " + config.getPort()); + if(config.getAccessTokens()!=null){ + List tokenConfig= config.getAccessTokens(); + for (WebsocketConfig.AccessToken obj:tokenConfig) { + tokens.put(obj.getTokenName(),obj.getTokenStr()); + } + } + } else { + log.error("webSocket server on starting the server", server.cause()); + } + }); + } + + @Override + public void stop() throws Exception { + for (String deviceKey : wsClients.keySet()) { + thingService.post( + pluginInfo.getPluginId(), + DeviceStateChange.builder() + .productKey(deviceKey.split("_")[1]) + .deviceName(deviceKey) + .state(DeviceState.OFFLINE) + .build() + ); + } + tokens.clear(); + httpServer.close(voidAsyncResult -> log.info("close webocket server...")); + } + + private String getDeviceKey(String productKey, String deviceName) { + return String.format("%s_%s", productKey, deviceName); + } + + public void send(String deviceName,String msg) { + ServerWebSocket wsClient = wsClients.get(deviceName); + String msgStr = JsonUtils.toJsonString(msg); + log.info("send msg payload:{}", msgStr); + Future result = wsClient.writeTextMessage(msgStr); + result.onFailure(e -> log.error("webSocket server send msg failed", e)); + } + +} diff --git a/websocket-plugin/src/main/resources/application.yml b/websocket-plugin/src/main/resources/application.yml new file mode 100644 index 0000000..062764a --- /dev/null +++ b/websocket-plugin/src/main/resources/application.yml @@ -0,0 +1,7 @@ +plugin: + runMode: prod + mainPackage: cc.iotkit.plugin + +websocket: + port: 1662 + accessTokens: [{"tokenName":"test_token","tokenStr":"123456789"}] diff --git a/websocket-plugin/src/main/resources/config.json b/websocket-plugin/src/main/resources/config.json new file mode 100644 index 0000000..d6fb543 --- /dev/null +++ b/websocket-plugin/src/main/resources/config.json @@ -0,0 +1,16 @@ +[ + { + "id": "port", + "name": "端口", + "type": "number", + "value": 1662, + "desc": "websocket端口,默认为1662" + }, + { + "id": "accessTokens", + "name": "token表", + "type": "json", + "value": "[{'tokenName':'test_token','tokenStr':'123456789'}]", + "desc": "token表,可多个" + } +] \ No newline at end of file