diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/analysis/DataAnalysis.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/analysis/DataAnalysis.java new file mode 100644 index 0000000..3832fec --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/analysis/DataAnalysis.java @@ -0,0 +1,38 @@ +package cc.iotkit.plugins.websocket.analysis; + +import cc.iotkit.common.utils.JsonUtils; +import cc.iotkit.plugins.websocket.beans.Event; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; + +/** + * @Author:tfd + * @Date:2024/4/11 15:42 + */ +@Slf4j +public class DataAnalysis { + public static final String EVENT_STATE_CHANGED="state_changed"; + public static final ObjectMapper mapper = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE); + + public static Map stateChangedEvent(Event.StateData oldState, Event.StateData newState){ + Map ret= new HashMap<>(); + if(!oldState.getState().equals(newState.getState())){ + ret.put("state",newState.getState()); + } + HashMap oldAttributes=JsonUtils.parseObject(JsonUtils.toJsonString(oldState.getAttributes()),HashMap.class); + HashMap newAttributes=JsonUtils.parseObject(JsonUtils.toJsonString(newState.getAttributes()),HashMap.class); + newAttributes.forEach((key, value) -> { + if(ObjectUtil.isNotNull(value)&&!JsonUtils.toJsonString(oldAttributes.get(key)).equals(JsonUtils.toJsonString(newAttributes.get(key)))){ + ret.put(StrUtil.toCamelCase(key),value); + } + }); + log.info("analysis:{}",JsonUtils.toJsonString(ret)); + return ret; + } +} diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/beans/Event.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/beans/Event.java new file mode 100644 index 0000000..aa4da5d --- /dev/null +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/beans/Event.java @@ -0,0 +1,34 @@ +package cc.iotkit.plugins.websocket.beans; + +import lombok.Data; + +/** + * @Author:tfd + * @Date:2024/4/11 16:57 + */ +@Data +public class Event { + private String eventType; + private EventData data; + private String origin; + private String timeFired; + private Object context; + + @Data + public class EventData{ + private String entityId; + private StateData oldState; + private StateData newState; + } + + @Data + public static class StateData{ + private String entityId; + private String state; + private Object attributes; + private String lastChanged; + private String lastUpdated; + private Object context; + } + +} diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java index c7341a9..bdd420f 100644 --- a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketDevice.java @@ -31,7 +31,7 @@ public class WebsocketDevice implements IDevice { @Override public ActionResult propertyGet(PropertyGet action) { return send( - action.getDeviceName(), + getDeviceKey(action.getDeviceName(),action.getProductKey()), new JsonObject() .put("id", action.getId()) .put("method", "thing.service.property.get") @@ -43,10 +43,10 @@ public class WebsocketDevice implements IDevice { @Override public ActionResult propertySet(PropertySet action) { return send( - action.getDeviceName(), + getDeviceKey(action.getDeviceName(),action.getProductKey()), new JsonObject() .put("id", action.getId()) - .put("method", "thing.service.property.set") + .put("type", "call_service") .put("params", action.getParams()) .toString() ); @@ -55,7 +55,7 @@ public class WebsocketDevice implements IDevice { @Override public ActionResult serviceInvoke(ServiceInvoke action) { return send( - action.getDeviceName(), + getDeviceKey(action.getDeviceName(),action.getProductKey()), new JsonObject() .put("id", action.getId()) .put("method", "thing.service." + action.getName()) @@ -64,10 +64,10 @@ public class WebsocketDevice implements IDevice { ); } - private ActionResult send(String deviceName, String payload) { + private ActionResult send(String deviceKey, String payload) { try { websocketVerticle.send( - deviceName, + deviceKey, payload ); return ActionResult.builder().code(0).reason("").build(); @@ -78,4 +78,8 @@ public class WebsocketDevice implements IDevice { } } + private String getDeviceKey(String deviceName, String productKey) { + return String.format("%s_%s", deviceName, productKey); + } + } diff --git a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java index ccb5f9a..fa486f2 100644 --- a/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java +++ b/websocket-plugin/src/main/java/cc/iotkit/plugins/websocket/service/WebsocketVerticle.java @@ -16,7 +16,14 @@ import cc.iotkit.plugin.core.thing.actions.ActionResult; import cc.iotkit.plugin.core.thing.actions.DeviceState; import cc.iotkit.plugin.core.thing.actions.up.DeviceRegister; import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange; +import cc.iotkit.plugin.core.thing.actions.up.PropertyReport; +import cc.iotkit.plugin.core.thing.actions.up.SubDeviceRegister; +import cc.iotkit.plugins.websocket.analysis.DataAnalysis; +import cc.iotkit.plugins.websocket.beans.Event; import cc.iotkit.plugins.websocket.conf.WebsocketConfig; +import cn.hutool.core.util.ObjectUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.gitee.starblues.bootstrap.annotation.AutowiredType; import com.gitee.starblues.core.PluginInfo; import io.vertx.core.AbstractVerticle; @@ -30,9 +37,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -89,7 +94,7 @@ public class WebsocketVerticle extends AbstractVerticle { } wsClient.writeTextMessage("connect succes! please auth!"); wsClient.textMessageHandler(message -> { - HashMap msg; + HashMap msg; try{ msg=JsonUtils.parseObject(message,HashMap.class); }catch (Exception e){ @@ -97,49 +102,124 @@ public class WebsocketVerticle extends AbstractVerticle { wsClient.writeTextMessage("data err"); return; } + log.info("webSocket receive message:{}",message); if(wsClients.containsKey(deviceKey)){ - if("ping".equals(msg.get("type"))){ + if("ping".equals(msg.get("type"))){//心跳 msg.put("type","pong"); wsClient.writeTextMessage(JsonUtils.toJsonString(msg)); return; - } - if("register".equals(msg.get("type"))){ - //设备注册 - ActionResult result = thingService.post( - pluginInfo.getPluginId(), - DeviceRegister.builder() - .productKey(strArr[1]) - .deviceName(deviceKey) - .model("") - .version("1.0") - .build() - ); - if(result.getCode()==0){ + }else if("register".equals(msg.get("type"))){//注册 + ActionResult result; + List subDevices = null; + if(ObjectUtil.isNotNull(msg.get("subDevices"))){ + subDevices=JsonUtils.parseObject(JsonUtils.toJsonString(msg.get("subDevices")),List.class); + List subsRe =new ArrayList<>(); + for(String sub:subDevices){ + String subName=sub.split("_")[0]; + String subKey=sub.split("_")[1]; + subsRe.add(DeviceRegister.builder() + .productKey(subKey) + .deviceName(subName) + .build()); + } + //带子设备注册 + result = thingService.post( + pluginInfo.getPluginId(), + SubDeviceRegister.builder() + .productKey(strArr[1]) + .deviceName(strArr[0]) + .subs(subsRe) + .build() + ); + }else{ + //设备注册 + result = thingService.post( + pluginInfo.getPluginId(), + DeviceRegister.builder() + .productKey(strArr[1]) + .deviceName(strArr[0]) + .build() + ); + } + if(ObjectUtil.isNotNull(result)&&result.getCode()==0){ + log.info("设备上线"); + //父设备上线 thingService.post( pluginInfo.getPluginId(), DeviceStateChange.builder() .productKey(strArr[1]) - .deviceName(deviceKey) + .deviceName(strArr[0]) .state(DeviceState.ONLINE) .build() ); + //子设备上线 + if(ObjectUtil.isNotNull(subDevices)){ + log.info("子设备上线"); + for(String sub:subDevices){ + String subName=sub.split("_")[0]; + String subKey=sub.split("_")[1]; + thingService.post( + pluginInfo.getPluginId(), + DeviceStateChange.builder() + .productKey(subKey) + .deviceName(subName) + .state(DeviceState.ONLINE) + .build() + ); + + } + } + //注册成功 + Map ret=new HashMap<>(); + ret.put("id",msg.get("id")); + ret.put("type",msg.get("type")); + ret.put("result","succes"); + wsClient.writeTextMessage(JsonUtils.toJsonString(ret)); + return; }else{ //注册失败 - Map ret=new HashMap<>(); + Map ret=new HashMap<>(); ret.put("id",msg.get("id")); ret.put("type",msg.get("type")); ret.put("result","fail"); wsClient.writeTextMessage(JsonUtils.toJsonString(ret)); return; } + }else{//数据处理 + if("event".equals(msg.get("type"))){ + Event event= null; + try { + event = DataAnalysis.mapper.readValue(JsonUtils.toJsonString(msg.get("event")), new TypeReference() {}); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + String[] keys=event.getData().getEntityId().split("_"); + if(DataAnalysis.EVENT_STATE_CHANGED.equals(event.getEventType())){ + thingService.post(pluginInfo.getPluginId(), + PropertyReport.builder().productKey(keys[1]) + .deviceName(keys[0]) + .params(DataAnalysis.stateChangedEvent(event.getData() + .getOldState(),event.getData().getNewState())).build()); + + } + //注册失败 + Map ret=new HashMap<>(); + ret.put("id",msg.get("id")); + ret.put("type",msg.get("type")); + ret.put("result","succes"); + wsClient.writeTextMessage(JsonUtils.toJsonString(ret)); + return; + } } }else if(msg!=null&&"auth".equals(msg.get("type"))){ Set tokenKey=tokens.keySet(); for(String key:tokenKey){ - if(StringUtils.isNotBlank(msg.get(key))&&tokens.get(key).equals(msg.get(key))){ + if(ObjectUtil.isNotNull(msg.get(key))&&tokens.get(key).equals(msg.get(key))){ //保存设备与连接关系 log.info("认证通过"); - wsClients.put(deviceKey, wsClient); + if(!wsClients.containsKey(deviceKey)){ + wsClients.put(deviceKey, wsClient); + } wsClient.writeTextMessage("auth succes"); return; } @@ -162,7 +242,7 @@ public class WebsocketVerticle extends AbstractVerticle { pluginInfo.getPluginId(), DeviceStateChange.builder() .productKey(strArr[1]) - .deviceName(deviceKey) + .deviceName(strArr[0]) .state(DeviceState.OFFLINE) .build() ); @@ -176,7 +256,7 @@ public class WebsocketVerticle extends AbstractVerticle { pluginInfo.getPluginId(), DeviceStateChange.builder() .productKey(strArr[1]) - .deviceName(deviceKey) + .deviceName(strArr[0]) .state(DeviceState.OFFLINE) .build() ); @@ -195,13 +275,13 @@ public class WebsocketVerticle extends AbstractVerticle { } @Override - public void stop() throws Exception { + public void stop() { for (String deviceKey : wsClients.keySet()) { thingService.post( pluginInfo.getPluginId(), DeviceStateChange.builder() .productKey(deviceKey.split("_")[1]) - .deviceName(deviceKey) + .deviceName(deviceKey.split("_")[0]) .state(DeviceState.OFFLINE) .build() ); @@ -210,12 +290,8 @@ public class WebsocketVerticle extends AbstractVerticle { httpServer.close(voidAsyncResult -> log.info("close webocket server...")); } - private String getDeviceKey(String productKey, String deviceName) { - return String.format("%s_%s", productKey, deviceName); - } - - public void send(String deviceName,String msg) { - ServerWebSocket wsClient = wsClients.get(deviceName); + public void send(String deviceKey,String msg) { + ServerWebSocket wsClient = wsClients.get(deviceKey); String msgStr = JsonUtils.toJsonString(msg); log.info("send msg payload:{}", msgStr); Future result = wsClient.writeTextMessage(msgStr);