diff --git a/data/converters/6260396d67aced2696184053/converter.js b/data/converters/6260396d67aced2696184053/converter.js index f543b94d..adf051cc 100755 --- a/data/converters/6260396d67aced2696184053/converter.js +++ b/data/converters/6260396d67aced2696184053/converter.js @@ -109,7 +109,7 @@ this.encode = function (service,device) { var method="thing.service."; var topic="/sys/"+service.productKey+"/"+service.deviceName+"/c/service/"; var params={}; - + //透传下发 if(device.transparent){ var rst=component.transparentEncode(service,device); @@ -152,12 +152,29 @@ this.encode = function (service,device) { method+=identifier; topic="/sys/"+service.productKey+"/"+service.deviceName+"/c/deregister"; } - - for(var p in service.params){ + if(type=="property" && identifier=="get" ){ + var listParams = [] + for(var p in service.params){ + listParams.push(service.params[p]); + } + return { + productKey:service.productKey, + deviceName:service.deviceName, + mid:deviceMid, + content:{ + topic:topic, + payload:JSON.stringify({ + id:deviceMid, + method:method, + params: listParams + }) + } + } + }else{ + for(var p in service.params){ params[p]=service.params[p]; } - - return { + return { productKey:service.productKey, deviceName:service.deviceName, mid:deviceMid, @@ -170,4 +187,8 @@ this.encode = function (service,device) { }) } } + + } + + }; \ No newline at end of file diff --git a/data/init/protocolComponent.json b/data/init/protocolComponent.json index 10372d83..1f6b246d 100644 --- a/data/init/protocolComponent.json +++ b/data/init/protocolComponent.json @@ -17,7 +17,7 @@ "name": "EMQX标准协议组件", "type": "device", "protocol": "mqtt", - "jarFile": "iot-emqx-component-0.4.2-SNAPSHOT.jar", + "jarFile": "iot-emqx-component-0.4.3-SNAPSHOT.jar", "config": "{\"port\":\"1884\",\"ssl\":false,\"type\":\"client\",\"subscribeTopics\":[\"/sys/+/+/s/#\",\"/sys/client/connected\",\"/sys/client/disconnected\",\"/sys/session/subscribed\",\"/sys/session/unsubscribed\"],\"authPort\":\"8088\",\"broker\":\"127.0.0.1\",\"clientId\":\"test\",\"username\":\"test\",\"password\":\"123\"}", "converter": "6260396d67aced2696184053", "state": "stopped", @@ -29,7 +29,7 @@ "name": "小度音箱接入组件", "type": "biz", "protocol": "http", - "jarFile": "iot-http-biz-component-0.4.2-SNAPSHOT.jar", + "jarFile": "iot-http-biz-component-0.4.3-SNAPSHOT.jar", "config": "{\"port\":\"8084\"}", "converter": "", "state": "stopped", diff --git a/data/init/spaceDevice.json b/data/init/spaceDevice.json index f6b242ab..aedae8b9 100755 --- a/data/init/spaceDevice.json +++ b/data/init/spaceDevice.json @@ -6,7 +6,8 @@ "name": "卧室的ZGW01", "homeId": "629e18e96b16ad6a3e158645", "spaceId": "629e18fee0dc6d4171e1a021", - "addAt": 1654609953349 + "addAt": 1654609953349, + "collect":true }, { "id": "629f581ee0dc6d4171e1a028", @@ -16,7 +17,8 @@ "homeId": "629e18e96b16ad6a3e158645", "spaceId": "629e18fee0dc6d4171e1a021", "addAt": 1654609950464, - "useAt": 1655653227177 + "useAt": 1655653227177, + "collect":true }, { "id": "629f581ce0dc6d4171e1a027", @@ -26,7 +28,8 @@ "homeId": "629e18e96b16ad6a3e158645", "spaceId": "629e18fee0dc6d4171e1a021", "addAt": 1654609948550, - "useAt": 1655653294372 + "useAt": 1655653294372, + "collect":false }, { "id": "629f581ae0dc6d4171e1a026", @@ -36,7 +39,8 @@ "homeId": "629e18e96b16ad6a3e158645", "spaceId": "629e18fee0dc6d4171e1a021", "addAt": 1654609946750, - "useAt": 1655653552172 + "useAt": 1655653552172, + "collect":false }, { "id": "629f5818e0dc6d4171e1a025", @@ -46,7 +50,8 @@ "homeId": "629e18e96b16ad6a3e158645", "spaceId": "629e18fee0dc6d4171e1a021", "addAt": 1654609944061, - "useAt": 1655653548172 + "useAt": 1655653548172, + "collect":false }, { "id": "629f5734e0dc6d4171e1a024", @@ -56,7 +61,8 @@ "homeId": "629e18e96b16ad6a3e158645", "spaceId": "629e18fee0dc6d4171e1a021", "addAt": 1654609716856, - "useAt": 1655653590474 + "useAt": 1655653590474, + "collect":false }, { "id": "629e1a1de0dc6d4171e1a022", @@ -66,7 +72,8 @@ "homeId": "629e18e96b16ad6a3e158645", "spaceId": "629e18fee0dc6d4171e1a021", "addAt": 1654528541149, - "useAt": 1655653255089 + "useAt": 1655653255089, + "collect":false }, { "id": "629db2e1e5a005209d182877", @@ -76,7 +83,8 @@ "homeId": "6238a49fecf37861bed7ad11", "spaceId": "62794a820b0776663635e636", "addAt": 1654502113734, - "useAt": 1655653724374 + "useAt": 1655653724374, + "collect":false }, { "id": "6280d7cd7e234141ee9d1fd2", @@ -86,7 +94,8 @@ "homeId": "6238a49fecf37861bed7ad11", "spaceId": "62794a5c0b0776663635e633", "addAt": 1652611021619, - "useAt": 1655181137254 + "useAt": 1655181137254, + "collect":false }, { "id": "6280b17d7e234141ee9d1fcf", @@ -96,6 +105,7 @@ "homeId": "6238a49fecf37861bed7ad11", "spaceId": "62794a5c0b0776663635e633", "addAt": 1652601213676, - "useAt": 1655189186348 + "useAt": 1655189186348, + "collect":false } ] \ No newline at end of file diff --git a/iot-common/src/main/java/cc/iotkit/common/Constants.java b/iot-common/src/main/java/cc/iotkit/common/Constants.java index 3e990142..fdcd4f9e 100755 --- a/iot-common/src/main/java/cc/iotkit/common/Constants.java +++ b/iot-common/src/main/java/cc/iotkit/common/Constants.java @@ -155,6 +155,10 @@ public interface Constants { * 设备-服务调用 */ String INVOKE_SERVICE = "/{deviceId}/service/{service}/invoke"; + /** + * 设备-属性获取 + */ + String INVOKE_SERVICE_PROPERTY_GET = "/{deviceId}/service/property/get"; } @@ -170,6 +174,11 @@ public interface Constants { */ String RECENT_DEVICES = "/myRecentDevices"; + /** + * 获取用户当前收藏设备 + */ + String GET_COLLECT_DEVICES = "/getCollectDevices"; + /** * 我的空间设备列表 */ @@ -195,6 +204,11 @@ public interface Constants { */ String SAVE_DEVICE = "/saveDevice"; + /** + * 收藏/取消收藏设备 + */ + String COLLECT_DEVICE = "/collectDevice"; + /** * 获取空间设备信息 */ diff --git a/iot-components/iot-component-server/src/main/java/cc/iotkit/comps/DeviceMessageHandler.java b/iot-components/iot-component-server/src/main/java/cc/iotkit/comps/DeviceMessageHandler.java index 8ec7e335..acc48771 100755 --- a/iot-components/iot-component-server/src/main/java/cc/iotkit/comps/DeviceMessageHandler.java +++ b/iot-components/iot-component-server/src/main/java/cc/iotkit/comps/DeviceMessageHandler.java @@ -78,10 +78,9 @@ public class DeviceMessageHandler implements IMessageHandler { public void onReceive(Map head, String type, String msg, Consumer onResult) { executorService.submit(() -> { try { - Map rst = scriptEngine.invokeMethod(new TypeReference<>() { + Map rst = scriptEngine.invokeMethod(new TypeReference<>() { }, "onReceive", head, type, msg); Object objType = rst.get("type"); - log.info("onReceive script result:{}", objType); if (objType == null) { onResult.accept(null); return; @@ -99,19 +98,15 @@ public class DeviceMessageHandler implements IMessageHandler { switch (objType.toString()) { case "register": - if (action != null && Action.TYPE_ACK.equals(action.getType())) { - doAction(action); - } else { - //注册数据 - RegisterInfo regInfo = MessageParser.parseRegisterInfo(data); - if (regInfo == null) { - onResult.accept(null); - return; - } - doRegister(regInfo); - doAction(action); - onResult.accept(new ReceiveResult(regInfo.getProductKey(), regInfo.getDeviceName(), regInfo)); + //注册数据 + RegisterInfo regInfo = MessageParser.parseRegisterInfo(data); + if (regInfo == null) { + onResult.accept(null); + return; } + doRegister(regInfo); + doAction(action); + onResult.accept(new ReceiveResult(regInfo.getProductKey(), regInfo.getDeviceName(), regInfo)); return; case "auth": //设备认证 diff --git a/iot-components/iot-component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java b/iot-components/iot-component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java index 0eb99d66..e6948f82 100755 --- a/iot-components/iot-component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java +++ b/iot-components/iot-component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java @@ -54,14 +54,15 @@ public class DeviceBehaviourService { DeviceInfo deviceInfo = register(null, info); //子设备注册 List subDevices = info.getSubDevices(); - if (subDevices != null && subDevices.size() != 0) { - for (RegisterInfo.SubDevice subDevice : subDevices) { - register(deviceInfo.getDeviceId(), - new RegisterInfo(subDevice.getProductKey(), - subDevice.getDeviceName(), - subDevice.getModel(), - subDevice.getTag(), null)); - } + if (subDevices == null) { + return; + } + for (RegisterInfo.SubDevice subDevice : subDevices) { + register(deviceInfo.getDeviceId(), + new RegisterInfo(subDevice.getProductKey(), + subDevice.getDeviceName(), + subDevice.getModel(), + subDevice.getTag(), null)); } } catch (BizException e) { log.error("register device error", e); @@ -175,6 +176,13 @@ public class DeviceBehaviourService { } + public boolean isOnline(String productKey, + String deviceName) { + DeviceInfo device = deviceInfoData.findByProductKeyAndDeviceName(productKey, deviceName); + DeviceInfo deviceInfo = deviceInfoData.findByDeviceId(device.getDeviceId()); + return deviceInfo.getState().isOnline(); + } + public void deviceStateChange(String productKey, String deviceName, boolean online) { diff --git a/iot-components/iot-emqx-component/src/main/java/cc/iotkit/comp/emqx/JsScripter.java b/iot-components/iot-emqx-component/src/main/java/cc/iotkit/comp/emqx/JsScripter.java index 23dcee1a..47238686 100755 --- a/iot-components/iot-emqx-component/src/main/java/cc/iotkit/comp/emqx/JsScripter.java +++ b/iot-components/iot-emqx-component/src/main/java/cc/iotkit/comp/emqx/JsScripter.java @@ -14,6 +14,7 @@ import cc.iotkit.model.device.message.ThingModelMessage; import cc.iotkit.model.product.ProductModel; import cc.iotkit.script.IScriptEngine; import cc.iotkit.script.ScriptEngineFactory; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -40,11 +41,13 @@ public class JsScripter implements IScripter { @SneakyThrows public ThingModelMessage decode(TransparentMsg msg) { - return scriptEngine.invokeMethod(ThingModelMessage.class, "decode", msg).get(0); + return scriptEngine.invokeMethod(new TypeReference<>() { + }, "decode", msg); } @SneakyThrows public TransparentMsg encode(ThingService service) { - return scriptEngine.invokeMethod(TransparentMsg.class, "encode", service).get(0); + return scriptEngine.invokeMethod(new TypeReference<>() { + }, "encode", service); } } diff --git a/iot-components/iot-http-biz-component/src/main/java/cc/iotkit/comp/biz/HttpBizComponent.java b/iot-components/iot-http-biz-component/src/main/java/cc/iotkit/comp/biz/HttpBizComponent.java index 56575776..8d659ed3 100755 --- a/iot-components/iot-http-biz-component/src/main/java/cc/iotkit/comp/biz/HttpBizComponent.java +++ b/iot-components/iot-http-biz-component/src/main/java/cc/iotkit/comp/biz/HttpBizComponent.java @@ -14,6 +14,7 @@ import cc.iotkit.comp.CompConfig; import cc.iotkit.comp.IComponent; import cc.iotkit.script.IScriptEngine; import cc.iotkit.script.ScriptEngineFactory; +import com.fasterxml.jackson.core.type.TypeReference; import io.vertx.core.MultiMap; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; @@ -76,13 +77,15 @@ public class HttpBizComponent implements IComponent { String response; try { HttpContent content = - scriptEngine.invokeMethod(HttpContent.class, + scriptEngine.invokeMethod( + new TypeReference<>() { + }, "onReceive", httpRequest.method().name(), httpRequest.path(), httpHeader, httpParams, - body).get(0); + body); responseHeader = content.getHeader(); response = content.getContent(); response = response == null ? "" : response; diff --git a/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/JsScripter.java b/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/JsScripter.java index 1c7d5a61..ba1d7c50 100755 --- a/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/JsScripter.java +++ b/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/JsScripter.java @@ -14,6 +14,7 @@ import cc.iotkit.model.device.message.ThingModelMessage; import cc.iotkit.model.product.ProductModel; import cc.iotkit.script.IScriptEngine; import cc.iotkit.script.ScriptEngineFactory; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -40,12 +41,14 @@ public class JsScripter implements IScripter { @SneakyThrows public ThingModelMessage decode(TransparentMsg msg) { - return scriptEngine.invokeMethod(ThingModelMessage.class, "decode", msg).get(0); + return scriptEngine.invokeMethod(new TypeReference<>() { + }, "decode", msg); } @SneakyThrows public TransparentMsg encode(ThingService service) { - return scriptEngine.invokeMethod(TransparentMsg.class, "encode", service).get(0); + return scriptEngine.invokeMethod(new TypeReference<>() { + }, "encode", service); } } diff --git a/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttConfig.java b/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttConfig.java index 68dea236..1c9069ab 100755 --- a/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttConfig.java +++ b/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttConfig.java @@ -22,4 +22,6 @@ public class MqttConfig { private boolean ssl; + private boolean useWebSocket; + } diff --git a/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java b/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java index b12a4ce2..aec0ee70 100755 --- a/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java +++ b/iot-components/iot-mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @Slf4j public class MqttVerticle extends AbstractVerticle { @@ -41,6 +42,9 @@ public class MqttVerticle extends AbstractVerticle { private final Map endpointMap = new HashMap<>(); + // 增加一个客户端连接clientid-连接状态池,避免mqtt关闭的时候走异常断开和mqtt断开的handler,导致多次离线消息 + private static final Map mqttConnectPool = new ConcurrentHashMap<>(); + public MqttVerticle(MqttConfig config) { this.config = config; } @@ -59,6 +63,7 @@ public class MqttVerticle extends AbstractVerticle { .setKeyPath(config.getSslKey()) .setCertPath(config.getSslCert())); } + options.setUseWebSocket(config.isUseWebSocket()); mqttServer = MqttServer.create(vertx, options); mqttServer.endpointHandler(endpoint -> { @@ -84,6 +89,7 @@ public class MqttVerticle extends AbstractVerticle { } //保存设备与连接关系 endpointMap.put(getEndpointKey(r), endpoint); + mqttConnectPool.put(clientId, true); }); } catch (Throwable e) { log.error("auth failed", e); @@ -96,6 +102,7 @@ public class MqttVerticle extends AbstractVerticle { endpoint.accept(false); endpoint.closeHandler((v) -> { log.warn("client connection closed,clientId:{}", clientId); + if (mqttConnectPool.get(clientId) == false) return; executor.onReceive(new HashMap<>(), "disconnect", clientId, (r) -> { //删除设备与连接关系 endpointMap.remove(getEndpointKey(r)); @@ -105,6 +112,7 @@ public class MqttVerticle extends AbstractVerticle { executor.onReceive(new HashMap<>(), "disconnect", clientId, (r) -> { //删除设备与连接关系 endpointMap.remove(getEndpointKey(r)); + mqttConnectPool.put(clientId, false); }); }).subscribeHandler(subscribe -> { List reasonCodes = new ArrayList<>(); diff --git a/iot-components/iot-mqtt-component/src/main/resources/convert.js b/iot-components/iot-mqtt-component/src/main/resources/convert.js index 975c5f1c..cec5b1ff 100644 --- a/iot-components/iot-mqtt-component/src/main/resources/convert.js +++ b/iot-components/iot-mqtt-component/src/main/resources/convert.js @@ -152,22 +152,43 @@ this.encode = function (service,device) { method+=identifier; topic="/sys/"+service.productKey+"/"+service.deviceName+"/c/deregister"; } - - for(var p in service.params){ - params[p]=service.params[p]; - } - - return { - productKey:service.productKey, - deviceName:service.deviceName, - mid:deviceMid, - content:{ - topic:topic, - payload:JSON.stringify({ - id:deviceMid, - method:method, - params:params - }) + if(type=="property" && identifier=="get" ){ + var listParams = [] + for(var p in service.params){ + listParams.push(service.params[p]); } + return { + productKey:service.productKey, + deviceName:service.deviceName, + mid:deviceMid, + content:{ + topic:topic, + payload:JSON.stringify({ + id:deviceMid, + method:method, + params: listParams + }) + } + } + }else{ + for(var p in service.params){ + params[p]=service.params[p]; + } + return { + productKey:service.productKey, + deviceName:service.deviceName, + mid:deviceMid, + content:{ + topic:topic, + payload:JSON.stringify({ + id:deviceMid, + method:method, + params:params + }) + } + } + } -}; \ No newline at end of file + + +}; diff --git a/iot-components/iot-websocket-component/pom.xml b/iot-components/iot-websocket-component/pom.xml new file mode 100644 index 00000000..7b1c17c7 --- /dev/null +++ b/iot-components/iot-websocket-component/pom.xml @@ -0,0 +1,89 @@ + + + + iot-components + cc.iotkit + 0.4.3-SNAPSHOT + + 4.0.0 + + iot-websocket-component + + + + + io.vertx + vertx-core + + + + org.projectlombok + lombok + + + + org.slf4j + slf4j-api + + + + org.luaj + luaj-jse + + + + cc.iotkit + iot-common + + + + cc.iotkit + iot-component-base + + + + cc.iotkit + iot-data-service + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + + + io.vertx:vertx-core + org.luaj:luaj-jse + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 11 + 11 + true + false + + + + + + \ No newline at end of file diff --git a/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/AbstractDeviceVerticle.java b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/AbstractDeviceVerticle.java new file mode 100644 index 00000000..e2d3c67e --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/AbstractDeviceVerticle.java @@ -0,0 +1,18 @@ +package cc.iotkit.comp.websocket; + +import cc.iotkit.comp.IMessageHandler; +import cc.iotkit.converter.DeviceMessage; +import io.vertx.core.AbstractVerticle; +import lombok.Data; + +@Data +public abstract class AbstractDeviceVerticle extends AbstractVerticle { + + public static final String TYPE_SERVER = "server"; + public static final String TYPE_CLIENT = "client"; + + protected IMessageHandler executor; + + public abstract DeviceMessage send(DeviceMessage message); + +} diff --git a/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/WebSocketDeviceComponent.java b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/WebSocketDeviceComponent.java new file mode 100644 index 00000000..cc01fc36 --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/WebSocketDeviceComponent.java @@ -0,0 +1,111 @@ +package cc.iotkit.comp.websocket; + +import cc.iotkit.common.exception.BizException; +import cc.iotkit.common.utils.JsonUtil; +import cc.iotkit.comp.AbstractDeviceComponent; +import cc.iotkit.comp.CompConfig; +import cc.iotkit.comp.model.DeviceState; +import cc.iotkit.comp.websocket.client.WebSocketClientVerticle; +import cc.iotkit.comp.websocket.server.WebSocketServerVerticle; +import cc.iotkit.converter.DeviceMessage; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import lombok.*; +import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +@Slf4j +public class WebSocketDeviceComponent extends AbstractDeviceComponent { + + private Vertx vertx; + private CountDownLatch countDownLatch; + private String deployedId; + private AbstractDeviceVerticle webSocketVerticle; + private String type; + private final Map deviceChildToParent = new HashMap<>(); + + public void create(CompConfig config) { + super.create(config); + vertx = Vertx.vertx(); + type= JsonUtil.parse(config.getOther(), Map.class).get("type").toString(); + if(AbstractDeviceVerticle.TYPE_CLIENT.equals(type)){ + webSocketVerticle = new WebSocketClientVerticle(config.getOther()); + }else{ + webSocketVerticle = new WebSocketServerVerticle(config.getOther()); + } + } + + public void start() { + try { + webSocketVerticle.setExecutor(getHandler()); + countDownLatch = new CountDownLatch(1); + Future future = vertx.deployVerticle(webSocketVerticle); + future.onSuccess((s -> { + deployedId = s; + countDownLatch.countDown(); + })); + future.onFailure((e) -> { + countDownLatch.countDown(); + log.error("start websocket component failed", e); + }); + countDownLatch.await(); + future.succeeded(); + } catch (Throwable e) { + throw new BizException("start websocket component error", e); + } + } + + @SneakyThrows + public void stop() { + webSocketVerticle.stop(); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> log.info("stop websocket component success")); + } + + public void destroy() { + } + + @Override + public void onDeviceStateChange(DeviceState state) { + DeviceState.Parent parent = state.getParent(); + if (parent == null) { + return; + } + Device device = new Device(state.getProductKey(), state.getDeviceName()); + + if (DeviceState.STATE_ONLINE.equals(state.getState())) { + //保存子设备所属父设备 + deviceChildToParent.put(device.toString(), + new Device(parent.getProductKey(), parent.getDeviceName()) + ); + } else { + //删除关系 + deviceChildToParent.remove(device.toString()); + } + + } + + @Override + public DeviceMessage send(DeviceMessage message) { + webSocketVerticle.send(message); + return message; + } + + @Override + public CompConfig getConfig() { + return config; + } + + + @Data + @NoArgsConstructor + @AllArgsConstructor + @ToString + public static class Device { + private String productKey; + private String deviceName; + } + +} diff --git a/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/client/WebSocketClientConfig.java b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/client/WebSocketClientConfig.java new file mode 100644 index 00000000..2867ca71 --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/client/WebSocketClientConfig.java @@ -0,0 +1,20 @@ +package cc.iotkit.comp.websocket.client; + +import lombok.Data; + +@Data +public class WebSocketClientConfig { + + private int port; + + private String ip; + + private String url; + + private long heartBeatTime; + + private String heartBeatData; + + private boolean ssl; + +} diff --git a/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/client/WebSocketClientVerticle.java b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/client/WebSocketClientVerticle.java new file mode 100644 index 00000000..40a46fa1 --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/client/WebSocketClientVerticle.java @@ -0,0 +1,124 @@ +package cc.iotkit.comp.websocket.client; + +import cc.iotkit.common.exception.BizException; +import cc.iotkit.common.utils.JsonUtil; +import cc.iotkit.comp.model.ReceiveResult; +import cc.iotkit.comp.model.RegisterInfo; +import cc.iotkit.comp.websocket.AbstractDeviceVerticle; +import cc.iotkit.converter.DeviceMessage; +import io.vertx.core.Future; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.WebSocket; +import io.vertx.core.http.WebSocketConnectOptions; +import lombok.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +public class WebSocketClientVerticle extends AbstractDeviceVerticle { + + private HttpClient httpClient; + + private WebSocket webSocketClient; + + private WebSocketClientConfig webSocketConfig; + + private long timerID; + + private final Map devices = new ConcurrentHashMap<>(); + + public void setWebSocketClient(WebSocket webSocketClient) { + this.webSocketClient = webSocketClient; + } + + public WebSocketClientVerticle(String config) { + this.webSocketConfig = JsonUtil.parse(config, WebSocketClientConfig.class); + } + + public void start() { + WebSocketConnectOptions options = new WebSocketConnectOptions().setPort(webSocketConfig.getPort()) + .setHost(webSocketConfig.getIp()).setURI(webSocketConfig.getUrl()).setSsl(webSocketConfig.isSsl()); + httpClient = vertx.createHttpClient(); + httpClient.webSocket(options).onSuccess(ws -> { + setWebSocketClient(ws); + log.info("webSocket client connect success!"); + ws.textMessageHandler(data -> { + log.info("webSocket client receive msg:" + data); + executor.onReceive(new HashMap<>(), null, data, (ret) -> { + if (ret != null && ret.getData() instanceof RegisterInfo) { + executor.onReceive(null, "connected", data, (r) -> { + if (!devices.containsKey(getDeviceKey(r))) { + devices.put(getDeviceKey(r), new Device(r.getDeviceName(), r.getProductKey())); + } + }); + } + }); + }); + ws.closeHandler(e -> { + for (String deviceKey : devices.keySet()) { + executor.onReceive(null, "disconnect", deviceKey); + } + log.warn("client connection closed!"); + }); + ws.exceptionHandler(e -> { + for (String deviceKey : devices.keySet()) { + executor.onReceive(null, "disconnect", deviceKey); + } + log.error("webSocket client connect exception!"); + }); + if (webSocketConfig.getHeartBeatTime() > 0 && StringUtils.isNotBlank(webSocketConfig.getHeartBeatData())) { + timerID = vertx.setPeriodic(webSocketConfig.getHeartBeatTime(), t -> { + if (webSocketClient.isClosed()) { + vertx.cancelTimer(timerID); + } + executor.onReceive(new HashMap<>(), "ping", JsonUtil.toJsonString(webSocketConfig)); + }); + } + }).onFailure(e -> { + log.info("webSocket client connect failed!"); + }); + } + + @SneakyThrows + public void stop() { + vertx.cancelTimer(timerID); + for (String deviceKey : devices.keySet()) { + executor.onReceive(null, "disconnect", deviceKey); + } + httpClient.close(); + } + + @Override + public DeviceMessage send(DeviceMessage message) { + Object obj = message.getContent(); + if (!(obj instanceof Map)) { + throw new BizException("message content is not Map"); + } + String msgStr = JsonUtil.toJsonString(obj); + log.info("send msg payload:{}", msgStr); + Future result = webSocketClient.writeTextMessage(msgStr); + result.onFailure(e -> log.error("webSocket client send msg failed", e)); + return message; + } + + private String getDeviceKey(ReceiveResult result) { + return getDeviceKey(result.getProductKey(), result.getDeviceName()); + } + + private String getDeviceKey(String productKey, String deviceName) { + return String.format("%s_%s", productKey, deviceName); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + @ToString + public static class Device { + private String productKey; + private String deviceName; + } +} diff --git a/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerConfig.java b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerConfig.java new file mode 100644 index 00000000..e0fa0aa8 --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerConfig.java @@ -0,0 +1,25 @@ +package cc.iotkit.comp.websocket.server; + +import lombok.Data; + +import java.util.List; + +@Data +public class WebSocketServerConfig { + + private int port; + + private String sslKey; + + private String sslCert; + + private boolean ssl; + + private List accessTokens; + + @Data + public static class AccessToken{ + private String tokenName; + private String tokenStr; + } +} diff --git a/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerVerticle.java b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerVerticle.java new file mode 100644 index 00000000..828b174b --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/java/cc/iotkit/comp/websocket/server/WebSocketServerVerticle.java @@ -0,0 +1,145 @@ +package cc.iotkit.comp.websocket.server; + + +import cc.iotkit.common.exception.BizException; +import cc.iotkit.common.utils.JsonUtil; +import cc.iotkit.comp.model.ReceiveResult; +import cc.iotkit.comp.websocket.AbstractDeviceVerticle; +import cc.iotkit.converter.DeviceMessage; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.util.JSONPObject; +import io.vertx.core.Future; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.ServerWebSocket; +import io.vertx.core.net.PemKeyCertOptions; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + + +@Slf4j +public class WebSocketServerVerticle extends AbstractDeviceVerticle { + + + private HttpServer httpServer; + + private WebSocketServerConfig webSocketConfig; + + private final Map wsClients = new ConcurrentHashMap<>(); + + public WebSocketServerVerticle(String config) { + this.webSocketConfig = JsonUtil.parse(config, WebSocketServerConfig.class); + } + + private Map tokens=new HashMap<>(); + + @Override + public void start() throws Exception { + HttpServerOptions options = new HttpServerOptions() + .setPort(webSocketConfig.getPort()); + if (webSocketConfig.isSsl()) { + options = options.setSsl(true) + .setKeyCertOptions(new PemKeyCertOptions() + .setKeyPath(webSocketConfig.getSslKey()) + .setCertPath(webSocketConfig.getSslCert())); + } + httpServer = vertx.createHttpServer(options).webSocketHandler(wsClient -> { + log.info("webSocket client connect sessionId:{},path={}", wsClient.textHandlerID(), wsClient.path()); + String deviceKey = wsClient.path().replace("/",""); + if(StringUtils.isBlank(deviceKey)||deviceKey.split("_").length!=2){ + log.warn("陌生连接,拒绝"); + wsClient.reject(); + return; + } + wsClient.writeTextMessage("connect succes! please auth!"); + Map deviceKeyObj=new HashMap<>(); + deviceKeyObj.put("deviceKey",deviceKey); + wsClient.textMessageHandler(message -> { + HashMap msg= JsonUtil.parse(message,HashMap.class); + if(wsClients.containsKey(deviceKey)){ + executor.onReceive(new HashMap<>(), "", message); + }else if(msg!=null&&"auth".equals(msg.get("type"))){ + Set tokenKey=tokens.keySet(); + for(String key:tokenKey){ + if(StringUtils.isNotBlank(msg.get(key))&&tokens.get(key).equals(msg.get(key))){ + //保存设备与连接关系 + log.info("认证通过"); + wsClients.put(deviceKey, wsClient); + wsClient.writeTextMessage("auth succes"); + return; + } + } + log.warn("认证失败,拒绝"); + wsClient.writeTextMessage("auth fail"); + return; + }else{ + log.warn("认证失败,拒绝"); + wsClient.writeTextMessage("auth fail"); + return; + } + + }); + wsClient.closeHandler(c -> { + log.warn("client connection closed,deviceKey:{}", deviceKey); + executor.onReceive(new HashMap<>(), "disconnect", JsonUtil.toJsonString(deviceKeyObj), (r) -> { + //删除设备与连接关系 + if(r!=null){ + wsClients.remove(getDeviceKey(r)); + } + }); + }); + wsClient.exceptionHandler(ex -> { + log.warn("webSocket client connection exception,deviceKey:{}", deviceKey); + }); + }).listen(webSocketConfig.getPort(), server -> { + if (server.succeeded()) { + log.info("webSocket server is listening on port " + webSocketConfig.getPort()); + List tokenConfig= webSocketConfig.getAccessTokens(); + for (WebSocketServerConfig.AccessToken obj:tokenConfig) { + tokens.put(obj.getTokenName(),obj.getTokenStr()); + } + } else { + log.error("webSocket server on starting the server", server.cause()); + } + }); + } + + @Override + public void stop() throws Exception { + for (String deviceKey : wsClients.keySet()) { + Map deviceKeyObj=new HashMap<>(); + deviceKeyObj.put("deviceKey",deviceKey); + executor.onReceive(null, "disconnect", JsonUtil.toJsonString(deviceKeyObj)); + } + tokens.clear(); + httpServer.close(voidAsyncResult -> log.info("close webocket server...")); + } + + private String getDeviceKey(ReceiveResult result) { + return getDeviceKey(result.getProductKey(), result.getDeviceName()); + } + + private String getDeviceKey(String productKey, String deviceName) { + return String.format("%s_%s", productKey, deviceName); + } + + @Override + public DeviceMessage send(DeviceMessage message) { + ServerWebSocket wsClient = wsClients.get(getDeviceKey(message.getProductKey(), message.getDeviceName())); + Object obj = message.getContent(); + if (!(obj instanceof Map)) { + throw new BizException("message content is not Map"); + } + String msgStr = JsonUtil.toJsonString(obj); + log.info("send msg payload:{}", msgStr); + Future result = wsClient.writeTextMessage(msgStr); + result.onFailure(e -> log.error("webSocket server send msg failed", e)); + return message; + } +} diff --git a/iot-components/iot-websocket-component/src/main/resources/component.js b/iot-components/iot-websocket-component/src/main/resources/component.js new file mode 100644 index 00000000..b63e09ff --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/resources/component.js @@ -0,0 +1,72 @@ +var mid=1; + +var access_token=""; + +function getMid(){ + mid++; + if(mid>10000){ + mid=1; + } + return mid; +}; +function getPingData(data){ + var ping={ + productKey:"", + deviceName:"", + content:{ + id:getMid(), + type:data + } + }; + return { + type:"action", + data:{ + productKey:"", + deviceName:"", + state:"" + }, + action:{ + type:"ack", + content:JSON.stringify(ping) + } + } +}; +//必须提供onReceive方法 +this.onReceive=function(head,type,payload){ + var data=JSON.parse(payload) + if(data.type=="auth_required"){ + var auth={ + productKey:"", + deviceName:"", + content:{ + type:"auth", + access_token:access_token + } + }; + return { + type:"action", + data:{ + productKey:"", + deviceName:"", + state:"" + }, + action:{ + type:"ack", + content:JSON.stringify(auth) + } + } + }else if(data.type=="auth_ok"){ + return getPingData(data.heartBeatData); + }else if(data.type=="pong"){ + apiTool.log("receive pong!"); + }else if("ping"==type){ + return getPingData(data.heartBeatData); + } + return { + productKey:"", + deviceName:"", + mid:0, + content:{ + } + } +}; \ No newline at end of file diff --git a/iot-components/iot-websocket-component/src/main/resources/component.spi b/iot-components/iot-websocket-component/src/main/resources/component.spi new file mode 100644 index 00000000..aa6c2a0f --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/resources/component.spi @@ -0,0 +1 @@ +cc.iotkit.comp.websocket.WebSocketDeviceComponent \ No newline at end of file diff --git a/iot-components/iot-websocket-component/src/main/resources/converter.js b/iot-components/iot-websocket-component/src/main/resources/converter.js new file mode 100644 index 00000000..0dafe522 --- /dev/null +++ b/iot-components/iot-websocket-component/src/main/resources/converter.js @@ -0,0 +1,55 @@ + +var mid=1; + +function getMid(){ + mid++; + if(mid>10000){ + mid=1; + } + return mid+""; +} + +this.decode = function (msg) { + //对msg进行解析,并返回物模型数据 + var content=msg.content; + var type = content.type; + + if (type=="report") { + //属性上报 + return { + mid: msg.mid, + productKey: msg.productKey, + deviceName: msg.deviceName, + type:"property", + identifier: "report", //属性上报 + occur: new Date().getTime(), //时间戳,设备上的事件或数据产生的本地时间 + time: new Date().getTime(), //时间戳,消息上报时间 + data: content.params, + }; + } + return null; +}; + +this.encode = function (service,device) { + var type=service.type; + var identifier=service.identifier; + var entityId=service.deviceName; + var deviceMid=getMid(); + var params={}; + var target={}; + if("property"==type&&"set"==identifier){ + var domain=entityId.split(".")[0]; + var powerstate=service.params.powerstate==1?"turn_on":"turn_off"; + params.type="call_service"; + params.domain=domain; + params.service=powerstate; + target.entity_id=entityId; + params.target=target; + } + return { + productKey:service.productKey, + deviceName:service.deviceName, + mid:deviceMid, + content:params + } +}; \ No newline at end of file diff --git a/iot-data/iot-data-service/src/main/java/cc/iotkit/data/ISpaceDeviceData.java b/iot-data/iot-data-service/src/main/java/cc/iotkit/data/ISpaceDeviceData.java index 2c5331f7..b376f4bc 100755 --- a/iot-data/iot-data-service/src/main/java/cc/iotkit/data/ISpaceDeviceData.java +++ b/iot-data/iot-data-service/src/main/java/cc/iotkit/data/ISpaceDeviceData.java @@ -17,6 +17,8 @@ public interface ISpaceDeviceData extends IOwnedData { List findByUidOrderByUseAtDesc(String uid); + List findByHomeIdAndCollect(String homeId,boolean collect); + List findByUidOrderByAddAtDesc(String uid); List findBySpaceIdOrderByAddAtDesc(String spaceId); diff --git a/iot-data/iot-model/src/main/java/cc/iotkit/model/space/SpaceDevice.java b/iot-data/iot-model/src/main/java/cc/iotkit/model/space/SpaceDevice.java index 1dd055a6..77801246 100755 --- a/iot-data/iot-model/src/main/java/cc/iotkit/model/space/SpaceDevice.java +++ b/iot-data/iot-model/src/main/java/cc/iotkit/model/space/SpaceDevice.java @@ -63,4 +63,9 @@ public class SpaceDevice implements Owned { */ private Long useAt; + /** + * 是否收藏 + */ + private Boolean collect; + } diff --git a/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/dao/SpaceDeviceRepository.java b/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/dao/SpaceDeviceRepository.java index e79870bc..09eed8d6 100755 --- a/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/dao/SpaceDeviceRepository.java +++ b/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/dao/SpaceDeviceRepository.java @@ -24,6 +24,8 @@ public interface SpaceDeviceRepository extends JpaRepository findByUidOrderByUseAtDesc(String uid); + List findByHomeIdAndCollect(String homeId,boolean collect); + List findByUidOrderByAddAtDesc(String uid); List findBySpaceIdOrderByAddAtDesc(String spaceId); diff --git a/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/model/TbSpaceDevice.java b/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/model/TbSpaceDevice.java index 9645a06d..58f81b59 100755 --- a/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/model/TbSpaceDevice.java +++ b/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/model/TbSpaceDevice.java @@ -63,4 +63,9 @@ public class TbSpaceDevice { */ private Long useAt; + /** + * 是否收藏 + */ + private Boolean collect; + } diff --git a/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/service/SpaceDeviceDataImpl.java b/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/service/SpaceDeviceDataImpl.java index ee7df8b7..d1321307 100755 --- a/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/service/SpaceDeviceDataImpl.java +++ b/iot-data/iot-rdb-data-service/src/main/java/cc/iotkit/data/service/SpaceDeviceDataImpl.java @@ -34,6 +34,11 @@ public class SpaceDeviceDataImpl implements ISpaceDeviceData { return SpaceDeviceMapper.toDto(spaceDeviceRepository.findByUidOrderByUseAtDesc(uid)); } + @Override + public List findByHomeIdAndCollect(String homeId,boolean collect) { + return SpaceDeviceMapper.toDto(spaceDeviceRepository.findByHomeIdAndCollect(homeId,collect)); + } + @Override public List findByUidOrderByAddAtDesc(String uid) { return SpaceDeviceMapper.toDto(spaceDeviceRepository.findByUidOrderByAddAtDesc(uid)); diff --git a/iot-data/iot-ts-temporal-service/pom.xml b/iot-data/iot-ts-temporal-service/pom.xml new file mode 100644 index 00000000..213c8638 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/pom.xml @@ -0,0 +1,97 @@ + + + 4.0.0 + + cc.iotkit + iot-data + 0.4.3-SNAPSHOT + + + iot-ts-temporal-service + + + 11 + 11 + UTF-8 + + + + + org.springframework + spring-context + + + + org.springframework.boot + spring-boot-starter-jdbc + + + + + org.projectlombok + lombok + + + + com.fasterxml.jackson.core + jackson-annotations + + + + org.mapstruct + mapstruct + + + + cn.hutool + hutool-http + + + + + + cc.iotkit + iot-data-cache + + + + + org.jooq + jooq-meta + 3.14.15 + + + + org.jooq + jooq + 3.14.15 + + + + org.postgresql + postgresql + 42.5.4 + + + org.springframework + spring-jdbc + + + + cc.iotkit + iot-model + + + + cc.iotkit + iot-model + + + cc.iotkit + iot-temporal-service + + + + \ No newline at end of file diff --git a/iot-data/iot-ts-temporal-service/readme.md b/iot-data/iot-ts-temporal-service/readme.md new file mode 100644 index 00000000..e84fc038 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/readme.md @@ -0,0 +1,5 @@ +### 时序数据库服务接口的TimescaleDB实现 + +postgrep 14 + + diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/config/Constants.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/config/Constants.java new file mode 100644 index 00000000..5a6bb3f6 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/config/Constants.java @@ -0,0 +1,27 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.config; + +public interface Constants { + + /** + * 根据产品key获取产品属性超级表名 + */ + static String getProductPropertySTableName(String productKey) { + return String.format("product_property_%s", productKey.toLowerCase()); + } + + /** + * 根据deviceId获取设备属性表名 + */ + static String getDevicePropertyTableName(String deviceId) { + return String.format("device_property_%s", deviceId.toLowerCase()); + } +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/config/TsDatasourceConfig.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/config/TsDatasourceConfig.java new file mode 100644 index 00000000..2b8c5979 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/config/TsDatasourceConfig.java @@ -0,0 +1,43 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.config; + +import cc.iotkit.temporal.ts.dao.TsTemplate; +import com.zaxxer.hikari.HikariDataSource; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class TsDatasourceConfig { + + @Value("${spring.ts-datasource.url}") + private String url; + + @Value("${spring.ts-datasource.driverClassName}") + private String driverClassName; + + @Value("${spring.ts-datasource.username}") + private String username; + + @Value("${spring.ts-datasource.password}") + private String password; + + @Bean("tsJdbcTemplate") + public TsTemplate tdJdbcTemplate() { + HikariDataSource dataSource = new HikariDataSource(); + dataSource.setJdbcUrl(url); + dataSource.setUsername(username); + dataSource.setPassword(password); + dataSource.setDriverClassName(driverClassName); + return new TsTemplate(dataSource); + } + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dao/TsTemplate.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dao/TsTemplate.java new file mode 100644 index 00000000..bb9c3163 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dao/TsTemplate.java @@ -0,0 +1,19 @@ +package cc.iotkit.temporal.ts.dao; + +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; + +public class TsTemplate extends JdbcTemplate { + + public TsTemplate() { + } + + public TsTemplate(DataSource dataSource) { + super(dataSource); + } + + public TsTemplate(DataSource dataSource, boolean lazyInit) { + super(dataSource, lazyInit); + } +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/DbField.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/DbField.java new file mode 100644 index 00000000..c9d7443e --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/DbField.java @@ -0,0 +1,15 @@ +package cc.iotkit.temporal.ts.dm; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DbField { + private String name; + private String type; + + private int length; +} \ No newline at end of file diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/FieldParser.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/FieldParser.java new file mode 100644 index 00000000..08288c08 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/FieldParser.java @@ -0,0 +1,118 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.dm; + + +import cc.iotkit.model.product.ThingModel; +import org.jooq.DataType; +import org.jooq.Field; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class FieldParser { + + /** + * 物模型到td数据类型映射 + */ + private static final Map TYPE_MAPPING = Map.of( + "int32", SQLDataType.INTEGER, + "float", SQLDataType.FLOAT, + "bool", SQLDataType.INTEGER, + "enum", SQLDataType.INTEGER, + "text", SQLDataType.NVARCHAR, + "date", SQLDataType.DATE + ); + + /** + * td数据类型到物模型映射 + */ + + private static final Map DB2TYPE_MAPPING = Map.of( + "int",SQLDataType.INTEGER, + "float", SQLDataType.FLOAT, + "bool", SQLDataType.INTEGER, + "char",SQLDataType.NVARCHAR, + "date", SQLDataType.DATE, + "timestamptz", SQLDataType.TIMESTAMPWITHTIMEZONE + ); + + + private static DataType getFieldType(final String type) { + Set keys = DB2TYPE_MAPPING.keySet(); + String lowerCase = type.toLowerCase(); + for(String key:keys){ + if(lowerCase.contains(key)){ + return DB2TYPE_MAPPING.get(key); + } + } + return null; + } + + /** + * 将物模型字段转换为td字段 + */ + public static TsField parse(ThingModel.Property property) { + String filedName = property.getIdentifier().toLowerCase(); + ThingModel.DataType dataType = property.getDataType(); + String type = dataType.getType(); + + //将物模型字段类型映射为td字段类型 + DataType fType = TYPE_MAPPING.get(type); + Object specs = dataType.getSpecs(); + int len = -1; + if (specs instanceof Map) { + Object objLen = ((Map) specs).get("length"); + if (objLen != null) { + len = Integer.parseInt(objLen.toString()); + } + } + + return new TsField(filedName, fType, len); + } + + /** + * 获取物模型中的字段列表 + */ + public static List parse(ThingModel thingModel) { + return thingModel.getModel().getProperties().stream().map(FieldParser::parse).collect(Collectors.toList()); + } + + /** + * 将从库中查出来的字段信息转换为td字段对象 + */ + public static List parse(List rows) { + return (List) rows.stream().map((r) -> { + + return new TsField( + r.getName(), + getFieldType(r.getType()).length(r.getLength()),r.getLength()); + }).collect(Collectors.toList()); + } + + /** + * 获取字段字义 + */ + public static Field getFieldDefine(TsField field) { + int length = field.getLength(); + DataType type = field.getType(); + + if(length>0){ + type.length(length); + } + return DSL.field(field.getName(),type); + + } + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/TableManager.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/TableManager.java new file mode 100644 index 00000000..0cb13a3d --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/TableManager.java @@ -0,0 +1,161 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.dm; + +import org.jooq.*; +import org.jooq.conf.ParamType; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.jooq.tools.StringUtils; + +import java.util.List; + +import static org.jooq.impl.DSL.*; + +public class TableManager { + + private static final DSLContext sqlBuilder = DSL.using(SQLDialect.POSTGRES); + + public static DSLContext getSqlBuilder() { + return sqlBuilder; + } + + /** + * 获取创建表sql + */ + public static String getCreateSTableSql(String tbName, List fields) { + if (fields.size() == 0) { + return null; + } + + CreateTableColumnStep tableColumnStep = sqlBuilder.createTable(tbName) + .column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)) + .column(field("device_id", SQLDataType.NCHAR.length(50).nullable(false))); + + //生成字段片段 + + for (TsField field : fields) { + tableColumnStep.column(FieldParser.getFieldDefine(field)); + + } + + + return tableColumnStep.getSQL(ParamType.INLINED); + + } + + public static String getCreateSTableIndexSql(String tbName, String partitionCol) { + //根据时间和设备纬度分区 + String sql = null; + if(StringUtils.isBlank(partitionCol)){ + // 只根据时间分区 + sql= String.format(" SELECT create_hypertable('%s', 'time') ;", tbName); + }else{ + sql= String.format(" SELECT * FROM create_hypertable('%s', 'time'," + + " partitioning_column => '%s'," + + " number_partitions => 4" + + ") ;", tbName, partitionCol ); + + } + + return sql; + + } + + public static String getCreateTableIndexSql(String tbName) { + + CreateIndexIncludeStep step = sqlBuilder.createIndexIfNotExists(tbName + "_index").on( + table(name(tbName)), + field(name("device_id")), + field(name("time")).desc()); + + return step.getSQL(ParamType.INLINED); + + } + + /** + * 取正确的表名 + * + * @param name 表象 + */ + public static String rightTbName(String name) { + return name.toLowerCase().replace("-", "_"); + } + + /** + * 获取表详情的sql + */ + public static String getDescTableSql(String tbName) { + + + String sql =String.format( " select a.attname as name," + + " t.typname as type, " + + "a.attlen as length," + + " case when a.attnotnull='t' then '1' else '0' end as nullable," + + " case when b.pk='t' then '1' else '0' end as isPk " + + "from pg_class e, pg_attribute a left join pg_type t on a.atttypid = t.oid " + + "left join (select pg_constraint.conname,pg_constraint.contype,pg_attribute.attname as pk " + + "from pg_constraint " + + " inner join pg_class on pg_constraint.conrelid = pg_class.oid" + + " inner join pg_attribute on pg_attribute.attrelid = pg_class.oid " + + " and pg_attribute.attnum = any(pg_constraint.conkey) where contype='p')" + + " b on a.attname=b.pk where e.relname = '%s'" + + " and a.attnum > 0 and a.attrelid = e.oid and t.typname is not null ;",tbName); + + return sql; + + + + } + + /** + * 获取添加字段sql + */ + public static String getAddSTableColumnSql(String tbName, List fields) { + + AlterTableStep alterTableStep = sqlBuilder.alterTable(tbName); + + AlterTableFinalStep addStep = null; + for (TsField o : fields) { + addStep = alterTableStep.add(FieldParser.getFieldDefine(o)); + } + return addStep.getSQL(); + } + + /** + * 获取修改字段sql + */ + public static String getModifySTableColumnSql(String tbName, List fields) { + AlterTableStep alterTableStep = sqlBuilder.alterTable(tbName); + AlterTableFinalStep step = null; + for (TsField o : fields) { + Field fieldDefine = FieldParser.getFieldDefine(o); + step = alterTableStep.alterColumn(o.getName()).set(fieldDefine.getDataType()); + } + return step.getSQL(); + } + + /** + * 获取删除字段sql + */ + public static String getDropSTableColumnSql(String tbName, List fields) { + + AlterTableStep alterTableStep = sqlBuilder.alterTable(tbName); + + + AlterTableFinalStep step = null; + for (TsField o : fields) { + step = alterTableStep.dropColumnIfExists(FieldParser.getFieldDefine(o)); + } + + return step.getSQL(); + } + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/TsField.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/TsField.java new file mode 100644 index 00000000..72734d80 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/dm/TsField.java @@ -0,0 +1,24 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.dm; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.jooq.DataType; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TsField { + private String name; + private DataType type; + private int length; +} \ No newline at end of file diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsDeviceProperty.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsDeviceProperty.java new file mode 100644 index 00000000..ebc9f32f --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsDeviceProperty.java @@ -0,0 +1,31 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TsDeviceProperty { + + private Date time; + + private String deviceId; + + private String name; + + private Object value; + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsRuleLog.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsRuleLog.java new file mode 100644 index 00000000..59d5f2dd --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsRuleLog.java @@ -0,0 +1,33 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TsRuleLog { + + private Date time; + + private String ruleId; + + private String state1; + + private String content; + + private Boolean success; + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsTaskLog.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsTaskLog.java new file mode 100644 index 00000000..a2546658 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsTaskLog.java @@ -0,0 +1,29 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TsTaskLog { + + private Long time; + + private String taskId; + + private String content; + + private Boolean success; + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsThingModelMessage.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsThingModelMessage.java new file mode 100644 index 00000000..fe6b6c73 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsThingModelMessage.java @@ -0,0 +1,45 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TsThingModelMessage { + + private Date time; + + private String mid; + + private String deviceId; + + private String productKey; + + private String deviceName; + + private String uid; + + private String type; + + private String identifier; + + private int code; + + private String data; + + private Long reportTime; + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsTimeData.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsTimeData.java new file mode 100644 index 00000000..c98b4097 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsTimeData.java @@ -0,0 +1,36 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * 统计的时间数据 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TsTimeData { + + /** + * 时间 + */ + private Date time; + + /** + * 数据值 + */ + private Object data; + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsVirtualDeviceLog.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsVirtualDeviceLog.java new file mode 100644 index 00000000..f1488b65 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/model/TsVirtualDeviceLog.java @@ -0,0 +1,33 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TsVirtualDeviceLog { + + private Date time; + + private String virtualDeviceId; + + private String virtualDeviceName; + + private int deviceTotal; + + private String result; + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/DbStructureDataImpl.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/DbStructureDataImpl.java new file mode 100644 index 00000000..9a04caf7 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/DbStructureDataImpl.java @@ -0,0 +1,207 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.service; + +import cc.iotkit.common.utils.JsonUtil; +import cc.iotkit.model.product.ThingModel; +import cc.iotkit.temporal.IDbStructureData; +import cc.iotkit.temporal.ts.config.Constants; +import cc.iotkit.temporal.ts.dao.TsTemplate; +import cc.iotkit.temporal.ts.dm.DbField; +import cc.iotkit.temporal.ts.dm.FieldParser; +import cc.iotkit.temporal.ts.dm.TableManager; +import cc.iotkit.temporal.ts.dm.TsField; +import lombok.extern.slf4j.Slf4j; +import org.jooq.CreateTableColumnStep; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class DbStructureDataImpl implements IDbStructureData { + + @Autowired + private TsTemplate tsTemplate; + + /** + * 根据物模型创建超级表 + */ + @Override + public void defineThingModel(ThingModel thingModel) { + //获取物模型中的属性定义 + List fields = FieldParser.parse(thingModel); + String tbName = Constants.getProductPropertySTableName(thingModel.getProductKey()); + String sql = TableManager.getCreateSTableSql(tbName, + fields); + if (sql == null) { + return; + } + System.out.println(sql); + tsTemplate.execute(sql); + + createHypertable(tbName, "device_id"); + + } + + private void createHypertable(String tbName, String partitionCol) { + String createSTableIndexSql = TableManager.getCreateSTableIndexSql(tbName, partitionCol); + try { + System.out.println(createSTableIndexSql); + + tsTemplate.execute(createSTableIndexSql); + } catch (Exception e) { + log.info("createHypertable error:{}", e.getMessage()); + } + + } + + /** + * 根据物模型更新超级表结构 + */ + @Override + public void updateThingModel(ThingModel thingModel) { + //获取旧字段信息 + String tbName = Constants.getProductPropertySTableName(thingModel.getProductKey()); + String sql = TableManager.getDescTableSql(tbName); + if (sql == null) { + return; + } + + tsTemplate.execute(sql); + + List fieldsInDb = tsTemplate.query(sql, new BeanPropertyRowMapper(DbField.class)); + + List newFields = FieldParser.parse(thingModel); + List oldFields = FieldParser.parse(fieldsInDb); + + //对比差异 + + //找出修改的字段 + List modifyFields = newFields.stream().filter((f) -> oldFields.stream() + .anyMatch(old -> + old.getName().equals(f.getName()) //字段名相同 + //字段类型或长度不同 + && (!old.getType().equals(f.getType()) || old.getLength() != f.getLength()) + )) + .collect(Collectors.toList()); + if (modifyFields.size() > 0) { + sql = TableManager.getModifySTableColumnSql(tbName, modifyFields); + log.info("modify column:{}", sql); + + tsTemplate.execute(sql); + + } + + //找出新增的字段 + List addFields = newFields.stream().filter((f) -> oldFields.stream() + .noneMatch(old -> old.getName().equals(f.getName()))) + .collect(Collectors.toList()); + if (addFields.size() > 0) { + sql = TableManager.getAddSTableColumnSql(tbName, addFields); + log.info("add column:{}", sql); + + tsTemplate.execute(sql); + } + + + //找出删除的字段 + List dropFields = oldFields.stream().filter((f) -> + !"time".equals(f.getName()) && + !"device_id".equals(f.getName()) && newFields.stream() + //字段名不是time且没有相同字段名的 + .noneMatch(n -> n.getName().equals(f.getName()))) + .collect(Collectors.toList()); + if (dropFields.size() > 0) { + + sql = TableManager.getDropSTableColumnSql(tbName, dropFields); + log.info("drop column:{}", sql); + tsTemplate.execute(sql); + + } + } + + /** + * 初始化其它数据结构 + */ + @Override + @PostConstruct + public void initDbStructure() { + //创建规则日志表 + DSLContext dslBuilder = DSL.using(SQLDialect.POSTGRES); + + CreateTableColumnStep ruleLogStep = dslBuilder.createTableIfNotExists("rule_log") + .column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)) + .column("state1", SQLDataType.VARCHAR(50)) + .column("content", SQLDataType.VARCHAR(1024)) + .column("success", SQLDataType.BOOLEAN) + .column("rule_id", SQLDataType.VARCHAR(50).nullable(false)); + String sql = ruleLogStep.getSQL(); + System.out.println(sql); + tsTemplate.execute(sql); + // 按时间和rule_id分区 + createHypertable("rule_log", "rule_id"); + + + CreateTableColumnStep taskLogStep = dslBuilder.createTableIfNotExists("task_log") + .column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)) + .column("content", SQLDataType.VARCHAR(1024)) + .column("success", SQLDataType.BOOLEAN) + .column("task_id", SQLDataType.VARCHAR(50).nullable(false)); + String taskLogsql = taskLogStep.getSQL(); + + System.out.println(taskLogsql); + tsTemplate.execute(taskLogsql); + // 按时间和task_id分区 + createHypertable("task_log", "task_id"); + + CreateTableColumnStep thingModelStep = dslBuilder.createTableIfNotExists("thing_model_message") + .column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)) + .column("mid", SQLDataType.NVARCHAR(50)) + .column("product_key", SQLDataType.NVARCHAR(50)) + .column("device_name", SQLDataType.NVARCHAR(50)) + .column("uid", SQLDataType.NVARCHAR(50)) + .column("type", SQLDataType.NVARCHAR(20)) + .column("identifier", SQLDataType.NVARCHAR(50)) + .column("code", SQLDataType.INTEGER) + .column("data", SQLDataType.NVARCHAR(1024)) + .column("report_time", SQLDataType.NVARCHAR(1024)) + .column("device_id", SQLDataType.NVARCHAR(50)); + + String thingModelsql = thingModelStep.getSQL(); + + System.out.println(thingModelsql); + tsTemplate.execute(thingModelsql); + createHypertable("thing_model_message", "device_id"); + + //创建虚拟设备日志表 + CreateTableColumnStep virtualStep = dslBuilder.createTableIfNotExists("virtual_device_log") + .column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)) + .column("virtual_device_name", SQLDataType.NVARCHAR(50)) + .column("device_total", SQLDataType.INTEGER) + .column("result", SQLDataType.NVARCHAR(1024)) + .column("virtual_device_id", SQLDataType.NVARCHAR(50)); + + String virtualsql = virtualStep.getSQL(); + System.out.println(virtualsql); + + tsTemplate.execute(virtualsql); + createHypertable("virtual_device_log", "virtual_device_id"); + + } +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/DevicePropertyDataImpl.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/DevicePropertyDataImpl.java new file mode 100644 index 00000000..c414ac59 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/DevicePropertyDataImpl.java @@ -0,0 +1,105 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.service; + +import cc.iotkit.data.IDeviceInfoData; +import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.device.message.DeviceProperty; +import cc.iotkit.temporal.IDevicePropertyData; +import cc.iotkit.temporal.ts.config.Constants; +import cc.iotkit.temporal.ts.dao.TsTemplate; +import cc.iotkit.temporal.ts.model.TsDeviceProperty; +import lombok.extern.slf4j.Slf4j; +import org.jooq.Condition; +import org.jooq.Field; +import org.jooq.InsertValuesStepN; +import org.jooq.Record; +import org.jooq.conf.ParamType; +import org.jooq.impl.DSL; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.stream.Collectors; + +import static org.jooq.impl.DSL.field; + +@Slf4j +@Service +public class DevicePropertyDataImpl implements IDevicePropertyData { + + @Autowired + private TsTemplate tsTemplate; + @Autowired + @Qualifier("deviceInfoDataCache") + private IDeviceInfoData deviceInfoData; + + @Override + public List findDevicePropertyHistory(String deviceId, String name, long start, long end) { + DeviceInfo device = deviceInfoData.findByDeviceId(deviceId); + + String tbName = Constants.getProductPropertySTableName(device.getProductKey()); + Condition con = field("time").greaterOrEqual(new Date(start)).and(field("time").lessOrEqual(new Date(end))) + .and(DSL.field("device_id").eq(deviceId)); + String sql = DSL.select(DSL.field("time"), DSL.field("device_id"), DSL.field(name.toLowerCase()).as("value")) + .from(tbName).where(con) + .getSQL(ParamType.INLINED); + + + List list = tsTemplate.query(sql, new BeanPropertyRowMapper<>(TsDeviceProperty.class)); + + + return list.stream().map( + o->{ + DeviceProperty deviceProperty = new DeviceProperty(); + BeanUtils.copyProperties(o,deviceProperty); + deviceProperty.setTime(o.getTime().getTime()); + return deviceProperty; + } + ).collect(Collectors.toList()); + + } + + @Override + public void addProperties(String deviceId, Map properties, long time) { + DeviceInfo device = deviceInfoData.findByDeviceId(deviceId); + if (device == null) { + return; + } + //获取设备旧属性 + Map oldProperties = deviceInfoData.getProperties(deviceId); + //用新属性覆盖 + oldProperties.putAll(properties); + + List> fields = new ArrayList<>(); + List values = new ArrayList<>(); + + fields.add(DSL.field("time")); + fields.add(DSL.field("device_id")); + values.add(new Date(time)); + values.add(deviceId); + //组织sql + oldProperties.forEach((key, val) -> { + fields.add(DSL.field(key)); + values.add(val); + }); + String tbName = Constants.getProductPropertySTableName(device.getProductKey()); + + //组织sql + InsertValuesStepN step = DSL.insertInto(DSL.table(tbName), (Collection>) fields).values(values); + String sql = step.getSQL(ParamType.INLINED); + tsTemplate.batchUpdate(sql); + + } + +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/RuleLogDataImpl.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/RuleLogDataImpl.java new file mode 100644 index 00000000..156fb262 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/RuleLogDataImpl.java @@ -0,0 +1,80 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.service; + +import cc.iotkit.model.Paging; +import cc.iotkit.model.rule.RuleLog; +import cc.iotkit.temporal.IRuleLogData; +import cc.iotkit.temporal.ts.dao.TsTemplate; +//import cc.iotkit.temporal.ts.dm.TableManager; +import cc.iotkit.temporal.ts.dm.TableManager; +import cc.iotkit.temporal.ts.model.TsRuleLog; +import org.jooq.*; +import org.jooq.conf.ParamType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +import static org.jooq.impl.DSL.*; + +@Service +public class RuleLogDataImpl implements IRuleLogData { + + + @Autowired + private TsTemplate tsTemplate; + + @Override + public void deleteByRuleId(String ruleId) { + + tsTemplate.update("delete from rule_log where rule_id=?", ruleId); + } + + @Override + public Paging findByRuleId(String ruleId, int page, int size) { + + SelectForUpdateStep> sqlStep = TableManager.getSqlBuilder() + .select(field("time"), field("state1"), field("content"), field("success"), + field("rule_id")) + .from(table("rule_log")) + .where(field("rule_id").eq(ruleId)) + .orderBy(field("time").desc()) + .limit(size) + .offset((page - 1) * size); + List ruleLogs = tsTemplate.query(sqlStep.getSQL(ParamType.INLINED), new BeanPropertyRowMapper<>(TsRuleLog.class), ruleId); + + SelectConditionStep> where = TableManager.getSqlBuilder().selectCount().from(table("rule_log")) + .where(field("rule_id").eq(ruleId)); + Long count = tsTemplate.queryForObject(where.getSQL(ParamType.INLINED), Long.class); + + return new Paging<>(count, ruleLogs.stream().map(r -> + new RuleLog(r.getTime().toString(), ruleId, r.getState1(), + r.getContent(), r.getSuccess(), r.getTime().getTime())) + .collect(Collectors.toList())); + } + + @Override + public void add(RuleLog log) { + //使用 + + InsertValuesStep5 sqlStep = TableManager.getSqlBuilder().insertInto(table("rule_log"), + field("time"), + field("rule_id"), + field("state1"), + field("content"), field("success")).values(new Date(), + log.getRuleId(), log.getState(), log.getContent(), log.getSuccess()); + + tsTemplate.update(sqlStep.getSQL(ParamType.INLINED)); + } +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/TaskLogDataImpl.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/TaskLogDataImpl.java new file mode 100644 index 00000000..7ddf01a7 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/TaskLogDataImpl.java @@ -0,0 +1,82 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.service; + +import cc.iotkit.model.Paging; +import cc.iotkit.model.rule.TaskLog; +import cc.iotkit.temporal.ITaskLogData; +import cc.iotkit.temporal.ts.dao.TsTemplate; +//import cc.iotkit.temporal.ts.dm.TableManager; +import cc.iotkit.temporal.ts.dm.TableManager; +import cc.iotkit.temporal.ts.model.TsTaskLog; +import cc.iotkit.temporal.ts.dao.TsTemplate; +import org.jooq.*; +import org.jooq.conf.ParamType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.table; + +@Service +public class TaskLogDataImpl implements ITaskLogData { + + @Autowired + private TsTemplate tsTemplate; + + @Override + public void deleteByTaskId(String taskId) { + tsTemplate.update("delete from task_log where task_id=?", taskId); + } + + @Override + public Paging findByTaskId(String taskId, int page, int size) { + SelectForUpdateStep> sqlStep = TableManager.getSqlBuilder() + .select(field("time"), field("content"), field("success"), field("task_id")) + .from(table("task_log")) + .where(field("task_id").eq(taskId)) + .orderBy(field("time").desc()) + .limit(size) + .offset((page - 1) * size); + + // Get the SQL string from the query + String sql = sqlStep.getSQL(ParamType.INLINED); + List taskLogs = tsTemplate.query(sql, new BeanPropertyRowMapper<>(TsTaskLog.class)); + + String whereSql = TableManager.getSqlBuilder().selectCount().from(table("task_log")) + .where(field("task_id").eq(taskId)).getSQL(ParamType.INLINED); + Long count = tsTemplate.queryForObject(whereSql, new BeanPropertyRowMapper<>(Long.class)); + + return new Paging<>(count , taskLogs.stream().map(r -> + new TaskLog(r.getTime().toString(), taskId, + r.getContent(), r.getSuccess(), r.getTime())) + .collect(Collectors.toList())); + } + + @Override + public void add(TaskLog log) { + + InsertValuesStep4 sqlStep + = TableManager.getSqlBuilder().insertInto(table("tag_log"), + field("time"), + field("task_id"), + field("content"), field("success")).values( + new Date(), + log.getTaskId(), + log.getContent(), log.getSuccess()); + + tsTemplate.update(sqlStep.getSQL(ParamType.INLINED)); + } +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/ThingModelMessageDataImpl.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/ThingModelMessageDataImpl.java new file mode 100644 index 00000000..43c537bf --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/ThingModelMessageDataImpl.java @@ -0,0 +1,134 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.service; + +import cc.iotkit.common.utils.JsonUtil; +import cc.iotkit.model.Paging; +import cc.iotkit.model.device.message.ThingModelMessage; +import cc.iotkit.model.stats.TimeData; +import cc.iotkit.temporal.IThingModelMessageData; +import cc.iotkit.temporal.ts.dao.TsTemplate; +import cc.iotkit.temporal.ts.dm.TableManager; +import cc.iotkit.temporal.ts.model.TsThingModelMessage; +import cc.iotkit.temporal.ts.model.TsTimeData; +import org.apache.commons.lang3.StringUtils; +import org.jooq.*; +import org.jooq.conf.ParamType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.table; + +@Service +public class ThingModelMessageDataImpl implements IThingModelMessageData { + + @Autowired + private TsTemplate tsTemplate; + + public Paging findByTypeAndIdentifier(String deviceId, String type, + String identifier, + int page, int size) { + + + Table table = table("thing_model_message"); + Condition whereConditions = field("device_id").eq(deviceId); + SelectJoinStep> step = TableManager.getSqlBuilder().select(field("time"), field("mid"), + field("product_key"), field("device_name"), field("type"), + field("identifier"), field("code"), field("data"), + field("report_time")).from(table); + + + if (StringUtils.isNotBlank(type)) { + whereConditions.and(field("type").eq(type)); + } + if (StringUtils.isNotBlank(identifier)) { + whereConditions.and(field("identifier").eq(identifier)); + } + + String sql = step.where(whereConditions).orderBy(field("time").desc()).limit(size).offset((page - 1) * size).getSQL(ParamType.INLINED); + + List ruleLogs = tsTemplate.query(sql, + new BeanPropertyRowMapper<>(TsThingModelMessage.class) + ); + + String countSql = TableManager.getSqlBuilder().selectCount().from(table).where(whereConditions).getSQL(ParamType.INLINED); + Long count = tsTemplate.queryForObject(countSql, Long.class); + + return new Paging<>(count, ruleLogs.stream().map(r -> + new ThingModelMessage(r.getTime().toString(), r.getMid(), + deviceId, r.getProductKey(), r.getDeviceName(), + r.getUid(), r.getType(), r.getIdentifier(), r.getCode(), + r.getData(), + r.getTime().getTime(), r.getReportTime())) + .collect(Collectors.toList())); + } + + @Override + public List getDeviceMessageStatsWithUid(String uid, long start, long end) { + + Table table = table("thing_model_message"); + + Condition con = field("time").greaterOrEqual(new Date(start)).and(field("time").lessOrEqual(new Date(end))); + if(StringUtils.isNotBlank(uid)){ + con.and(field("uid").eq(uid)); + } + + String sql = TableManager.getSqlBuilder().select(field("date_trunc('hour', \"time\")").as("time"),field("count(*)").as("data")) + .from(table).where(con).groupBy(field("date_trunc('hour', \"time\")")).orderBy(field("time").asc()).getSQL(ParamType.INLINED); + + + List query = tsTemplate.query(sql, new BeanPropertyRowMapper<>(TsTimeData.class)); + return query.stream().map(o -> { + TimeData timeData = new TimeData(); + timeData.setData(o.getData()); + timeData.setTime(o.getTime().getTime()); + return timeData; + + }).collect(Collectors.toList()); + } + + @Override + public void add(ThingModelMessage msg) { + Table table = table("thing_model_message"); + + String sql = TableManager.getSqlBuilder().insertInto(table, + field("time"), + field("device_id"), + field("mid"), + field("product_key"), + field("device_name"), + field("uid"), + field("type"), + field("identifier"), + field("code"), + field("data"), field("report_time")) + .values(new Date(msg.getOccurred()), msg.getDeviceId(), msg.getMid(), + msg.getProductKey(), msg.getDeviceName(), + msg.getUid(), msg.getType(), + msg.getIdentifier(), msg.getCode(), + msg.getData() == null ? "{}" : JsonUtil.toJsonString(msg.getData()), + msg.getTime()).getSQL(ParamType.INLINED); + tsTemplate.update(sql); + } + + @Override + public long count() { + List counts = tsTemplate.queryForList("select count(*) from thing_model_message", Long.class); + return counts.size() > 0 ? counts.get(0) : 0; + } +} diff --git a/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/VirtualDeviceLogDataImpl.java b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/VirtualDeviceLogDataImpl.java new file mode 100644 index 00000000..b1ee1197 --- /dev/null +++ b/iot-data/iot-ts-temporal-service/src/main/java/cc/iotkit/temporal/ts/service/VirtualDeviceLogDataImpl.java @@ -0,0 +1,74 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.temporal.ts.service; + +import cc.iotkit.model.Paging; +import cc.iotkit.model.device.VirtualDeviceLog; +import cc.iotkit.temporal.IVirtualDeviceLogData; +import cc.iotkit.temporal.ts.dao.TsTemplate; +import cc.iotkit.temporal.ts.dm.TableManager; +import cc.iotkit.temporal.ts.model.TsVirtualDeviceLog; +import org.jooq.*; +import org.jooq.conf.ParamType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BeanPropertyRowMapper; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.table; + +@Service +public class VirtualDeviceLogDataImpl implements IVirtualDeviceLogData { + + @Autowired + private TsTemplate tsTemplate; + + @Override + public Paging findByVirtualDeviceId(String virtualDeviceId, int page, int size) { + + Table table = table("virtual_device_log"); + + Condition whereConditions = field("virtual_device_id").eq(virtualDeviceId.toLowerCase()); + DSLContext sqlBuilder = TableManager.getSqlBuilder(); + String sql = sqlBuilder.select(field("time"), field("virtual_device_id"), + field("virtual_device_name"), field("device_total"), field("result")).from(table).where(whereConditions) + .orderBy(field("time").desc()).limit(size).offset((page - 1) * size).getSQL(ParamType.INLINED); + + List logs = tsTemplate.query(sql, new BeanPropertyRowMapper<>(TsVirtualDeviceLog.class)); + + String countSql = sqlBuilder.selectCount().from(table).where(whereConditions).getSQL(ParamType.INLINED); + + Long count = tsTemplate.queryForObject(countSql, Long.class); + + return new Paging<>(count, logs.stream().map(r -> + new VirtualDeviceLog(r.getTime().toString(), virtualDeviceId, + r.getVirtualDeviceName(), + r.getDeviceTotal(), r.getResult(), + r.getTime().getTime())) + .collect(Collectors.toList())); + } + + @Override + public void add(VirtualDeviceLog log) { + Table table = table("virtual_device_log"); + + String sql = TableManager.getSqlBuilder().insertInto(table, field("time"), field("virtual_device_id"), + field("virtual_device_name"), + field("device_total"), field("result")) + .values(new Date(), log.getVirtualDeviceId(), log.getVirtualDeviceName(), + log.getDeviceTotal(), log.getResult()).getSQL(ParamType.INLINED); + + tsTemplate.update(sql); + } +} diff --git a/iot-data/pom.xml b/iot-data/pom.xml index c89ad127..ece7abdf 100755 --- a/iot-data/pom.xml +++ b/iot-data/pom.xml @@ -18,6 +18,7 @@ iot-es-temporal-service iot-rdb-data-service iot-td-temporal-service + iot-ts-temporal-service iot-data diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/AlertService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/AlertService.java index 0704f08e..d88518a4 100755 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/AlertService.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/AlertService.java @@ -11,6 +11,7 @@ package cc.iotkit.ruleengine.action; import cc.iotkit.model.device.message.ThingModelMessage; import cc.iotkit.ruleengine.alert.Alerter; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.SneakyThrows; @@ -29,7 +30,8 @@ public class AlertService extends ScriptService { @SneakyThrows public String execute(ThingModelMessage msg) { //执行转换脚本 - Map result = execScript(msg); + Map result = execScript(new TypeReference<>() { + }, msg); if (result == null) { log.warn("execScript result is null"); return "execScript result is null"; diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/DeviceActionService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/DeviceActionService.java index 68f9c8aa..4088ef51 100755 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/DeviceActionService.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/DeviceActionService.java @@ -51,7 +51,8 @@ public class DeviceActionService { public String getType() { //identifier为set固定为属性设置,其它为服务调用 - if (ThingModelMessage.ID_PROPERTY_SET.equals(identifier)) { + if (ThingModelMessage.ID_PROPERTY_SET.equals(identifier) || + ThingModelMessage.ID_PROPERTY_GET.equals(identifier)) { return ThingModelMessage.TYPE_PROPERTY; } return ThingModelMessage.TYPE_SERVICE; diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/HttpService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/HttpService.java index ff0d6ca7..9f53f1bd 100755 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/HttpService.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/HttpService.java @@ -11,6 +11,7 @@ package cc.iotkit.ruleengine.action; import cc.iotkit.common.utils.JsonUtil; import cc.iotkit.model.device.message.ThingModelMessage; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.SneakyThrows; @@ -32,15 +33,13 @@ public class HttpService extends ScriptService { @SneakyThrows public String execute(ThingModelMessage msg) { //执行转换脚本 - Map result = execScript(msg); - if (result == null) { + HttpData httpData = execScript(new TypeReference<>() { + }, msg); + if (httpData == null) { log.warn("execScript result is null"); return "execScript result is null"; } - HttpData httpData = new HttpData(); - BeanUtils.populate(httpData, result); - //组装http请求 String url = this.url + httpData.getPath(); Request.Builder builder = new Request.Builder(); @@ -57,7 +56,7 @@ public class HttpService extends ScriptService { httpData.getBody().toString()); Request request = builder.method(httpData.getMethod().toUpperCase(), requestBody).build(); - String requestDataStr = JsonUtil.toJsonString(result); + String requestDataStr = JsonUtil.toJsonString(httpData); log.info("send http request:{} ,{}", url, requestDataStr); String responseBody = ""; diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/ScriptService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/ScriptService.java index 9511c5d1..3c2b63aa 100755 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/ScriptService.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/ScriptService.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import java.util.Map; @Slf4j @Data @@ -30,15 +29,17 @@ public class ScriptService { private IDeviceInfoData deviceInfoData; - public Map execScript(ThingModelMessage msg) { + public void setScript(String script) { + scriptEngine.setScript(script); + } + + public T execScript(TypeReference type, ThingModelMessage msg) { try { - scriptEngine.setScript(script); //取设备信息 DeviceInfo deviceInfo = deviceInfoData.findByDeviceId(msg.getDeviceId()); //执行转换脚本 - return scriptEngine.invokeMethod(new TypeReference<>() { - }, "translate", msg, deviceInfo); + return scriptEngine.invokeMethod(type, "translate", msg, deviceInfo); } catch (Throwable e) { log.error("run script error", e); return null; diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaService.java index 3c4a1a5a..6ebcc1f1 100644 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaService.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaService.java @@ -6,6 +6,7 @@ import cc.iotkit.ruleengine.action.ScriptService; import cc.iotkit.ruleengine.link.LinkFactory; import cc.iotkit.ruleengine.link.LinkService; import cc.iotkit.ruleengine.link.impl.KafkaLink; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -28,7 +29,8 @@ public class KafkaService extends ScriptService implements LinkService { public String execute(ThingModelMessage msg) { //执行转换脚本 - Map result = execScript(msg); + Map result = execScript(new TypeReference<>() { + }, msg); if (result == null) { log.warn("execScript result is null"); return "execScript result is null"; diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttService.java index 17cbedc0..f982539f 100644 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttService.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttService.java @@ -6,6 +6,7 @@ import cc.iotkit.ruleengine.action.ScriptService; import cc.iotkit.ruleengine.link.LinkFactory; import cc.iotkit.ruleengine.link.LinkService; import cc.iotkit.ruleengine.link.impl.MqttClientLink; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -31,7 +32,8 @@ public class MqttService extends ScriptService implements LinkService { public String execute(ThingModelMessage msg) { //执行转换脚本 - Map result = execScript(msg); + Map result = execScript(new TypeReference<>() { + }, msg); if (result == null) { log.warn("execScript result is null"); return "execScript result is null"; diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/tcp/TcpService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/tcp/TcpService.java index 692ee779..e503ca55 100644 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/tcp/TcpService.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/tcp/TcpService.java @@ -6,6 +6,7 @@ import cc.iotkit.ruleengine.action.ScriptService; import cc.iotkit.ruleengine.link.LinkFactory; import cc.iotkit.ruleengine.link.LinkService; import cc.iotkit.ruleengine.link.impl.TcpClientLink; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @@ -27,7 +28,8 @@ public class TcpService extends ScriptService implements LinkService { public String execute(ThingModelMessage msg) { //执行转换脚本 - Map result = execScript(msg); + Map result = execScript(new TypeReference<>() { + },msg); if (result == null) { log.warn("execScript result is null"); return "execScript result is null"; diff --git a/iot-script-engine/pom.xml b/iot-script-engine/pom.xml index 36184d9e..28a7d4cf 100644 --- a/iot-script-engine/pom.xml +++ b/iot-script-engine/pom.xml @@ -23,23 +23,25 @@ org.graalvm.sdk graal-sdk + org.graalvm.js js + org.graalvm.js js-scriptengine - + org.apache.commons commons-lang3 - commons-beanutils - commons-beanutils + org.slf4j + slf4j-api @@ -47,6 +49,11 @@ lombok + + commons-beanutils + commons-beanutils + + cc.iotkit iot-common diff --git a/iot-script-engine/src/main/java/cc/iotkit/script/JavaScriptEngine.java b/iot-script-engine/src/main/java/cc/iotkit/script/JavaScriptEngine.java index 5b140a65..3a179d6b 100644 --- a/iot-script-engine/src/main/java/cc/iotkit/script/JavaScriptEngine.java +++ b/iot-script-engine/src/main/java/cc/iotkit/script/JavaScriptEngine.java @@ -1,13 +1,22 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ package cc.iotkit.script; import cc.iotkit.common.utils.JsonUtil; import com.fasterxml.jackson.core.type.TypeReference; +import lombok.extern.slf4j.Slf4j; import org.graalvm.polyglot.Context; import org.graalvm.polyglot.HostAccess; import org.graalvm.polyglot.Value; -import java.util.Objects; - +@Slf4j public class JavaScriptEngine implements IScriptEngine { private final Context context = Context.newBuilder("js").allowHostAccess(HostAccess.ALL).build(); @@ -42,22 +51,22 @@ public class JavaScriptEngine implements IScriptEngine { public T invokeMethod(TypeReference type, String methodName, Object... args) { Value member = jsScript.getMember("invoke"); - if (Objects.isNull(member)) { - return null; - } - + StringBuilder sbArgs = new StringBuilder("["); + //将入参转成json for (int i = 0; i < args.length; i++) { args[i] = JsonUtil.toJsonString(args[i]); + sbArgs.append(i == args.length - 1 ? "," : "").append(args[i]); } + sbArgs.append("]"); //通过调用invoke方法将目标方法返回结果转成json Value rst = member.execute(methodName, args); - if (rst == null) { - return null; - } String json = rst.asString(); - if (json == null) { + log.info("invoke script {},args:{}, result:{}", methodName, sbArgs, json); + + //没有返回值 + if (json == null || "null".equals(json)) { return null; } diff --git a/iot-standalone/pom.xml b/iot-standalone/pom.xml index 50207d7a..86789e65 100755 --- a/iot-standalone/pom.xml +++ b/iot-standalone/pom.xml @@ -164,6 +164,12 @@ iot-es-temporal-service + + + + + + diff --git a/iot-standalone/src/main/java/cc/iotkit/manager/config/SaTokenConfigure.java b/iot-standalone/src/main/java/cc/iotkit/manager/config/SaTokenConfigure.java index 694583f5..cff53e67 100755 --- a/iot-standalone/src/main/java/cc/iotkit/manager/config/SaTokenConfigure.java +++ b/iot-standalone/src/main/java/cc/iotkit/manager/config/SaTokenConfigure.java @@ -38,11 +38,15 @@ public class SaTokenConfigure implements WebMvcConfigurer { "/space/delSpace/**", "/space/saveHome/**", "/space/currentHome/**", + "/space/changCurrentHome/**", + "/space/getUserHomes/**", "/space/myRecentDevices/**", "/space/spaces/**", "/space/myDevices/**", "/space/findDevice/**", "/space/addDevice/**", + "/space/collectDevice/**", + "/space/getCollectDevices/**", "/space/saveDevice", "/space/removeDevice", "/space/setOpenUid", diff --git a/iot-standalone/src/main/java/cc/iotkit/manager/controller/DeviceController.java b/iot-standalone/src/main/java/cc/iotkit/manager/controller/DeviceController.java index fe26dde2..70b9e85f 100755 --- a/iot-standalone/src/main/java/cc/iotkit/manager/controller/DeviceController.java +++ b/iot-standalone/src/main/java/cc/iotkit/manager/controller/DeviceController.java @@ -88,6 +88,14 @@ public class DeviceController { return new InvokeResult(deviceService.invokeService(deviceId, service, args)); } + @PostMapping(Constants.API_DEVICE.INVOKE_SERVICE_PROPERTY_GET) + public InvokeResult invokeServicePropertySet(@PathVariable("deviceId") String deviceId, + @RequestBody List propertyNames) { + if (StringUtils.isBlank(deviceId)) { + throw new RuntimeException("deviceId/service is blank."); + } + return new InvokeResult(deviceService.getProperty(deviceId, propertyNames, true)); + } @PostMapping(Constants.API_DEVICE.SET_PROPERTIES) public InvokeResult setProperty(@PathVariable("deviceId") String deviceId, @RequestBody Map args) { diff --git a/iot-standalone/src/main/java/cc/iotkit/manager/controller/SpaceController.java b/iot-standalone/src/main/java/cc/iotkit/manager/controller/SpaceController.java index 90f598b0..6e0ecec1 100755 --- a/iot-standalone/src/main/java/cc/iotkit/manager/controller/SpaceController.java +++ b/iot-standalone/src/main/java/cc/iotkit/manager/controller/SpaceController.java @@ -42,6 +42,27 @@ public class SpaceController { return homeData.findByUidAndCurrent(AuthUtil.getUserId(), true); } + /** + * 取用户所有家庭 + */ + @GetMapping("/getUserHomes") + public List getUserHomes() { + return homeData.findByUid(AuthUtil.getUserId()); + } + + /** + * 切换用户当前家庭 + */ + @PostMapping("/changCurrentHome") + public void changCurrentHome(Home home) { + Home oldHome=homeData.findByUidAndCurrent(AuthUtil.getUserId(), true); + oldHome.setCurrent(false); + homeData.save(oldHome); + Home newHome=homeData.findById(home.getId()); + newHome.setCurrent(true); + homeData.save(newHome); + } + /** * 保存家庭信息 */ diff --git a/iot-standalone/src/main/java/cc/iotkit/manager/controller/SpaceDeviceController.java b/iot-standalone/src/main/java/cc/iotkit/manager/controller/SpaceDeviceController.java index d6ddfdc9..bd358f61 100755 --- a/iot-standalone/src/main/java/cc/iotkit/manager/controller/SpaceDeviceController.java +++ b/iot-standalone/src/main/java/cc/iotkit/manager/controller/SpaceDeviceController.java @@ -15,6 +15,7 @@ import cc.iotkit.data.*; import cc.iotkit.manager.model.vo.FindDeviceVo; import cc.iotkit.manager.model.vo.SpaceDeviceVo; import cc.iotkit.manager.service.DataOwnerService; +import cc.iotkit.model.space.Home; import cc.iotkit.utils.AuthUtil; import cc.iotkit.model.UserInfo; import cc.iotkit.model.device.DeviceInfo; @@ -52,6 +53,8 @@ public class SpaceDeviceController { @Qualifier("spaceDataCache") private ISpaceData spaceData; @Autowired + private IHomeData homeData; + @Autowired private DataOwnerService dataOwnerService; @Autowired private IUserInfoData userInfoData; @@ -65,6 +68,27 @@ public class SpaceDeviceController { return spaceDevices.stream().map((this::parseSpaceDevice)).collect(Collectors.toList()); } + /** + * 获取用户收藏设备列表 + */ + @GetMapping(Constants.API_SPACE.GET_COLLECT_DEVICES) + public List getCollectDevices() { + Home home=homeData.findByUidAndCurrent(AuthUtil.getUserId(), true); + List spaceDevices = spaceDeviceData.findByHomeIdAndCollect(home.getId(),true); + return spaceDevices.stream().map((this::parseSpaceDevice)).collect(Collectors.toList()); + } + + /** + * + * 收藏/取消收藏设备 + */ + @PostMapping(Constants.API_SPACE.COLLECT_DEVICE) + public void collectDevice(SpaceDevice spaceDevice) { + SpaceDevice oldSpaceDevice=spaceDeviceData.findByDeviceId(spaceDevice.getDeviceId()); + oldSpaceDevice.setCollect(spaceDevice.getCollect()); + spaceDeviceData.save(oldSpaceDevice); + } + /** * 我的空间设备列表-按空间获取 * @@ -107,6 +131,7 @@ public class SpaceDeviceController { .online(state != null && state.isOnline()) .property(device.getProperty()) .uid(sd.getUid()) + .collect(sd.getCollect()) .build(); } @@ -173,7 +198,7 @@ public class SpaceDeviceController { return findDeviceVo; } - /** + /**REMOVE_DEVICE * 往指定房间中添加设备 */ @PostMapping(Constants.API_SPACE.ADD_DEVICE) diff --git a/iot-standalone/src/main/java/cc/iotkit/manager/model/vo/SpaceDeviceVo.java b/iot-standalone/src/main/java/cc/iotkit/manager/model/vo/SpaceDeviceVo.java index f6a84ddd..62784654 100755 --- a/iot-standalone/src/main/java/cc/iotkit/manager/model/vo/SpaceDeviceVo.java +++ b/iot-standalone/src/main/java/cc/iotkit/manager/model/vo/SpaceDeviceVo.java @@ -89,4 +89,9 @@ public class SpaceDeviceVo { * 品类名 */ private String categoryName; + + /** + * 是否收藏 + */ + private Boolean collect; } diff --git a/iot-standalone/src/main/java/cc/iotkit/manager/service/DeviceService.java b/iot-standalone/src/main/java/cc/iotkit/manager/service/DeviceService.java index 4f114883..bb49a870 100755 --- a/iot-standalone/src/main/java/cc/iotkit/manager/service/DeviceService.java +++ b/iot-standalone/src/main/java/cc/iotkit/manager/service/DeviceService.java @@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; +import java.util.List; import java.util.Map; @Slf4j @@ -68,6 +69,17 @@ public class DeviceService { args, ThingModelMessage.TYPE_SERVICE, service); } + /** + * 设备属性获取 + */ + public String getProperty(String deviceId, List properties, + boolean checkOwner) { + DeviceInfo device = getAndCheckDevice(deviceId, checkOwner); + + return send(deviceId, device.getProductKey(), device.getDeviceName(), properties, + ThingModelMessage.TYPE_PROPERTY, ThingModelMessage.ID_PROPERTY_GET); + } + /** * 设备属性设置 */ diff --git a/iot-standalone/src/main/java/cc/iotkit/manager/service/ThingModelService.java b/iot-standalone/src/main/java/cc/iotkit/manager/service/ThingModelService.java index 9efd9242..25c5328f 100755 --- a/iot-standalone/src/main/java/cc/iotkit/manager/service/ThingModelService.java +++ b/iot-standalone/src/main/java/cc/iotkit/manager/service/ThingModelService.java @@ -11,6 +11,7 @@ package cc.iotkit.manager.service; import cc.iotkit.common.thing.ThingService; import cc.iotkit.data.IThingModelData; +import cc.iotkit.model.device.message.ThingModelMessage; import cc.iotkit.model.product.ThingModel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -37,7 +38,12 @@ public class ThingModelService { if (properties == null) { return; } - params = parseProperties(properties, (Map) service.getParams()); + if(identifier.equals(ThingModelMessage.ID_PROPERTY_GET)){ + params = service.getParams(); + } + else { + params = parseProperties(properties, (Map) service.getParams()); + } } else if (ThingService.TYPE_SERVICE.equals(type)) { //服务调用 Map services = model.serviceMap(); diff --git a/pom.xml b/pom.xml index 7b753fc7..3ec65b8e 100755 --- a/pom.xml +++ b/pom.xml @@ -335,6 +335,11 @@ iot-td-temporal-service ${project.version} + + cc.iotkit + iot-ts-temporal-service + ${project.version} + cc.iotkit