diff --git a/iot-dao/iot-temporal-service/src/main/java/cc/iotkit/temporal/IThingModelMessageData.java b/iot-dao/iot-temporal-service/src/main/java/cc/iotkit/temporal/IThingModelMessageData.java index 2155c135..d8c005ec 100644 --- a/iot-dao/iot-temporal-service/src/main/java/cc/iotkit/temporal/IThingModelMessageData.java +++ b/iot-dao/iot-temporal-service/src/main/java/cc/iotkit/temporal/IThingModelMessageData.java @@ -38,6 +38,25 @@ public interface IThingModelMessageData { */ List getDeviceMessageStatsWithUid(String uid, long start, long end); + + /** + * 按用户统计时间段内上行消息 + * @param uid 用户id + * @param start 开始时间戳 + * @param end 结束时间戳 + */ + List getDeviceUpMessageStatsWithUid(String uid, Long start, Long end); + + /** + * 按用户统计时间段内下行 + * @param uid 用户id + * @param start 开始时间戳 + * @param end 结束时间戳 + */ + List getDeviceDownMessageStatsWithUid(String uid, Long start, Long end); + + + void add(ThingModelMessage msg); long count(); diff --git a/iot-dao/iot-temporal-serviceImpl-es/src/main/java/cc/iotkit/temporal/es/service/ThingModelMessageDataImpl.java b/iot-dao/iot-temporal-serviceImpl-es/src/main/java/cc/iotkit/temporal/es/service/ThingModelMessageDataImpl.java index f303588b..4ab7fe3c 100644 --- a/iot-dao/iot-temporal-serviceImpl-es/src/main/java/cc/iotkit/temporal/es/service/ThingModelMessageDataImpl.java +++ b/iot-dao/iot-temporal-serviceImpl-es/src/main/java/cc/iotkit/temporal/es/service/ThingModelMessageDataImpl.java @@ -16,6 +16,7 @@ import cc.iotkit.model.stats.TimeData; import cc.iotkit.temporal.IThingModelMessageData; import cc.iotkit.temporal.es.dao.ThingModelMessageRepository; import cc.iotkit.temporal.es.document.DocThingModelMessage; +import cn.hutool.core.util.ObjectUtil; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -28,6 +29,7 @@ import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.core.ElasticsearchAggregations; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; +import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.SearchHits; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; @@ -100,6 +102,92 @@ public class ThingModelMessageDataImpl implements IThingModelMessageData { return data; } + @Override + public List getDeviceUpMessageStatsWithUid(String uid, Long start, Long end) { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + if (ObjectUtil.isNotEmpty(start) && ObjectUtil.isNotEmpty(end)) { + queryBuilder.must(QueryBuilders.rangeQuery("time") + .from(start, true).to(end, true)); + } + + if ( ObjectUtil.isNotEmpty(uid) ) { + queryBuilder = + queryBuilder.must(QueryBuilders.termQuery("uid", uid)); + } + + // 查询字段type='property' and identifier='report', 或者 type='event' 的数据 + queryBuilder = queryBuilder.must(QueryBuilders.boolQuery() + .should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("type", "property")) + .must(QueryBuilders.termQuery("identifier", "report"))) + .should(QueryBuilders.termQuery("type", "event"))); + + NativeSearchQuery query = new NativeSearchQueryBuilder() + .withQuery(queryBuilder) + .withAggregations(AggregationBuilders.dateHistogram("agg") + .field("time") + .calendarInterval(DateHistogramInterval.HOUR) + .calendarInterval(DateHistogramInterval.hours(1)) + ) + .build(); + + ElasticsearchAggregations result = (ElasticsearchAggregations) template + .search(query, DocThingModelMessage.class).getAggregations(); + ParsedDateHistogram histogram = result.aggregations().get("agg"); + + List data = new ArrayList<>(); + for (Histogram.Bucket bucket : histogram.getBuckets()) { + long seconds = ((ZonedDateTime) bucket.getKey()).toInstant().getEpochSecond(); + data.add(new TimeData(seconds * 1000, bucket.getDocCount())); + } + + return data; + } + + @Override + public List getDeviceDownMessageStatsWithUid(String uid, Long start, Long end) { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + if (ObjectUtil.isNotEmpty(start) && ObjectUtil.isNotEmpty(end)) { + queryBuilder.must(QueryBuilders.rangeQuery("time") + .from(start, true).to(end, true)); + } + + if ( ObjectUtil.isNotEmpty(uid) ) { + queryBuilder = + queryBuilder.must(QueryBuilders.termQuery("uid", uid)); + } + + // 查询字段type='property' and identifie!='report', 或者 type='service' 或者 type= 'config' + queryBuilder = queryBuilder.must(QueryBuilders.boolQuery() + .should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("type", "property")) + .must(QueryBuilders.boolQuery() + .mustNot(QueryBuilders.termQuery("identifier", "report")))) + .should(QueryBuilders.termQuery("type", "service")) + .should(QueryBuilders.termQuery("type", "config"))); + + NativeSearchQuery query = new NativeSearchQueryBuilder() + .withQuery(queryBuilder) + .withAggregations(AggregationBuilders.dateHistogram("agg") + .field("time") + .calendarInterval(DateHistogramInterval.HOUR) + .calendarInterval(DateHistogramInterval.hours(1)) + ) + .build(); + + ElasticsearchAggregations result = (ElasticsearchAggregations) template + .search(query, DocThingModelMessage.class).getAggregations(); + ParsedDateHistogram histogram = result.aggregations().get("agg"); + + List data = new ArrayList<>(); + for (Histogram.Bucket bucket : histogram.getBuckets()) { + long seconds = ((ZonedDateTime) bucket.getKey()).toInstant().getEpochSecond(); + data.add(new TimeData(seconds * 1000, bucket.getDocCount())); + } + + return data; + } + @Override public void add(ThingModelMessage msg) { thingModelMessageRepository.save(MapstructUtils.convert(msg, DocThingModelMessage.class)); diff --git a/iot-dao/iot-temporal-serviceImpl-iotdb/src/main/java/cc/iotkit/temporal/iotdb/service/ThingModelMessageDataImpl.java b/iot-dao/iot-temporal-serviceImpl-iotdb/src/main/java/cc/iotkit/temporal/iotdb/service/ThingModelMessageDataImpl.java index b97fc6e3..29dd1df4 100644 --- a/iot-dao/iot-temporal-serviceImpl-iotdb/src/main/java/cc/iotkit/temporal/iotdb/service/ThingModelMessageDataImpl.java +++ b/iot-dao/iot-temporal-serviceImpl-iotdb/src/main/java/cc/iotkit/temporal/iotdb/service/ThingModelMessageDataImpl.java @@ -33,6 +33,16 @@ public class ThingModelMessageDataImpl implements IThingModelMessageData { return new ArrayList<>(); } + @Override + public List getDeviceUpMessageStatsWithUid(String uid, Long start, Long end) { + return null; + } + + @Override + public List getDeviceDownMessageStatsWithUid(String uid, Long start, Long end) { + return null; + } + @Override public void add(ThingModelMessage msg) { } diff --git a/iot-dao/iot-temporal-serviceImpl-td/src/main/java/cc/iotkit/temporal/td/service/ThingModelMessageDataImpl.java b/iot-dao/iot-temporal-serviceImpl-td/src/main/java/cc/iotkit/temporal/td/service/ThingModelMessageDataImpl.java index 85ff48a3..d3828a56 100644 --- a/iot-dao/iot-temporal-serviceImpl-td/src/main/java/cc/iotkit/temporal/td/service/ThingModelMessageDataImpl.java +++ b/iot-dao/iot-temporal-serviceImpl-td/src/main/java/cc/iotkit/temporal/td/service/ThingModelMessageDataImpl.java @@ -16,6 +16,7 @@ import cc.iotkit.model.stats.TimeData; import cc.iotkit.temporal.IThingModelMessageData; import cc.iotkit.temporal.td.dao.TdTemplate; import cc.iotkit.temporal.td.model.TbThingModelMessage; +import cn.hutool.core.util.ObjectUtil; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.BeanPropertyRowMapper; @@ -89,6 +90,56 @@ public class ThingModelMessageDataImpl implements IThingModelMessageData { return tdTemplate.query(sql, new BeanPropertyRowMapper<>(TimeData.class), args.toArray()); } + @Override + public List getDeviceUpMessageStatsWithUid(String uid, Long start, Long end) { + String sql = "select time,count(*) as data from(" + + "select TIMETRUNCATE(time,1h) as time from thing_model_message " + + "where (type='property' and identifier='report') or type='event' "; + StringBuilder sqlBuffer = new StringBuilder(); + sqlBuffer.append(sql); + + List args = new ArrayList<>(); + if (ObjectUtil.isNotEmpty(uid)) { + sqlBuffer.append(" and uid=?"); + args.add(uid); + } + + if (ObjectUtil.isNotEmpty(start) && ObjectUtil.isNotEmpty(end)) { + sqlBuffer.append(" and time>=? and time<=?"); + args.add(start); + args.add(end); + } + + sqlBuffer.append(") a group by time order by time asc"); + + return tdTemplate.query(sqlBuffer.toString(), new BeanPropertyRowMapper<>(TimeData.class), args.toArray()); + } + + @Override + public List getDeviceDownMessageStatsWithUid(String uid, Long start, Long end) { + String sql = "select time,count(*) as data from(" + + "select TIMETRUNCATE(time,1h) as time from thing_model_message " + + "where (type='property' and identifier!='report') or type='service' or type= 'config' "; + StringBuilder sqlBuffer = new StringBuilder(); + sqlBuffer.append(sql); + + List args = new ArrayList<>(); + if (ObjectUtil.isNotEmpty(uid)) { + sqlBuffer.append(" and uid=?"); + args.add(uid); + } + + if (ObjectUtil.isNotEmpty(start) && ObjectUtil.isNotEmpty(end)) { + sqlBuffer.append(" and time>=? and time<=?"); + args.add(start); + args.add(end); + } + + sqlBuffer.append(") a group by time order by time asc"); + + return tdTemplate.query(sqlBuffer.toString(), new BeanPropertyRowMapper<>(TimeData.class), args.toArray()); + } + @Override public void add(ThingModelMessage msg) { //使用deviceId作表名 diff --git a/iot-module/iot-manager/src/main/java/cc/iotkit/manager/controller/StatsController.java b/iot-module/iot-manager/src/main/java/cc/iotkit/manager/controller/StatsController.java index 1f84ab57..f766edab 100644 --- a/iot-module/iot-manager/src/main/java/cc/iotkit/manager/controller/StatsController.java +++ b/iot-module/iot-manager/src/main/java/cc/iotkit/manager/controller/StatsController.java @@ -66,8 +66,10 @@ public class StatsController { mainStats.setNeverOnlineTotal(deviceInfoData.findNeverUsedDevices().size()); mainStats.setReportTotal(thingModelMessageData.count()); - //上报数据统计 - mainStats.setReportDataStats(thingModelMessageData.getDeviceMessageStatsWithUid(null, now - 48 * 3600 * 1000, now)); + //上行数据统计 + mainStats.setDeviceUpMessageStats(thingModelMessageData.getDeviceUpMessageStatsWithUid(null, null, null)); + // 下行数据统计 + mainStats.setDeviceDownMessageStats(thingModelMessageData.getDeviceDownMessageStatsWithUid(null, null, null)); //产品数量统计 mainStats.setDeviceStatsOfCategory(deviceInfoData.getDeviceStatsByCategory("")); } else { diff --git a/iot-module/iot-manager/src/main/java/cc/iotkit/manager/model/stats/MainStats.java b/iot-module/iot-manager/src/main/java/cc/iotkit/manager/model/stats/MainStats.java index 09195e12..33ecc9d6 100644 --- a/iot-module/iot-manager/src/main/java/cc/iotkit/manager/model/stats/MainStats.java +++ b/iot-module/iot-manager/src/main/java/cc/iotkit/manager/model/stats/MainStats.java @@ -61,6 +61,18 @@ public class MainStats { */ private List reportDataStats; + + /** + * 上行数据数量统计 + */ + private List deviceUpMessageStats; + + /** + * 下行数据数量统计 + */ + private List deviceDownMessageStats; + + /** * 按品类统计的设备数量 */