feat:修改ws的解析

master
tangfudong 2024-04-17 17:51:01 +08:00
parent a2d8e727d9
commit f7b91b1584
4 changed files with 190 additions and 38 deletions

View File

@ -0,0 +1,38 @@
package cc.iotkit.plugins.websocket.analysis;
import cc.iotkit.common.utils.JsonUtils;
import cc.iotkit.plugins.websocket.beans.Event;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
* @Authortfd
* @Date2024/4/11 15:42
*/
@Slf4j
public class DataAnalysis {
public static final String EVENT_STATE_CHANGED="state_changed";
public static final ObjectMapper mapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
public static Map<String, Object> stateChangedEvent(Event.StateData oldState, Event.StateData newState){
Map<String, Object> ret= new HashMap<>();
if(!oldState.getState().equals(newState.getState())){
ret.put("state",newState.getState());
}
HashMap<String,Object> oldAttributes=JsonUtils.parseObject(JsonUtils.toJsonString(oldState.getAttributes()),HashMap.class);
HashMap<String,Object> newAttributes=JsonUtils.parseObject(JsonUtils.toJsonString(newState.getAttributes()),HashMap.class);
newAttributes.forEach((key, value) -> {
if(ObjectUtil.isNotNull(value)&&!JsonUtils.toJsonString(oldAttributes.get(key)).equals(JsonUtils.toJsonString(newAttributes.get(key)))){
ret.put(StrUtil.toCamelCase(key),value);
}
});
log.info("analysis:{}",JsonUtils.toJsonString(ret));
return ret;
}
}

View File

@ -0,0 +1,34 @@
package cc.iotkit.plugins.websocket.beans;
import lombok.Data;
/**
* @Authortfd
* @Date2024/4/11 16:57
*/
@Data
public class Event {
private String eventType;
private EventData data;
private String origin;
private String timeFired;
private Object context;
@Data
public class EventData{
private String entityId;
private StateData oldState;
private StateData newState;
}
@Data
public static class StateData{
private String entityId;
private String state;
private Object attributes;
private String lastChanged;
private String lastUpdated;
private Object context;
}
}

View File

