设备数据es索引策略调整

V0.5.x
xiwa 2022-11-03 23:46:58 +08:00
parent 42c19b568e
commit 29c0ca9d3f
3 changed files with 50 additions and 14 deletions

View File

@ -39,6 +39,11 @@
<artifactId>iot-temporal-service</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>iot-data-cache</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -18,6 +18,4 @@ public interface DevicePropertyMapper {
DevicePropertyMapper M = Mappers.getMapper(DevicePropertyMapper.class);
DeviceProperty toDto(DocDeviceProperty vo);
DocDeviceProperty toVo(DeviceProperty dto);
}

View File

@ -9,24 +9,27 @@
*/
package cc.iotkit.temporal.es.service;
import cc.iotkit.data.IDeviceInfoData;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.device.message.DeviceProperty;
import cc.iotkit.temporal.IDevicePropertyData;
import cc.iotkit.temporal.es.document.DevicePropertyMapper;
import cc.iotkit.temporal.es.document.DocDeviceProperty;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;
@Service
@ -35,18 +38,24 @@ public class DevicePropertyDataImpl implements IDevicePropertyData {
@Autowired
private ElasticsearchRestTemplate template;
@Autowired
@Qualifier("deviceInfoDataCache")
private IDeviceInfoData deviceInfoData;
private final Set<String> indexSet = new HashSet<>();
public List<DeviceProperty> findDevicePropertyHistory(String deviceId, String name, long start, long end) {
String index = getIndex(deviceId, name);
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("deviceId", deviceId))
.must(QueryBuilders.termQuery("name", name.toLowerCase()))
.must(QueryBuilders.rangeQuery("time")
.from(start, true).to(end, true))
)
.withSorts(new FieldSortBuilder("time").order(SortOrder.ASC))
.build();
SearchHits<DocDeviceProperty> result = template.search(query, DocDeviceProperty.class);
SearchHits<DocDeviceProperty> result = template.search(query, DocDeviceProperty.class, IndexCoordinates.of(index));
return result.getSearchHits().stream()
.map(h -> DevicePropertyMapper.M.toDto(h.getContent()))
.collect(Collectors.toList());
@ -54,13 +63,37 @@ public class DevicePropertyDataImpl implements IDevicePropertyData {
@Override
public void addProperties(String deviceId, Map<String, Object> properties, long time) {
List<DocDeviceProperty> deviceProperties = new ArrayList<>();
properties.forEach((key, val) -> deviceProperties.add(
new DocDeviceProperty(UUID.randomUUID().toString(), deviceId, key, val, time)
));
template.save(deviceProperties);
properties.forEach((key, val) -> {
String index = getIndex(deviceId, key);
template.save(
new DocDeviceProperty(UUID.randomUUID().toString(), deviceId, key, val, time),
IndexCoordinates.of(index)
);
});
}
private String getIndex(String deviceId, String name) {
DeviceInfo deviceInfo = deviceInfoData.findByDeviceId(deviceId);
if (deviceInfo == null) {
return null;
}
String pk = deviceInfo.getProductKey().toLowerCase();
String index = String.format("device_property_%s_%s", pk, name);
if (null == index || StringUtils.isBlank(index)) {
return null;
}
if (!indexSet.contains(index)) {
IndexCoordinates indexCoordinates = IndexCoordinates.of(index);
if (!template.indexOps(indexCoordinates).exists()) {
// 根据索引实体获取mapping字段
Document mapping = template.indexOps(indexCoordinates).createMapping(DocDeviceProperty.class);
template.indexOps(indexCoordinates).create();
// 创建索引mapping
template.indexOps(indexCoordinates).putMapping(mapping);
}
indexSet.add(index);
}
return index;
}
}