fix: 更新状态检查物模型消息,更新上下线规则引擎bug

V0.5.x
gaoyoulong 2024-04-29 11:22:45 +08:00
parent 993f05b75e
commit 4bee56623d
4 changed files with 64 additions and 35 deletions

View File

@ -11,10 +11,14 @@ package cc.iotkit.manager.service;
import cc.iotkit.common.api.PageRequest; import cc.iotkit.common.api.PageRequest;
import cc.iotkit.common.api.Paging; import cc.iotkit.common.api.Paging;
import cc.iotkit.common.utils.UniqueIdUtil;
import cc.iotkit.data.manager.IDeviceInfoData; import cc.iotkit.data.manager.IDeviceInfoData;
import cc.iotkit.data.manager.IProductData; import cc.iotkit.data.manager.IProductData;
import cc.iotkit.model.device.DeviceInfo; import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.product.Product; import cc.iotkit.model.product.Product;
import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.DeviceState;
import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
@ -38,6 +42,9 @@ public class DeviceStateCheckTask {
@Qualifier("productDataCache") @Qualifier("productDataCache")
private IProductData productData; private IProductData productData;
@Autowired
private IThingService thingService;
@Scheduled(fixedDelay = 10, initialDelay = 20, timeUnit = TimeUnit.SECONDS) @Scheduled(fixedDelay = 10, initialDelay = 20, timeUnit = TimeUnit.SECONDS)
public void syncState() { public void syncState() {
int pn = 1; int pn = 1;
@ -61,11 +68,15 @@ public class DeviceStateCheckTask {
continue; continue;
} }
log.info("device state check offline,{}", deviceId); log.info("device state check offline,{}", deviceId);
//更新为离线
DeviceInfo.State state = realTimeDevice.getState(); // 发送设备离线物模型消息
state.setOnline(false); thingService.post("NONE", DeviceStateChange.builder()
state.setOfflineTime(System.currentTimeMillis()); .id(UniqueIdUtil.newRequestId())
deviceInfoData.save(realTimeDevice); .productKey(realTimeDevice.getProductKey())
.deviceName(realTimeDevice.getDeviceName())
.state(DeviceState.OFFLINE)
.time(System.currentTimeMillis())
.build());
} }
} }

View File

@ -74,14 +74,7 @@ public class ThingServiceImpl implements IThingService {
subRegisterDevice(pluginId, device, (SubDeviceRegister) action); subRegisterDevice(pluginId, device, (SubDeviceRegister) action);
break; break;
case STATE_CHANGE: case STATE_CHANGE:
publishMsg( deviceStateChange(device, (DeviceStateChange) action);
device, action,
ThingModelMessage.builder()
.type(ThingModelMessage.TYPE_STATE)
.identifier(((DeviceStateChange) action).getState().getState())
.time(System.currentTimeMillis())
.build()
);
break; break;
case EVENT_REPORT: case EVENT_REPORT:
EventReport eventReport = (EventReport) action; EventReport eventReport = (EventReport) action;
@ -190,6 +183,29 @@ public class ThingServiceImpl implements IThingService {
return device.getProperty(); return device.getProperty();
} }
private void deviceStateChange(DeviceInfo device, DeviceStateChange action) {
DeviceState state = action.getState();
if (state == DeviceState.ONLINE) {
device.getState().setOnline(true);
device.getState().setOnlineTime(System.currentTimeMillis());
} else {
device.getState().setOnline(false);
device.getState().setOfflineTime(System.currentTimeMillis());
}
deviceInfoData.save(device);
publishMsg(
device, action,
ThingModelMessage.builder()
.type(ThingModelMessage.TYPE_STATE)
.identifier(action.getState().getState())
.time(System.currentTimeMillis())
.build()
);
}
private String registerDevice(DeviceInfo device, DeviceRegister register, String parentId) { private String registerDevice(DeviceInfo device, DeviceRegister register, String parentId) {
String productKey = register.getProductKey(); String productKey = register.getProductKey();
//指定了pk需验证 //指定了pk需验证

View File

@ -108,6 +108,10 @@
<groupId>io.vertx</groupId> <groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId> <artifactId>vertx-kafka-client</artifactId>
</dependency> </dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>iot-plugin-core</artifactId>
</dependency>
</dependencies> </dependencies>

View File

@ -10,14 +10,17 @@
package cc.iotkit.ruleengine.handler.sys; package cc.iotkit.ruleengine.handler.sys;
import cc.iotkit.common.thing.ThingModelMessage; import cc.iotkit.common.thing.ThingModelMessage;
import cc.iotkit.common.utils.UniqueIdUtil;
import cc.iotkit.data.manager.IDeviceInfoData; import cc.iotkit.data.manager.IDeviceInfoData;
import cc.iotkit.model.device.DeviceInfo; import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.plugin.core.thing.actions.DeviceState;
import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange;
import cc.iotkit.ruleengine.handler.DeviceMessageHandler; import cc.iotkit.ruleengine.handler.DeviceMessageHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import cc.iotkit.plugin.core.thing.IThingService;
/** /**
* *
@ -30,6 +33,8 @@ public class DeviceStateCheckHandler implements DeviceMessageHandler {
@Qualifier("deviceInfoDataCache") @Qualifier("deviceInfoDataCache")
private IDeviceInfoData deviceInfoData; private IDeviceInfoData deviceInfoData;
@Autowired
private IThingService thingService;
@Override @Override
public void handle(ThingModelMessage msg) { public void handle(ThingModelMessage msg) {
@ -58,32 +63,25 @@ public class DeviceStateCheckHandler implements DeviceMessageHandler {
//过滤oat消息 //过滤oat消息
if (ThingModelMessage.TYPE_CONFIG.equals(type) || if (ThingModelMessage.TYPE_CONFIG.equals(type) ||
ThingModelMessage.TYPE_OTA.equals(type) || ThingModelMessage.TYPE_OTA.equals(type) ||
ThingModelMessage.TYPE_LIFETIME.equals(type)) { ThingModelMessage.TYPE_LIFETIME.equals(type) ||
ThingModelMessage.TYPE_STATE.equals(type)
) {
return; return;
} }
//当前设备状态 // 如果在线,则不处理
boolean online = false; if( deviceInfo.getState().isOnline() ) {
//在离线消息 return;
if (ThingModelMessage.TYPE_STATE.equals(type)) {
online = ThingModelMessage.ID_ONLINE.equals(identifier);
} else {
//其它消息都认作为在线
online = true;
} }
DeviceInfo.State state = deviceInfo.getState(); // 其他消息, 发送设备在线物模型消息
if (state != null && state.isOnline() != online) { thingService.post("NONE", DeviceStateChange.builder()
//状态有变更 .id(UniqueIdUtil.newRequestId())
state.setOnline(online); .productKey(deviceInfo.getProductKey())
if (online) { .deviceName(deviceInfo.getDeviceName())
state.setOnlineTime(System.currentTimeMillis()); .state(DeviceState.ONLINE)
} else { .time(System.currentTimeMillis())
state.setOfflineTime(System.currentTimeMillis()); .build());
}
}
deviceInfoData.save(deviceInfo);
} }
} }