From aad06fcea60bd51ac0aa8fbc7e805ec2a5eac5d5 Mon Sep 17 00:00:00 2001 From: xiwa Date: Sun, 29 May 2022 23:15:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=99=9A=E6=8B=9F=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E5=8A=9F=E8=83=BD=E5=92=8C=E6=95=B0=E6=8D=AE=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/cc/iotkit/common/Constants.java | 2 + .../cc/iotkit/common/thing}/ThingService.java | 2 +- .../cc/iotkit/common/utils/ReflectUtil.java | 11 +- .../cc/iotkit/dao/CategoryRepository.java | 4 + .../main/java/cc/iotkit/dao/DeviceCache.java | 9 + .../main/java/cc/iotkit/dao/DeviceDao.java | 76 +++++ .../java/cc/iotkit/dao/DeviceReportDao.java | 70 +++++ .../cc/iotkit/dao/DeviceReportRepository.java | 12 + .../java/cc/iotkit/dao/DeviceRepository.java | 2 + .../java/cc/iotkit/dao/ProductRepository.java | 5 +- .../cc/iotkit/dao/ThingModelMessageDao.java | 3 +- .../dao/VirtualDeviceLogRepository.java | 14 + .../iotkit/dao/VirtualDeviceRepository.java | 20 ++ manager/pom.xml | 23 +- .../cc/iotkit/manager/config/CacheConfig.java | 5 + .../manager/config/SaTokenConfigure.java | 1 + .../manager/controller/DeviceController.java | 13 +- .../manager/controller/StatsController.java | 59 ++++ .../controller/VirtualDeviceController.java | 162 ++++++++++ .../iotkit/manager/model/stats/MainStats.java | 45 +++ .../iotkit/manager/service/DeviceService.java | 13 +- .../manager/service/ThingModelService.java | 2 +- .../cc/iotkit/model/device/DeviceInfo.java | 11 + .../cc/iotkit/model/device/VirtualDevice.java | 102 +++++++ .../iotkit/model/device/VirtualDeviceLog.java | 48 +++ .../model/device/message/DeviceReport.java | 59 ++++ .../java/cc/iotkit/model/stats/DataItem.java | 25 ++ .../java/cc/iotkit/model/stats/TimeData.java | 25 ++ oauth2-server/src/test/java/GenPwdSecret.java | 1 - pom.xml | 7 + .../iotkit/comps/DeviceComponentManager.java | 2 +- .../comps/service/DeviceBehaviourService.java | 20 +- .../comps/service/DeviceMessageConsumer.java | 30 +- .../comps/service/DeviceStateHolder.java | 122 -------- .../cc/iotkit/converter/DeviceMessage.java | 0 .../java/cc/iotkit/converter/IConverter.java | 1 + .../cc/iotkit/converter/ScriptConverter.java | 1 + .../iotkit/comp/emqx/EmqxDeviceComponent.java | 2 +- .../java/cc/iotkit/comp/emqx/IScripter.java | 2 +- .../java/cc/iotkit/comp/emqx/JsScripter.java | 2 +- .../java/cc/iotkit/comp/emqx/LuaScripter.java | 2 +- .../comp/emqx/TransparentConverter.java | 2 +- .../java/cc/iotkit/comp/mqtt/IScripter.java | 3 +- .../java/cc/iotkit/comp/mqtt/JsScripter.java | 2 +- .../java/cc/iotkit/comp/mqtt/LuaScripter.java | 2 +- .../iotkit/comp/mqtt/MqttDeviceComponent.java | 2 +- .../comp/mqtt/TransparentConverter.java | 2 +- .../action/DeviceActionService.java | 2 +- virtual-device/pom.xml | 44 +++ .../iotkit/virtualdevice/VirtualExecutor.java | 49 +++ .../iotkit/virtualdevice/VirtualManager.java | 287 ++++++++++++++++++ .../virtualdevice/config/VirtualConfig.java | 15 + .../trigger/RandomScheduleBuilder.java | 18 ++ .../virtualdevice/trigger/RandomTrigger.java | 47 +++ .../src/main/resources/spring.factories | 1 + 55 files changed, 1313 insertions(+), 178 deletions(-) rename {protocol-gateway/converter/src/main/java/cc/iotkit/converter => common/src/main/java/cc/iotkit/common/thing}/ThingService.java (93%) mode change 100755 => 100644 create mode 100644 dao/src/main/java/cc/iotkit/dao/DeviceReportDao.java create mode 100644 dao/src/main/java/cc/iotkit/dao/DeviceReportRepository.java create mode 100644 dao/src/main/java/cc/iotkit/dao/VirtualDeviceLogRepository.java create mode 100644 dao/src/main/java/cc/iotkit/dao/VirtualDeviceRepository.java create mode 100644 manager/src/main/java/cc/iotkit/manager/controller/StatsController.java create mode 100644 manager/src/main/java/cc/iotkit/manager/controller/VirtualDeviceController.java create mode 100644 manager/src/main/java/cc/iotkit/manager/model/stats/MainStats.java create mode 100644 model/src/main/java/cc/iotkit/model/device/VirtualDevice.java create mode 100644 model/src/main/java/cc/iotkit/model/device/VirtualDeviceLog.java create mode 100644 model/src/main/java/cc/iotkit/model/device/message/DeviceReport.java create mode 100644 model/src/main/java/cc/iotkit/model/stats/DataItem.java create mode 100644 model/src/main/java/cc/iotkit/model/stats/TimeData.java delete mode 100755 protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceStateHolder.java mode change 100755 => 100644 protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceMessage.java create mode 100644 virtual-device/pom.xml create mode 100644 virtual-device/src/main/java/cc/iotkit/virtualdevice/VirtualExecutor.java create mode 100644 virtual-device/src/main/java/cc/iotkit/virtualdevice/VirtualManager.java create mode 100644 virtual-device/src/main/java/cc/iotkit/virtualdevice/config/VirtualConfig.java create mode 100644 virtual-device/src/main/java/cc/iotkit/virtualdevice/trigger/RandomScheduleBuilder.java create mode 100644 virtual-device/src/main/java/cc/iotkit/virtualdevice/trigger/RandomTrigger.java create mode 100644 virtual-device/src/main/resources/spring.factories diff --git a/common/src/main/java/cc/iotkit/common/Constants.java b/common/src/main/java/cc/iotkit/common/Constants.java index 37788e1c..75cd3619 100755 --- a/common/src/main/java/cc/iotkit/common/Constants.java +++ b/common/src/main/java/cc/iotkit/common/Constants.java @@ -10,6 +10,8 @@ public interface Constants { String DEVICE_CACHE = "device_cache"; + String DEVICE_STATS_CACHE = "device_stats_cache"; + String CATEGORY_CACHE = "category_cache"; String SPACE_CACHE = "space_cache"; diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ThingService.java b/common/src/main/java/cc/iotkit/common/thing/ThingService.java old mode 100755 new mode 100644 similarity index 93% rename from protocol-gateway/converter/src/main/java/cc/iotkit/converter/ThingService.java rename to common/src/main/java/cc/iotkit/common/thing/ThingService.java index 633856d3..c1a94234 --- a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ThingService.java +++ b/common/src/main/java/cc/iotkit/common/thing/ThingService.java @@ -1,4 +1,4 @@ -package cc.iotkit.converter; +package cc.iotkit.common.thing; import lombok.AllArgsConstructor; import lombok.Builder; diff --git a/common/src/main/java/cc/iotkit/common/utils/ReflectUtil.java b/common/src/main/java/cc/iotkit/common/utils/ReflectUtil.java index ed4f85a2..2ce923f6 100755 --- a/common/src/main/java/cc/iotkit/common/utils/ReflectUtil.java +++ b/common/src/main/java/cc/iotkit/common/utils/ReflectUtil.java @@ -5,19 +5,26 @@ import lombok.SneakyThrows; import org.apache.commons.beanutils.BeanMap; import org.apache.commons.beanutils.BeanUtils; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; public class ReflectUtil { @SneakyThrows - public static T copyNoNulls(T from, T to) { + public static T copyNoNulls(T from, T to, String... fields) { + List fieldList = Arrays.asList(fields); + Map map = new HashMap<>(); new BeanMap(from).forEach((key, value) -> { if (value == null) { return; } - map.put(key.toString(), value); + String field = key.toString(); + if (fields.length == 0 || fieldList.contains(field)) { + map.put(field, value); + } }); BeanUtils.populate(to, map); return to; diff --git a/dao/src/main/java/cc/iotkit/dao/CategoryRepository.java b/dao/src/main/java/cc/iotkit/dao/CategoryRepository.java index ea9d9608..e01d471a 100755 --- a/dao/src/main/java/cc/iotkit/dao/CategoryRepository.java +++ b/dao/src/main/java/cc/iotkit/dao/CategoryRepository.java @@ -6,4 +6,8 @@ import org.springframework.stereotype.Repository; @Repository public interface CategoryRepository extends MongoRepository { + + int countBy(); + + } diff --git a/dao/src/main/java/cc/iotkit/dao/DeviceCache.java b/dao/src/main/java/cc/iotkit/dao/DeviceCache.java index 1e022d68..431619ee 100755 --- a/dao/src/main/java/cc/iotkit/dao/DeviceCache.java +++ b/dao/src/main/java/cc/iotkit/dao/DeviceCache.java @@ -2,17 +2,21 @@ package cc.iotkit.dao; import cc.iotkit.common.Constants; import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.stats.DataItem; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Repository; import javax.annotation.PostConstruct; +import java.util.List; @Repository public class DeviceCache { @Autowired private DeviceRepository deviceRepository; + @Autowired + private DeviceDao deviceDao; private static DeviceCache INSTANCE; @@ -35,4 +39,9 @@ public class DeviceCache { return deviceRepository.findById(deviceId).orElse(null); } + @Cacheable(value = Constants.DEVICE_STATS_CACHE, key = "#uid") + public List getDeviceStatsByCategory(String uid) { + return deviceDao.getDeviceStatsByCategory(uid); + } + } diff --git a/dao/src/main/java/cc/iotkit/dao/DeviceDao.java b/dao/src/main/java/cc/iotkit/dao/DeviceDao.java index 831fa9aa..bf1344d0 100755 --- a/dao/src/main/java/cc/iotkit/dao/DeviceDao.java +++ b/dao/src/main/java/cc/iotkit/dao/DeviceDao.java @@ -2,22 +2,36 @@ package cc.iotkit.dao; import cc.iotkit.model.Paging; import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.product.Category; +import cc.iotkit.model.product.Product; +import cc.iotkit.model.stats.DataItem; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.aggregation.*; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.stereotype.Repository; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @Repository public class DeviceDao { @Autowired private MongoTemplate mongoTemplate; + @Autowired + private ProductRepository productRepository; + @Autowired + private CategoryRepository categoryRepository; public Paging find(Criteria condition, int size, int page) { Query query = Query.query(condition); @@ -64,4 +78,66 @@ public class DeviceDao { mongoTemplate.updateFirst(query, update, DeviceInfo.class); } + /** + * 获取按品类统计的用户设备数 + */ + public List getDeviceStatsByCategory(String uid) { + MatchOperation matchOperation; + if (StringUtils.isBlank(uid)) { + matchOperation = Aggregation.match(new Criteria()); + } else { + matchOperation = Aggregation.match(Criteria.where("uid").is(uid)); + } + + //先按产品分组统计 + GroupOperation groupOperation = Aggregation.group("productKey").count().as("total"); + ProjectionOperation projectionOperation = Aggregation.project("productKey", "uid"); + Aggregation aggregation = Aggregation.newAggregation(projectionOperation, groupOperation, matchOperation); + AggregationResults result = mongoTemplate.aggregate(aggregation, DeviceInfo.class, Map.class); + List stats = result.getMappedResults(); + + //取用户产品列表 + List products; + if (StringUtils.isBlank(uid)) { + products = productRepository.findAll(); + } else { + products = productRepository.findByUid(uid); + } + Map pkCateMap = new HashMap<>(); + for (Product product : products) { + pkCateMap.put(product.getId(), product.getCategory()); + } + + //取品类 + List categories = categoryRepository.findAll(); + Map cateNames = new HashMap<>(); + for (Category category : categories) { + cateNames.put(category.getId(), category.getName()); + } + + Map cateStats = new HashMap<>(); + for (Map stat : stats) { + String productKey = stat.get("_id").toString(); + String cateName = cateNames.get(pkCateMap.get(productKey)); + //按品类汇总 + long total = cateStats.getOrDefault(cateName, 0L); + total += (Integer) stat.get("total"); + cateStats.put(cateName, total); + } + + List items = new ArrayList<>(); + cateStats.forEach((key, val) -> { + items.add(new DataItem(key, val)); + }); + + return items; + } + + /** + * 获取按品类统计的设备数 + */ + public List getDeviceStatsByCategory() { + return getDeviceStatsByCategory(null); + } + } diff --git a/dao/src/main/java/cc/iotkit/dao/DeviceReportDao.java b/dao/src/main/java/cc/iotkit/dao/DeviceReportDao.java new file mode 100644 index 00000000..e3020dc8 --- /dev/null +++ b/dao/src/main/java/cc/iotkit/dao/DeviceReportDao.java @@ -0,0 +1,70 @@ +package cc.iotkit.dao; + +import cc.iotkit.model.device.message.DeviceReport; +import cc.iotkit.model.stats.TimeData; +import lombok.SneakyThrows; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; +import org.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregations; +import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; +import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; +import org.springframework.stereotype.Repository; + +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; + +@Repository +public class DeviceReportDao { + + @Autowired + private ElasticsearchRestTemplate template; + + /** + * 按用户统计时间段内上报次数 + */ + public List getDeviceMessageStatsWithUid(String uid, long start, long end) { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.rangeQuery("time") + .from(start, true).to(end, true)); + if (uid != null) { + queryBuilder = + queryBuilder.must(QueryBuilders.termQuery("uid", uid)); + } + + NativeSearchQuery query = new NativeSearchQueryBuilder() + .withQuery(queryBuilder) + .withAggregations(AggregationBuilders.dateHistogram("agg") + .field("time") + .calendarInterval(DateHistogramInterval.HOUR) + .calendarInterval(DateHistogramInterval.hours(1)) + ) + .build(); + + ElasticsearchAggregations result = (ElasticsearchAggregations) template + .search(query, DeviceReport.class).getAggregations(); + ParsedDateHistogram histogram = result.aggregations().get("agg"); + + List data = new ArrayList<>(); + for (Histogram.Bucket bucket : histogram.getBuckets()) { + long seconds = ((ZonedDateTime) bucket.getKey()).toInstant().getEpochSecond(); + data.add(new TimeData(seconds * 1000, bucket.getDocCount())); + } + + return data; + } + + /** + * 统计时间段内上报次数 + */ + @SneakyThrows + public List getDeviceMessageStats(long start, long end) { + return getDeviceMessageStatsWithUid(null, start, end); + } +} diff --git a/dao/src/main/java/cc/iotkit/dao/DeviceReportRepository.java b/dao/src/main/java/cc/iotkit/dao/DeviceReportRepository.java new file mode 100644 index 00000000..6d8648fb --- /dev/null +++ b/dao/src/main/java/cc/iotkit/dao/DeviceReportRepository.java @@ -0,0 +1,12 @@ +package cc.iotkit.dao; + +import cc.iotkit.model.device.message.DeviceReport; +import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface DeviceReportRepository extends ElasticsearchRepository { + + long countByUid(String uid); + +} diff --git a/dao/src/main/java/cc/iotkit/dao/DeviceRepository.java b/dao/src/main/java/cc/iotkit/dao/DeviceRepository.java index 5f4012e5..b94f98d3 100755 --- a/dao/src/main/java/cc/iotkit/dao/DeviceRepository.java +++ b/dao/src/main/java/cc/iotkit/dao/DeviceRepository.java @@ -17,4 +17,6 @@ public interface DeviceRepository extends MongoRepository { List findByDeviceName(String deviceName); + long countByUid(String uid); + } diff --git a/dao/src/main/java/cc/iotkit/dao/ProductRepository.java b/dao/src/main/java/cc/iotkit/dao/ProductRepository.java index c096db93..ba68602c 100755 --- a/dao/src/main/java/cc/iotkit/dao/ProductRepository.java +++ b/dao/src/main/java/cc/iotkit/dao/ProductRepository.java @@ -4,9 +4,12 @@ import cc.iotkit.model.product.Product; import org.springframework.data.mongodb.repository.MongoRepository; import org.springframework.stereotype.Repository; +import java.util.List; + @Repository public interface ProductRepository extends MongoRepository { + long countByUid(String uid); - + List findByUid(String uid); } diff --git a/dao/src/main/java/cc/iotkit/dao/ThingModelMessageDao.java b/dao/src/main/java/cc/iotkit/dao/ThingModelMessageDao.java index 901712b4..ba2b447d 100755 --- a/dao/src/main/java/cc/iotkit/dao/ThingModelMessageDao.java +++ b/dao/src/main/java/cc/iotkit/dao/ThingModelMessageDao.java @@ -35,11 +35,10 @@ public class ThingModelMessageDao { builder.must(QueryBuilders.matchPhraseQuery("identifier", identifier)); } NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(builder) - .withPageable(PageRequest.of(page-1, size, Sort.by(Sort.Order.desc("time")))) + .withPageable(PageRequest.of(page - 1, size, Sort.by(Sort.Order.desc("time")))) .build(); SearchHits result = template.search(query, ThingModelMessage.class); return new Paging<>(result.getTotalHits(), result.getSearchHits().stream() .map(SearchHit::getContent).collect(Collectors.toList())); } - } diff --git a/dao/src/main/java/cc/iotkit/dao/VirtualDeviceLogRepository.java b/dao/src/main/java/cc/iotkit/dao/VirtualDeviceLogRepository.java new file mode 100644 index 00000000..646407d1 --- /dev/null +++ b/dao/src/main/java/cc/iotkit/dao/VirtualDeviceLogRepository.java @@ -0,0 +1,14 @@ +package cc.iotkit.dao; + +import cc.iotkit.model.device.VirtualDeviceLog; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface VirtualDeviceLogRepository extends ElasticsearchRepository { + + Page findByVirtualDeviceId(String virtualDeviceId, Pageable pageable); + +} diff --git a/dao/src/main/java/cc/iotkit/dao/VirtualDeviceRepository.java b/dao/src/main/java/cc/iotkit/dao/VirtualDeviceRepository.java new file mode 100644 index 00000000..3844f8a4 --- /dev/null +++ b/dao/src/main/java/cc/iotkit/dao/VirtualDeviceRepository.java @@ -0,0 +1,20 @@ +package cc.iotkit.dao; + +import cc.iotkit.model.device.VirtualDevice; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.stereotype.Repository; + +import java.util.List; + +@Repository +public interface VirtualDeviceRepository extends MongoRepository { + + Page findByUid(String uid, Pageable pageable); + + List findByUidAndState(String uid, String state); + + List findByTriggerAndState(String trigger, String state); + +} diff --git a/manager/pom.xml b/manager/pom.xml index aeed57d0..5f9cc1bd 100755 --- a/manager/pom.xml +++ b/manager/pom.xml @@ -130,26 +130,13 @@ oauth2-server + + cc.iotkit + virtual-device + + - - - - - - - - - - - - - - - - - - diff --git a/manager/src/main/java/cc/iotkit/manager/config/CacheConfig.java b/manager/src/main/java/cc/iotkit/manager/config/CacheConfig.java index d302f6f5..0f5d02e5 100755 --- a/manager/src/main/java/cc/iotkit/manager/config/CacheConfig.java +++ b/manager/src/main/java/cc/iotkit/manager/config/CacheConfig.java @@ -27,6 +27,11 @@ public class CacheConfig { Caffeine.newBuilder() .expireAfterWrite(5, TimeUnit.MINUTES) .build() + ), new CaffeineCache( + Constants.DEVICE_STATS_CACHE, + Caffeine.newBuilder() + .expireAfterWrite(5, TimeUnit.MINUTES) + .build() ), new CaffeineCache( Constants.PRODUCT_CACHE, diff --git a/manager/src/main/java/cc/iotkit/manager/config/SaTokenConfigure.java b/manager/src/main/java/cc/iotkit/manager/config/SaTokenConfigure.java index 2970de92..42364006 100755 --- a/manager/src/main/java/cc/iotkit/manager/config/SaTokenConfigure.java +++ b/manager/src/main/java/cc/iotkit/manager/config/SaTokenConfigure.java @@ -56,6 +56,7 @@ public class SaTokenConfigure implements WebMvcConfigurer { "/**/remove*/**", "/**/del*/**", "/**/add*/**", + "/**/create*/**", "/**/clear*/**", "/**/set*/**", "/**/set", diff --git a/manager/src/main/java/cc/iotkit/manager/controller/DeviceController.java b/manager/src/main/java/cc/iotkit/manager/controller/DeviceController.java index 9677bd3b..a9a94b0e 100755 --- a/manager/src/main/java/cc/iotkit/manager/controller/DeviceController.java +++ b/manager/src/main/java/cc/iotkit/manager/controller/DeviceController.java @@ -2,6 +2,7 @@ package cc.iotkit.manager.controller; import cc.iotkit.common.Constants; import cc.iotkit.common.exception.BizException; +import cc.iotkit.common.utils.CodecUtil; import cc.iotkit.common.utils.DeviceUtil; import cc.iotkit.common.utils.UniqueIdUtil; import cc.iotkit.comps.service.DeviceBehaviourService; @@ -30,6 +31,7 @@ import org.springframework.web.context.request.async.DeferredResult; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; @Slf4j @RestController @@ -111,16 +113,25 @@ public class DeviceController { @PostMapping("/create") public void createDevice(String productKey, String deviceName) { Optional productOpt = productRepository.findById(productKey); - if (!productOpt.isPresent()) { + if (productOpt.isEmpty()) { throw new BizException("the product does not exist"); } + //生成设备密钥 + String chars = "ABCDEFGHJKMNPQRSTWXYZabcdefhijkmnprstwxyz2345678"; + int maxPos = chars.length(); + StringBuilder secret = new StringBuilder(); + for (var i = 0; i < 16; i++) { + secret.append(chars.charAt((int) Math.floor(Math.random() * maxPos))); + } + DeviceInfo device = new DeviceInfo(); device.setId(DeviceUtil.newDeviceId(deviceName)); device.setUid(productOpt.get().getUid()); device.setDeviceId(device.getId()); device.setProductKey(productKey); device.setDeviceName(deviceName); + device.setSecret(secret.toString()); device.setState(new DeviceInfo.State(false, null, null)); device.setCreateAt(System.currentTimeMillis()); diff --git a/manager/src/main/java/cc/iotkit/manager/controller/StatsController.java b/manager/src/main/java/cc/iotkit/manager/controller/StatsController.java new file mode 100644 index 00000000..d4ebed2a --- /dev/null +++ b/manager/src/main/java/cc/iotkit/manager/controller/StatsController.java @@ -0,0 +1,59 @@ +package cc.iotkit.manager.controller; + +import cc.iotkit.dao.*; +import cc.iotkit.manager.model.stats.MainStats; +import cc.iotkit.utils.AuthUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Slf4j +@RestController +@RequestMapping("/stats") +public class StatsController { + + @Autowired + private CategoryRepository categoryRepository; + @Autowired + private ProductRepository productRepository; + @Autowired + private DeviceRepository deviceRepository; + @Autowired + private DeviceReportRepository deviceReportRepository; + @Autowired + private DeviceReportDao deviceReportDao; + @Autowired + private DeviceCache deviceCache; + + @GetMapping("/main") + public MainStats getMainStats() { + MainStats mainStats = new MainStats(); + String uid = AuthUtil.getUserId(); + + long now = System.currentTimeMillis(); + if (AuthUtil.isAdmin()) { + mainStats.setCategoryTotal(categoryRepository.count()); + mainStats.setProductTotal(productRepository.count()); + mainStats.setDeviceTotal(deviceRepository.count()); + mainStats.setReportTotal(deviceReportRepository.count()); + //上报数据统计 + mainStats.setReportDataStats(deviceReportDao.getDeviceMessageStats(now - 48 * 3600 * 1000, now)); + //产品数量统计 + mainStats.setDeviceStatsOfCategory(deviceCache.getDeviceStatsByCategory("")); + } else { + mainStats.setCategoryTotal(categoryRepository.count()); + mainStats.setProductTotal(productRepository.countByUid(uid)); + mainStats.setDeviceTotal(deviceRepository.countByUid(uid)); + mainStats.setReportTotal(deviceReportRepository.countByUid(uid)); + //上报数据统计 + mainStats.setReportDataStats(deviceReportDao.getDeviceMessageStatsWithUid(uid, now - 48 * 3600 * 1000, now)); + //产品数量统计 + mainStats.setDeviceStatsOfCategory(deviceCache.getDeviceStatsByCategory(uid)); + } + + return mainStats; + } + +} diff --git a/manager/src/main/java/cc/iotkit/manager/controller/VirtualDeviceController.java b/manager/src/main/java/cc/iotkit/manager/controller/VirtualDeviceController.java new file mode 100644 index 00000000..ff433a17 --- /dev/null +++ b/manager/src/main/java/cc/iotkit/manager/controller/VirtualDeviceController.java @@ -0,0 +1,162 @@ +package cc.iotkit.manager.controller; + +import cc.iotkit.common.exception.BizException; +import cc.iotkit.common.utils.ReflectUtil; +import cc.iotkit.dao.VirtualDeviceLogRepository; +import cc.iotkit.dao.VirtualDeviceRepository; +import cc.iotkit.manager.service.DataOwnerService; +import cc.iotkit.model.Paging; +import cc.iotkit.model.device.VirtualDevice; +import cc.iotkit.model.device.VirtualDeviceLog; +import cc.iotkit.model.rule.TaskLog; +import cc.iotkit.utils.AuthUtil; +import cc.iotkit.virtualdevice.VirtualManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Sort; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Optional; + +@Slf4j +@RestController +@RequestMapping("/virtual_device") +public class VirtualDeviceController { + + @Autowired + private DataOwnerService dataOwnerService; + @Autowired + private VirtualDeviceRepository virtualDeviceRepository; + @Autowired + private VirtualManager virtualManager; + @Autowired + private VirtualDeviceLogRepository virtualDeviceLogRepository; + + @PostMapping("/list/{size}/{page}") + public Paging getDevices( + @PathVariable("size") int size, + @PathVariable("page") int page) { + String uid = AuthUtil.getUserId(); + Page virtualDevices = virtualDeviceRepository.findByUid(uid, + PageRequest.of(page - 1, size, Sort.by(Sort.Order.desc("createAt")))); + return new Paging<>(virtualDevices.getTotalElements(), virtualDevices.getContent()); + } + + /** + * 添加虚拟设备 + */ + @PostMapping("/add") + public void add(VirtualDevice virtualDevice) { + virtualDevice.setId(null); + virtualDevice.setUid(AuthUtil.getUserId()); + virtualDevice.setState(VirtualDevice.STATE_STOPPED); + virtualDevice.setCreateAt(System.currentTimeMillis()); + virtualDeviceRepository.save(virtualDevice); + } + + /** + * 修改虚拟设备 + */ + @PostMapping("/modify") + public void modify(VirtualDevice virtualDevice) { + VirtualDevice oldData = checkOwner(virtualDevice.getId()); + ReflectUtil.copyNoNulls(virtualDevice, oldData, + "name", "productKey", "type", "trigger", "triggerExpression"); + virtualDevice.setState(VirtualDevice.STATE_STOPPED); + virtualDeviceRepository.save(virtualDevice); + } + + /** + * 获取虚拟设备详情 + */ + @GetMapping("/{id}/detail") + public VirtualDevice detail(@PathVariable("id") String id) { + return checkOwner(id); + } + + /** + * 设置虚拟设备状态 + */ + @PostMapping("/{id}/setState") + public void setState(@PathVariable("id") String id, String state) { + VirtualDevice oldData = checkOwner(id); + if (!VirtualDevice.STATE_RUNNING.equals(state) + && !VirtualDevice.STATE_STOPPED.equals(state)) { + throw new BizException("state is illegal"); + } + oldData.setState(state); + if (VirtualDevice.STATE_RUNNING.equals(state)) { + virtualManager.add(oldData); + } else { + virtualManager.remove(oldData); + } + virtualDeviceRepository.save(oldData); + } + + /** + * 删除 + */ + @DeleteMapping("/{id}/delete") + public void delete(@PathVariable("id") String id) { + checkOwner(id); + virtualDeviceRepository.deleteById(id); + } + + /** + * 保存脚本 + */ + @PostMapping("/{id}/saveScript") + public void saveScript(@PathVariable("id") String id, String script) { + VirtualDevice old = checkOwner(id); + old.setScript(script); + virtualDeviceRepository.save(old); + } + + /** + * 保存关联设备 + */ + @PostMapping("/{id}/saveDevices") + public void saveDevices(@PathVariable("id") String id, @RequestBody List devices) { + VirtualDevice old = checkOwner(id); + old.setDevices(devices); + virtualDeviceRepository.save(old); + } + + /** + * 手动执行虚拟设备 + */ + @PostMapping("/{id}/run") + public void run(@PathVariable("id") String id) { + VirtualDevice virtualDevice = checkOwner(id); + virtualManager.run(virtualDevice); + } + + /** + * 取虚拟设备执行日志 + */ + @PostMapping("/{id}/logs/{size}/{page}") + public Paging getLogs( + @PathVariable("id") String id, + @PathVariable("size") int size, + @PathVariable("page") int page + ) { + Page logs = virtualDeviceLogRepository.findByVirtualDeviceId(id, + PageRequest.of(page - 1, size, Sort.by(Sort.Order.desc("logAt")))); + return new Paging<>(logs.getTotalElements(), logs.getContent()); + } + + private VirtualDevice checkOwner(String id) { + Optional old = virtualDeviceRepository.findById(id); + if (old.isEmpty()) { + throw new BizException("record does not exist"); + } + VirtualDevice oldData = old.get(); + + dataOwnerService.checkOwner(oldData); + return oldData; + } + +} diff --git a/manager/src/main/java/cc/iotkit/manager/model/stats/MainStats.java b/manager/src/main/java/cc/iotkit/manager/model/stats/MainStats.java new file mode 100644 index 00000000..dbb3da00 --- /dev/null +++ b/manager/src/main/java/cc/iotkit/manager/model/stats/MainStats.java @@ -0,0 +1,45 @@ +package cc.iotkit.manager.model.stats; + +import cc.iotkit.model.stats.DataItem; +import cc.iotkit.model.stats.TimeData; +import lombok.Data; + +import java.util.List; + +/** + * 首页数据统计 + */ +@Data +public class MainStats { + + /** + * 品类数量 + */ + private long categoryTotal; + + /** + * 产品数量 + */ + private long productTotal; + + /** + * 设备数量 + */ + private long deviceTotal; + + /** + * 上报数量 + */ + private long reportTotal; + + /** + * 上报数据数量统计 + */ + private List reportDataStats; + + /** + * 按品类统计的设备数量 + */ + private List deviceStatsOfCategory; + +} diff --git a/manager/src/main/java/cc/iotkit/manager/service/DeviceService.java b/manager/src/main/java/cc/iotkit/manager/service/DeviceService.java index 79579f10..988f0131 100755 --- a/manager/src/main/java/cc/iotkit/manager/service/DeviceService.java +++ b/manager/src/main/java/cc/iotkit/manager/service/DeviceService.java @@ -4,11 +4,12 @@ import cc.iotkit.common.exception.NotFoundException; import cc.iotkit.common.exception.OfflineException; import cc.iotkit.common.utils.UniqueIdUtil; import cc.iotkit.comps.DeviceComponentManager; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.dao.DeviceRepository; import cc.iotkit.dao.ThingModelMessageRepository; import cc.iotkit.model.device.DeviceInfo; import cc.iotkit.model.device.message.ThingModelMessage; +import cc.iotkit.virtualdevice.VirtualManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -29,6 +30,8 @@ public class DeviceService { private ThingModelService thingModelService; @Autowired private ThingModelMessageRepository thingModelMessageRepository; + @Autowired + private VirtualManager virtualManager; public String invokeService(String deviceId, String service, Map args) { @@ -103,7 +106,13 @@ public class DeviceService { .build(); thingModelService.parseParams(thingService); - deviceComponentManager.send(thingService); + if (virtualManager.isVirtual(deviceId)) { + //虚拟设备指令下发 + virtualManager.send(thingService); + } else { + //设备指令下发 + deviceComponentManager.send(thingService); + } String mid = thingService.getMid(); //保存设备日志 diff --git a/manager/src/main/java/cc/iotkit/manager/service/ThingModelService.java b/manager/src/main/java/cc/iotkit/manager/service/ThingModelService.java index 2383038d..cf648a3e 100755 --- a/manager/src/main/java/cc/iotkit/manager/service/ThingModelService.java +++ b/manager/src/main/java/cc/iotkit/manager/service/ThingModelService.java @@ -1,6 +1,6 @@ package cc.iotkit.manager.service; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.dao.ThingModelRepository; import cc.iotkit.model.product.ThingModel; import org.springframework.beans.factory.annotation.Autowired; diff --git a/model/src/main/java/cc/iotkit/model/device/DeviceInfo.java b/model/src/main/java/cc/iotkit/model/device/DeviceInfo.java index fb9649df..21ddfbc1 100755 --- a/model/src/main/java/cc/iotkit/model/device/DeviceInfo.java +++ b/model/src/main/java/cc/iotkit/model/device/DeviceInfo.java @@ -25,12 +25,23 @@ public class DeviceInfo implements Owned { private String deviceId; + /** + * 产品key + */ private String productKey; private String deviceName; + /** + * 设备型号 + */ private String model; + /** + * 设备密钥 + */ + private String secret; + private String parentId; /** diff --git a/model/src/main/java/cc/iotkit/model/device/VirtualDevice.java b/model/src/main/java/cc/iotkit/model/device/VirtualDevice.java new file mode 100644 index 00000000..2481877b --- /dev/null +++ b/model/src/main/java/cc/iotkit/model/device/VirtualDevice.java @@ -0,0 +1,102 @@ +package cc.iotkit.model.device; + +import cc.iotkit.model.Owned; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.ArrayList; +import java.util.List; + +/** + * 虚拟设备 + */ +@Data +@Document +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class VirtualDevice implements Owned { + + public static final String STATE_STOPPED = "stopped"; + public static final String STATE_RUNNING = "running"; + + /** + * 虚拟类型-基于物模型模拟 + */ + public static final String TYPE_THING_MODEL = "thingModel"; + /** + * 虚拟类型-基于设备协议模拟 + */ + public static final String TYPE_PROTOCOL = "protocol"; + + /** + * 触发执行-无(手动) + */ + public static final String TRIGGER_NONE = "none"; + /** + * 触发执行-定时执行 + */ + public static final String TRIGGER_CRON = "cron"; + /** + * 触发执行-随机执行 + */ + public static final String TRIGGER_RANDOM = "random"; + + @Id + private String id; + + /** + * 所属用户 + */ + private String uid; + + /** + * 虚拟设备名称 + */ + private String name; + + /** + * 产品key + */ + private String productKey; + + /** + * 虚拟的目标设备列表 + */ + private List devices = new ArrayList<>(); + + /** + * 虚拟类型 + */ + private String type; + + /** + * 设备行为脚本 + */ + private String script; + + /** + * 触发方式执行方式 + */ + private String trigger; + + /** + * 触发表达式 + */ + private String triggerExpression; + + /** + * 运行状态 + */ + private String state = STATE_STOPPED; + + /** + * 创建时间 + */ + private Long createAt = System.currentTimeMillis(); + +} diff --git a/model/src/main/java/cc/iotkit/model/device/VirtualDeviceLog.java b/model/src/main/java/cc/iotkit/model/device/VirtualDeviceLog.java new file mode 100644 index 00000000..ff231f0e --- /dev/null +++ b/model/src/main/java/cc/iotkit/model/device/VirtualDeviceLog.java @@ -0,0 +1,48 @@ +package cc.iotkit.model.device; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.annotation.Id; +import org.springframework.data.elasticsearch.annotations.Document; + + +/** + * 虚拟设备日志 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@Document(indexName = "virtual_device_log") +public class VirtualDeviceLog { + + @Id + private String id; + + /** + * 虚拟设备id + */ + private String virtualDeviceId; + + /** + * 虚拟设备名称 + */ + private String virtualDeviceName; + + /** + * 关联设备数量 + */ + private int deviceTotal; + + /** + * 虚拟设备执行结果 + */ + private String result; + + /** + * 创建时间 + */ + private Long logAt = System.currentTimeMillis(); +} diff --git a/model/src/main/java/cc/iotkit/model/device/message/DeviceReport.java b/model/src/main/java/cc/iotkit/model/device/message/DeviceReport.java new file mode 100644 index 00000000..d027043d --- /dev/null +++ b/model/src/main/java/cc/iotkit/model/device/message/DeviceReport.java @@ -0,0 +1,59 @@ +package cc.iotkit.model.device.message; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.annotation.Id; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.annotations.Field; +import org.springframework.data.elasticsearch.annotations.FieldType; + +/** + * 设备上报消息-用于统计 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@Document(indexName = "device_report") +public class DeviceReport { + + @Id + private String id; + + private String deviceId; + + private String productKey; + + private String deviceName; + + /** + * 设备所属用户 + */ + private String uid; + + /** + * 消息类型 + * lifetime:生命周期 + * state:状态 + * property:属性 + * event:事件 + * service:服务 + */ + private String type; + + private String identifier; + + /** + * 消息状态码 + */ + private int code; + + /** + * 消息上报时间 + */ + @Field(type = FieldType.Date) + private Long time; + +} diff --git a/model/src/main/java/cc/iotkit/model/stats/DataItem.java b/model/src/main/java/cc/iotkit/model/stats/DataItem.java new file mode 100644 index 00000000..adbba6a2 --- /dev/null +++ b/model/src/main/java/cc/iotkit/model/stats/DataItem.java @@ -0,0 +1,25 @@ +package cc.iotkit.model.stats; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 统计的数据项 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DataItem { + + /** + * 数据项名 + */ + private String name; + + /** + * 数据项值 + */ + private Object value; + +} diff --git a/model/src/main/java/cc/iotkit/model/stats/TimeData.java b/model/src/main/java/cc/iotkit/model/stats/TimeData.java new file mode 100644 index 00000000..a1b7a48b --- /dev/null +++ b/model/src/main/java/cc/iotkit/model/stats/TimeData.java @@ -0,0 +1,25 @@ +package cc.iotkit.model.stats; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 统计的时间数据 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TimeData { + + /** + * 时间 + */ + private long time; + + /** + * 数据值 + */ + private Object data; + +} diff --git a/oauth2-server/src/test/java/GenPwdSecret.java b/oauth2-server/src/test/java/GenPwdSecret.java index 7fc226e0..9fe866d5 100644 --- a/oauth2-server/src/test/java/GenPwdSecret.java +++ b/oauth2-server/src/test/java/GenPwdSecret.java @@ -10,5 +10,4 @@ public class GenPwdSecret { System.out.println(secret); System.out.println(AuthUtil.checkPwd("guest123", secret)); } - } diff --git a/pom.xml b/pom.xml index 8a95a4a7..42a455b8 100755 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ protocol-gateway standalone-package oauth2-server + virtual-device org.springframework.boot @@ -282,6 +283,12 @@ ${project.version} + + cc.iotkit + virtual-device + ${project.version} + + diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/DeviceComponentManager.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/DeviceComponentManager.java index 9e3927ff..141e503e 100755 --- a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/DeviceComponentManager.java +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/DeviceComponentManager.java @@ -13,7 +13,7 @@ import cc.iotkit.comps.service.DeviceBehaviourService; import cc.iotkit.converter.Device; import cc.iotkit.converter.DeviceMessage; import cc.iotkit.converter.ScriptConverter; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.dao.DeviceCache; import cc.iotkit.dao.ProductCache; import cc.iotkit.dao.ProtocolComponentRepository; diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java index f716331d..04d270e1 100755 --- a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceBehaviourService.java @@ -15,8 +15,10 @@ import cc.iotkit.model.product.Product; import cc.iotkit.model.product.ProductModel; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -42,9 +44,6 @@ public class DeviceBehaviourService { private ServerConfig serverConfig; @Autowired private DeviceCache deviceCache; -// @Autowired - private DeviceStateHolder deviceStateHolder; - //旧实现,ThingModelMessage序列化失败 //private Producer deviceMessageProducer; @@ -58,9 +57,9 @@ public class DeviceBehaviourService { .build(); /** 旧实现,ThingModelMessage序列化失败 - deviceMessageProducer = client.newProducer(JSONSchema.of(ThingModelMessage.class)) - .topic("persistent://iotkit/default/" + Constants.THING_MODEL_MESSAGE_TOPIC) - .create(); + deviceMessageProducer = client.newProducer(JSONSchema.of(ThingModelMessage.class)) + .topic("persistent://iotkit/default/" + Constants.THING_MODEL_MESSAGE_TOPIC) + .create(); */ deviceMessageProducer = client.newProducer() @@ -199,7 +198,7 @@ public class DeviceBehaviourService { boolean online) { DeviceInfo device = deviceRepository.findByProductKeyAndDeviceName(productKey, deviceName); if (device == null) { - log.warn(String.format("productKey: %s,device: %s,online: %s",productKey,device,online)); + log.warn(String.format("productKey: %s,device: %s,online: %s", productKey, device, online)); throw new BizException("device does not exist"); } deviceStateChange(device, online); @@ -269,8 +268,7 @@ public class DeviceBehaviourService { builder.send(); - } - catch (PulsarClientException e) { + } catch (PulsarClientException e) { log.error("send thing model message error", e); } } diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceMessageConsumer.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceMessageConsumer.java index d26ff35b..0443165a 100755 --- a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceMessageConsumer.java +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceMessageConsumer.java @@ -3,10 +3,10 @@ package cc.iotkit.comps.service; import cc.iotkit.common.Constants; import cc.iotkit.common.utils.JsonUtil; import cc.iotkit.comps.config.ServerConfig; -import cc.iotkit.dao.DeviceDao; -import cc.iotkit.dao.DevicePropertyRepository; -import cc.iotkit.dao.ThingModelMessageRepository; +import cc.iotkit.dao.*; +import cc.iotkit.model.device.DeviceInfo; import cc.iotkit.model.device.message.DeviceProperty; +import cc.iotkit.model.device.message.DeviceReport; import cc.iotkit.model.device.message.ThingModelMessage; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -17,6 +17,7 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Map; +import java.util.UUID; @Slf4j @Service @@ -30,8 +31,13 @@ public class DeviceMessageConsumer implements MessageListener @Lazy @Autowired private DevicePropertyRepository propertyRepository; + @Lazy + @Autowired + private DeviceReportRepository deviceReportRepository; @Autowired private DeviceDao deviceDao; + @Autowired + private DeviceCache deviceCache; @PostConstruct public void init() throws PulsarClientException { @@ -85,8 +91,11 @@ public class DeviceMessageConsumer implements MessageListener } try { + //todo 存在性能问题,量大可再拆分处理 //设备消息日志入库 messageRepository.save(modelMessage); + //设备上报日志入库 + deviceReportRepository.save(getDeviceReport(modelMessage)); } catch (Throwable e) { log.warn("save device message to es error", e); } @@ -97,6 +106,21 @@ public class DeviceMessageConsumer implements MessageListener consumer.acknowledge(msg); } + private DeviceReport getDeviceReport(ThingModelMessage message) { + DeviceInfo device = deviceCache.get(message.getDeviceId()); + return DeviceReport.builder() + .id(UUID.randomUUID().toString()) + .deviceId(message.getDeviceId()) + .productKey(message.getProductKey()) + .deviceName(message.getDeviceName()) + .uid(device.getUid()) + .identifier(message.getIdentifier()) + .type(message.getType()) + .code(message.getCode()) + .time(message.getTime()) + .build(); + } + @Override public void reachedEndOfTopic(Consumer consumer) { diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceStateHolder.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceStateHolder.java deleted file mode 100755 index a1a954ca..00000000 --- a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/service/DeviceStateHolder.java +++ /dev/null @@ -1,122 +0,0 @@ -package cc.iotkit.comps.service; - -import cc.iotkit.common.utils.ThreadUtil; -import cc.iotkit.comps.config.ServerConfig; -import cc.iotkit.dao.DeviceRepository; -import cc.iotkit.model.device.DeviceInfo; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * 设备状态维持,每1分钟更新一次心跳 - */ -@Slf4j -//@Service -public class DeviceStateHolder implements MessageListener { - - private ScheduledThreadPoolExecutor stateHolderTask; - - private Set devices = new TreeSet<>(); - - @Autowired - private StringRedisTemplate redisTemplate; - @Autowired - private ServerConfig serverConfig; - @Autowired - private DeviceRepository deviceRepository; - - private Producer offlineMessageProducer; - - @PostConstruct - public void init() throws PulsarClientException { - stateHolderTask = ThreadUtil.newScheduled(4, "thread-device-state-holder"); - stateHolderTask.scheduleAtFixedRate(this::hold, 0, 1, TimeUnit.MINUTES); - - PulsarClient client = PulsarClient.builder() - .serviceUrl(this.serverConfig.getPulsarBrokerUrl()) - .build(); - - offlineMessageProducer = client.newProducer(Schema.JSON(OfflineMessage.class)) - .topic("persistent://iotkit/default/holder_offline") - .create(); - - client.newConsumer(Schema.JSON(OfflineMessage.class)) - .topic("persistent://iotkit/default/holder_offline") - .subscriptionName("holder_offline") - .consumerName("device-state-holder-consumer") - .messageListener(this).subscribe(); - } - - public void online(String deviceId) { - try { - devices.add(deviceId); - hold(deviceId); - //上线后先产生离线消息 - offlineMessageProducer.send(new OfflineMessage(deviceId)); - } catch (Throwable e) { - log.error("state holder online error", e); - } - } - - public void offline(String deviceId) { - devices.remove(deviceId); - } - - private void hold() { - //标识在线 - for (String deviceId : devices) { - hold(deviceId); - } - } - - private void hold(String deviceId) { - redisTemplate.opsForValue().set("str:device:state:holder:" + deviceId, - "1", 5, TimeUnit.SECONDS); - } - - @SneakyThrows - @Override - public void received(Consumer consumer, Message msg) { - String deviceId = msg.getValue().getDeviceId(); - //如果设备在线,不处理离线消息 - String hold = redisTemplate.opsForValue().get("str:device:state:holder:" + deviceId); - if (hold != null) { - return; - } - //如果设备不在线,则将设备更新为离线 - DeviceInfo device = deviceRepository.findByDeviceId(deviceId); - DeviceInfo.State state = device.getState(); - state.setOnline(false); - state.setOfflineTime(System.currentTimeMillis()); - deviceRepository.save(device); - log.info("device offline,deviceId:{}", deviceId); - - consumer.acknowledge(msg); - } - - @Override - public void reachedEndOfTopic(Consumer consumer) { - - } - - - @Data - @NoArgsConstructor - @AllArgsConstructor - public static class OfflineMessage { - private String deviceId; - } -} diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceMessage.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/DeviceMessage.java old mode 100755 new mode 100644 diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java index 6acf883e..1f1392fb 100755 --- a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java +++ b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/IConverter.java @@ -1,5 +1,6 @@ package cc.iotkit.converter; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.model.device.message.ThingModelMessage; public interface IConverter { diff --git a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java index 15d2cf2b..4281e3b0 100755 --- a/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java +++ b/protocol-gateway/converter/src/main/java/cc/iotkit/converter/ScriptConverter.java @@ -1,5 +1,6 @@ package cc.iotkit.converter; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.common.utils.JsonUtil; import cc.iotkit.model.device.message.ThingModelMessage; import jdk.nashorn.api.scripting.NashornScriptEngine; diff --git a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxDeviceComponent.java b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxDeviceComponent.java index c66aa41f..c548c738 100755 --- a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxDeviceComponent.java +++ b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxDeviceComponent.java @@ -8,7 +8,7 @@ import cc.iotkit.comp.IMessageHandler; import cc.iotkit.comp.model.DeviceState; import cc.iotkit.comp.utils.SpringUtils; import cc.iotkit.converter.DeviceMessage; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.dao.DeviceRepository; import cc.iotkit.model.device.DeviceInfo; import cc.iotkit.model.device.message.ThingModelMessage; diff --git a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/IScripter.java b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/IScripter.java index 7f4e58ef..2dd9b1d1 100644 --- a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/IScripter.java +++ b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/IScripter.java @@ -1,6 +1,6 @@ package cc.iotkit.comp.emqx; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.model.device.message.ThingModelMessage; public interface IScripter { diff --git a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/JsScripter.java b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/JsScripter.java index 90dbd02e..188bfc97 100644 --- a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/JsScripter.java +++ b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/JsScripter.java @@ -1,6 +1,6 @@ package cc.iotkit.comp.emqx; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.model.device.message.ThingModelMessage; public class JsScripter implements IScripter { diff --git a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/LuaScripter.java b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/LuaScripter.java index ec3477b6..db256b0c 100644 --- a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/LuaScripter.java +++ b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/LuaScripter.java @@ -1,6 +1,6 @@ package cc.iotkit.comp.emqx; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.model.device.message.ThingModelMessage; import lombok.extern.slf4j.Slf4j; import org.apache.commons.beanutils.BeanUtils; diff --git a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/TransparentConverter.java b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/TransparentConverter.java index e52f882d..48a62975 100644 --- a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/TransparentConverter.java +++ b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/TransparentConverter.java @@ -3,7 +3,7 @@ package cc.iotkit.comp.emqx; import cc.iotkit.converter.Device; import cc.iotkit.converter.DeviceMessage; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.dao.DeviceCache; import cc.iotkit.dao.ProductCache; import cc.iotkit.model.device.DeviceInfo; diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/IScripter.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/IScripter.java index 412a4cfe..7bb6d05d 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/IScripter.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/IScripter.java @@ -1,7 +1,6 @@ package cc.iotkit.comp.mqtt; -import cc.iotkit.converter.DeviceMessage; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.model.device.message.ThingModelMessage; public interface IScripter { diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/JsScripter.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/JsScripter.java index 380cd44c..a1582478 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/JsScripter.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/JsScripter.java @@ -1,6 +1,6 @@ package cc.iotkit.comp.mqtt; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.model.device.message.ThingModelMessage; public class JsScripter implements IScripter { diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/LuaScripter.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/LuaScripter.java index 376bcd18..3e00e932 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/LuaScripter.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/LuaScripter.java @@ -1,6 +1,6 @@ package cc.iotkit.comp.mqtt; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.model.device.message.ThingModelMessage; import lombok.extern.slf4j.Slf4j; import org.apache.commons.beanutils.BeanUtils; diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttDeviceComponent.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttDeviceComponent.java index 9463fac0..60f29e21 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttDeviceComponent.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttDeviceComponent.java @@ -6,7 +6,7 @@ import cc.iotkit.comp.AbstractDeviceComponent; import cc.iotkit.comp.CompConfig; import cc.iotkit.comp.model.DeviceState; import cc.iotkit.converter.DeviceMessage; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.model.device.message.ThingModelMessage; import io.vertx.core.Future; import io.vertx.core.Vertx; diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/TransparentConverter.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/TransparentConverter.java index 6f5956e7..f0bc05cb 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/TransparentConverter.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/TransparentConverter.java @@ -3,7 +3,7 @@ package cc.iotkit.comp.mqtt; import cc.iotkit.converter.Device; import cc.iotkit.converter.DeviceMessage; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import cc.iotkit.dao.DeviceCache; import cc.iotkit.dao.ProductCache; import cc.iotkit.model.device.DeviceInfo; diff --git a/rule-engine/src/main/java/cc/iotkit/ruleengine/action/DeviceActionService.java b/rule-engine/src/main/java/cc/iotkit/ruleengine/action/DeviceActionService.java index 252acda6..b09a8ffd 100755 --- a/rule-engine/src/main/java/cc/iotkit/ruleengine/action/DeviceActionService.java +++ b/rule-engine/src/main/java/cc/iotkit/ruleengine/action/DeviceActionService.java @@ -2,7 +2,7 @@ package cc.iotkit.ruleengine.action; import cc.iotkit.common.utils.UniqueIdUtil; import cc.iotkit.comps.DeviceComponentManager; -import cc.iotkit.converter.ThingService; +import cc.iotkit.common.thing.ThingService; import lombok.Data; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; diff --git a/virtual-device/pom.xml b/virtual-device/pom.xml new file mode 100644 index 00000000..bc68c613 --- /dev/null +++ b/virtual-device/pom.xml @@ -0,0 +1,44 @@ + + + + iotkit-parent + cc.iotkit + 0.2.0-SNAPSHOT + + 4.0.0 + + virtual-device + + + + + org.springframework.boot + spring-boot-starter + + + + org.quartz-scheduler + quartz + 2.3.2 + + + + cc.iotkit + model + + + + cc.iotkit + dao + + + + cc.iotkit + component-server + + + + + \ No newline at end of file diff --git a/virtual-device/src/main/java/cc/iotkit/virtualdevice/VirtualExecutor.java b/virtual-device/src/main/java/cc/iotkit/virtualdevice/VirtualExecutor.java new file mode 100644 index 00000000..b7b36b8e --- /dev/null +++ b/virtual-device/src/main/java/cc/iotkit/virtualdevice/VirtualExecutor.java @@ -0,0 +1,49 @@ +package cc.iotkit.virtualdevice; + +import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.device.VirtualDevice; +import cc.iotkit.model.device.VirtualDeviceLog; +import lombok.extern.slf4j.Slf4j; +import org.quartz.Job; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@Slf4j +public class VirtualExecutor implements Job { + + @Override + public void execute(JobExecutionContext context) { + Map data = context.getMergedJobDataMap(); + VirtualManager virtualManager = (VirtualManager) data.get("virtualManager"); + VirtualDevice virtualDevice = (VirtualDevice) data.get("virtualDevice"); + List devices = (List) data.get("devices"); + devices = devices == null ? new ArrayList<>() : devices; + JobDetail jobDetail = context.getJobDetail(); + String jobKey = jobDetail.getKey().toString(); + + VirtualDeviceLog virtualDeviceLog = VirtualDeviceLog.builder() + .id(UUID.randomUUID().toString()) + .virtualDeviceId(virtualDevice.getId()) + .virtualDeviceName(virtualDevice.getName()) + .deviceTotal(devices.size()) + .result("success") + .logAt(System.currentTimeMillis()) + .build(); + + try { + for (DeviceInfo device : devices) { + log.info("invoke virtual device report,jobKey:{},deviceId:{}", jobKey, device.getDeviceId()); + virtualManager.invokeReport(device); + } + } catch (Throwable e) { + virtualDeviceLog.setResult(e.getMessage()); + log.error("execute job error", e); + } + virtualManager.saveLog(virtualDeviceLog); + } +} diff --git a/virtual-device/src/main/java/cc/iotkit/virtualdevice/VirtualManager.java b/virtual-device/src/main/java/cc/iotkit/virtualdevice/VirtualManager.java new file mode 100644 index 00000000..fc711547 --- /dev/null +++ b/virtual-device/src/main/java/cc/iotkit/virtualdevice/VirtualManager.java @@ -0,0 +1,287 @@ +package cc.iotkit.virtualdevice; + +import cc.iotkit.common.thing.ThingService; +import cc.iotkit.common.utils.JsonUtil; +import cc.iotkit.comps.service.DeviceBehaviourService; +import cc.iotkit.dao.DeviceCache; +import cc.iotkit.dao.VirtualDeviceLogRepository; +import cc.iotkit.dao.VirtualDeviceRepository; +import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.device.VirtualDevice; +import cc.iotkit.model.device.VirtualDeviceLog; +import cc.iotkit.model.device.message.ThingModelMessage; +import cc.iotkit.virtualdevice.trigger.RandomScheduleBuilder; +import jdk.nashorn.api.scripting.NashornScriptEngine; +import jdk.nashorn.api.scripting.ScriptObjectMirror; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.beanutils.BeanUtils; +import org.quartz.*; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.annotation.PostConstruct; +import javax.script.ScriptEngineManager; +import java.util.*; + +@Slf4j +public class VirtualManager { + private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn"); + + private final Map virtualScripts = new HashMap<>(); + private final Map> deviceIdToVirtualId = new HashMap<>(); + + @Autowired + private VirtualDeviceRepository virtualDeviceRepository; + @Autowired + private DeviceCache deviceCache; + @Autowired + private Scheduler scheduler; + @Autowired + private DeviceBehaviourService deviceBehaviourService; + @Autowired + private VirtualDeviceLogRepository virtualDeviceLogRepository; + + + @PostConstruct + public void init() { + List virtualDevices = getAllVirtualDevices(); + for (VirtualDevice virtualDevice : virtualDevices) { + addTask(virtualDevice); + } + } + + /** + * 判断设备是否应用了虚拟设备 + */ + public boolean isVirtual(String deviceId) { + return deviceIdToVirtualId.containsKey(deviceId); + } + + /** + * 调用虚拟设备下发 + */ + public void send(ThingService service) { + DeviceInfo deviceInfo = deviceCache.getDeviceInfo(service.getProductKey(), service.getDeviceName()); + String deviceId = deviceInfo.getDeviceId(); + + //根据设备Id取虚拟设备列表 + Set virtualIds = deviceIdToVirtualId.get(deviceId); + for (String virtualId : virtualIds) { + Object scriptObj = virtualScripts.get(virtualId); + Object result = invokeMethod(scriptObj, "receive", service); + for (Object value : ((ScriptObjectMirror) result).values()) { + processReport(value); + } + log.info("virtual device send result:{}", JsonUtil.toJsonString(result)); + } + } + + + /** + * 添加虚拟设备 + */ + public void add(VirtualDevice virtualDevice) { + addTask(virtualDevice); + } + + /** + * 删除虚拟设备 + */ + public void remove(VirtualDevice virtualDevice) { + deleteTask(virtualDevice); + } + + /** + * 立即执行一次虚拟设备上报 + */ + public void run(VirtualDevice virtualDevice) { + List devices = virtualDevice.getDevices(); + VirtualDeviceLog virtualDeviceLog = VirtualDeviceLog.builder() + .id(UUID.randomUUID().toString()) + .virtualDeviceId(virtualDevice.getId()) + .virtualDeviceName(virtualDevice.getName()) + .deviceTotal(devices.size()) + .result("success") + .logAt(System.currentTimeMillis()) + .build(); + try { + Object scriptObj = engine.eval(String.format("new (function () {\n%s})()", virtualDevice.getScript())); + for (String deviceId : devices) { + DeviceInfo device = deviceCache.get(deviceId); + processReport(invokeMethod(scriptObj, "report", device)); + } + } catch (Throwable e) { + virtualDeviceLog.setResult(e.getMessage()); + log.error("run VirtualDevice error", e); + } + virtualDeviceLogRepository.save(virtualDeviceLog); + } + + /** + * 更新虚拟设备 + */ + public void update(VirtualDevice virtualDevice) { + remove(virtualDevice); + add(virtualDevice); + } + + /** + * 获取所有虚拟设备 + */ + private List getAllVirtualDevices() { + List randomVirtualDevices = virtualDeviceRepository + .findByTriggerAndState(VirtualDevice.TRIGGER_RANDOM, VirtualDevice.STATE_RUNNING); + List cronVirtualDevices = virtualDeviceRepository + .findByTriggerAndState(VirtualDevice.TRIGGER_CRON, VirtualDevice.STATE_RUNNING); + cronVirtualDevices.addAll(randomVirtualDevices); + return cronVirtualDevices; + } + + private void addTask(VirtualDevice virtualDevice) { + try { + String id = virtualDevice.getId(); + String name = virtualDevice.getName(); + String script = virtualDevice.getScript(); + log.info("adding virtual device job,id:{},name:{}", id, name); + + //添加新的脚本对象 + virtualScripts.put(id, engine.eval(String.format("new (function () {\n%s})()", script))); + List devices = new ArrayList<>(); + for (String deviceId : virtualDevice.getDevices()) { + devices.add(deviceCache.get(deviceId)); + //更新deviceId的虚拟设备Id对应关系 + Set virtualIds = deviceIdToVirtualId.getOrDefault(deviceId, new HashSet<>()); + virtualIds.add(id); + deviceIdToVirtualId.put(deviceId, virtualIds); + } + + JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put("virtualManager", this); + jobDataMap.put("virtualDevice", virtualDevice); + jobDataMap.put("devices", devices); + + JobDetail jobDetail = JobBuilder.newJob(VirtualExecutor.class) + .withIdentity(id, name) + .usingJobData(jobDataMap) + .build(); + + Trigger trigger = TriggerBuilder + .newTrigger() + .withIdentity("trigger_" + id, "triggerGroup_" + name) + .startNow() + .withSchedule( + getTriggerBuilder(virtualDevice) + ).build(); + + scheduler.scheduleJob(jobDetail, trigger); + if (!scheduler.isShutdown()) { + scheduler.start(); + } + } catch (Throwable e) { + log.error("create job failed", e); + } + } + + private ScheduleBuilder getTriggerBuilder(VirtualDevice virtualDevice) { + String type = virtualDevice.getTrigger(); + if ("random".equals(type)) { + return new RandomScheduleBuilder(virtualDevice.getTriggerExpression()); + } + if ("cron".equals(type)) { + return CronScheduleBuilder.cronSchedule(virtualDevice.getTriggerExpression()); + } + return null; + } + + @SneakyThrows + public void deleteTask(VirtualDevice virtualDevice) { + String id = virtualDevice.getId(); + String name = virtualDevice.getName(); + + //删除脚本对象 + virtualScripts.remove(id); + + //更新deviceId的虚拟设备Id对应关系 + for (String deviceId : deviceIdToVirtualId.keySet()) { + Set virtualIds = deviceIdToVirtualId.get(deviceId); + virtualIds.remove(id); + } + + //删除job + TriggerKey triggerKey = new TriggerKey("trigger_" + id, "triggerGroup_" + name); + if (!scheduler.checkExists(triggerKey)) { + return; + } + scheduler.deleteJob(JobKey.jobKey(id, name)); + } + + /** + * 处理js上报方法返回结果 + */ + public void processReport(Object sourceMsg) { + try { + ScriptObjectMirror result = (ScriptObjectMirror) sourceMsg; + ThingModelMessage modelMessage = new ThingModelMessage(); + BeanUtils.populate(modelMessage, result); + deviceBehaviourService.reportMessage(modelMessage); + } catch (Throwable e) { + log.error("process js data error", e); + } + } + + /** + * 调用js方法 + */ + private Object invokeMethod(Object scriptObj, String name, Object... args) { + try { + if (((ScriptObjectMirror) scriptObj).get(name) != null) { + return engine.invokeMethod(scriptObj, name, args); + } + return null; + } catch (Throwable e) { + log.error("invoke js method error", e); + } + return null; + } + + /** + * 调用脚本中上报方法 + */ + public void invokeReport(DeviceInfo device) { + //设备上线 + deviceOnline(device); + + String deviceId = device.getDeviceId(); + Set virtualIds = deviceIdToVirtualId.get(deviceId); + if (virtualIds == null) { + return; + } + + for (String virtualId : virtualIds) { + Object scriptObj = virtualScripts.get(virtualId); + if (scriptObj == null) { + continue; + } + processReport(invokeMethod(scriptObj, "report", device)); + } + } + + /** + * 设备上线 + */ + private void deviceOnline(DeviceInfo device) { + DeviceInfo.State state = device.getState(); + if (state == null || !state.isOnline()) { + //设备离线,产生上线消息 + deviceBehaviourService.deviceStateChange(device.getProductKey(), device.getDeviceName(), true); + } + } + + /** + * 保存虚拟设备日志 + */ + public void saveLog(VirtualDeviceLog log) { + virtualDeviceLogRepository.save(log); + } + +} diff --git a/virtual-device/src/main/java/cc/iotkit/virtualdevice/config/VirtualConfig.java b/virtual-device/src/main/java/cc/iotkit/virtualdevice/config/VirtualConfig.java new file mode 100644 index 00000000..19d513c9 --- /dev/null +++ b/virtual-device/src/main/java/cc/iotkit/virtualdevice/config/VirtualConfig.java @@ -0,0 +1,15 @@ +package cc.iotkit.virtualdevice.config; + +import cc.iotkit.virtualdevice.VirtualManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class VirtualConfig { + + @Bean + public VirtualManager getVirtualManager() { + return new VirtualManager(); + } + +} diff --git a/virtual-device/src/main/java/cc/iotkit/virtualdevice/trigger/RandomScheduleBuilder.java b/virtual-device/src/main/java/cc/iotkit/virtualdevice/trigger/RandomScheduleBuilder.java new file mode 100644 index 00000000..4177e883 --- /dev/null +++ b/virtual-device/src/main/java/cc/iotkit/virtualdevice/trigger/RandomScheduleBuilder.java @@ -0,0 +1,18 @@ +package cc.iotkit.virtualdevice.trigger; + +import org.quartz.ScheduleBuilder; +import org.quartz.spi.MutableTrigger; + +public class RandomScheduleBuilder extends ScheduleBuilder { + + private final String unit; + + public RandomScheduleBuilder(String unit) { + this.unit = unit; + } + + public MutableTrigger build() { + return new RandomTrigger(unit); + } + +} diff --git a/virtual-device/src/main/java/cc/iotkit/virtualdevice/trigger/RandomTrigger.java b/virtual-device/src/main/java/cc/iotkit/virtualdevice/trigger/RandomTrigger.java new file mode 100644 index 00000000..47ae84d1 --- /dev/null +++ b/virtual-device/src/main/java/cc/iotkit/virtualdevice/trigger/RandomTrigger.java @@ -0,0 +1,47 @@ +package cc.iotkit.virtualdevice.trigger; + +import org.apache.commons.lang3.RandomUtils; +import org.quartz.Calendar; +import org.quartz.impl.triggers.SimpleTriggerImpl; + +import java.util.Date; + +public class RandomTrigger extends SimpleTriggerImpl { + + private String unit; + private Date nextFireTime; + + public RandomTrigger(String unit) { + this.unit = unit; + } + + @Override + public void triggered(Calendar calendar) { + super.triggered(calendar); + nextFireTime = randomTime(); + } + + @Override + public Date getNextFireTime() { + if (nextFireTime == null) { + nextFireTime = randomTime(); + } + return nextFireTime; + } + + private Date randomTime() { + Date previousTime = getPreviousFireTime(); + if (previousTime == null) { + previousTime = new Date(); + } + long time = previousTime.getTime(); + if ("second".equals(unit)) { + time = time + RandomUtils.nextInt(0, 60) * 1000; + } else if ("minute".equals(unit)) { + time = time + RandomUtils.nextInt(0, 60) * 1000 * 60; + } else if ("hour".equals(unit)) { + time = time + RandomUtils.nextInt(0, 60) * 1000 * 60 * 60; + } + return new Date(time); + } +} diff --git a/virtual-device/src/main/resources/spring.factories b/virtual-device/src/main/resources/spring.factories new file mode 100644 index 00000000..68e7ceb9 --- /dev/null +++ b/virtual-device/src/main/resources/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=cc.iotkit.virtualdevice.config.VirtualConfig