feat:产品增加保活时长、离线状态更新
parent
a172cee183
commit
cfc30500cc
|
@ -86,6 +86,10 @@ public class DeviceInfo implements Owned<String> {
|
||||||
|
|
||||||
private Long createAt;
|
private Long createAt;
|
||||||
|
|
||||||
|
public boolean isOnline() {
|
||||||
|
return state != null && state.isOnline();
|
||||||
|
}
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
|
@ -100,6 +104,7 @@ public class DeviceInfo implements Owned<String> {
|
||||||
@Data
|
@Data
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
|
@Builder
|
||||||
public static class State {
|
public static class State {
|
||||||
|
|
||||||
private boolean online;
|
private boolean online;
|
||||||
|
|
|
@ -60,6 +60,11 @@ public class Product extends TenantModel implements Id<Long>, Serializable {
|
||||||
*/
|
*/
|
||||||
private String locateUpdateType;
|
private String locateUpdateType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 保活时长(秒)
|
||||||
|
*/
|
||||||
|
private Long keepAliveTime;
|
||||||
|
|
||||||
private Long createAt;
|
private Long createAt;
|
||||||
|
|
||||||
public boolean isTransparent() {
|
public boolean isTransparent() {
|
||||||
|
|
|
@ -35,6 +35,14 @@ public interface IDeviceInfoData extends IOwnedData<DeviceInfo, String> {
|
||||||
*/
|
*/
|
||||||
Map<String, DevicePropertyCache> getProperties(String deviceId);
|
Map<String, DevicePropertyCache> getProperties(String deviceId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取设备属性更新时间
|
||||||
|
*
|
||||||
|
* @param deviceId 设备id
|
||||||
|
* @return timestamp
|
||||||
|
*/
|
||||||
|
long getPropertyUpdateTime(String deviceId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 根据设备ID取设备信息
|
* 根据设备ID取设备信息
|
||||||
*
|
*
|
||||||
|
|
|
@ -20,6 +20,9 @@ import cc.iotkit.model.device.DeviceInfo;
|
||||||
import cc.iotkit.model.device.message.DevicePropertyCache;
|
import cc.iotkit.model.device.message.DevicePropertyCache;
|
||||||
import cc.iotkit.model.stats.DataItem;
|
import cc.iotkit.model.stats.DataItem;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.beans.factory.SmartInitializingSingleton;
|
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
@ -94,7 +97,9 @@ public class DeviceInfoDataCache implements IDeviceInfoData, SmartInitializingSi
|
||||||
public void saveProperties(String deviceId, Map<String, DevicePropertyCache> properties) {
|
public void saveProperties(String deviceId, Map<String, DevicePropertyCache> properties) {
|
||||||
Map<String, DevicePropertyCache> old = getProperties(deviceId);
|
Map<String, DevicePropertyCache> old = getProperties(deviceId);
|
||||||
old.putAll(properties);
|
old.putAll(properties);
|
||||||
redisTemplate.opsForValue().set(getPropertyCacheKey(deviceId), JsonUtils.toJsonString(old));
|
redisTemplate.opsForValue().set(getPropertyCacheKey(deviceId),
|
||||||
|
JsonUtils.toJsonString(new PropertyCacheInfo(System.currentTimeMillis(), old))
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,14 +111,23 @@ public class DeviceInfoDataCache implements IDeviceInfoData, SmartInitializingSi
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, DevicePropertyCache> getProperties(String deviceId) {
|
public Map<String, DevicePropertyCache> getProperties(String deviceId) {
|
||||||
|
return getPropertyCacheInfo(deviceId).getProperties();
|
||||||
|
}
|
||||||
|
|
||||||
|
private PropertyCacheInfo getPropertyCacheInfo(String deviceId) {
|
||||||
String json = redisTemplate.opsForValue().get(getPropertyCacheKey(deviceId));
|
String json = redisTemplate.opsForValue().get(getPropertyCacheKey(deviceId));
|
||||||
if (StringUtils.isBlank(json)) {
|
if (StringUtils.isBlank(json)) {
|
||||||
return new HashMap<>();
|
return new PropertyCacheInfo(0, new HashMap<>());
|
||||||
}
|
}
|
||||||
return JsonUtils.parseObject(json, new TypeReference<>() {
|
return JsonUtils.parseObject(json, new TypeReference<>() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getPropertyUpdateTime(String deviceId) {
|
||||||
|
return getPropertyCacheInfo(deviceId).getUpdateTime();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Cacheable(value = Constants.CACHE_DEVICE_INFO, key = "#root.method.name+#deviceId", unless = "#result == null")
|
@Cacheable(value = Constants.CACHE_DEVICE_INFO, key = "#root.method.name+#deviceId", unless = "#result == null")
|
||||||
public DeviceInfo findByDeviceId(String deviceId) {
|
public DeviceInfo findByDeviceId(String deviceId) {
|
||||||
|
@ -304,4 +318,15 @@ public class DeviceInfoDataCache implements IDeviceInfoData, SmartInitializingSi
|
||||||
List<String> subDeviceIds = deviceInfoData.findSubDeviceIds(parentId);
|
List<String> subDeviceIds = deviceInfoData.findSubDeviceIds(parentId);
|
||||||
deviceInfoCachePut.findSubDeviceIds(parentId, subDeviceIds);
|
deviceInfoCachePut.findSubDeviceIds(parentId, subDeviceIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
private static class PropertyCacheInfo {
|
||||||
|
|
||||||
|
private long updateTime;
|
||||||
|
|
||||||
|
private Map<String, DevicePropertyCache> properties;
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,6 +100,11 @@ public class DeviceInfoPropertyDataCache implements IDeviceInfoData {
|
||||||
return deviceInfoData.getProperties(deviceId);
|
return deviceInfoData.getProperties(deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getPropertyUpdateTime(String deviceId) {
|
||||||
|
return deviceInfoData.getPropertyUpdateTime(deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DeviceInfo findByDeviceId(String deviceId) {
|
public DeviceInfo findByDeviceId(String deviceId) {
|
||||||
DeviceInfo deviceInfo = deviceInfoData.findByDeviceId(deviceId);
|
DeviceInfo deviceInfo = deviceInfoData.findByDeviceId(deviceId);
|
||||||
|
|
|
@ -77,6 +77,9 @@ public class TbProduct implements TenantAware {
|
||||||
@ApiModelProperty(value = "定位更新方式")
|
@ApiModelProperty(value = "定位更新方式")
|
||||||
private String locateUpdateType;
|
private String locateUpdateType;
|
||||||
|
|
||||||
|
@ApiModelProperty(value = "保活时长(秒)")
|
||||||
|
private Long keepAliveTime;
|
||||||
|
|
||||||
@ApiModelProperty(value = "创建时间")
|
@ApiModelProperty(value = "创建时间")
|
||||||
private Long createAt;
|
private Long createAt;
|
||||||
|
|
||||||
|
|
|
@ -84,6 +84,11 @@ public class DeviceInfoDataImpl implements IDeviceInfoData, IJPACommData<DeviceI
|
||||||
return new HashMap<>();
|
return new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getPropertyUpdateTime(String deviceId) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DeviceInfo findByDeviceId(String deviceId) {
|
public DeviceInfo findByDeviceId(String deviceId) {
|
||||||
TbDeviceInfo tbDeviceInfo = deviceInfoRepository.findByDeviceId(deviceId);
|
TbDeviceInfo tbDeviceInfo = deviceInfoRepository.findByDeviceId(deviceId);
|
||||||
|
@ -96,9 +101,9 @@ public class DeviceInfoDataImpl implements IDeviceInfoData, IJPACommData<DeviceI
|
||||||
/**
|
/**
|
||||||
* 填充设备其它信息
|
* 填充设备其它信息
|
||||||
*/
|
*/
|
||||||
private void fillDeviceInfo(String deviceId, TbDeviceInfo vo, DeviceInfo dto) {
|
private DeviceInfo fillDeviceInfo(String deviceId, TbDeviceInfo vo, DeviceInfo dto) {
|
||||||
if (vo == null || dto == null) {
|
if (vo == null || dto == null) {
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
//取子关联用户
|
//取子关联用户
|
||||||
dto.setSubUid(deviceSubUserRepository.findByDeviceId(deviceId).stream()
|
dto.setSubUid(deviceSubUserRepository.findByDeviceId(deviceId).stream()
|
||||||
|
@ -126,6 +131,7 @@ public class DeviceInfoDataImpl implements IDeviceInfoData, IJPACommData<DeviceI
|
||||||
|
|
||||||
//将设备状态从vo转为dto的
|
//将设备状态从vo转为dto的
|
||||||
parseStateToDto(vo, dto);
|
parseStateToDto(vo, dto);
|
||||||
|
return dto;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -445,7 +451,11 @@ public class DeviceInfoDataImpl implements IDeviceInfoData, IJPACommData<DeviceI
|
||||||
@Override
|
@Override
|
||||||
public Paging<DeviceInfo> findAll(PageRequest<DeviceInfo> pageRequest) {
|
public Paging<DeviceInfo> findAll(PageRequest<DeviceInfo> pageRequest) {
|
||||||
Page<TbDeviceInfo> ret = deviceInfoRepository.findAll(PageBuilder.toPageable(pageRequest));
|
Page<TbDeviceInfo> ret = deviceInfoRepository.findAll(PageBuilder.toPageable(pageRequest));
|
||||||
return new Paging<>(ret.getTotalElements(), MapstructUtils.convert(ret.getContent(), DeviceInfo.class));
|
List<DeviceInfo> list = new ArrayList<>();
|
||||||
|
for (TbDeviceInfo deviceInfo : ret.getContent()) {
|
||||||
|
list.add(fillDeviceInfo(deviceInfo.getDeviceId(), deviceInfo, MapstructUtils.convert(deviceInfo, DeviceInfo.class)));
|
||||||
|
}
|
||||||
|
return new Paging<>(ret.getTotalElements(), list);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -5,6 +5,8 @@ import cc.iotkit.model.product.Product;
|
||||||
import io.github.linpeilie.annotations.AutoMapper;
|
import io.github.linpeilie.annotations.AutoMapper;
|
||||||
import io.swagger.annotations.ApiModel;
|
import io.swagger.annotations.ApiModel;
|
||||||
import io.swagger.annotations.ApiModelProperty;
|
import io.swagger.annotations.ApiModelProperty;
|
||||||
|
import jakarta.validation.constraints.Min;
|
||||||
|
import jakarta.validation.constraints.NotBlank;
|
||||||
import jakarta.validation.constraints.Size;
|
import jakarta.validation.constraints.Size;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
|
@ -59,4 +61,9 @@ public class ProductBo extends BaseDto {
|
||||||
@Size(max = 255, message = "产品密钥长度不正确")
|
@Size(max = 255, message = "产品密钥长度不正确")
|
||||||
private String productSecret;
|
private String productSecret;
|
||||||
|
|
||||||
|
@ApiModelProperty(value = "保活时长")
|
||||||
|
@NotBlank(message = "保活时长不能为空")
|
||||||
|
@Min(value = 10, message = "保活时长(秒)必须大于10")
|
||||||
|
private Long keepAliveTime;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,4 +67,8 @@ public class ProductVo implements Serializable {
|
||||||
@ExcelProperty(value = "用户ID")
|
@ExcelProperty(value = "用户ID")
|
||||||
private String uid;
|
private String uid;
|
||||||
|
|
||||||
|
@ApiModelProperty(value = "保活时长(秒)")
|
||||||
|
@ExcelProperty(value = "保活时长(秒)")
|
||||||
|
private Long keepAliveTime;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,16 +9,71 @@
|
||||||
*/
|
*/
|
||||||
package cc.iotkit.manager.service;
|
package cc.iotkit.manager.service;
|
||||||
|
|
||||||
|
import cc.iotkit.common.api.PageRequest;
|
||||||
|
import cc.iotkit.common.api.Paging;
|
||||||
|
import cc.iotkit.data.manager.IDeviceInfoData;
|
||||||
|
import cc.iotkit.data.manager.IProductData;
|
||||||
|
import cc.iotkit.model.device.DeviceInfo;
|
||||||
|
import cc.iotkit.model.product.Product;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设备状态检查定时任务
|
* 设备状态检查定时任务
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class DeviceStateCheckTask {
|
public class DeviceStateCheckTask {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("deviceInfoDataCache")
|
||||||
|
private IDeviceInfoData deviceInfoData;
|
||||||
|
|
||||||
private void checkClientStateFromEmq() {
|
@Autowired
|
||||||
|
@Qualifier("productDataCache")
|
||||||
|
private IProductData productData;
|
||||||
|
|
||||||
|
@Scheduled(fixedDelay = 10, initialDelay = 20, timeUnit = TimeUnit.SECONDS)
|
||||||
|
public void syncState() {
|
||||||
|
int pn = 1;
|
||||||
|
Paging<DeviceInfo> all;
|
||||||
|
while (true) {
|
||||||
|
//取出数据库中所有在线设备
|
||||||
|
all = deviceInfoData.findByConditions("","","","",true,"",pn,1000);
|
||||||
|
//判断属性更新时间是否大于产品定义保活时长
|
||||||
|
for (DeviceInfo device : all.getRows()) {
|
||||||
|
Product product = productData.findByProductKey(device.getProductKey());
|
||||||
|
Long keepAliveTime = product.getKeepAliveTime();
|
||||||
|
if (keepAliveTime == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String deviceId = device.getDeviceId();
|
||||||
|
long updateTime = deviceInfoData.getPropertyUpdateTime(deviceId);
|
||||||
|
//最后更新时间超时保活时长1.1倍认为设备离线了
|
||||||
|
if (System.currentTimeMillis() - updateTime > keepAliveTime * 1000 * 1.1) {
|
||||||
|
DeviceInfo realTimeDevice = deviceInfoData.findByDeviceId(deviceId);
|
||||||
|
if (!realTimeDevice.isOnline()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
log.info("device state check offline,{}", deviceId);
|
||||||
|
//更新为离线
|
||||||
|
DeviceInfo.State state = realTimeDevice.getState();
|
||||||
|
state.setOnline(false);
|
||||||
|
state.setOfflineTime(System.currentTimeMillis());
|
||||||
|
deviceInfoData.save(realTimeDevice);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (all.getRows().size() < 1000) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pn++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class SysOssConfigServiceImpl implements ISysOssConfigService {
|
||||||
private final ISysOssConfigData baseData;
|
private final ISysOssConfigData baseData;
|
||||||
|
|
||||||
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.SECONDS)
|
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.SECONDS)
|
||||||
private void keepAlive() {
|
private void checkOssConfig() {
|
||||||
String configKey = RedisUtils.getCacheObject(OssConstant.DEFAULT_CONFIG_KEY);
|
String configKey = RedisUtils.getCacheObject(OssConstant.DEFAULT_CONFIG_KEY);
|
||||||
if(configKey==null){
|
if(configKey==null){
|
||||||
init();
|
init();
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2024 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.config;
|
||||||
|
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableScheduling
|
||||||
|
public class SchedulerConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
|
||||||
|
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
||||||
|
scheduler.setPoolSize(10); // 设置线程池大小
|
||||||
|
scheduler.setThreadNamePrefix("spring-scheduled-"); // 设置线程名前缀
|
||||||
|
scheduler.initialize();
|
||||||
|
return scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue