1、添加websocket插件包

master
tangfudong 2024-03-06 09:25:57 +08:00
parent 6c99d57905
commit f631bbfd8a
11 changed files with 636 additions and 0 deletions

View File

@ -11,6 +11,7 @@
<module>DLT645-plugin</module>
<module>hydrovalve-plugin</module>
<module>emqx-plugin</module>
<module>websocket-plugin</module>
</modules>
<parent>

74
websocket-plugin/pom.xml Normal file
View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-iita-plugins</artifactId>
<groupId>cc.iotkit.plugins</groupId>
<version>1.0.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>websocket-plugin</artifactId>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<plugin.build.mode>dev</plugin.build.mode>
</properties>
</profile>
<profile>
<id>prod</id>
<properties>
<plugin.build.mode>prod</plugin.build.mode>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>com.gitee.starblues</groupId>
<artifactId>spring-brick-maven-packager</artifactId>
<version>${spring-brick.version}</version>
<configuration>
<mode>${plugin.build.mode}</mode>
<pluginInfo>
<id>websocket-plugin</id>
<bootstrapClass>cc.iotkit.plugins.websocket.Application</bootstrapClass>
<version>${project.version}</version>
<provider>iita</provider>
<description>websocket示例插件</description>
<configFileName>application.yml</configFileName>
</pluginInfo>
<prodConfig>
<packageType>jar</packageType>
</prodConfig>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,19 @@
package cc.iotkit.plugins.websocket;
import com.gitee.starblues.bootstrap.SpringPluginBootstrap;
import com.gitee.starblues.bootstrap.annotation.OneselfConfig;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
/**
* @author tfd
*/
@SpringBootApplication(scanBasePackages = "cc.iotkit.plugins.websocket")
@OneselfConfig(mainConfigFileName = {"application.yml"})
@EnableConfigurationProperties
public class Application extends SpringPluginBootstrap {
public static void main(String[] args) {
new Application().run(Application.class, args);
}
}

View File

@ -0,0 +1,28 @@
package cc.iotkit.plugins.websocket.conf;
import cc.iotkit.plugin.core.IPluginConfig;
import cc.iotkit.plugin.core.LocalPluginConfig;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugins.websocket.service.FakeThingService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author tfd
*/
@Component
public class BeanConfig {
@Bean
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
IThingService getThingService() {
return new FakeThingService();
}
@Bean
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
IPluginConfig getPluginConfig(){
return new LocalPluginConfig();
}
}

View File

@ -0,0 +1,39 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.plugins.websocket.conf;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebsocketConfig {
private int port;
private String sslKey;
private String sslCert;
private boolean ssl;
private List<AccessToken> accessTokens;
@Data
public static class AccessToken{
private String tokenName;
private String tokenStr;
}
}

View File

@ -0,0 +1,53 @@
package cc.iotkit.plugins.websocket.service;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
import cc.iotkit.plugin.core.thing.model.ThingDevice;
import cc.iotkit.plugin.core.thing.model.ThingProduct;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
*
*
* @author tfd
*/
@Slf4j
public class FakeThingService implements IThingService {
/**
*
*/
private static final Map<String, String> DEVICES = new HashMap<>();
@Override
public ActionResult post(String pluginId, IDeviceAction action) {
log.info("post action:{}", action);
return ActionResult.builder().code(0).build();
}
@Override
public ThingProduct getProduct(String pk) {
return ThingProduct.builder()
.productKey(pk)
.productSecret("")
.build();
}
@Override
public ThingDevice getDevice(String dn) {
return ThingDevice.builder()
.productKey(DEVICES.get(dn))
.deviceName(dn)
.build();
}
@Override
public Map<String, ?> getProperty(String dn) {
return new HashMap<>(0);
}
}

View File

@ -0,0 +1,81 @@
package cc.iotkit.plugins.websocket.service;
import cc.iotkit.common.enums.ErrCode;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.plugin.core.thing.IDevice;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig;
import cc.iotkit.plugin.core.thing.actions.down.PropertyGet;
import cc.iotkit.plugin.core.thing.actions.down.PropertySet;
import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke;
import io.vertx.core.json.JsonObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* websocket
*
* @author tfd
*/
@Service
public class WebsocketDevice implements IDevice {
@Autowired
private WebsocketVerticle websocketVerticle;
@Override
public ActionResult config(DeviceConfig action) {
return ActionResult.builder().code(0).reason("").build();
}
@Override
public ActionResult propertyGet(PropertyGet action) {
return send(
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service.property.get")
.put("params", action.getKeys())
.toString()
);
}
@Override
public ActionResult propertySet(PropertySet action) {
return send(
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service.property.set")
.put("params", action.getParams())
.toString()
);
}
@Override
public ActionResult serviceInvoke(ServiceInvoke action) {
return send(
action.getDeviceName(),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service." + action.getName())
.put("params", action.getParams())
.toString()
);
}
private ActionResult send(String deviceName, String payload) {
try {
websocketVerticle.send(
deviceName,
payload
);
return ActionResult.builder().code(0).reason("").build();
} catch (BizException e) {
return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build();
} catch (Exception e) {
return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build();
}
}
}

View File

@ -0,0 +1,89 @@
package cc.iotkit.plugins.websocket.service;
import cc.iotkit.plugin.core.IPluginConfig;
import cc.iotkit.plugins.websocket.conf.WebsocketConfig;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
import com.gitee.starblues.bootstrap.realize.PluginCloseListener;
import com.gitee.starblues.core.PluginCloseType;
import com.gitee.starblues.core.PluginInfo;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author tfd
*/
@Slf4j
@Service
public class WebsocketPlugin implements PluginCloseListener {
@Autowired
private PluginInfo pluginInfo;
@Autowired
private WebsocketVerticle websocketVerticle;
@Autowired
private WebsocketConfig websocketConfig;
@Autowired
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
private IPluginConfig pluginConfig;
private Vertx vertx;
private String deployedId;
@PostConstruct
public void init() {
vertx = Vertx.vertx();
try {
//获取插件最新配置替换当前配置
Map<String, Object> config = pluginConfig.getConfig(pluginInfo.getPluginId());
BeanUtil.copyProperties(config, websocketConfig, CopyOptions.create().ignoreNullValue());
websocketVerticle.setConfig(websocketConfig);
Future<String> future = vertx.deployVerticle(websocketVerticle);
future.onSuccess((s -> {
deployedId = s;
log.info("websocket plugin started success");
}));
future.onFailure((e) -> {
log.error("websocket plugin startup failed", e);
});
} catch (Throwable e) {
log.error("websocket plugin startup error", e);
}
}
@Override
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
try {
log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId());
if (deployedId != null) {
CountDownLatch wait = new CountDownLatch(1);
Future<Void> future = vertx.undeploy(deployedId);
future.onSuccess(unused -> {
log.info("websocket plugin stopped success");
wait.countDown();
});
future.onFailure(h -> {
log.info("websocket plugin stopped failed");
h.printStackTrace();
wait.countDown();
});
wait.await(5, TimeUnit.SECONDS);
}
} catch (Throwable e) {
log.error("websocket plugin stop error", e);
}
}
}

View File

