diff --git a/iot-module/iot-manager/src/main/java/cc/iotkit/manager/service/DeviceStateCheckTask.java b/iot-module/iot-manager/src/main/java/cc/iotkit/manager/service/DeviceStateCheckTask.java index f51cd912..3e1ea3c0 100644 --- a/iot-module/iot-manager/src/main/java/cc/iotkit/manager/service/DeviceStateCheckTask.java +++ b/iot-module/iot-manager/src/main/java/cc/iotkit/manager/service/DeviceStateCheckTask.java @@ -11,10 +11,14 @@ package cc.iotkit.manager.service; import cc.iotkit.common.api.PageRequest; import cc.iotkit.common.api.Paging; +import cc.iotkit.common.utils.UniqueIdUtil; import cc.iotkit.data.manager.IDeviceInfoData; import cc.iotkit.data.manager.IProductData; import cc.iotkit.model.device.DeviceInfo; import cc.iotkit.model.product.Product; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.DeviceState; +import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -38,6 +42,9 @@ public class DeviceStateCheckTask { @Qualifier("productDataCache") private IProductData productData; + @Autowired + private IThingService thingService; + @Scheduled(fixedDelay = 10, initialDelay = 20, timeUnit = TimeUnit.SECONDS) public void syncState() { int pn = 1; @@ -61,11 +68,15 @@ public class DeviceStateCheckTask { continue; } log.info("device state check offline,{}", deviceId); - //更新为离线 - DeviceInfo.State state = realTimeDevice.getState(); - state.setOnline(false); - state.setOfflineTime(System.currentTimeMillis()); - deviceInfoData.save(realTimeDevice); + + // 发送设备离线物模型消息 + thingService.post("NONE", DeviceStateChange.builder() + .id(UniqueIdUtil.newRequestId()) + .productKey(realTimeDevice.getProductKey()) + .deviceName(realTimeDevice.getDeviceName()) + .state(DeviceState.OFFLINE) + .time(System.currentTimeMillis()) + .build()); } } diff --git a/iot-module/iot-plugin/iot-plugin-main/src/main/java/cc/iotkit/plugin/main/ThingServiceImpl.java b/iot-module/iot-plugin/iot-plugin-main/src/main/java/cc/iotkit/plugin/main/ThingServiceImpl.java index 04580f4a..e412b841 100644 --- a/iot-module/iot-plugin/iot-plugin-main/src/main/java/cc/iotkit/plugin/main/ThingServiceImpl.java +++ b/iot-module/iot-plugin/iot-plugin-main/src/main/java/cc/iotkit/plugin/main/ThingServiceImpl.java @@ -74,14 +74,7 @@ public class ThingServiceImpl implements IThingService { subRegisterDevice(pluginId, device, (SubDeviceRegister) action); break; case STATE_CHANGE: - publishMsg( - device, action, - ThingModelMessage.builder() - .type(ThingModelMessage.TYPE_STATE) - .identifier(((DeviceStateChange) action).getState().getState()) - .time(System.currentTimeMillis()) - .build() - ); + deviceStateChange(device, (DeviceStateChange) action); break; case EVENT_REPORT: EventReport eventReport = (EventReport) action; @@ -190,6 +183,29 @@ public class ThingServiceImpl implements IThingService { return device.getProperty(); } + + private void deviceStateChange(DeviceInfo device, DeviceStateChange action) { + DeviceState state = action.getState(); + if (state == DeviceState.ONLINE) { + device.getState().setOnline(true); + device.getState().setOnlineTime(System.currentTimeMillis()); + } else { + device.getState().setOnline(false); + device.getState().setOfflineTime(System.currentTimeMillis()); + } + deviceInfoData.save(device); + + publishMsg( + device, action, + ThingModelMessage.builder() + .type(ThingModelMessage.TYPE_STATE) + .identifier(action.getState().getState()) + .time(System.currentTimeMillis()) + .build() + ); + } + + private String registerDevice(DeviceInfo device, DeviceRegister register, String parentId) { String productKey = register.getProductKey(); //指定了pk需验证 diff --git a/iot-module/iot-rule-engine/pom.xml b/iot-module/iot-rule-engine/pom.xml index def04b47..bf30a5e2 100644 --- a/iot-module/iot-rule-engine/pom.xml +++ b/iot-module/iot-rule-engine/pom.xml @@ -108,6 +108,10 @@ io.vertx vertx-kafka-client + + cc.iotkit + iot-plugin-core + diff --git a/iot-module/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/handler/sys/DeviceStateCheckHandler.java b/iot-module/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/handler/sys/DeviceStateCheckHandler.java index 339009e7..613a3b85 100644 --- a/iot-module/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/handler/sys/DeviceStateCheckHandler.java +++ b/iot-module/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/handler/sys/DeviceStateCheckHandler.java @@ -10,14 +10,17 @@ package cc.iotkit.ruleengine.handler.sys; import cc.iotkit.common.thing.ThingModelMessage; +import cc.iotkit.common.utils.UniqueIdUtil; import cc.iotkit.data.manager.IDeviceInfoData; import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.plugin.core.thing.actions.DeviceState; +import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange; import cc.iotkit.ruleengine.handler.DeviceMessageHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; - +import cc.iotkit.plugin.core.thing.IThingService; /** * 设备状态检查 @@ -30,6 +33,8 @@ public class DeviceStateCheckHandler implements DeviceMessageHandler { @Qualifier("deviceInfoDataCache") private IDeviceInfoData deviceInfoData; + @Autowired + private IThingService thingService; @Override public void handle(ThingModelMessage msg) { @@ -58,32 +63,25 @@ public class DeviceStateCheckHandler implements DeviceMessageHandler { //过滤oat消息 if (ThingModelMessage.TYPE_CONFIG.equals(type) || ThingModelMessage.TYPE_OTA.equals(type) || - ThingModelMessage.TYPE_LIFETIME.equals(type)) { + ThingModelMessage.TYPE_LIFETIME.equals(type) || + ThingModelMessage.TYPE_STATE.equals(type) + ) { return; } - //当前设备状态 - boolean online = false; - //在离线消息 - if (ThingModelMessage.TYPE_STATE.equals(type)) { - online = ThingModelMessage.ID_ONLINE.equals(identifier); - } else { - //其它消息都认作为在线 - online = true; + // 如果在线,则不处理 + if( deviceInfo.getState().isOnline() ) { + return; } - DeviceInfo.State state = deviceInfo.getState(); - if (state != null && state.isOnline() != online) { - //状态有变更 - state.setOnline(online); - if (online) { - state.setOnlineTime(System.currentTimeMillis()); - } else { - state.setOfflineTime(System.currentTimeMillis()); - } - } - deviceInfoData.save(deviceInfo); - + // 其他消息, 发送设备在线物模型消息 + thingService.post("NONE", DeviceStateChange.builder() + .id(UniqueIdUtil.newRequestId()) + .productKey(deviceInfo.getProductKey()) + .deviceName(deviceInfo.getDeviceName()) + .state(DeviceState.ONLINE) + .time(System.currentTimeMillis()) + .build()); } }