@ -31,7 +31,7 @@ public class WebsocketDevice implements IDevice {
@Override
public ActionResult propertyGet(PropertyGet action) {
return send(
action.getDeviceName(),
getDeviceKey(action.getDeviceName(),action.getProductKey()),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service.property.get")
@ -43,10 +43,10 @@ public class WebsocketDevice implements IDevice {
@Override
public ActionResult propertySet(PropertySet action) {
return send(
action.getDeviceName(),
getDeviceKey(action.getDeviceName(),action.getProductKey()),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service.property.set")
.put("type", "call_service")
.put("params", action.getParams())
.toString()
);
@ -55,7 +55,7 @@ public class WebsocketDevice implements IDevice {
@Override
public ActionResult serviceInvoke(ServiceInvoke action) {
return send(
action.getDeviceName(),
getDeviceKey(action.getDeviceName(),action.getProductKey()),
new JsonObject()
.put("id", action.getId())
.put("method", "thing.service." + action.getName())
@ -64,10 +64,10 @@ public class WebsocketDevice implements IDevice {
);
}
private ActionResult send(String deviceName, String payload) {
private ActionResult send(String deviceKey, String payload) {
try {
websocketVerticle.send(
deviceName,
deviceKey,
payload
);
return ActionResult.builder().code(0).reason("").build();
@ -78,4 +78,8 @@ public class WebsocketDevice implements IDevice {
}
}
private String getDeviceKey(String deviceName, String productKey) {
return String.format("%s_%s", deviceName, productKey);
}
}

View File

@ -16,7 +16,14 @@ import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.DeviceState;
import cc.iotkit.plugin.core.thing.actions.up.DeviceRegister;
import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange;
import cc.iotkit.plugin.core.thing.actions.up.PropertyReport;
import cc.iotkit.plugin.core.thing.actions.up.SubDeviceRegister;
import cc.iotkit.plugins.websocket.analysis.DataAnalysis;
import cc.iotkit.plugins.websocket.beans.Event;
import cc.iotkit.plugins.websocket.conf.WebsocketConfig;
import cn.hutool.core.util.ObjectUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
import com.gitee.starblues.core.PluginInfo;
import io.vertx.core.AbstractVerticle;
@ -30,9 +37,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -89,7 +94,7 @@ public class WebsocketVerticle extends AbstractVerticle {
}
wsClient.writeTextMessage("connect succes! please auth!");
wsClient.textMessageHandler(message -> {
HashMap<String,String> msg;
HashMap<String,Object> msg;
try{
msg=JsonUtils.parseObject(message,HashMap.class);
}catch (Exception e){
@ -97,49 +102,124 @@ public class WebsocketVerticle extends AbstractVerticle {
wsClient.writeTextMessage("data err");
return;
}
log.info("webSocket receive message:{}",message);
if(wsClients.containsKey(deviceKey)){
if("ping".equals(msg.get("type"))){
if("ping".equals(msg.get("type"))){//心跳
msg.put("type","pong");
wsClient.writeTextMessage(JsonUtils.toJsonString(msg));
return;
}else if("register".equals(msg.get("type"))){//注册
ActionResult result;
List<String> subDevices = null;
if(ObjectUtil.isNotNull(msg.get("subDevices"))){
subDevices=JsonUtils.parseObject(JsonUtils.toJsonString(msg.get("subDevices")),List.class);
List<DeviceRegister> subsRe =new ArrayList<>();
for(String sub:subDevices){
String subName=sub.split("_")[0];
String subKey=sub.split("_")[1];
subsRe.add(DeviceRegister.builder()
.productKey(subKey)
.deviceName(subName)
.build());
}
if("register".equals(msg.get("type"))){
//带子设备注册
result = thingService.post(
pluginInfo.getPluginId(),
SubDeviceRegister.builder()
.productKey(strArr[1])
.deviceName(strArr[0])
.subs(subsRe)
.build()
);
}else{
//设备注册
ActionResult result = thingService.post(
result = thingService.post(
pluginInfo.getPluginId(),
DeviceRegister.builder()
.productKey(strArr[1])
.deviceName(deviceKey)
.model("")
.version("1.0")
.deviceName(strArr[0])
.build()
);
if(result.getCode()==0){
}
if(ObjectUtil.isNotNull(result)&&result.getCode()==0){
log.info("设备上线");
//父设备上线
thingService.post(
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(strArr[1])
.deviceName(deviceKey)
.deviceName(strArr[0])
.state(DeviceState.ONLINE)
.build()
);
//子设备上线
if(ObjectUtil.isNotNull(subDevices)){
log.info("子设备上线");
for(String sub:subDevices){
String subName=sub.split("_")[0];
String subKey=sub.split("_")[1];
thingService.post(
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(subKey)
.deviceName(subName)
.state(DeviceState.ONLINE)
.build()
);
}
}
//注册成功
Map<String,Object> ret=new HashMap<>();
ret.put("id",msg.get("id"));
ret.put("type",msg.get("type"));
ret.put("result","succes");
wsClient.writeTextMessage(JsonUtils.toJsonString(ret));
return;
}else{
//注册失败
Map<String,String> ret=new HashMap<>();
Map<String,Object> ret=new HashMap<>();
ret.put("id",msg.get("id"));
ret.put("type",msg.get("type"));
ret.put("result","fail");
wsClient.writeTextMessage(JsonUtils.toJsonString(ret));
return;
}
}else{//数据处理
if("event".equals(msg.get("type"))){
Event event= null;
try {
event = DataAnalysis.mapper.readValue(JsonUtils.toJsonString(msg.get("event")), new TypeReference<Event>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
String[] keys=event.getData().getEntityId().split("_");
if(DataAnalysis.EVENT_STATE_CHANGED.equals(event.getEventType())){
thingService.post(pluginInfo.getPluginId(),
PropertyReport.builder().productKey(keys[1])
.deviceName(keys[0])
.params(DataAnalysis.stateChangedEvent(event.getData()
.getOldState(),event.getData().getNewState())).build());
}
//注册失败
Map<String,Object> ret=new HashMap<>();
ret.put("id",msg.get("id"));
ret.put("type",msg.get("type"));
ret.put("result","succes");
wsClient.writeTextMessage(JsonUtils.toJsonString(ret));
return;
}
}
}else if(msg!=null&&"auth".equals(msg.get("type"))){
Set<String> tokenKey=tokens.keySet();
for(String key:tokenKey){
if(StringUtils.isNotBlank(msg.get(key))&&tokens.get(key).equals(msg.get(key))){
if(ObjectUtil.isNotNull(msg.get(key))&&tokens.get(key).equals(msg.get(key))){
//保存设备与连接关系
log.info("认证通过");
if(!wsClients.containsKey(deviceKey)){
wsClients.put(deviceKey, wsClient);
}
wsClient.writeTextMessage("auth succes");
return;
}
@ -162,7 +242,7 @@ public class WebsocketVerticle extends AbstractVerticle {
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(strArr[1])
.deviceName(deviceKey)
.deviceName(strArr[0])
.state(DeviceState.OFFLINE)
.build()
);
@ -176,7 +256,7 @@ public class WebsocketVerticle extends AbstractVerticle {
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(strArr[1])
.deviceName(deviceKey)
.deviceName(strArr[0])
.state(DeviceState.OFFLINE)
.build()
);
@ -195,13 +275,13 @@ public class WebsocketVerticle extends AbstractVerticle {
}
@Override
public void stop() throws Exception {
public void stop() {
for (String deviceKey : wsClients.keySet()) {
thingService.post(
pluginInfo.getPluginId(),
DeviceStateChange.builder()
.productKey(deviceKey.split("_")[1])
.deviceName(deviceKey)
.deviceName(deviceKey.split("_")[0])
.state(DeviceState.OFFLINE)
.build()
);
@ -210,12 +290,8 @@ public class WebsocketVerticle extends AbstractVerticle {
httpServer.close(voidAsyncResult -> log.info("close webocket server..."));
}
private String getDeviceKey(String productKey, String deviceName) {
return String.format("%s_%s", productKey, deviceName);
}
public void send(String deviceName,String msg) {
ServerWebSocket wsClient = wsClients.get(deviceName);
public void send(String deviceKey,String msg) {
ServerWebSocket wsClient = wsClients.get(deviceKey);
String msgStr = JsonUtils.toJsonString(msg);
log.info("send msg payload:{}", msgStr);
Future<Void> result = wsClient.writeTextMessage(msgStr);