Merge branch 'dev'

V0.5.x
xiwa 2022-05-29 23:39:18 +08:00
commit 0c3482148d
55 changed files with 1313 additions and 178 deletions

View File

@ -10,6 +10,8 @@ public interface Constants {
String DEVICE_CACHE = "device_cache";
String DEVICE_STATS_CACHE = "device_stats_cache";
String CATEGORY_CACHE = "category_cache";
String SPACE_CACHE = "space_cache";

View File

@ -1,4 +1,4 @@
package cc.iotkit.converter;
package cc.iotkit.common.thing;
import lombok.AllArgsConstructor;
import lombok.Builder;

View File

@ -5,19 +5,26 @@ import lombok.SneakyThrows;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.beanutils.BeanUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReflectUtil {
@SneakyThrows
public static <T> T copyNoNulls(T from, T to) {
public static <T> T copyNoNulls(T from, T to, String... fields) {
List<String> fieldList = Arrays.asList(fields);
Map<String, Object> map = new HashMap<>();
new BeanMap(from).forEach((key, value) -> {
if (value == null) {
return;
}
map.put(key.toString(), value);
String field = key.toString();
if (fields.length == 0 || fieldList.contains(field)) {
map.put(field, value);
}
});
BeanUtils.populate(to, map);
return to;

View File

@ -6,4 +6,8 @@ import org.springframework.stereotype.Repository;
@Repository
public interface CategoryRepository extends MongoRepository<Category, String> {
int countBy();
}

View File

@ -2,17 +2,21 @@ package cc.iotkit.dao;
import cc.iotkit.common.Constants;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.stats.DataItem;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Repository;
import javax.annotation.PostConstruct;
import java.util.List;
@Repository
public class DeviceCache {
@Autowired
private DeviceRepository deviceRepository;
@Autowired
private DeviceDao deviceDao;
private static DeviceCache INSTANCE;
@ -35,4 +39,9 @@ public class DeviceCache {
return deviceRepository.findById(deviceId).orElse(null);
}
@Cacheable(value = Constants.DEVICE_STATS_CACHE, key = "#uid")
public List<DataItem> getDeviceStatsByCategory(String uid) {
return deviceDao.getDeviceStatsByCategory(uid);
}
}

View File

@ -2,22 +2,36 @@ package cc.iotkit.dao;
import cc.iotkit.model.Paging;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.product.Category;
import cc.iotkit.model.product.Product;
import cc.iotkit.model.stats.DataItem;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Repository
public class DeviceDao {
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private ProductRepository productRepository;
@Autowired
private CategoryRepository categoryRepository;
public Paging<DeviceInfo> find(Criteria condition, int size, int page) {
Query query = Query.query(condition);
@ -64,4 +78,66 @@ public class DeviceDao {
mongoTemplate.updateFirst(query, update, DeviceInfo.class);
}
/**
*
*/
public List<DataItem> getDeviceStatsByCategory(String uid) {
MatchOperation matchOperation;
if (StringUtils.isBlank(uid)) {
matchOperation = Aggregation.match(new Criteria());
} else {
matchOperation = Aggregation.match(Criteria.where("uid").is(uid));
}
//先按产品分组统计
GroupOperation groupOperation = Aggregation.group("productKey").count().as("total");
ProjectionOperation projectionOperation = Aggregation.project("productKey", "uid");
Aggregation aggregation = Aggregation.newAggregation(projectionOperation, groupOperation, matchOperation);
AggregationResults<Map> result = mongoTemplate.aggregate(aggregation, DeviceInfo.class, Map.class);
List<Map> stats = result.getMappedResults();
//取用户产品列表
List<Product> products;
if (StringUtils.isBlank(uid)) {
products = productRepository.findAll();
} else {
products = productRepository.findByUid(uid);
}
Map<String, String> pkCateMap = new HashMap<>();
for (Product product : products) {
pkCateMap.put(product.getId(), product.getCategory());
}
//取品类
List<Category> categories = categoryRepository.findAll();
Map<String, String> cateNames = new HashMap<>();
for (Category category : categories) {
cateNames.put(category.getId(), category.getName());
}
Map<String, Long> cateStats = new HashMap<>();
for (Map stat : stats) {
String productKey = stat.get("_id").toString();
String cateName = cateNames.get(pkCateMap.get(productKey));
//按品类汇总
long total = cateStats.getOrDefault(cateName, 0L);
total += (Integer) stat.get("total");
cateStats.put(cateName, total);
}
List<DataItem> items = new ArrayList<>();
cateStats.forEach((key, val) -> {
items.add(new DataItem(key, val));
});
return items;
}
/**
*
*/
public List<DataItem> getDeviceStatsByCategory() {
return getDeviceStatsByCategory(null);
}
}

View File

@ -0,0 +1,70 @@
package cc.iotkit.dao;
import cc.iotkit.model.device.message.DeviceReport;
import cc.iotkit.model.stats.TimeData;
import lombok.SneakyThrows;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregations;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Repository;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
@Repository
public class DeviceReportDao {
@Autowired
private ElasticsearchRestTemplate template;
/**
*
*/
public List<TimeData> getDeviceMessageStatsWithUid(String uid, long start, long end) {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.rangeQuery("time")
.from(start, true).to(end, true));
if (uid != null) {
queryBuilder =
queryBuilder.must(QueryBuilders.termQuery("uid", uid));
}
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(queryBuilder)
.withAggregations(AggregationBuilders.dateHistogram("agg")
.field("time")
.calendarInterval(DateHistogramInterval.HOUR)
.calendarInterval(DateHistogramInterval.hours(1))
)
.build();
ElasticsearchAggregations result = (ElasticsearchAggregations) template
.search(query, DeviceReport.class).getAggregations();
ParsedDateHistogram histogram = result.aggregations().get("agg");
List<TimeData> data = new ArrayList<>();
for (Histogram.Bucket bucket : histogram.getBuckets()) {
long seconds = ((ZonedDateTime) bucket.getKey()).toInstant().getEpochSecond();
data.add(new TimeData(seconds * 1000, bucket.getDocCount()));
}
return data;
}
/**
*
*/
@SneakyThrows
public List<TimeData> getDeviceMessageStats(long start, long end) {
return getDeviceMessageStatsWithUid(null, start, end);
}
}

View File

@ -0,0 +1,12 @@
package cc.iotkit.dao;
import cc.iotkit.model.device.message.DeviceReport;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface DeviceReportRepository extends ElasticsearchRepository<DeviceReport, String> {
long countByUid(String uid);
}

View File

@ -17,4 +17,6 @@ public interface DeviceRepository extends MongoRepository<DeviceInfo, String> {
List<DeviceInfo> findByDeviceName(String deviceName);
long countByUid(String uid);
}

View File

@ -4,9 +4,12 @@ import cc.iotkit.model.product.Product;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface ProductRepository extends MongoRepository<Product, String> {
long countByUid(String uid);
List<Product> findByUid(String uid);
}

View File

@ -35,11 +35,10 @@ public class ThingModelMessageDao {
builder.must(QueryBuilders.matchPhraseQuery("identifier", identifier));
}
NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(builder)
.withPageable(PageRequest.of(page-1, size, Sort.by(Sort.Order.desc("time"))))
.withPageable(PageRequest.of(page - 1, size, Sort.by(Sort.Order.desc("time"))))
.build();
SearchHits<ThingModelMessage> result = template.search(query, ThingModelMessage.class);
return new Paging<>(result.getTotalHits(), result.getSearchHits().stream()
.map(SearchHit::getContent).collect(Collectors.toList()));
}
}

View File

@ -0,0 +1,14 @@
package cc.iotkit.dao;
import cc.iotkit.model.device.VirtualDeviceLog;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface VirtualDeviceLogRepository extends ElasticsearchRepository<VirtualDeviceLog, String> {
Page<VirtualDeviceLog> findByVirtualDeviceId(String virtualDeviceId, Pageable pageable);
}

View File

@ -0,0 +1,20 @@
package cc.iotkit.dao;
import cc.iotkit.model.device.VirtualDevice;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface VirtualDeviceRepository extends MongoRepository<VirtualDevice, String> {
Page<VirtualDevice> findByUid(String uid, Pageable pageable);
List<VirtualDevice> findByUidAndState(String uid, String state);
List<VirtualDevice> findByTriggerAndState(String trigger, String state);
}

View File

@ -130,26 +130,13 @@
<artifactId>oauth2-server</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>virtual-device</artifactId>
</dependency>
</dependencies>
<!-- <build>-->
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-maven-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <excludes>-->
<!-- <exclude>-->
<!-- <groupId>org.projectlombok</groupId>-->
<!-- <artifactId>lombok</artifactId>-->
<!-- </exclude>-->
<!-- </excludes>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </build>-->
<build>
<plugins>

View File

@ -27,6 +27,11 @@ public class CacheConfig {
Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.build()
), new CaffeineCache(
Constants.DEVICE_STATS_CACHE,
Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.build()
),
new CaffeineCache(
Constants.PRODUCT_CACHE,

View File

@ -56,6 +56,7 @@ public class SaTokenConfigure implements WebMvcConfigurer {
"/**/remove*/**",
"/**/del*/**",
"/**/add*/**",
"/**/create*/**",
"/**/clear*/**",
"/**/set*/**",
"/**/set",

View File

@ -2,6 +2,7 @@ package cc.iotkit.manager.controller;
import cc.iotkit.common.Constants;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.common.utils.CodecUtil;
import cc.iotkit.common.utils.DeviceUtil;
import cc.iotkit.common.utils.UniqueIdUtil;
import cc.iotkit.comps.service.DeviceBehaviourService;
@ -30,6 +31,7 @@ import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@RestController
@ -111,16 +113,25 @@ public class DeviceController {
@PostMapping("/create")
public void createDevice(String productKey, String deviceName) {
Optional<Product> productOpt = productRepository.findById(productKey);
if (!productOpt.isPresent()) {
if (productOpt.isEmpty()) {
throw new BizException("the product does not exist");
}
//生成设备密钥
String chars = "ABCDEFGHJKMNPQRSTWXYZabcdefhijkmnprstwxyz2345678";
int maxPos = chars.length();
StringBuilder secret = new StringBuilder();
for (var i = 0; i < 16; i++) {
secret.append(chars.charAt((int) Math.floor(Math.random() * maxPos)));
}
DeviceInfo device = new DeviceInfo();
device.setId(DeviceUtil.newDeviceId(deviceName));
device.setUid(productOpt.get().getUid());
device.setDeviceId(device.getId());
device.setProductKey(productKey);
device.setDeviceName(deviceName);
device.setSecret(secret.toString());
device.setState(new DeviceInfo.State(false, null, null));
device.setCreateAt(System.currentTimeMillis());

View File

@ -0,0 +1,59 @@
package cc.iotkit.manager.controller;
import cc.iotkit.dao.*;
import cc.iotkit.manager.model.stats.MainStats;
import cc.iotkit.utils.AuthUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/stats")
public class StatsController {
@Autowired
private CategoryRepository categoryRepository;
@Autowired
private ProductRepository productRepository;
@Autowired
private DeviceRepository deviceRepository;
@Autowired
private DeviceReportRepository deviceReportRepository;
@Autowired
private DeviceReportDao deviceReportDao;
@Autowired
private DeviceCache deviceCache;
@GetMapping("/main")
public MainStats getMainStats() {
MainStats mainStats = new MainStats();
String uid = AuthUtil.getUserId();
long now = System.currentTimeMillis();
if (AuthUtil.isAdmin()) {
mainStats.setCategoryTotal(categoryRepository.count());
mainStats.setProductTotal(productRepository.count());
mainStats.setDeviceTotal(deviceRepository.count());
mainStats.setReportTotal(deviceReportRepository.count());
//上报数据统计
mainStats.setReportDataStats(deviceReportDao.getDeviceMessageStats(now - 48 * 3600 * 1000, now));
//产品数量统计
mainStats.setDeviceStatsOfCategory(deviceCache.getDeviceStatsByCategory(""));
} else {
mainStats.setCategoryTotal(categoryRepository.count());
mainStats.setProductTotal(productRepository.countByUid(uid));
mainStats.setDeviceTotal(deviceRepository.countByUid(uid));
mainStats.setReportTotal(deviceReportRepository.countByUid(uid));
//上报数据统计
mainStats.setReportDataStats(deviceReportDao.getDeviceMessageStatsWithUid(uid, now - 48 * 3600 * 1000, now));
//产品数量统计
mainStats.setDeviceStatsOfCategory(deviceCache.getDeviceStatsByCategory(uid));
}
return mainStats;
}
}

View File

@ -0,0 +1,162 @@
package cc.iotkit.manager.controller;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.common.utils.ReflectUtil;
import cc.iotkit.dao.VirtualDeviceLogRepository;
import cc.iotkit.dao.VirtualDeviceRepository;
import cc.iotkit.manager.service.DataOwnerService;
import cc.iotkit.model.Paging;
import cc.iotkit.model.device.VirtualDevice;
import cc.iotkit.model.device.VirtualDeviceLog;
import cc.iotkit.model.rule.TaskLog;
import cc.iotkit.utils.AuthUtil;
import cc.iotkit.virtualdevice.VirtualManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Optional;
@Slf4j
@RestController
@RequestMapping("/virtual_device")
public class VirtualDeviceController {
@Autowired
private DataOwnerService dataOwnerService;
@Autowired
private VirtualDeviceRepository virtualDeviceRepository;
@Autowired
private VirtualManager virtualManager;
@Autowired
private VirtualDeviceLogRepository virtualDeviceLogRepository;
@PostMapping("/list/{size}/{page}")
public Paging<VirtualDevice> getDevices(
@PathVariable("size") int size,
@PathVariable("page") int page) {
String uid = AuthUtil.getUserId();
Page<VirtualDevice> virtualDevices = virtualDeviceRepository.findByUid(uid,
PageRequest.of(page - 1, size, Sort.by(Sort.Order.desc("createAt"))));
return new Paging<>(virtualDevices.getTotalElements(), virtualDevices.getContent());
}
/**
*
*/
@PostMapping("/add")
public void add(VirtualDevice virtualDevice) {
virtualDevice.setId(null);
virtualDevice.setUid(AuthUtil.getUserId());
virtualDevice.setState(VirtualDevice.STATE_STOPPED);
virtualDevice.setCreateAt(System.currentTimeMillis());
virtualDeviceRepository.save(virtualDevice);
}
/**
*
*/
@PostMapping("/modify")
public void modify(VirtualDevice virtualDevice) {
VirtualDevice oldData = checkOwner(virtualDevice.getId());
ReflectUtil.copyNoNulls(virtualDevice, oldData,
"name", "productKey", "type", "trigger", "triggerExpression");
virtualDevice.setState(VirtualDevice.STATE_STOPPED);
virtualDeviceRepository.save(virtualDevice);
}
/**
*
*/
@GetMapping("/{id}/detail")
public VirtualDevice detail(@PathVariable("id") String id) {
return checkOwner(id);
}
/**
*
*/
@PostMapping("/{id}/setState")
public void setState(@PathVariable("id") String id, String state) {
VirtualDevice oldData = checkOwner(id);
if (!VirtualDevice.STATE_RUNNING.equals(state)
&& !VirtualDevice.STATE_STOPPED.equals(state)) {
throw new BizException("state is illegal");
}
oldData.setState(state);
if (VirtualDevice.STATE_RUNNING.equals(state)) {
virtualManager.add(oldData);
} else {
virtualManager.remove(oldData);
}
virtualDeviceRepository.save(oldData);
}
/**
*
*/
@DeleteMapping("/{id}/delete")
public void delete(@PathVariable("id") String id) {
checkOwner(id);
virtualDeviceRepository.deleteById(id);
}
/**
*
*/
@PostMapping("/{id}/saveScript")
public void saveScript(@PathVariable("id") String id, String script) {
VirtualDevice old = checkOwner(id);
old.setScript(script);
virtualDeviceRepository.save(old);
}
/**
*
*/
@PostMapping("/{id}/saveDevices")
public void saveDevices(@PathVariable("id") String id, @RequestBody List<String> devices) {
VirtualDevice old = checkOwner(id);
old.setDevices(devices);
virtualDeviceRepository.save(old);
}
/**
*
*/
@PostMapping("/{id}/run")
public void run(@PathVariable("id") String id) {
VirtualDevice virtualDevice = checkOwner(id);
virtualManager.run(virtualDevice);
}
/**
*
*/
@PostMapping("/{id}/logs/{size}/{page}")
public Paging<VirtualDeviceLog> getLogs(
@PathVariable("id") String id,
@PathVariable("size") int size,
@PathVariable("page") int page
) {
Page<VirtualDeviceLog> logs = virtualDeviceLogRepository.findByVirtualDeviceId(id,
PageRequest.of(page - 1, size, Sort.by(Sort.Order.desc("logAt"))));
return new Paging<>(logs.getTotalElements(), logs.getContent());
}
private VirtualDevice checkOwner(String id) {
Optional<VirtualDevice> old = virtualDeviceRepository.findById(id);
if (old.isEmpty()) {
throw new BizException("record does not exist");
}
VirtualDevice oldData = old.get();
dataOwnerService.checkOwner(oldData);
return oldData;
}
}

View File

@ -0,0 +1,45 @@
package cc.iotkit.manager.model.stats;
import cc.iotkit.model.stats.DataItem;
import cc.iotkit.model.stats.TimeData;
import lombok.Data;
import java.util.List;
/**
*
*/
@Data
public class MainStats {
/**
*
*/
private long categoryTotal;
/**
*
*/
private long productTotal;
/**
*
*/
private long deviceTotal;
/**
*
*/
private long reportTotal;
/**
*
*/
private List<TimeData> reportDataStats;
/**
*
*/
private List<DataItem> deviceStatsOfCategory;
}

View File

@ -4,11 +4,12 @@ import cc.iotkit.common.exception.NotFoundException;
import cc.iotkit.common.exception.OfflineException;
import cc.iotkit.common.utils.UniqueIdUtil;
import cc.iotkit.comps.DeviceComponentManager;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.dao.DeviceRepository;
import cc.iotkit.dao.ThingModelMessageRepository;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.virtualdevice.VirtualManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -29,6 +30,8 @@ public class DeviceService {
private ThingModelService thingModelService;
@Autowired
private ThingModelMessageRepository thingModelMessageRepository;
@Autowired
private VirtualManager virtualManager;
public String invokeService(String deviceId, String service,
Map<String, Object> args) {
@ -103,7 +106,13 @@ public class DeviceService {
.build();
thingModelService.parseParams(thingService);
deviceComponentManager.send(thingService);
if (virtualManager.isVirtual(deviceId)) {
//虚拟设备指令下发
virtualManager.send(thingService);
} else {
//设备指令下发
deviceComponentManager.send(thingService);
}
String mid = thingService.getMid();
//保存设备日志

View File

@ -1,6 +1,6 @@
package cc.iotkit.manager.service;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.dao.ThingModelRepository;
import cc.iotkit.model.product.ThingModel;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -25,12 +25,23 @@ public class DeviceInfo implements Owned {
private String deviceId;
/**
* key
*/
private String productKey;
private String deviceName;
/**
*
*/
private String model;
/**
*
*/
private String secret;
private String parentId;
/**

View File

@ -0,0 +1,102 @@
package cc.iotkit.model.device;
import cc.iotkit.model.Owned;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.ArrayList;
import java.util.List;
/**
*
*/
@Data
@Document
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class VirtualDevice implements Owned {
public static final String STATE_STOPPED = "stopped";
public static final String STATE_RUNNING = "running";
/**
* -
*/
public static final String TYPE_THING_MODEL = "thingModel";
/**
* -
*/
public static final String TYPE_PROTOCOL = "protocol";
/**
* -
*/
public static final String TRIGGER_NONE = "none";
/**
* -
*/
public static final String TRIGGER_CRON = "cron";
/**
* -
*/
public static final String TRIGGER_RANDOM = "random";
@Id
private String id;
/**
*
*/
private String uid;
/**
*
*/
private String name;
/**
* key
*/
private String productKey;
/**
*
*/
private List<String> devices = new ArrayList<>();
/**
*
*/
private String type;
/**
*
*/
private String script;
/**
*
*/
private String trigger;
/**
*
*/
private String triggerExpression;
/**
*
*/
private String state = STATE_STOPPED;
/**
*
*/
private Long createAt = System.currentTimeMillis();
}

View File

@ -0,0 +1,48 @@
package cc.iotkit.model.device;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
/**
*
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Document(indexName = "virtual_device_log")
public class VirtualDeviceLog {
@Id
private String id;
/**
* id
*/
private String virtualDeviceId;
/**
*
*/
private String virtualDeviceName;
/**
*
*/
private int deviceTotal;
/**
*
*/
private String result;
/**
*
*/
private Long logAt = System.currentTimeMillis();
}

View File

@ -0,0 +1,59 @@
package cc.iotkit.model.device.message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
/**
* -
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Document(indexName = "device_report")
public class DeviceReport {
@Id
private String id;
private String deviceId;
private String productKey;
private String deviceName;
/**
*
*/
private String uid;
/**
*
* lifetime:
* state:
* property:
* event:
* service:
*/
private String type;
private String identifier;
/**
*
*/
private int code;
/**
*
*/
@Field(type = FieldType.Date)
private Long time;
}

View File

@ -0,0 +1,25 @@
package cc.iotkit.model.stats;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DataItem {
/**
*
*/
private String name;
/**
*
*/
private Object value;
}

View File

@ -0,0 +1,25 @@
package cc.iotkit.model.stats;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TimeData {
/**
*
*/
private long time;
/**
*
*/
private Object data;
}

View File

@ -10,5 +10,4 @@ public class GenPwdSecret {
System.out.println(secret);
System.out.println(AuthUtil.checkPwd("guest123", secret));
}
}

View File

@ -12,6 +12,7 @@
<module>protocol-gateway</module>
<module>standalone-package</module>
<module>oauth2-server</module>
<module>virtual-device</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
@ -282,6 +283,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>virtual-device</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -13,7 +13,7 @@ import cc.iotkit.comps.service.DeviceBehaviourService;
import cc.iotkit.converter.Device;
import cc.iotkit.converter.DeviceMessage;
import cc.iotkit.converter.ScriptConverter;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.dao.DeviceCache;
import cc.iotkit.dao.ProductCache;
import cc.iotkit.dao.ProtocolComponentRepository;

View File

@ -15,8 +15,10 @@ import cc.iotkit.model.product.Product;
import cc.iotkit.model.product.ProductModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -42,9 +44,6 @@ public class DeviceBehaviourService {
private ServerConfig serverConfig;
@Autowired
private DeviceCache deviceCache;
// @Autowired
private DeviceStateHolder deviceStateHolder;
//旧实现ThingModelMessage序列化失败
//private Producer<ThingModelMessage> deviceMessageProducer;
@ -58,9 +57,9 @@ public class DeviceBehaviourService {
.build();
/**
ThingModelMessage
deviceMessageProducer = client.newProducer(JSONSchema.of(ThingModelMessage.class))
.topic("persistent://iotkit/default/" + Constants.THING_MODEL_MESSAGE_TOPIC)
.create();
deviceMessageProducer = client.newProducer(JSONSchema.of(ThingModelMessage.class))
.topic("persistent://iotkit/default/" + Constants.THING_MODEL_MESSAGE_TOPIC)
.create();
*/
deviceMessageProducer = client.newProducer()
@ -199,7 +198,7 @@ public class DeviceBehaviourService {
boolean online) {
DeviceInfo device = deviceRepository.findByProductKeyAndDeviceName(productKey, deviceName);
if (device == null) {
log.warn(String.format("productKey: %s,device: %s,online: %s",productKey,device,online));
log.warn(String.format("productKey: %s,device: %s,online: %s", productKey, device, online));
throw new BizException("device does not exist");
}
deviceStateChange(device, online);
@ -269,8 +268,7 @@ public class DeviceBehaviourService {
builder.send();
}
catch (PulsarClientException e) {
} catch (PulsarClientException e) {
log.error("send thing model message error", e);
}
}

View File

@ -3,10 +3,10 @@ package cc.iotkit.comps.service;
import cc.iotkit.common.Constants;
import cc.iotkit.common.utils.JsonUtil;
import cc.iotkit.comps.config.ServerConfig;
import cc.iotkit.dao.DeviceDao;
import cc.iotkit.dao.DevicePropertyRepository;
import cc.iotkit.dao.ThingModelMessageRepository;
import cc.iotkit.dao.*;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.device.message.DeviceProperty;
import cc.iotkit.model.device.message.DeviceReport;
import cc.iotkit.model.device.message.ThingModelMessage;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@ -17,6 +17,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Service
@ -30,8 +31,13 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
@Lazy
@Autowired
private DevicePropertyRepository propertyRepository;
@Lazy
@Autowired
private DeviceReportRepository deviceReportRepository;
@Autowired
private DeviceDao deviceDao;
@Autowired
private DeviceCache deviceCache;
@PostConstruct
public void init() throws PulsarClientException {
@ -85,8 +91,11 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
}
try {
//todo 存在性能问题,量大可再拆分处理
//设备消息日志入库
messageRepository.save(modelMessage);
//设备上报日志入库
deviceReportRepository.save(getDeviceReport(modelMessage));
} catch (Throwable e) {
log.warn("save device message to es error", e);
}
@ -97,6 +106,21 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
consumer.acknowledge(msg);
}
private DeviceReport getDeviceReport(ThingModelMessage message) {
DeviceInfo device = deviceCache.get(message.getDeviceId());
return DeviceReport.builder()
.id(UUID.randomUUID().toString())
.deviceId(message.getDeviceId())
.productKey(message.getProductKey())
.deviceName(message.getDeviceName())
.uid(device.getUid())
.identifier(message.getIdentifier())
.type(message.getType())
.code(message.getCode())
.time(message.getTime())
.build();
}
@Override
public void reachedEndOfTopic(Consumer<ThingModelMessage> consumer) {

View File

@ -1,122 +0,0 @@
package cc.iotkit.comps.service;
import cc.iotkit.common.utils.ThreadUtil;
import cc.iotkit.comps.config.ServerConfig;
import cc.iotkit.dao.DeviceRepository;
import cc.iotkit.model.device.DeviceInfo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 1
*/
@Slf4j
//@Service
public class DeviceStateHolder implements MessageListener<DeviceStateHolder.OfflineMessage> {
private ScheduledThreadPoolExecutor stateHolderTask;
private Set<String> devices = new TreeSet<>();
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ServerConfig serverConfig;
@Autowired
private DeviceRepository deviceRepository;
private Producer<OfflineMessage> offlineMessageProducer;
@PostConstruct
public void init() throws PulsarClientException {
stateHolderTask = ThreadUtil.newScheduled(4, "thread-device-state-holder");
stateHolderTask.scheduleAtFixedRate(this::hold, 0, 1, TimeUnit.MINUTES);
PulsarClient client = PulsarClient.builder()
.serviceUrl(this.serverConfig.getPulsarBrokerUrl())
.build();
offlineMessageProducer = client.newProducer(Schema.JSON(OfflineMessage.class))
.topic("persistent://iotkit/default/holder_offline")
.create();
client.newConsumer(Schema.JSON(OfflineMessage.class))
.topic("persistent://iotkit/default/holder_offline")
.subscriptionName("holder_offline")
.consumerName("device-state-holder-consumer")
.messageListener(this).subscribe();
}
public void online(String deviceId) {
try {
devices.add(deviceId);
hold(deviceId);
//上线后先产生离线消息
offlineMessageProducer.send(new OfflineMessage(deviceId));
} catch (Throwable e) {
log.error("state holder online error", e);
}
}
public void offline(String deviceId) {
devices.remove(deviceId);
}
private void hold() {
//标识在线
for (String deviceId : devices) {
hold(deviceId);
}
}
private void hold(String deviceId) {
redisTemplate.opsForValue().set("str:device:state:holder:" + deviceId,
"1", 5, TimeUnit.SECONDS);
}
@SneakyThrows
@Override
public void received(Consumer<OfflineMessage> consumer, Message<OfflineMessage> msg) {
String deviceId = msg.getValue().getDeviceId();
//如果设备在线,不处理离线消息
String hold = redisTemplate.opsForValue().get("str:device:state:holder:" + deviceId);
if (hold != null) {
return;
}
//如果设备不在线,则将设备更新为离线
DeviceInfo device = deviceRepository.findByDeviceId(deviceId);
DeviceInfo.State state = device.getState();
state.setOnline(false);
state.setOfflineTime(System.currentTimeMillis());
deviceRepository.save(device);
log.info("device offline,deviceId:{}", deviceId);
consumer.acknowledge(msg);
}
@Override
public void reachedEndOfTopic(Consumer<OfflineMessage> consumer) {
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class OfflineMessage {
private String deviceId;
}
}

View File

@ -1,5 +1,6 @@
package cc.iotkit.converter;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.model.device.message.ThingModelMessage;
public interface IConverter {

View File

@ -1,5 +1,6 @@
package cc.iotkit.converter;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.common.utils.JsonUtil;
import cc.iotkit.model.device.message.ThingModelMessage;
import jdk.nashorn.api.scripting.NashornScriptEngine;

View File

@ -8,7 +8,7 @@ import cc.iotkit.comp.IMessageHandler;
import cc.iotkit.comp.model.DeviceState;
import cc.iotkit.comp.utils.SpringUtils;
import cc.iotkit.converter.DeviceMessage;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.dao.DeviceRepository;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.device.message.ThingModelMessage;

View File

@ -1,6 +1,6 @@
package cc.iotkit.comp.emqx;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.model.device.message.ThingModelMessage;
public interface IScripter {

View File

@ -1,6 +1,6 @@
package cc.iotkit.comp.emqx;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.model.device.message.ThingModelMessage;
public class JsScripter implements IScripter {

View File

@ -1,6 +1,6 @@
package cc.iotkit.comp.emqx;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.model.device.message.ThingModelMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;

View File

@ -3,7 +3,7 @@ package cc.iotkit.comp.emqx;
import cc.iotkit.converter.Device;
import cc.iotkit.converter.DeviceMessage;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.dao.DeviceCache;
import cc.iotkit.dao.ProductCache;
import cc.iotkit.model.device.DeviceInfo;

View File

@ -1,7 +1,6 @@
package cc.iotkit.comp.mqtt;
import cc.iotkit.converter.DeviceMessage;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.model.device.message.ThingModelMessage;
public interface IScripter {

View File

@ -1,6 +1,6 @@
package cc.iotkit.comp.mqtt;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.model.device.message.ThingModelMessage;
public class JsScripter implements IScripter {

View File

@ -1,6 +1,6 @@
package cc.iotkit.comp.mqtt;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.model.device.message.ThingModelMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;

View File

@ -6,7 +6,7 @@ import cc.iotkit.comp.AbstractDeviceComponent;
import cc.iotkit.comp.CompConfig;
import cc.iotkit.comp.model.DeviceState;
import cc.iotkit.converter.DeviceMessage;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.model.device.message.ThingModelMessage;
import io.vertx.core.Future;
import io.vertx.core.Vertx;

View File

@ -3,7 +3,7 @@ package cc.iotkit.comp.mqtt;
import cc.iotkit.converter.Device;
import cc.iotkit.converter.DeviceMessage;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.dao.DeviceCache;
import cc.iotkit.dao.ProductCache;
import cc.iotkit.model.device.DeviceInfo;

View File

@ -2,7 +2,7 @@ package cc.iotkit.ruleengine.action;
import cc.iotkit.common.utils.UniqueIdUtil;
import cc.iotkit.comps.DeviceComponentManager;
import cc.iotkit.converter.ThingService;
import cc.iotkit.common.thing.ThingService;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

44
virtual-device/pom.xml Normal file
View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iotkit-parent</artifactId>
<groupId>cc.iotkit</groupId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>virtual-device</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>model</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>dao</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>component-server</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,49 @@
package cc.iotkit.virtualdevice;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.device.VirtualDevice;
import cc.iotkit.model.device.VirtualDeviceLog;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Slf4j
public class VirtualExecutor implements Job {
@Override
public void execute(JobExecutionContext context) {
Map<String, Object> data = context.getMergedJobDataMap();
VirtualManager virtualManager = (VirtualManager) data.get("virtualManager");
VirtualDevice virtualDevice = (VirtualDevice) data.get("virtualDevice");
List<DeviceInfo> devices = (List<DeviceInfo>) data.get("devices");
devices = devices == null ? new ArrayList<>() : devices;
JobDetail jobDetail = context.getJobDetail();
String jobKey = jobDetail.getKey().toString();
VirtualDeviceLog virtualDeviceLog = VirtualDeviceLog.builder()
.id(UUID.randomUUID().toString())
.virtualDeviceId(virtualDevice.getId())
.virtualDeviceName(virtualDevice.getName())
.deviceTotal(devices.size())
.result("success")
.logAt(System.currentTimeMillis())
.build();
try {
for (DeviceInfo device : devices) {
log.info("invoke virtual device report,jobKey:{},deviceId:{}", jobKey, device.getDeviceId());
virtualManager.invokeReport(device);
}
} catch (Throwable e) {
virtualDeviceLog.setResult(e.getMessage());
log.error("execute job error", e);
}
virtualManager.saveLog(virtualDeviceLog);
}
}

View File

@ -0,0 +1,287 @@
package cc.iotkit.virtualdevice;
import cc.iotkit.common.thing.ThingService;
import cc.iotkit.common.utils.JsonUtil;
import cc.iotkit.comps.service.DeviceBehaviourService;
import cc.iotkit.dao.DeviceCache;
import cc.iotkit.dao.VirtualDeviceLogRepository;
import cc.iotkit.dao.VirtualDeviceRepository;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.device.VirtualDevice;
import cc.iotkit.model.device.VirtualDeviceLog;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.virtualdevice.trigger.RandomScheduleBuilder;
import jdk.nashorn.api.scripting.NashornScriptEngine;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.script.ScriptEngineManager;
import java.util.*;
@Slf4j
public class VirtualManager {
private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn");
private final Map<String, Object> virtualScripts = new HashMap<>();
private final Map<String, Set<String>> deviceIdToVirtualId = new HashMap<>();
@Autowired
private VirtualDeviceRepository virtualDeviceRepository;
@Autowired
private DeviceCache deviceCache;
@Autowired
private Scheduler scheduler;
@Autowired
private DeviceBehaviourService deviceBehaviourService;
@Autowired
private VirtualDeviceLogRepository virtualDeviceLogRepository;
@PostConstruct
public void init() {
List<VirtualDevice> virtualDevices = getAllVirtualDevices();
for (VirtualDevice virtualDevice : virtualDevices) {
addTask(virtualDevice);
}
}
/**
*
*/
public boolean isVirtual(String deviceId) {
return deviceIdToVirtualId.containsKey(deviceId);
}
/**
*
*/
public void send(ThingService<?> service) {
DeviceInfo deviceInfo = deviceCache.getDeviceInfo(service.getProductKey(), service.getDeviceName());
String deviceId = deviceInfo.getDeviceId();
//根据设备Id取虚拟设备列表
Set<String> virtualIds = deviceIdToVirtualId.get(deviceId);
for (String virtualId : virtualIds) {
Object scriptObj = virtualScripts.get(virtualId);
Object result = invokeMethod(scriptObj, "receive", service);
for (Object value : ((ScriptObjectMirror) result).values()) {
processReport(value);
}
log.info("virtual device send result:{}", JsonUtil.toJsonString(result));
}
}
/**
*
*/
public void add(VirtualDevice virtualDevice) {
addTask(virtualDevice);
}
/**
*
*/
public void remove(VirtualDevice virtualDevice) {
deleteTask(virtualDevice);
}
/**
*
*/
public void run(VirtualDevice virtualDevice) {
List<String> devices = virtualDevice.getDevices();
VirtualDeviceLog virtualDeviceLog = VirtualDeviceLog.builder()
.id(UUID.randomUUID().toString())
.virtualDeviceId(virtualDevice.getId())
.virtualDeviceName(virtualDevice.getName())
.deviceTotal(devices.size())
.result("success")
.logAt(System.currentTimeMillis())
.build();
try {
Object scriptObj = engine.eval(String.format("new (function () {\n%s})()", virtualDevice.getScript()));
for (String deviceId : devices) {
DeviceInfo device = deviceCache.get(deviceId);
processReport(invokeMethod(scriptObj, "report", device));
}
} catch (Throwable e) {
virtualDeviceLog.setResult(e.getMessage());
log.error("run VirtualDevice error", e);
}
virtualDeviceLogRepository.save(virtualDeviceLog);
}
/**
*
*/
public void update(VirtualDevice virtualDevice) {
remove(virtualDevice);
add(virtualDevice);
}
/**
*
*/
private List<VirtualDevice> getAllVirtualDevices() {
List<VirtualDevice> randomVirtualDevices = virtualDeviceRepository
.findByTriggerAndState(VirtualDevice.TRIGGER_RANDOM, VirtualDevice.STATE_RUNNING);
List<VirtualDevice> cronVirtualDevices = virtualDeviceRepository
.findByTriggerAndState(VirtualDevice.TRIGGER_CRON, VirtualDevice.STATE_RUNNING);
cronVirtualDevices.addAll(randomVirtualDevices);
return cronVirtualDevices;
}
private void addTask(VirtualDevice virtualDevice) {
try {
String id = virtualDevice.getId();
String name = virtualDevice.getName();
String script = virtualDevice.getScript();
log.info("adding virtual device job,id:{},name:{}", id, name);
//添加新的脚本对象
virtualScripts.put(id, engine.eval(String.format("new (function () {\n%s})()", script)));
List<DeviceInfo> devices = new ArrayList<>();
for (String deviceId : virtualDevice.getDevices()) {
devices.add(deviceCache.get(deviceId));
//更新deviceId的虚拟设备Id对应关系
Set<String> virtualIds = deviceIdToVirtualId.getOrDefault(deviceId, new HashSet<>());
virtualIds.add(id);
deviceIdToVirtualId.put(deviceId, virtualIds);
}
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("virtualManager", this);
jobDataMap.put("virtualDevice", virtualDevice);
jobDataMap.put("devices", devices);
JobDetail jobDetail = JobBuilder.newJob(VirtualExecutor.class)
.withIdentity(id, name)
.usingJobData(jobDataMap)
.build();
Trigger trigger = TriggerBuilder
.newTrigger()
.withIdentity("trigger_" + id, "triggerGroup_" + name)
.startNow()
.withSchedule(
getTriggerBuilder(virtualDevice)
).build();
scheduler.scheduleJob(jobDetail, trigger);
if (!scheduler.isShutdown()) {
scheduler.start();
}
} catch (Throwable e) {
log.error("create job failed", e);
}
}
private ScheduleBuilder<?> getTriggerBuilder(VirtualDevice virtualDevice) {
String type = virtualDevice.getTrigger();
if ("random".equals(type)) {
return new RandomScheduleBuilder(virtualDevice.getTriggerExpression());
}
if ("cron".equals(type)) {
return CronScheduleBuilder.cronSchedule(virtualDevice.getTriggerExpression());
}
return null;
}
@SneakyThrows
public void deleteTask(VirtualDevice virtualDevice) {
String id = virtualDevice.getId();
String name = virtualDevice.getName();
//删除脚本对象
virtualScripts.remove(id);
//更新deviceId的虚拟设备Id对应关系
for (String deviceId : deviceIdToVirtualId.keySet()) {
Set<String> virtualIds = deviceIdToVirtualId.get(deviceId);
virtualIds.remove(id);
}
//删除job
TriggerKey triggerKey = new TriggerKey("trigger_" + id, "triggerGroup_" + name);
if (!scheduler.checkExists(triggerKey)) {
return;
}
scheduler.deleteJob(JobKey.jobKey(id, name));
}
/**
* js
*/
public void processReport(Object sourceMsg) {
try {
ScriptObjectMirror result = (ScriptObjectMirror) sourceMsg;
ThingModelMessage modelMessage = new ThingModelMessage();
BeanUtils.populate(modelMessage, result);
deviceBehaviourService.reportMessage(modelMessage);
} catch (Throwable e) {
log.error("process js data error", e);
}
}
/**
* js
*/
private Object invokeMethod(Object scriptObj, String name, Object... args) {
try {
if (((ScriptObjectMirror) scriptObj).get(name) != null) {
return engine.invokeMethod(scriptObj, name, args);
}
return null;
} catch (Throwable e) {
log.error("invoke js method error", e);
}
return null;
}
/**
*
*/
public void invokeReport(DeviceInfo device) {
//设备上线
deviceOnline(device);
String deviceId = device.getDeviceId();
Set<String> virtualIds = deviceIdToVirtualId.get(deviceId);
if (virtualIds == null) {
return;
}
for (String virtualId : virtualIds) {
Object scriptObj = virtualScripts.get(virtualId);
if (scriptObj == null) {
continue;
}
processReport(invokeMethod(scriptObj, "report", device));
}
}
/**
* 线
*/
private void deviceOnline(DeviceInfo device) {
DeviceInfo.State state = device.getState();
if (state == null || !state.isOnline()) {
//设备离线,产生上线消息
deviceBehaviourService.deviceStateChange(device.getProductKey(), device.getDeviceName(), true);
}
}
/**
*
*/
public void saveLog(VirtualDeviceLog log) {
virtualDeviceLogRepository.save(log);
}
}

View File

@ -0,0 +1,15 @@
package cc.iotkit.virtualdevice.config;
import cc.iotkit.virtualdevice.VirtualManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class VirtualConfig {
@Bean
public VirtualManager getVirtualManager() {
return new VirtualManager();
}
}

View File

@ -0,0 +1,18 @@
package cc.iotkit.virtualdevice.trigger;
import org.quartz.ScheduleBuilder;
import org.quartz.spi.MutableTrigger;
public class RandomScheduleBuilder extends ScheduleBuilder<RandomTrigger> {
private final String unit;
public RandomScheduleBuilder(String unit) {
this.unit = unit;
}
public MutableTrigger build() {
return new RandomTrigger(unit);
}
}

View File

@ -0,0 +1,47 @@
package cc.iotkit.virtualdevice.trigger;
import org.apache.commons.lang3.RandomUtils;
import org.quartz.Calendar;
import org.quartz.impl.triggers.SimpleTriggerImpl;
import java.util.Date;
public class RandomTrigger extends SimpleTriggerImpl {
private String unit;
private Date nextFireTime;
public RandomTrigger(String unit) {
this.unit = unit;
}
@Override
public void triggered(Calendar calendar) {
super.triggered(calendar);
nextFireTime = randomTime();
}
@Override
public Date getNextFireTime() {
if (nextFireTime == null) {
nextFireTime = randomTime();
}
return nextFireTime;
}
private Date randomTime() {
Date previousTime = getPreviousFireTime();
if (previousTime == null) {
previousTime = new Date();
}
long time = previousTime.getTime();
if ("second".equals(unit)) {
time = time + RandomUtils.nextInt(0, 60) * 1000;
} else if ("minute".equals(unit)) {
time = time + RandomUtils.nextInt(0, 60) * 1000 * 60;
} else if ("hour".equals(unit)) {
time = time + RandomUtils.nextInt(0, 60) * 1000 * 60 * 60;
}
return new Date(time);
}
}

View File

@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cc.iotkit.virtualdevice.config.VirtualConfig