add timescale时序数据库支持
parent
b84d4789c8
commit
4ff091f68a
|
@ -0,0 +1,97 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>cc.iotkit</groupId>
|
||||||
|
<artifactId>iot-data</artifactId>
|
||||||
|
<version>0.4.2-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>iot-ts-temporal-service</artifactId>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>11</maven.compiler.source>
|
||||||
|
<maven.compiler.target>11</maven.compiler.target>
|
||||||
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework</groupId>
|
||||||
|
<artifactId>spring-context</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-jdbc</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-annotations</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mapstruct</groupId>
|
||||||
|
<artifactId>mapstruct</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>cn.hutool</groupId>
|
||||||
|
<artifactId>hutool-http</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>cc.iotkit</groupId>
|
||||||
|
<artifactId>iot-data-cache</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.jooq</groupId>
|
||||||
|
<artifactId>jooq-meta</artifactId>
|
||||||
|
<version>3.14.15</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.jooq</groupId>
|
||||||
|
<artifactId>jooq</artifactId>
|
||||||
|
<version>3.14.15</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.postgresql</groupId>
|
||||||
|
<artifactId>postgresql</artifactId>
|
||||||
|
<version>42.5.4</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework</groupId>
|
||||||
|
<artifactId>spring-jdbc</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>cc.iotkit</groupId>
|
||||||
|
<artifactId>iot-model</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>cc.iotkit</groupId>
|
||||||
|
<artifactId>iot-model</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>cc.iotkit</groupId>
|
||||||
|
<artifactId>iot-temporal-service</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,5 @@
|
||||||
|
### 时序数据库服务接口的TimescaleDB实现
|
||||||
|
|
||||||
|
postgrep 14
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.config;
|
||||||
|
|
||||||
|
public interface Constants {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据产品key获取产品属性超级表名
|
||||||
|
*/
|
||||||
|
static String getProductPropertySTableName(String productKey) {
|
||||||
|
return String.format("product_property_%s", productKey.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据deviceId获取设备属性表名
|
||||||
|
*/
|
||||||
|
static String getDevicePropertyTableName(String deviceId) {
|
||||||
|
return String.format("device_property_%s", deviceId.toLowerCase());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.config;
|
||||||
|
|
||||||
|
import cc.iotkit.temporal.ts.dao.TsTemplate;
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class TsDatasourceConfig {
|
||||||
|
|
||||||
|
@Value("${spring.ts-datasource.url}")
|
||||||
|
private String url;
|
||||||
|
|
||||||
|
@Value("${spring.ts-datasource.driverClassName}")
|
||||||
|
private String driverClassName;
|
||||||
|
|
||||||
|
@Value("${spring.ts-datasource.username}")
|
||||||
|
private String username;
|
||||||
|
|
||||||
|
@Value("${spring.ts-datasource.password}")
|
||||||
|
private String password;
|
||||||
|
|
||||||
|
@Bean("tsJdbcTemplate")
|
||||||
|
public TsTemplate tdJdbcTemplate() {
|
||||||
|
HikariDataSource dataSource = new HikariDataSource();
|
||||||
|
dataSource.setJdbcUrl(url);
|
||||||
|
dataSource.setUsername(username);
|
||||||
|
dataSource.setPassword(password);
|
||||||
|
dataSource.setDriverClassName(driverClassName);
|
||||||
|
return new TsTemplate(dataSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
package cc.iotkit.temporal.ts.dao;
|
||||||
|
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
|
public class TsTemplate extends JdbcTemplate {
|
||||||
|
|
||||||
|
public TsTemplate() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public TsTemplate(DataSource dataSource) {
|
||||||
|
super(dataSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TsTemplate(DataSource dataSource, boolean lazyInit) {
|
||||||
|
super(dataSource, lazyInit);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
package cc.iotkit.temporal.ts.dm;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class DbField {
|
||||||
|
private String name;
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
private int length;
|
||||||
|
}
|
|
@ -0,0 +1,118 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.dm;
|
||||||
|
|
||||||
|
|
||||||
|
import cc.iotkit.model.product.ThingModel;
|
||||||
|
import org.jooq.DataType;
|
||||||
|
import org.jooq.Field;
|
||||||
|
import org.jooq.impl.DSL;
|
||||||
|
import org.jooq.impl.SQLDataType;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class FieldParser {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 物模型到td数据类型映射
|
||||||
|
*/
|
||||||
|
private static final Map<String, DataType> TYPE_MAPPING = Map.of(
|
||||||
|
"int32", SQLDataType.INTEGER,
|
||||||
|
"float", SQLDataType.FLOAT,
|
||||||
|
"bool", SQLDataType.BOOLEAN,
|
||||||
|
"enum", SQLDataType.INTEGER,
|
||||||
|
"text", SQLDataType.NVARCHAR,
|
||||||
|
"date", SQLDataType.DATE
|
||||||
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* td数据类型到物模型映射
|
||||||
|
*/
|
||||||
|
|
||||||
|
private static final Map<String, DataType> DB2TYPE_MAPPING = Map.of(
|
||||||
|
"int",SQLDataType.INTEGER,
|
||||||
|
"float", SQLDataType.FLOAT,
|
||||||
|
"bool", SQLDataType.BOOLEAN,
|
||||||
|
"char",SQLDataType.NVARCHAR,
|
||||||
|
"date", SQLDataType.DATE,
|
||||||
|
"timestamptz", SQLDataType.TIMESTAMPWITHTIMEZONE
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
private static DataType getFieldType(final String type) {
|
||||||
|
Set<String> keys = DB2TYPE_MAPPING.keySet();
|
||||||
|
String lowerCase = type.toLowerCase();
|
||||||
|
for(String key:keys){
|
||||||
|
if(lowerCase.contains(key)){
|
||||||
|
return DB2TYPE_MAPPING.get(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将物模型字段转换为td字段
|
||||||
|
*/
|
||||||
|
public static TsField parse(ThingModel.Property property) {
|
||||||
|
String filedName = property.getIdentifier().toLowerCase();
|
||||||
|
ThingModel.DataType dataType = property.getDataType();
|
||||||
|
String type = dataType.getType();
|
||||||
|
|
||||||
|
//将物模型字段类型映射为td字段类型
|
||||||
|
DataType fType = TYPE_MAPPING.get(type);
|
||||||
|
Object specs = dataType.getSpecs();
|
||||||
|
int len = -1;
|
||||||
|
if (specs instanceof Map) {
|
||||||
|
Object objLen = ((Map<?, ?>) specs).get("length");
|
||||||
|
if (objLen != null) {
|
||||||
|
len = Integer.parseInt(objLen.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new TsField(filedName, fType, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取物模型中的字段列表
|
||||||
|
*/
|
||||||
|
public static List<TsField> parse(ThingModel thingModel) {
|
||||||
|
return thingModel.getModel().getProperties().stream().map(FieldParser::parse).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将从库中查出来的字段信息转换为td字段对象
|
||||||
|
*/
|
||||||
|
public static List<TsField> parse(List<DbField> rows) {
|
||||||
|
return (List<TsField>) rows.stream().map((r) -> {
|
||||||
|
|
||||||
|
return new TsField(
|
||||||
|
r.getName(),
|
||||||
|
getFieldType(r.getType()).length(r.getLength()),r.getLength());
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取字段字义
|
||||||
|
*/
|
||||||
|
public static Field getFieldDefine(TsField field) {
|
||||||
|
int length = field.getLength();
|
||||||
|
DataType type = field.getType();
|
||||||
|
|
||||||
|
if(length>0){
|
||||||
|
type.length(length);
|
||||||
|
}
|
||||||
|
return DSL.field(field.getName(),type);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,161 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.dm;
|
||||||
|
|
||||||
|
import org.jooq.*;
|
||||||
|
import org.jooq.conf.ParamType;
|
||||||
|
import org.jooq.impl.DSL;
|
||||||
|
import org.jooq.impl.SQLDataType;
|
||||||
|
import org.jooq.tools.StringUtils;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.jooq.impl.DSL.*;
|
||||||
|
|
||||||
|
public class TableManager {
|
||||||
|
|
||||||
|
private static final DSLContext sqlBuilder = DSL.using(SQLDialect.POSTGRES);
|
||||||
|
|
||||||
|
public static DSLContext getSqlBuilder() {
|
||||||
|
return sqlBuilder;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取创建表sql
|
||||||
|
*/
|
||||||
|
public static String getCreateSTableSql(String tbName, List<TsField> fields) {
|
||||||
|
if (fields.size() == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
CreateTableColumnStep tableColumnStep = sqlBuilder.createTable(tbName)
|
||||||
|
.column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false))
|
||||||
|
.column(field("device_id", SQLDataType.NCHAR.length(50).nullable(false)));
|
||||||
|
|
||||||
|
//生成字段片段
|
||||||
|
|
||||||
|
for (TsField field : fields) {
|
||||||
|
tableColumnStep.column(FieldParser.getFieldDefine(field));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return tableColumnStep.getSQL(ParamType.INLINED);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getCreateSTableIndexSql(String tbName, String partitionCol) {
|
||||||
|
//根据时间和设备纬度分区
|
||||||
|
String sql = null;
|
||||||
|
if(StringUtils.isBlank(partitionCol)){
|
||||||
|
// 只根据时间分区
|
||||||
|
sql= String.format(" SELECT create_hypertable('%s', 'time') ;", tbName);
|
||||||
|
}else{
|
||||||
|
sql= String.format(" SELECT * FROM create_hypertable('%s', 'time'," +
|
||||||
|
" partitioning_column => '%s'," +
|
||||||
|
" number_partitions => 4" +
|
||||||
|
") ;", tbName, partitionCol );
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return sql;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getCreateTableIndexSql(String tbName) {
|
||||||
|
|
||||||
|
CreateIndexIncludeStep step = sqlBuilder.createIndexIfNotExists(tbName + "_index").on(
|
||||||
|
table(name(tbName)),
|
||||||
|
field(name("device_id")),
|
||||||
|
field(name("time")).desc());
|
||||||
|
|
||||||
|
return step.getSQL(ParamType.INLINED);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 取正确的表名
|
||||||
|
*
|
||||||
|
* @param name 表象
|
||||||
|
*/
|
||||||
|
public static String rightTbName(String name) {
|
||||||
|
return name.toLowerCase().replace("-", "_");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取表详情的sql
|
||||||
|
*/
|
||||||
|
public static String getDescTableSql(String tbName) {
|
||||||
|
|
||||||
|
|
||||||
|
String sql =String.format( " select a.attname as name," +
|
||||||
|
" t.typname as type, " +
|
||||||
|
"a.attlen as length," +
|
||||||
|
" case when a.attnotnull='t' then '1' else '0' end as nullable," +
|
||||||
|
" case when b.pk='t' then '1' else '0' end as isPk " +
|
||||||
|
"from pg_class e, pg_attribute a left join pg_type t on a.atttypid = t.oid " +
|
||||||
|
"left join (select pg_constraint.conname,pg_constraint.contype,pg_attribute.attname as pk " +
|
||||||
|
"from pg_constraint " +
|
||||||
|
" inner join pg_class on pg_constraint.conrelid = pg_class.oid" +
|
||||||
|
" inner join pg_attribute on pg_attribute.attrelid = pg_class.oid " +
|
||||||
|
" and pg_attribute.attnum = any(pg_constraint.conkey) where contype='p')" +
|
||||||
|
" b on a.attname=b.pk where e.relname = '%s'" +
|
||||||
|
" and a.attnum > 0 and a.attrelid = e.oid and t.typname is not null ;",tbName);
|
||||||
|
|
||||||
|
return sql;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取添加字段sql
|
||||||
|
*/
|
||||||
|
public static String getAddSTableColumnSql(String tbName, List<TsField> fields) {
|
||||||
|
|
||||||
|
AlterTableStep alterTableStep = sqlBuilder.alterTable(tbName);
|
||||||
|
|
||||||
|
AlterTableFinalStep addStep = null;
|
||||||
|
for (TsField o : fields) {
|
||||||
|
addStep = alterTableStep.add(FieldParser.getFieldDefine(o));
|
||||||
|
}
|
||||||
|
return addStep.getSQL();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取修改字段sql
|
||||||
|
*/
|
||||||
|
public static String getModifySTableColumnSql(String tbName, List<TsField> fields) {
|
||||||
|
AlterTableStep alterTableStep = sqlBuilder.alterTable(tbName);
|
||||||
|
AlterTableFinalStep step = null;
|
||||||
|
for (TsField o : fields) {
|
||||||
|
Field fieldDefine = FieldParser.getFieldDefine(o);
|
||||||
|
step = alterTableStep.alterColumn(o.getName()).set(fieldDefine.getDataType());
|
||||||
|
}
|
||||||
|
return step.getSQL();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取删除字段sql
|
||||||
|
*/
|
||||||
|
public static String getDropSTableColumnSql(String tbName, List<TsField> fields) {
|
||||||
|
|
||||||
|
AlterTableStep alterTableStep = sqlBuilder.alterTable(tbName);
|
||||||
|
|
||||||
|
|
||||||
|
AlterTableFinalStep step = null;
|
||||||
|
for (TsField o : fields) {
|
||||||
|
step = alterTableStep.dropColumnIfExists(FieldParser.getFieldDefine(o));
|
||||||
|
}
|
||||||
|
|
||||||
|
return step.getSQL();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.dm;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.jooq.DataType;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class TsField {
|
||||||
|
private String name;
|
||||||
|
private DataType type;
|
||||||
|
private int length;
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.model;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class TsDeviceProperty {
|
||||||
|
|
||||||
|
private Date time;
|
||||||
|
|
||||||
|
private String deviceId;
|
||||||
|
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
private Object value;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.model;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class TsRuleLog {
|
||||||
|
|
||||||
|
private Date time;
|
||||||
|
|
||||||
|
private String ruleId;
|
||||||
|
|
||||||
|
private String state1;
|
||||||
|
|
||||||
|
private String content;
|
||||||
|
|
||||||
|
private Boolean success;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.model;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class TsTaskLog {
|
||||||
|
|
||||||
|
private Long time;
|
||||||
|
|
||||||
|
private String taskId;
|
||||||
|
|
||||||
|
private String content;
|
||||||
|
|
||||||
|
private Boolean success;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.model;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class TsThingModelMessage {
|
||||||
|
|
||||||
|
private Date time;
|
||||||
|
|
||||||
|
private String mid;
|
||||||
|
|
||||||
|
private String deviceId;
|
||||||
|
|
||||||
|
private String productKey;
|
||||||
|
|
||||||
|
private String deviceName;
|
||||||
|
|
||||||
|
private String uid;
|
||||||
|
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
private String identifier;
|
||||||
|
|
||||||
|
private int code;
|
||||||
|
|
||||||
|
private String data;
|
||||||
|
|
||||||
|
private Long reportTime;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.model;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 统计的时间数据
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class TsTimeData {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 时间
|
||||||
|
*/
|
||||||
|
private Date time;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 数据值
|
||||||
|
*/
|
||||||
|
private Object data;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.model;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class TsVirtualDeviceLog {
|
||||||
|
|
||||||
|
private Date time;
|
||||||
|
|
||||||
|
private String virtualDeviceId;
|
||||||
|
|
||||||
|
private String virtualDeviceName;
|
||||||
|
|
||||||
|
private int deviceTotal;
|
||||||
|
|
||||||
|
private String result;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,207 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.service;
|
||||||
|
|
||||||
|
import cc.iotkit.common.utils.JsonUtil;
|
||||||
|
import cc.iotkit.model.product.ThingModel;
|
||||||
|
import cc.iotkit.temporal.IDbStructureData;
|
||||||
|
import cc.iotkit.temporal.ts.config.Constants;
|
||||||
|
import cc.iotkit.temporal.ts.dao.TsTemplate;
|
||||||
|
import cc.iotkit.temporal.ts.dm.DbField;
|
||||||
|
import cc.iotkit.temporal.ts.dm.FieldParser;
|
||||||
|
import cc.iotkit.temporal.ts.dm.TableManager;
|
||||||
|
import cc.iotkit.temporal.ts.dm.TsField;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jooq.CreateTableColumnStep;
|
||||||
|
import org.jooq.DSLContext;
|
||||||
|
import org.jooq.SQLDialect;
|
||||||
|
import org.jooq.impl.DSL;
|
||||||
|
import org.jooq.impl.SQLDataType;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.jdbc.core.BeanPropertyRowMapper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class DbStructureDataImpl implements IDbStructureData {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TsTemplate tsTemplate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据物模型创建超级表
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void defineThingModel(ThingModel thingModel) {
|
||||||
|
//获取物模型中的属性定义
|
||||||
|
List<TsField> fields = FieldParser.parse(thingModel);
|
||||||
|
String tbName = Constants.getProductPropertySTableName(thingModel.getProductKey());
|
||||||
|
String sql = TableManager.getCreateSTableSql(tbName,
|
||||||
|
fields);
|
||||||
|
if (sql == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
System.out.println(sql);
|
||||||
|
tsTemplate.execute(sql);
|
||||||
|
|
||||||
|
createHypertable(tbName, "device_id");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createHypertable(String tbName, String partitionCol) {
|
||||||
|
String createSTableIndexSql = TableManager.getCreateSTableIndexSql(tbName, partitionCol);
|
||||||
|
try {
|
||||||
|
System.out.println(createSTableIndexSql);
|
||||||
|
|
||||||
|
tsTemplate.execute(createSTableIndexSql);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.info("createHypertable error:{}", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据物模型更新超级表结构
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void updateThingModel(ThingModel thingModel) {
|
||||||
|
//获取旧字段信息
|
||||||
|
String tbName = Constants.getProductPropertySTableName(thingModel.getProductKey());
|
||||||
|
String sql = TableManager.getDescTableSql(tbName);
|
||||||
|
if (sql == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsTemplate.execute(sql);
|
||||||
|
|
||||||
|
List<DbField> fieldsInDb = tsTemplate.query(sql, new BeanPropertyRowMapper<DbField>(DbField.class));
|
||||||
|
|
||||||
|
List<TsField> newFields = FieldParser.parse(thingModel);
|
||||||
|
List<TsField> oldFields = FieldParser.parse(fieldsInDb);
|
||||||
|
|
||||||
|
//对比差异
|
||||||
|
|
||||||
|
//找出修改的字段
|
||||||
|
List<TsField> modifyFields = newFields.stream().filter((f) -> oldFields.stream()
|
||||||
|
.anyMatch(old ->
|
||||||
|
old.getName().equals(f.getName()) //字段名相同
|
||||||
|
//字段类型或长度不同
|
||||||
|
&& (!old.getType().equals(f.getType()) || old.getLength() != f.getLength())
|
||||||
|
))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
if (modifyFields.size() > 0) {
|
||||||
|
sql = TableManager.getModifySTableColumnSql(tbName, modifyFields);
|
||||||
|
log.info("modify column:{}", sql);
|
||||||
|
|
||||||
|
tsTemplate.execute(sql);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//找出新增的字段
|
||||||
|
List<TsField> addFields = newFields.stream().filter((f) -> oldFields.stream()
|
||||||
|
.noneMatch(old -> old.getName().equals(f.getName())))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
if (addFields.size() > 0) {
|
||||||
|
sql = TableManager.getAddSTableColumnSql(tbName, addFields);
|
||||||
|
log.info("add column:{}", sql);
|
||||||
|
|
||||||
|
tsTemplate.execute(sql);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//找出删除的字段
|
||||||
|
List<TsField> dropFields = oldFields.stream().filter((f) ->
|
||||||
|
!"time".equals(f.getName()) &&
|
||||||
|
!"device_id".equals(f.getName()) && newFields.stream()
|
||||||
|
//字段名不是time且没有相同字段名的
|
||||||
|
.noneMatch(n -> n.getName().equals(f.getName())))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
if (dropFields.size() > 0) {
|
||||||
|
|
||||||
|
sql = TableManager.getDropSTableColumnSql(tbName, dropFields);
|
||||||
|
log.info("drop column:{}", sql);
|
||||||
|
tsTemplate.execute(sql);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 初始化其它数据结构
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
@PostConstruct
|
||||||
|
public void initDbStructure() {
|
||||||
|
//创建规则日志表
|
||||||
|
DSLContext dslBuilder = DSL.using(SQLDialect.POSTGRES);
|
||||||
|
|
||||||
|
CreateTableColumnStep ruleLogStep = dslBuilder.createTableIfNotExists("rule_log")
|
||||||
|
.column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false))
|
||||||
|
.column("state1", SQLDataType.INTEGER)
|
||||||
|
.column("content", SQLDataType.VARCHAR(1024))
|
||||||
|
.column("success", SQLDataType.BOOLEAN)
|
||||||
|
.column("rule_id", SQLDataType.VARCHAR(50).nullable(false));
|
||||||
|
String sql = ruleLogStep.getSQL();
|
||||||
|
System.out.println(sql);
|
||||||
|
tsTemplate.execute(sql);
|
||||||
|
// 按时间和rule_id分区
|
||||||
|
createHypertable("rule_log", "rule_id");
|
||||||
|
|
||||||
|
|
||||||
|
CreateTableColumnStep taskLogStep = dslBuilder.createTableIfNotExists("task_log")
|
||||||
|
.column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false))
|
||||||
|
.column("content", SQLDataType.VARCHAR(1024))
|
||||||
|
.column("success", SQLDataType.BOOLEAN)
|
||||||
|
.column("task_id", SQLDataType.VARCHAR(50).nullable(false));
|
||||||
|
String taskLogsql = taskLogStep.getSQL();
|
||||||
|
|
||||||
|
System.out.println(taskLogsql);
|
||||||
|
tsTemplate.execute(taskLogsql);
|
||||||
|
// 按时间和task_id分区
|
||||||
|
createHypertable("task_log", "task_id");
|
||||||
|
|
||||||
|
CreateTableColumnStep thingModelStep = dslBuilder.createTableIfNotExists("thing_model_message")
|
||||||
|
.column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false))
|
||||||
|
.column("mid", SQLDataType.NCHAR(50))
|
||||||
|
.column("product_key", SQLDataType.NCHAR(50))
|
||||||
|
.column("device_name", SQLDataType.NCHAR(50))
|
||||||
|
.column("uid", SQLDataType.NCHAR(50))
|
||||||
|
.column("type", SQLDataType.NCHAR(20))
|
||||||
|
.column("identifier", SQLDataType.NVARCHAR(50))
|
||||||
|
.column("code", SQLDataType.INTEGER)
|
||||||
|
.column("data", SQLDataType.NVARCHAR(1024))
|
||||||
|
.column("report_time", SQLDataType.NVARCHAR(1024))
|
||||||
|
.column("device_id", SQLDataType.NCHAR(50));
|
||||||
|
|
||||||
|
String thingModelsql = thingModelStep.getSQL();
|
||||||
|
|
||||||
|
System.out.println(thingModelsql);
|
||||||
|
tsTemplate.execute(thingModelsql);
|
||||||
|
createHypertable("thing_model_message", "device_id");
|
||||||
|
|
||||||
|
//创建虚拟设备日志表
|
||||||
|
CreateTableColumnStep virtualStep = dslBuilder.createTableIfNotExists("virtual_device_log")
|
||||||
|
.column("time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false))
|
||||||
|
.column("virtual_device_name", SQLDataType.NCHAR(50))
|
||||||
|
.column("device_total", SQLDataType.INTEGER)
|
||||||
|
.column("result", SQLDataType.NVARCHAR(1024))
|
||||||
|
.column("virtual_device_id", SQLDataType.NCHAR(50));
|
||||||
|
|
||||||
|
String virtualsql = virtualStep.getSQL();
|
||||||
|
System.out.println(virtualsql);
|
||||||
|
|
||||||
|
tsTemplate.execute(virtualsql);
|
||||||
|
createHypertable("virtual_device_log", "virtual_device_id");
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.service;
|
||||||
|
|
||||||
|
import cc.iotkit.data.IDeviceInfoData;
|
||||||
|
import cc.iotkit.model.device.DeviceInfo;
|
||||||
|
import cc.iotkit.model.device.message.DeviceProperty;
|
||||||
|
import cc.iotkit.temporal.IDevicePropertyData;
|
||||||
|
import cc.iotkit.temporal.ts.config.Constants;
|
||||||
|
import cc.iotkit.temporal.ts.dao.TsTemplate;
|
||||||
|
import cc.iotkit.temporal.ts.model.TsDeviceProperty;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jooq.Condition;
|
||||||
|
import org.jooq.Field;
|
||||||
|
import org.jooq.InsertValuesStepN;
|
||||||
|
import org.jooq.Record;
|
||||||
|
import org.jooq.conf.ParamType;
|
||||||
|
import org.jooq.impl.DSL;
|
||||||
|
import org.springframework.beans.BeanUtils;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.jdbc.core.BeanPropertyRowMapper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.jooq.impl.DSL.field;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class DevicePropertyDataImpl implements IDevicePropertyData {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TsTemplate tsTemplate;
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("deviceInfoDataCache")
|
||||||
|
private IDeviceInfoData deviceInfoData;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<DeviceProperty> findDevicePropertyHistory(String deviceId, String name, long start, long end) {
|
||||||
|
DeviceInfo device = deviceInfoData.findByDeviceId(deviceId);
|
||||||
|
|
||||||
|
String tbName = Constants.getProductPropertySTableName(device.getProductKey());
|
||||||
|
Condition con = field("time").greaterOrEqual(new Date(start)).and(field("time").lessOrEqual(new Date(end)))
|
||||||
|
.and(DSL.field("device_id").eq(deviceId));
|
||||||
|
String sql = DSL.select(DSL.field("time"), DSL.field("device_id"), DSL.field(name.toLowerCase()).as("value"))
|
||||||
|
.from(tbName).where(con)
|
||||||
|
.getSQL(ParamType.INLINED);
|
||||||
|
|
||||||
|
|
||||||
|
List<TsDeviceProperty> list = tsTemplate.query(sql, new BeanPropertyRowMapper<>(TsDeviceProperty.class));
|
||||||
|
|
||||||
|
|
||||||
|
return list.stream().map(
|
||||||
|
o->{
|
||||||
|
DeviceProperty deviceProperty = new DeviceProperty();
|
||||||
|
BeanUtils.copyProperties(o,deviceProperty);
|
||||||
|
deviceProperty.setTime(o.getTime().getTime());
|
||||||
|
return deviceProperty;
|
||||||
|
}
|
||||||
|
).collect(Collectors.toList());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addProperties(String deviceId, Map<String, Object> properties, long time) {
|
||||||
|
DeviceInfo device = deviceInfoData.findByDeviceId(deviceId);
|
||||||
|
if (device == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//获取设备旧属性
|
||||||
|
Map<String, Object> oldProperties = deviceInfoData.getProperties(deviceId);
|
||||||
|
//用新属性覆盖
|
||||||
|
oldProperties.putAll(properties);
|
||||||
|
|
||||||
|
List<Field<Object>> fields = new ArrayList<>();
|
||||||
|
List<Object> values = new ArrayList<>();
|
||||||
|
|
||||||
|
fields.add(DSL.field("time"));
|
||||||
|
fields.add(DSL.field("device_id"));
|
||||||
|
values.add(new Date(time));
|
||||||
|
values.add(deviceId);
|
||||||
|
//组织sql
|
||||||
|
oldProperties.forEach((key, val) -> {
|
||||||
|
fields.add(DSL.field(key));
|
||||||
|
values.add(val);
|
||||||
|
});
|
||||||
|
String tbName = Constants.getProductPropertySTableName(device.getProductKey());
|
||||||
|
|
||||||
|
//组织sql
|
||||||
|
InsertValuesStepN<Record> step = DSL.insertInto(DSL.table(tbName), (Collection<? extends Field<Object>>) fields).values(values);
|
||||||
|
String sql = step.getSQL(ParamType.INLINED);
|
||||||
|
tsTemplate.batchUpdate(sql);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,80 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.service;
|
||||||
|
|
||||||
|
import cc.iotkit.model.Paging;
|
||||||
|
import cc.iotkit.model.rule.RuleLog;
|
||||||
|
import cc.iotkit.temporal.IRuleLogData;
|
||||||
|
import cc.iotkit.temporal.ts.dao.TsTemplate;
|
||||||
|
//import cc.iotkit.temporal.ts.dm.TableManager;
|
||||||
|
import cc.iotkit.temporal.ts.dm.TableManager;
|
||||||
|
import cc.iotkit.temporal.ts.model.TsRuleLog;
|
||||||
|
import org.jooq.*;
|
||||||
|
import org.jooq.conf.ParamType;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.jdbc.core.BeanPropertyRowMapper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.jooq.impl.DSL.*;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class RuleLogDataImpl implements IRuleLogData {
|
||||||
|
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TsTemplate tsTemplate;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteByRuleId(String ruleId) {
|
||||||
|
|
||||||
|
tsTemplate.update("delete from rule_log where rule_id=?", ruleId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Paging<RuleLog> findByRuleId(String ruleId, int page, int size) {
|
||||||
|
|
||||||
|
SelectForUpdateStep<Record5<Object, Object, Object, Object, Object>> sqlStep = TableManager.getSqlBuilder()
|
||||||
|
.select(field("time"), field("state1"), field("content"), field("success"),
|
||||||
|
field("rule_id"))
|
||||||
|
.from(table("rule_log"))
|
||||||
|
.where(field("rule_id").eq(ruleId))
|
||||||
|
.orderBy(field("time").desc())
|
||||||
|
.limit(size)
|
||||||
|
.offset((page - 1) * size);
|
||||||
|
List<TsRuleLog> ruleLogs = tsTemplate.query(sqlStep.getSQL(ParamType.INLINED), new BeanPropertyRowMapper<>(TsRuleLog.class), ruleId);
|
||||||
|
|
||||||
|
SelectConditionStep<Record1<Integer>> where = TableManager.getSqlBuilder().selectCount().from(table("rule_log"))
|
||||||
|
.where(field("rule_id").eq(ruleId));
|
||||||
|
Long count = tsTemplate.queryForObject(where.getSQL(ParamType.INLINED), Long.class);
|
||||||
|
|
||||||
|
return new Paging<>(count, ruleLogs.stream().map(r ->
|
||||||
|
new RuleLog(r.getTime().toString(), ruleId, r.getState1(),
|
||||||
|
r.getContent(), r.getSuccess(), r.getTime().getTime()))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(RuleLog log) {
|
||||||
|
//使用
|
||||||
|
|
||||||
|
InsertValuesStep5<Record, Object, Object, Object, Object, Object> sqlStep = TableManager.getSqlBuilder().insertInto(table("rule_log"),
|
||||||
|
field("time"),
|
||||||
|
field("rule_id"),
|
||||||
|
field("state1"),
|
||||||
|
field("content"), field("success")).values(new Date(),
|
||||||
|
log.getRuleId(), log.getState(), log.getContent(), log.getSuccess());
|
||||||
|
|
||||||
|
tsTemplate.update(sqlStep.getSQL(ParamType.INLINED));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.service;
|
||||||
|
|
||||||
|
import cc.iotkit.model.Paging;
|
||||||
|
import cc.iotkit.model.rule.TaskLog;
|
||||||
|
import cc.iotkit.temporal.ITaskLogData;
|
||||||
|
import cc.iotkit.temporal.ts.dao.TsTemplate;
|
||||||
|
//import cc.iotkit.temporal.ts.dm.TableManager;
|
||||||
|
import cc.iotkit.temporal.ts.dm.TableManager;
|
||||||
|
import cc.iotkit.temporal.ts.model.TsTaskLog;
|
||||||
|
import cc.iotkit.temporal.ts.dao.TsTemplate;
|
||||||
|
import org.jooq.*;
|
||||||
|
import org.jooq.conf.ParamType;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.jdbc.core.BeanPropertyRowMapper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.jooq.impl.DSL.field;
|
||||||
|
import static org.jooq.impl.DSL.table;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class TaskLogDataImpl implements ITaskLogData {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TsTemplate tsTemplate;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteByTaskId(String taskId) {
|
||||||
|
tsTemplate.update("delete from task_log where task_id=?", taskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Paging<TaskLog> findByTaskId(String taskId, int page, int size) {
|
||||||
|
SelectForUpdateStep<Record4<Object, Object, Object, Object>> sqlStep = TableManager.getSqlBuilder()
|
||||||
|
.select(field("time"), field("content"), field("success"), field("task_id"))
|
||||||
|
.from(table("task_log"))
|
||||||
|
.where(field("task_id").eq(taskId))
|
||||||
|
.orderBy(field("time").desc())
|
||||||
|
.limit(size)
|
||||||
|
.offset((page - 1) * size);
|
||||||
|
|
||||||
|
// Get the SQL string from the query
|
||||||
|
String sql = sqlStep.getSQL(ParamType.INLINED);
|
||||||
|
List<TsTaskLog> taskLogs = tsTemplate.query(sql, new BeanPropertyRowMapper<>(TsTaskLog.class));
|
||||||
|
|
||||||
|
String whereSql = TableManager.getSqlBuilder().selectCount().from(table("task_log"))
|
||||||
|
.where(field("task_id").eq(taskId)).getSQL(ParamType.INLINED);
|
||||||
|
Long count = tsTemplate.queryForObject(whereSql, new BeanPropertyRowMapper<>(Long.class));
|
||||||
|
|
||||||
|
return new Paging<>(count , taskLogs.stream().map(r ->
|
||||||
|
new TaskLog(r.getTime().toString(), taskId,
|
||||||
|
r.getContent(), r.getSuccess(), r.getTime()))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(TaskLog log) {
|
||||||
|
|
||||||
|
InsertValuesStep4<Record, Object, Object, Object, Object> sqlStep
|
||||||
|
= TableManager.getSqlBuilder().insertInto(table("tag_log"),
|
||||||
|
field("time"),
|
||||||
|
field("task_id"),
|
||||||
|
field("content"), field("success")).values(
|
||||||
|
new Date(),
|
||||||
|
log.getTaskId(),
|
||||||
|
log.getContent(), log.getSuccess());
|
||||||
|
|
||||||
|
tsTemplate.update(sqlStep.getSQL(ParamType.INLINED));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,134 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.service;
|
||||||
|
|
||||||
|
import cc.iotkit.common.utils.JsonUtil;
|
||||||
|
import cc.iotkit.model.Paging;
|
||||||
|
import cc.iotkit.model.device.message.ThingModelMessage;
|
||||||
|
import cc.iotkit.model.stats.TimeData;
|
||||||
|
import cc.iotkit.temporal.IThingModelMessageData;
|
||||||
|
import cc.iotkit.temporal.ts.dao.TsTemplate;
|
||||||
|
import cc.iotkit.temporal.ts.dm.TableManager;
|
||||||
|
import cc.iotkit.temporal.ts.model.TsThingModelMessage;
|
||||||
|
import cc.iotkit.temporal.ts.model.TsTimeData;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.jooq.*;
|
||||||
|
import org.jooq.conf.ParamType;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.jdbc.core.BeanPropertyRowMapper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.jooq.impl.DSL.field;
|
||||||
|
import static org.jooq.impl.DSL.table;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class ThingModelMessageDataImpl implements IThingModelMessageData {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TsTemplate tsTemplate;
|
||||||
|
|
||||||
|
public Paging<ThingModelMessage> findByTypeAndIdentifier(String deviceId, String type,
|
||||||
|
String identifier,
|
||||||
|
int page, int size) {
|
||||||
|
|
||||||
|
|
||||||
|
Table<Record> table = table("thing_model_message");
|
||||||
|
Condition whereConditions = field("device_id").eq(deviceId);
|
||||||
|
SelectJoinStep<Record9<Object, Object, Object, Object, Object, Object, Object, Object, Object>> step = TableManager.getSqlBuilder().select(field("time"), field("mid"),
|
||||||
|
field("product_key"), field("device_name"), field("type"),
|
||||||
|
field("identifier"), field("code"), field("data"),
|
||||||
|
field("report_time")).from(table);
|
||||||
|
|
||||||
|
|
||||||
|
if (StringUtils.isNotBlank(type)) {
|
||||||
|
whereConditions.and(field("type").eq(type));
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotBlank(identifier)) {
|
||||||
|
whereConditions.and(field("identifier").eq(identifier));
|
||||||
|
}
|
||||||
|
|
||||||
|
String sql = step.where(whereConditions).orderBy(field("time").desc()).limit(size).offset((page - 1) * size).getSQL(ParamType.INLINED);
|
||||||
|
|
||||||
|
List<TsThingModelMessage> ruleLogs = tsTemplate.query(sql,
|
||||||
|
new BeanPropertyRowMapper<>(TsThingModelMessage.class)
|
||||||
|
);
|
||||||
|
|
||||||
|
String countSql = TableManager.getSqlBuilder().selectCount().from(table).where(whereConditions).getSQL(ParamType.INLINED);
|
||||||
|
Long count = tsTemplate.queryForObject(countSql, Long.class);
|
||||||
|
|
||||||
|
return new Paging<>(count, ruleLogs.stream().map(r ->
|
||||||
|
new ThingModelMessage(r.getTime().toString(), r.getMid(),
|
||||||
|
deviceId, r.getProductKey(), r.getDeviceName(),
|
||||||
|
r.getUid(), r.getType(), r.getIdentifier(), r.getCode(),
|
||||||
|
JsonUtil.parse(r.getData(), Map.class),
|
||||||
|
r.getTime().getTime(), r.getReportTime()))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<TimeData> getDeviceMessageStatsWithUid(String uid, long start, long end) {
|
||||||
|
|
||||||
|
Table<Record> table = table("thing_model_message");
|
||||||
|
|
||||||
|
Condition con = field("time").greaterOrEqual(new Date(start)).and(field("time").lessOrEqual(new Date(end)));
|
||||||
|
if(StringUtils.isNotBlank(uid)){
|
||||||
|
con.and(field("uid").eq(uid));
|
||||||
|
}
|
||||||
|
|
||||||
|
String sql = TableManager.getSqlBuilder().select(field("date_trunc('hour', \"time\")").as("time"),field("count(*)").as("data"))
|
||||||
|
.from(table).where(con).groupBy(field("date_trunc('hour', \"time\")")).orderBy(field("time").asc()).getSQL(ParamType.INLINED);
|
||||||
|
|
||||||
|
|
||||||
|
List<TsTimeData> query = tsTemplate.query(sql, new BeanPropertyRowMapper<>(TsTimeData.class));
|
||||||
|
return query.stream().map(o -> {
|
||||||
|
TimeData timeData = new TimeData();
|
||||||
|
timeData.setData(o.getData());
|
||||||
|
timeData.setTime(o.getTime().getTime());
|
||||||
|
return timeData;
|
||||||
|
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(ThingModelMessage msg) {
|
||||||
|
Table<Record> table = table("thing_model_message");
|
||||||
|
|
||||||
|
String sql = TableManager.getSqlBuilder().insertInto(table,
|
||||||
|
field("time"),
|
||||||
|
field("device_id"),
|
||||||
|
field("mid"),
|
||||||
|
field("product_key"),
|
||||||
|
field("device_name"),
|
||||||
|
field("uid"),
|
||||||
|
field("type"),
|
||||||
|
field("identifier"),
|
||||||
|
field("code"),
|
||||||
|
field("data"), field("report_time"))
|
||||||
|
.values(new Date(msg.getOccurred()), msg.getDeviceId(), msg.getMid(),
|
||||||
|
msg.getProductKey(), msg.getDeviceName(),
|
||||||
|
msg.getUid(), msg.getType(),
|
||||||
|
msg.getIdentifier(), msg.getCode(),
|
||||||
|
msg.getData() == null ? "{}" : JsonUtil.toJsonString(msg.getData()),
|
||||||
|
msg.getTime()).getSQL(ParamType.INLINED);
|
||||||
|
tsTemplate.update(sql);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long count() {
|
||||||
|
List<Long> counts = tsTemplate.queryForList("select count(*) from thing_model_message", Long.class);
|
||||||
|
return counts.size() > 0 ? counts.get(0) : 0;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,74 @@
|
||||||
|
/*
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Copyright (c) 奇特物联 2021-2022 All rights reserved.
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Licensed 未经许可不能去掉「奇特物联」相关版权
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
* | Author: xw2sy@163.com
|
||||||
|
* +----------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
package cc.iotkit.temporal.ts.service;
|
||||||
|
|
||||||
|
import cc.iotkit.model.Paging;
|
||||||
|
import cc.iotkit.model.device.VirtualDeviceLog;
|
||||||
|
import cc.iotkit.temporal.IVirtualDeviceLogData;
|
||||||
|
import cc.iotkit.temporal.ts.dao.TsTemplate;
|
||||||
|
import cc.iotkit.temporal.ts.dm.TableManager;
|
||||||
|
import cc.iotkit.temporal.ts.model.TsVirtualDeviceLog;
|
||||||
|
import org.jooq.*;
|
||||||
|
import org.jooq.conf.ParamType;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.jdbc.core.BeanPropertyRowMapper;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.jooq.impl.DSL.field;
|
||||||
|
import static org.jooq.impl.DSL.table;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class VirtualDeviceLogDataImpl implements IVirtualDeviceLogData {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TsTemplate tsTemplate;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Paging<VirtualDeviceLog> findByVirtualDeviceId(String virtualDeviceId, int page, int size) {
|
||||||
|
|
||||||
|
Table<Record> table = table("virtual_device_log");
|
||||||
|
|
||||||
|
Condition whereConditions = field("virtual_device_id").eq(virtualDeviceId.toLowerCase());
|
||||||
|
DSLContext sqlBuilder = TableManager.getSqlBuilder();
|
||||||
|
String sql = sqlBuilder.select(field("time"), field("virtual_device_id"),
|
||||||
|
field("virtual_device_name"), field("device_total"), field("result")).from(table).where(whereConditions)
|
||||||
|
.orderBy(field("time").desc()).limit(size).offset((page - 1) * size).getSQL(ParamType.INLINED);
|
||||||
|
|
||||||
|
List<TsVirtualDeviceLog> logs = tsTemplate.query(sql, new BeanPropertyRowMapper<>(TsVirtualDeviceLog.class));
|
||||||
|
|
||||||
|
String countSql = sqlBuilder.selectCount().from(table).where(whereConditions).getSQL(ParamType.INLINED);
|
||||||
|
|
||||||
|
Long count = tsTemplate.queryForObject(countSql, Long.class);
|
||||||
|
|
||||||
|
return new Paging<>(count, logs.stream().map(r ->
|
||||||
|
new VirtualDeviceLog(r.getTime().toString(), virtualDeviceId,
|
||||||
|
r.getVirtualDeviceName(),
|
||||||
|
r.getDeviceTotal(), r.getResult(),
|
||||||
|
r.getTime().getTime()))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(VirtualDeviceLog log) {
|
||||||
|
Table<Record> table = table("virtual_device_log");
|
||||||
|
|
||||||
|
String sql = TableManager.getSqlBuilder().insertInto(table, field("time"), field("virtual_device_id"),
|
||||||
|
field("virtual_device_name"),
|
||||||
|
field("device_total"), field("result"))
|
||||||
|
.values(new Date(), log.getVirtualDeviceId(), log.getVirtualDeviceName(),
|
||||||
|
log.getDeviceTotal(), log.getResult()).getSQL(ParamType.INLINED);
|
||||||
|
|
||||||
|
tsTemplate.update(sql);
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
<module>iot-es-temporal-service</module>
|
<module>iot-es-temporal-service</module>
|
||||||
<module>iot-rdb-data-service</module>
|
<module>iot-rdb-data-service</module>
|
||||||
<module>iot-td-temporal-service</module>
|
<module>iot-td-temporal-service</module>
|
||||||
|
<module>iot-ts-temporal-service</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
<artifactId>iot-data</artifactId>
|
<artifactId>iot-data</artifactId>
|
||||||
|
|
|
@ -164,6 +164,12 @@
|
||||||
<artifactId>iot-es-temporal-service</artifactId>
|
<artifactId>iot-es-temporal-service</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!--打开注释 启用timescale数据库-->
|
||||||
|
<!-- <dependency>-->
|
||||||
|
<!-- <groupId>cc.iotkit</groupId>-->
|
||||||
|
<!-- <artifactId>iot-ts-temporal-service</artifactId>-->
|
||||||
|
<!-- </dependency>-->
|
||||||
|
|
||||||
<!--打开注释 启用tdengine数据库-->
|
<!--打开注释 启用tdengine数据库-->
|
||||||
<!-- <dependency>-->
|
<!-- <dependency>-->
|
||||||
<!-- <groupId>cc.iotkit</groupId>-->
|
<!-- <groupId>cc.iotkit</groupId>-->
|
||||||
|
|
5
pom.xml
5
pom.xml
|
@ -316,6 +316,11 @@
|
||||||
<artifactId>iot-td-temporal-service</artifactId>
|
<artifactId>iot-td-temporal-service</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>cc.iotkit</groupId>
|
||||||
|
<artifactId>iot-ts-temporal-service</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>cc.iotkit</groupId>
|
<groupId>cc.iotkit</groupId>
|
||||||
|
|
Loading…
Reference in New Issue