@ -0,0 +1,229 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.plugins.websocket.service;
import cc.iotkit.common.utils.JsonUtils;
import cc.iotkit.common.utils.StringUtils;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.DeviceState;
import cc.iotkit.plugin.core.thing.actions.up.DeviceRegister;
import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange;
import cc.iotkit.plugins.websocket.conf.WebsocketConfig;
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
import com.gitee.starblues.core.PluginInfo;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.net.PemKeyCertOptions;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
*
* @author tfd
*/
@Slf4j
@Component
@Data
public class WebsocketVerticle extends AbstractVerticle {
private HttpServer httpServer;
private final Map<String, ServerWebSocket> wsClients = new ConcurrentHashMap<>();
private static final Map<String, Boolean> CONNECT_POOL = new ConcurrentHashMap<>();
private static final Map<String, Boolean> DEVICE_ONLINE = new ConcurrentHashMap<>();
private Map<String, String> tokens=new HashMap<>();
private WebsocketConfig config;
@Autowired
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
private IThingService thingService;
@Autowired
private PluginInfo pluginInfo;
@Override
public void start() {
Executors.newSingleThreadScheduledExecutor().schedule(this::initMqttServer, 3, TimeUnit.SECONDS);
}
private void initMqttServer() {
HttpServerOptions options = new HttpServerOptions()
.setPort(config.getPort());
if (config.isSsl()) {
options = options.setSsl(true)
.setKeyCertOptions(new PemKeyCertOptions()
.setKeyPath(config.getSslKey())
.setCertPath(config.getSslCert()));
}
httpServer = vertx.createHttpServer(options).webSocketHandler(wsClient -> {
log.info("webSocket client connect sessionId:{},path={}", wsClient.textHandlerID(), wsClient.path());
String deviceKey = wsClient.path().replace("/","");
String[] strArr=deviceKey.split("_");
if(StringUtils.isBlank(deviceKey)||strArr.length!=2){
log.warn("陌生连接,拒绝");
wsClient.reject();
return;
}
wsClient.writeTextMessage("connect succes! please auth!");
wsClient.textMessageHandler(message -> {
HashMap<String,String> msg;
try{
msg=JsonUtils.parseObject(message,HashMap.class);
}catch (Exception e){
log.warn("数据格式异常");
wsClient.writeTextMessage("data err");
return;
}
if(wsClients.containsKey(deviceKey)){
if("ping".equals(msg.get("type"))){
msg.put("type","pong");
wsClient.writeTextMessage(JsonUtils.toJsonString(msg));
return;
}
if("register".equals(msg.get("type"))){
//设备注册
ActionResult result = thingService.post(
pluginInfo.getPluginId(),
DeviceRegister.builder()
.productKey(strArr[1])
.deviceName(deviceKey)
.model("")
.version("1.0")
.build()
);
if(result.getCode()==0){
thingService.post(
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(strArr[1])
.deviceName(deviceKey)
.state(DeviceState.ONLINE)
.build()
);
}else{
//注册失败
Map<String,String> ret=new HashMap<>();
ret.put("id",msg.get("id"));
ret.put("type",msg.get("type"));
ret.put("result","fail");
wsClient.writeTextMessage(JsonUtils.toJsonString(ret));
return;
}
}
}else if(msg!=null&&"auth".equals(msg.get("type"))){
Set<String> tokenKey=tokens.keySet();
for(String key:tokenKey){
if(StringUtils.isNotBlank(msg.get(key))&&tokens.get(key).equals(msg.get(key))){
//保存设备与连接关系
log.info("认证通过");
wsClients.put(deviceKey, wsClient);
wsClient.writeTextMessage("auth succes");
return;
}
}
log.warn("认证失败,拒绝");
wsClient.writeTextMessage("auth fail");
return;
}else{
log.warn("认证失败,拒绝");
wsClient.writeTextMessage("auth fail");
return;
}
});
wsClient.closeHandler(c -> {
log.warn("client connection closed,deviceKey:{}", deviceKey);
if(wsClients.containsKey(deviceKey)){
wsClients.remove(deviceKey);
thingService.post(
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(strArr[1])
.deviceName(deviceKey)
.state(DeviceState.OFFLINE)
.build()
);
}
});
wsClient.exceptionHandler(ex -> {
log.warn("webSocket client connection exception,deviceKey:{}", deviceKey);
if(wsClients.containsKey(deviceKey)){
wsClients.remove(deviceKey);
thingService.post(
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(strArr[1])
.deviceName(deviceKey)
.state(DeviceState.OFFLINE)
.build()
);
}
});
}).listen(config.getPort(), server -> {
if (server.succeeded()) {
log.info("webSocket server is listening on port " + config.getPort());
if(config.getAccessTokens()!=null){
List<WebsocketConfig.AccessToken> tokenConfig= config.getAccessTokens();
for (WebsocketConfig.AccessToken obj:tokenConfig) {
tokens.put(obj.getTokenName(),obj.getTokenStr());
}
}
} else {
log.error("webSocket server on starting the server", server.cause());
}
});
}
@Override
public void stop() throws Exception {
for (String deviceKey : wsClients.keySet()) {
thingService.post(
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(deviceKey.split("_")[1])
.deviceName(deviceKey)
.state(DeviceState.OFFLINE)
.build()
);
}
tokens.clear();
httpServer.close(voidAsyncResult -> log.info("close webocket server..."));
}
private String getDeviceKey(String productKey, String deviceName) {
return String.format("%s_%s", productKey, deviceName);
}
public void send(String deviceName,String msg) {
ServerWebSocket wsClient = wsClients.get(deviceName);
String msgStr = JsonUtils.toJsonString(msg);
log.info("send msg payload:{}", msgStr);
Future<Void> result = wsClient.writeTextMessage(msgStr);
result.onFailure(e -> log.error("webSocket server send msg failed", e));
}
}

View File

@ -0,0 +1,7 @@
plugin:
runMode: prod
mainPackage: cc.iotkit.plugin
websocket:
port: 1662
accessTokens: [{"tokenName":"test_token","tokenStr":"123456789"}]

View File

@ -0,0 +1,16 @@
[
{
"id": "port",
"name": "端口",
"type": "number",
"value": 1662,
"desc": "websocket端口默认为1662"
},
{
"id": "accessTokens",
"name": "token表",
"type": "json",
"value": "[{'tokenName':'test_token','tokenStr':'123456789'}]",
"desc": "token表可多个"
}
]