From 29c0ca9d3f6972d65c2334ffbf37fa9352109623 Mon Sep 17 00:00:00 2001 From: xiwa Date: Thu, 3 Nov 2022 23:46:58 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E6=95=B0=E6=8D=AEes=E7=B4=A2?= =?UTF-8?q?=E5=BC=95=E7=AD=96=E7=95=A5=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iot-data/iot-es-temporal-service/pom.xml | 5 ++ .../es/document/DevicePropertyMapper.java | 2 - .../es/service/DevicePropertyDataImpl.java | 57 +++++++++++++++---- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/iot-data/iot-es-temporal-service/pom.xml b/iot-data/iot-es-temporal-service/pom.xml index 5d35b908..c74629d1 100755 --- a/iot-data/iot-es-temporal-service/pom.xml +++ b/iot-data/iot-es-temporal-service/pom.xml @@ -39,6 +39,11 @@ iot-temporal-service + + cc.iotkit + iot-data-cache + + diff --git a/iot-data/iot-es-temporal-service/src/main/java/cc/iotkit/temporal/es/document/DevicePropertyMapper.java b/iot-data/iot-es-temporal-service/src/main/java/cc/iotkit/temporal/es/document/DevicePropertyMapper.java index c54f5756..f99ad055 100755 --- a/iot-data/iot-es-temporal-service/src/main/java/cc/iotkit/temporal/es/document/DevicePropertyMapper.java +++ b/iot-data/iot-es-temporal-service/src/main/java/cc/iotkit/temporal/es/document/DevicePropertyMapper.java @@ -18,6 +18,4 @@ public interface DevicePropertyMapper { DevicePropertyMapper M = Mappers.getMapper(DevicePropertyMapper.class); DeviceProperty toDto(DocDeviceProperty vo); - - DocDeviceProperty toVo(DeviceProperty dto); } diff --git a/iot-data/iot-es-temporal-service/src/main/java/cc/iotkit/temporal/es/service/DevicePropertyDataImpl.java b/iot-data/iot-es-temporal-service/src/main/java/cc/iotkit/temporal/es/service/DevicePropertyDataImpl.java index ef13a430..a33f07e1 100755 --- a/iot-data/iot-es-temporal-service/src/main/java/cc/iotkit/temporal/es/service/DevicePropertyDataImpl.java +++ b/iot-data/iot-es-temporal-service/src/main/java/cc/iotkit/temporal/es/service/DevicePropertyDataImpl.java @@ -9,24 +9,27 @@ */ package cc.iotkit.temporal.es.service; +import cc.iotkit.data.IDeviceInfoData; +import cc.iotkit.model.device.DeviceInfo; import cc.iotkit.model.device.message.DeviceProperty; import cc.iotkit.temporal.IDevicePropertyData; import cc.iotkit.temporal.es.document.DevicePropertyMapper; import cc.iotkit.temporal.es.document.DocDeviceProperty; +import org.apache.commons.lang3.StringUtils; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import org.springframework.data.elasticsearch.core.SearchHits; +import org.springframework.data.elasticsearch.core.document.Document; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; @Service @@ -35,18 +38,24 @@ public class DevicePropertyDataImpl implements IDevicePropertyData { @Autowired private ElasticsearchRestTemplate template; + @Autowired + @Qualifier("deviceInfoDataCache") + private IDeviceInfoData deviceInfoData; + + private final Set indexSet = new HashSet<>(); + public List findDevicePropertyHistory(String deviceId, String name, long start, long end) { + String index = getIndex(deviceId, name); NativeSearchQuery query = new NativeSearchQueryBuilder() .withQuery( QueryBuilders.boolQuery() .must(QueryBuilders.termQuery("deviceId", deviceId)) - .must(QueryBuilders.termQuery("name", name.toLowerCase())) .must(QueryBuilders.rangeQuery("time") .from(start, true).to(end, true)) ) .withSorts(new FieldSortBuilder("time").order(SortOrder.ASC)) .build(); - SearchHits result = template.search(query, DocDeviceProperty.class); + SearchHits result = template.search(query, DocDeviceProperty.class, IndexCoordinates.of(index)); return result.getSearchHits().stream() .map(h -> DevicePropertyMapper.M.toDto(h.getContent())) .collect(Collectors.toList()); @@ -54,13 +63,37 @@ public class DevicePropertyDataImpl implements IDevicePropertyData { @Override public void addProperties(String deviceId, Map properties, long time) { - List deviceProperties = new ArrayList<>(); - properties.forEach((key, val) -> deviceProperties.add( - new DocDeviceProperty(UUID.randomUUID().toString(), deviceId, key, val, time) - )); - - template.save(deviceProperties); + properties.forEach((key, val) -> { + String index = getIndex(deviceId, key); + template.save( + new DocDeviceProperty(UUID.randomUUID().toString(), deviceId, key, val, time), + IndexCoordinates.of(index) + ); + }); } + private String getIndex(String deviceId, String name) { + DeviceInfo deviceInfo = deviceInfoData.findByDeviceId(deviceId); + if (deviceInfo == null) { + return null; + } + String pk = deviceInfo.getProductKey().toLowerCase(); + String index = String.format("device_property_%s_%s", pk, name); + if (null == index || StringUtils.isBlank(index)) { + return null; + } + if (!indexSet.contains(index)) { + IndexCoordinates indexCoordinates = IndexCoordinates.of(index); + if (!template.indexOps(indexCoordinates).exists()) { + // 根据索引实体,获取mapping字段 + Document mapping = template.indexOps(indexCoordinates).createMapping(DocDeviceProperty.class); + template.indexOps(indexCoordinates).create(); + // 创建索引mapping + template.indexOps(indexCoordinates).putMapping(mapping); + } + indexSet.add(index); + } + return index; + } }