协议网关修改

V0.5.x
xiwa 2022-03-23 18:58:29 +08:00
parent 55154d615f
commit 5c4694b080
43 changed files with 848 additions and 243 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -2,7 +2,7 @@ package cc.iotkit.common;
public interface Constants {
String MQTT_SECRET = "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU";
String PRODUCT_SECRET = "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU";
String ACCOUNT_SECRET = "3n1z33kzvpgz1foijpkepyd3e8tw84us";

View File

@ -36,7 +36,7 @@ public class Gateway extends Device {
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(this.deviceName);
connOpts.setPassword(DigestUtils.md5Hex(Constants.MQTT_SECRET + clientId).toCharArray());
connOpts.setPassword(DigestUtils.md5Hex(Constants.PRODUCT_SECRET + clientId).toCharArray());
// 保留会话
connOpts.setCleanSession(true);

View File

@ -53,7 +53,7 @@ public class MqttAuthController {
return false;
}
clientId = clientId.replaceFirst("su_", "");
return CodecUtil.aesDecrypt(clientId, Constants.MQTT_SECRET).startsWith("admin_");
return CodecUtil.aesDecrypt(clientId, Constants.PRODUCT_SECRET).startsWith("admin_");
} catch (Throwable e) {
log.error("aesDecrypt error.", e);
return false;

View File

@ -24,7 +24,7 @@ public class DeviceAuthService {
String clientId = auth.getClientid();
String[] pkDnAndModel = getPkDnAndModel(clientId);
String hmac = DigestUtils.md5Hex(Constants.MQTT_SECRET + clientId);
String hmac = DigestUtils.md5Hex(Constants.PRODUCT_SECRET + clientId);
if (!hmac.equalsIgnoreCase(auth.getPassword())) {
throw new RuntimeException("password is illegal.");
}

View File

@ -73,7 +73,7 @@ public class MqttManager implements MqttCallback, IMqttMessageListener {
if (mqttClient == null) {
MemoryPersistence persistence = new MemoryPersistence();
String clientId = "mqtt-server-consumer-" + env;
clientId = "su_" + CodecUtil.aesEncrypt("admin_" + clientId, Constants.MQTT_SECRET);
clientId = "su_" + CodecUtil.aesEncrypt("admin_" + clientId, Constants.PRODUCT_SECRET);
mqttClient = new MqttClient(url, clientId, persistence);
mqttClient.setCallback(this);
}

View File

@ -149,6 +149,16 @@
<artifactId>protocol-server</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>component-server</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>mqtt-component</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -2,6 +2,9 @@ package cc.iotkit.manager.controller;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.common.utils.ReflectUtil;
import cc.iotkit.comp.mqtt.MqttComponent;
import cc.iotkit.comps.ComponentManager;
import cc.iotkit.converter.ScriptConverter;
import cc.iotkit.dao.ProtocolGatewayRepository;
import cc.iotkit.dao.UserInfoRepository;
import cc.iotkit.manager.service.DataOwnerService;
@ -18,6 +21,7 @@ import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.web.bind.annotation.*;
import javax.annotation.PostConstruct;
import java.util.Optional;
@Slf4j
@ -40,6 +44,9 @@ public class ProtocolController {
@Autowired
private UserInfoRepository userInfoRepository;
@Autowired
private ComponentManager componentManager;
@PostMapping("/addGateway")
public void addGateway(ProtocolGateway gateway) {
Optional<ProtocolGateway> optGateway = gatewayRepository.findById(gateway.getId());
@ -123,4 +130,13 @@ public class ProtocolController {
return new Paging<>(gateways.getTotalElements(), gateways.getContent());
}
@PostConstruct
public void init() {
MqttComponent component = new MqttComponent();
ScriptConverter converter = new ScriptConverter();
converter.setScript("");
component.setConverter(converter);
componentManager.register("123", component);
componentManager.start("123", "");
}
}

14
pom.xml
View File

@ -12,8 +12,6 @@
<module>dao</module>
<module>tppa-server</module>
<module>protocol-gateway</module>
<module>protocol-gateway/mqtt-component</module>
<module>protocol-gateway/component</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
@ -253,6 +251,18 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>component-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>mqtt-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

BIN
protocol-gateway/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iotkit-parent</artifactId>
<groupId>cc.iotkit</groupId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>component-server</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>common</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>converter</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>model</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>dao</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>component</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,47 @@
package cc.iotkit.comps;
import cc.iotkit.comp.IComponent;
import cc.iotkit.comps.service.DeviceBehaviourService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class ComponentManager {
private final Map<String, IComponent> components = new HashMap<>();
@Autowired
private DeviceBehaviourService deviceBehaviourService;
public void register(String id, IComponent component) {
components.put(id, component);
}
public void deRegister(String id) {
IComponent component = components.remove(id);
component.destroy();
}
public void start(String id, String script) {
IComponent component = components.get(id);
if (component == null) {
return;
}
component.setHandler(new MessageHandler(script, component.getConverter(),
deviceBehaviourService));
component.start();
}
public void stop(String id) {
IComponent component = components.get(id);
if (component == null) {
return;
}
component.stop();
}
}

View File

@ -0,0 +1,132 @@
package cc.iotkit.comps;
import cc.iotkit.comp.IMessageHandler;
import cc.iotkit.comp.model.DeviceMessage;
import cc.iotkit.comps.model.AuthInfo;
import cc.iotkit.comps.model.DeviceState;
import cc.iotkit.comps.model.RegisterInfo;
import cc.iotkit.comps.service.DeviceBehaviourService;
import cc.iotkit.converter.IConverter;
import jdk.nashorn.api.scripting.NashornScriptEngine;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.util.Map;
@Slf4j
@Data
public class MessageHandler implements IMessageHandler {
private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn");
private final String script;
private final IConverter converter;
private final DeviceBehaviourService deviceBehaviourService;
@SneakyThrows
public MessageHandler(String script, IConverter converter,
DeviceBehaviourService deviceBehaviourService) {
this.script = script;
this.converter = converter;
this.deviceBehaviourService = deviceBehaviourService;
engine.eval(script);
}
public void register(Map<String, Object> head, String msg) {
}
public void auth(Map<String, Object> head, String msg) {
}
public void state(Map<String, Object> head, String msg) {
}
public void onReceive(Map<String, Object> head, String type, String msg) {
try {
ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeFunction("onReceive", head, type, msg);
Object rstType = result.get("type");
if (rstType == null) {
return;
}
//取脚本执行后返回的数据
Object data = result.get("data");
if (!(data instanceof Map)) {
return;
}
Map<String, Object> dataMap = (Map) data;
if ("register".equals(rstType)) {
//注册数据
RegisterInfo regInfo = new RegisterInfo();
BeanUtils.populate(regInfo, dataMap);
doRegister(regInfo);
} else if ("auth".equals(rstType)) {
//设备认证
AuthInfo authInfo = new AuthInfo();
BeanUtils.populate(authInfo, dataMap);
doAuth(authInfo);
} else if ("state".equals(rstType)) {
//设备状态变更
DeviceState state = new DeviceState();
BeanUtils.populate(state, dataMap);
doStateChange(state);
} else if ("report".equals(rstType)) {
//上报数据
DeviceMessage message = new DeviceMessage();
BeanUtils.populate(message, dataMap);
doReport(message);
}
} catch (Throwable e) {
log.error("onReceive error", e);
}
}
private void doRegister(RegisterInfo reg) throws ScriptException, NoSuchMethodException {
try {
deviceBehaviourService.register(reg);
engine.invokeFunction("onRegistered", reg, true);
} catch (Throwable e) {
log.error("register error", e);
engine.invokeFunction("onRegistered", reg, false);
}
}
private void doAuth(AuthInfo auth) throws ScriptException, NoSuchMethodException {
try {
deviceBehaviourService.deviceAuth(auth.getProductKey(),
auth.getDeviceName(),
auth.getProductSecret(),
auth.getDeviceSecret());
engine.invokeFunction("onAuthed", auth, true);
} catch (Throwable e) {
log.error("device auth error", e);
engine.invokeFunction("onAuthed", auth, false);
}
}
private void doStateChange(DeviceState state) {
try {
deviceBehaviourService.deviceStateChange(state.getProductKey(),
state.getDeviceName(),
DeviceState.STATE_ONLINE.equals(state.getState()));
} catch (Throwable e) {
log.error("device state change error", e);
}
}
private void doReport(DeviceMessage message) {
try {
deviceBehaviourService.reportMessage(message);
} catch (Throwable e) {
log.error("report device message error", e);
}
}
}

View File

@ -1,4 +1,4 @@
package cc.iotkit.protocol.server.config;
package cc.iotkit.comps.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;

View File

@ -0,0 +1,15 @@
package cc.iotkit.comps.model;
import lombok.Data;
@Data
public class AuthInfo {
private String productKey;
private String deviceName;
private String productSecret;
private String deviceSecret;
}

View File

@ -0,0 +1,17 @@
package cc.iotkit.comps.model;
import lombok.Data;
@Data
public class DeviceState {
public static final String STATE_ONLINE = "online";
public static final String STATE_OFFLINE = "offline";
private String productKey;
private String deviceName;
private String state;
}

View File

@ -0,0 +1,49 @@
package cc.iotkit.comps.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
/**
*
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RegisterInfo {
private String productKey;
private String deviceName;
private String model;
private Map<String, Object> tag;
private List<SubDevice> subDevices;
public RegisterInfo(String productKey, String deviceName, String model) {
this.productKey = productKey;
this.deviceName = deviceName;
this.model = model;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class SubDevice {
private String productKey;
private String deviceName;
private String model;
private Map<String, Object> tag;
}
}

View File

@ -0,0 +1,188 @@
package cc.iotkit.comps.service;
import cc.iotkit.common.Constants;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.common.utils.JsonUtil;
import cc.iotkit.comp.model.DeviceMessage;
import cc.iotkit.comps.config.ServerConfig;
import cc.iotkit.comps.model.RegisterInfo;
import cc.iotkit.dao.DeviceRepository;
import cc.iotkit.dao.ProductRepository;
import cc.iotkit.model.device.DeviceInfo;
import cc.iotkit.model.product.Product;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Slf4j
@Service
public class DeviceBehaviourService {
@Autowired
private ProductRepository productRepository;
@Autowired
private DeviceRepository deviceRepository;
@Autowired
private ServerConfig serverConfig;
private Producer<DeviceMessage> deviceMessageProducer;
@PostConstruct
public void init() throws PulsarClientException {
//初始化pulsar客户端
PulsarClient client = PulsarClient.builder()
.serviceUrl(serverConfig.getPulsarBrokerUrl())
.build();
deviceMessageProducer = client.newProducer(JSONSchema.of(DeviceMessage.class))
.topic("persistent://public/default/device_raw")
.create();
}
public void register(RegisterInfo info) {
try {
DeviceInfo deviceInfo = register(null, info);
//子设备注册
List<RegisterInfo.SubDevice> subDevices = info.getSubDevices();
if (subDevices != null && subDevices.size() != 0) {
for (RegisterInfo.SubDevice subDevice : subDevices) {
register(deviceInfo.getDeviceId(),
new RegisterInfo(subDevice.getProductKey(),
subDevice.getDeviceName(),
subDevice.getModel(),
subDevice.getTag(), null));
}
}
//todo 产生设备注册事件
} catch (BizException e) {
log.error("register device error", e);
throw e;
} catch (Throwable e) {
log.error("register device error", e);
throw new BizException("register device error", e);
}
}
public DeviceInfo register(String parentId, RegisterInfo info) {
String pk = info.getProductKey();
Optional<Product> optProduct = productRepository.findById(pk);
if (!optProduct.isPresent()) {
throw new BizException("Product does not exist");
}
String uid = optProduct.get().getUid();
DeviceInfo device = deviceRepository.findByProductKeyAndDeviceName(pk, info.getDeviceName());
if (device != null) {
//更新设备信息
device.setParentId(parentId);
device.setUid(uid);
Map<String, Object> tag = info.getTag();
Map<String, Object> oldTag = device.getTag();
if (oldTag == null) {
oldTag = new HashMap<>();
}
if (tag != null) {
oldTag.putAll(tag);
}
device.setTag(oldTag);
} else {
//不存在,注册新设备
device = new DeviceInfo();
device.setParentId(parentId);
device.setUid(uid);
device.setDeviceId(newDeviceId(info.getDeviceName()));
device.setProductKey(info.getProductKey());
device.setDeviceName(info.getDeviceName());
device.setTag(info.getTag());
device.setState(new DeviceInfo.State(false, null, null));
device.setCreateAt(System.currentTimeMillis());
}
deviceRepository.save(device);
log.info("device registered:{}", JsonUtil.toJsonString(device));
return device;
}
/**
* 1-13
* 14-29 deviceNae16016mac1616
* 30-31 mac2
* 32 0-f
*/
public static String newDeviceId(String deviceNae) {
int maxDnLen = 16;
String dn = deviceNae.replaceAll("[^0-9A-Za-z]", "");
if (dn.length() > maxDnLen) {
dn = dn.substring(dn.length() - maxDnLen);
} else {
dn = (dn + "00000000000000000000").substring(0, maxDnLen);
}
String len = StringUtils.leftPad(deviceNae.length() + "", 2, '0');
String rnd = Integer.toHexString(RandomUtils.nextInt(0, 16));
return (System.currentTimeMillis() + "0" + dn + len + rnd).toLowerCase();
}
public void deviceAuth(String productKey,
String deviceName,
String productSecret,
String deviceSecret) {
DeviceInfo deviceInfo = deviceRepository.findByProductKeyAndDeviceName(productKey, deviceName);
if (deviceInfo == null) {
throw new BizException("device does not exist");
}
if (!Constants.PRODUCT_SECRET.equals(productSecret)) {
throw new BizException("incorrect productSecret");
}
//todo 按产品ProductSecret认证子设备需要父设备认证后可通过验证
// Optional<Product> optProduct = productRepository.findById(productKey);
// if (!optProduct.isPresent()) {
// throw new BizException("product does not exist");
// }
// Product product = optProduct.get();
// if (product.getNodeType()) {
//
// }
}
public void deviceStateChange(String productKey,
String deviceName,
boolean online) {
DeviceInfo device = deviceRepository.findByProductKeyAndDeviceName(productKey, deviceName);
if (device == null) {
throw new BizException("device does not exist");
}
if (online) {
device.getState().setOnline(true);
device.getState().setOnlineTime(System.currentTimeMillis());
} else {
device.getState().setOnline(false);
device.getState().setOfflineTime(System.currentTimeMillis());
}
deviceRepository.save(device);
//todo 产生在离线事件
}
public void reportMessage(DeviceMessage message) throws PulsarClientException {
deviceMessageProducer.send(message);
}
}

View File

@ -0,0 +1,75 @@
package cc.iotkit.comps.service;
import cc.iotkit.common.Constants;
import cc.iotkit.common.utils.JsonUtil;
import cc.iotkit.comps.config.ServerConfig;
import cc.iotkit.dao.ThingModelMessageRepository;
import cc.iotkit.dao.UserInfoRepository;
import cc.iotkit.model.UserInfo;
import cc.iotkit.model.device.message.ThingModelMessage;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Service
public class DeviceMessageConsumer implements MessageListener<ThingModelMessage> {
private final ServerConfig serverConfig;
private final ThingModelMessageRepository messageRepository;
private final UserInfoRepository userInfoRepository;
@SneakyThrows
@Autowired
public DeviceMessageConsumer(ServerConfig serverConfig,
ThingModelMessageRepository messageRepository,
UserInfoRepository userInfoRepository) {
this.serverConfig = serverConfig;
this.messageRepository = messageRepository;
this.userInfoRepository = userInfoRepository;
PulsarClient client = PulsarClient.builder()
.serviceUrl(this.serverConfig.getPulsarBrokerUrl())
.build();
String topicFormat = "persistent://%s/default/" + Constants.THING_MODEL_MESSAGE_TOPIC;
List<UserInfo> platformUsers = userInfoRepository.findByType(UserInfo.USER_TYPE_PLATFORM);
List<String> topics = platformUsers.stream().map(u -> String.format(topicFormat, u.getUid()))
.collect(Collectors.toList());
log.info("subscribe device_thing topic:{}", JsonUtil.toJsonString(topics));
client.newConsumer(Schema.JSON(ThingModelMessage.class))
.topics(topics)
.subscriptionName("thing-model-message")
.consumerName("thing-model-message-consumer")
.messageListener(this).subscribe();
}
@SneakyThrows
@Override
public void received(Consumer<ThingModelMessage> consumer, Message<ThingModelMessage> msg) {
ThingModelMessage modelMessage = msg.getValue();
log.info("receive message:{}", JsonUtil.toJsonString(modelMessage));
//设备消息日志入库
messageRepository.save(modelMessage);
messageRepository.findAll().forEach(m -> {
log.info(JsonUtil.toJsonString(m));
});
consumer.acknowledge(msg);
}
@Override
public void reachedEndOfTopic(Consumer<ThingModelMessage> consumer) {
}
}

23
protocol-gateway/component/pom.xml Executable file → Normal file
View File

@ -3,32 +3,31 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iotkit-parent</artifactId>
<artifactId>protocol-gateway</artifactId>
<groupId>cc.iotkit</groupId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>component</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>common</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>converter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,13 +1,13 @@
package cc.iotkit.comp;
import cc.iotkit.converter.Converter;
import cc.iotkit.converter.IConverter;
import lombok.Data;
@Data
public abstract class AbstractComponent implements Component {
public abstract class AbstractComponent implements IComponent {
protected MessageHandler handler;
protected IMessageHandler handler;
protected Converter converter;
protected IConverter converter;
}

View File

@ -1,21 +0,0 @@
package cc.iotkit.comp;
import cc.iotkit.converter.Converter;
public interface Component {
void create(String config);
void start();
void stop();
void destroy();
void setHandler(MessageHandler handler);
void setConverter(Converter converter);
Converter getConverter();
}

View File

@ -1,37 +0,0 @@
package cc.iotkit.comp;
import java.util.HashMap;
import java.util.Map;
public class ComponentManager {
private final Map<String, Component> components = new HashMap<>();
public void register(String id, Component component) {
components.put(id, component);
}
public void deRegister(String id) {
Component component = components.remove(id);
component.destroy();
}
public void start(String id, String script) {
Component component = components.get(id);
if (component == null) {
return;
}
component.setHandler(new MessageHandler(script, component.getConverter()));
component.start();
}
public void stop(String id) {
Component component = components.get(id);
if (component == null) {
return;
}
component.stop();
}
}

View File

@ -0,0 +1,21 @@
package cc.iotkit.comp;
import cc.iotkit.converter.IConverter;
public interface IComponent {
void create(String config);
void start();
void stop();
void destroy();
void setHandler(IMessageHandler handler);
void setConverter(IConverter converter);
IConverter getConverter();
}

View File

@ -0,0 +1,14 @@
package cc.iotkit.comp;
import java.util.Map;
public interface IMessageHandler {
void register(Map<String, Object> head, String msg);
void auth(Map<String, Object> head, String msg);
void state(Map<String, Object> head, String msg);
void onReceive(Map<String, Object> head, String type, String msg);
}

View File

@ -1,68 +0,0 @@
package cc.iotkit.comp;
import cc.iotkit.common.utils.JsonUtil;
import cc.iotkit.comp.model.RegisterInfo;
import cc.iotkit.converter.Converter;
import jdk.nashorn.api.scripting.NashornScriptEngine;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import javax.script.ScriptEngineManager;
import java.util.Map;
@Slf4j
@Data
public class MessageHandler {
private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn");
private final String script;
private final Converter converter;
@SneakyThrows
public MessageHandler(String script, Converter converter) {
this.script = script;
this.converter = converter;
engine.eval(script);
}
public void register(Map<String, Object> head, String msg) {
}
public void auth(Map<String, Object> head, String msg) {
}
public void state(Map<String, Object> head, String msg) {
}
public void onReceive(Map<String, Object> head, String type, String msg) {
try {
ScriptObjectMirror obj = (ScriptObjectMirror) engine.invokeFunction("onReceive", head, type, msg);
Object rstType = obj.get("type");
if (rstType == null) {
return;
}
//取脚本执行后返回的数据
Object data = obj.get("data");
if ("register".equals(rstType)) {
//注册数据
RegisterInfo regInfo = getData(data, RegisterInfo.class);
} else if ("report".equals(rstType)) {
//上报数据
}
} catch (Throwable e) {
log.error("onReceive error", e);
}
}
private <T> T getData(Object data, Class<T> cls) {
return JsonUtil.parse(JsonUtil.toJsonString(data), cls);
}
}

View File

@ -0,0 +1,12 @@
package cc.iotkit.comp.model;
import lombok.Data;
@Data
public class AuthInfo {
private String productKey;
private String deviceName;
}

View File

@ -0,0 +1,15 @@
package cc.iotkit.comp.model;
import lombok.Data;
@Data
public class DeviceMessage {
private String productKey;
private String deviceName;
private String mid;
private String content;
}

View File

@ -18,6 +18,21 @@
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</dependency>
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>model</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

View File

@ -1,8 +0,0 @@
package cc.iotkit.converter;
public interface Converter {
ThingModelMessage decode(String msg);
}

View File

@ -1,7 +1,9 @@
package cc.iotkit.comp;
package cc.iotkit.converter;
import lombok.Data;
@Data
public class Device {
}

View File

@ -0,0 +1,20 @@
package cc.iotkit.converter;
import lombok.Data;
@Data
public class DeviceService<T> {
private String mid;
private String productKey;
private String deviceName;
private String type;
private String identifier;
private T params;
}

View File

@ -0,0 +1,13 @@
package cc.iotkit.converter;
import cc.iotkit.model.device.message.ThingModelMessage;
public interface IConverter {
void setScript(String script);
ThingModelMessage decode(String msg);
String encode(DeviceService<?> service, Device device);
}

View File

@ -1,8 +1,51 @@
package cc.iotkit.converter;
public class ScriptConverter implements Converter {
import cc.iotkit.model.device.message.ThingModelMessage;
import jdk.nashorn.api.scripting.NashornScriptEngine;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
@Slf4j
@Data
public class ScriptConverter implements IConverter {
private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager()).getEngineByName("nashorn");
private String script;
public void setScript(String script) {
this.script = script;
try {
engine.eval(script);
} catch (ScriptException e) {
log.error("eval converter script error", e);
}
}
public ThingModelMessage decode(String msg) {
try {
ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeFunction("decode", msg);
ThingModelMessage modelMessage = new ThingModelMessage();
BeanUtils.populate(modelMessage, result);
return modelMessage;
} catch (Throwable e) {
log.error("execute decode script error", e);
}
return null;
}
@Override
public String encode(DeviceService<?> service, Device device) {
try {
ScriptObjectMirror result = (ScriptObjectMirror) engine.invokeFunction("encode", service, device);
return result.toString();
} catch (Throwable e) {
log.error("execute encode script error", e);
}
return null;
}

View File

@ -1,62 +0,0 @@
package cc.iotkit.converter;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ThingModelMessage {
private String productKey;
private String deviceName;
private String mid;
private String identifier;
private Map<String, Object> data;
/**
*
*/
private Long occur;
/**
*
*/
private Long time;
public static ThingModelMessage from(Map<?,?> map) {
ThingModelMessage message = new ThingModelMessage();
message.setProductKey(getStr(map, "productKey"));
message.setDeviceName(getStr(map, "deviceName"));
message.setMid(getStr(map, "mid"));
message.setIdentifier(getStr(map, "identifier"));
Object data = map.get("data");
if (data instanceof Map) {
message.setData((Map<String, Object>) data);
} else {
message.setData(new HashMap<>());
}
return message;
}
private static String getStr(Map<?,?> map, String key) {
Object val = map.get(key);
if (val == null) {
return null;
}
return val.toString();
}
}

View File

@ -1,6 +1,6 @@
package cc.iotkit.comp.mqtt;
import cc.iotkit.comp.MessageHandler;
import cc.iotkit.comp.IMessageHandler;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
@ -26,9 +26,9 @@ public class MqttVerticle extends AbstractVerticle {
private final MqttConfig config;
private final MessageHandler executor;
private final IMessageHandler executor;
public MqttVerticle(MqttConfig config, MessageHandler executor) {
public MqttVerticle(MqttConfig config, IMessageHandler executor) {
this.config = config;
this.executor = executor;
}

View File

@ -15,12 +15,10 @@
<module>gateway-client</module>
<module>protocol-server</module>
<module>decode-function</module>
<module>component-server</module>
<module>converter</module>
<module>mqtt-component</module>
<module>component</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>

View File

@ -0,0 +1,17 @@
package cc.iotkit.protocol.server.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Data
public class ProtocolConfig {
@Value("${pulsar.broker}")
private String pulsarBrokerUrl;
@Value("${pulsar.service}")
private String pulsarServiceUrl;
}

View File

@ -3,11 +3,10 @@ package cc.iotkit.protocol.server.controller;
import cc.iotkit.common.utils.JsonUtil;
import cc.iotkit.protocol.*;
import cc.iotkit.protocol.client.DeviceBehaviourClient;
import cc.iotkit.protocol.server.config.ServerConfig;
import cc.iotkit.protocol.server.service.DeviceBehaviourService;
import cc.iotkit.protocol.server.config.ProtocolConfig;
import cc.iotkit.protocol.server.service.BehaviourService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
@Slf4j
@ -16,10 +15,10 @@ import org.springframework.web.bind.annotation.*;
public class DeviceBehaviourController implements DeviceBehaviour {
@Autowired
private ServerConfig serverConfig;
private ProtocolConfig serverConfig;
@Autowired
private DeviceBehaviourService behaviourService;
private BehaviourService behaviourService;
@Override

View File

@ -21,7 +21,7 @@ import java.util.Optional;
@Slf4j
@Service
public class DeviceBehaviourService {
public class BehaviourService {
@Autowired
private ProductRepository productRepository;

View File

@ -6,7 +6,7 @@ import cc.iotkit.dao.ThingModelMessageRepository;
import cc.iotkit.dao.UserInfoRepository;
import cc.iotkit.model.UserInfo;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.protocol.server.config.ServerConfig;
import cc.iotkit.protocol.server.config.ProtocolConfig;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
@ -17,10 +17,10 @@ import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Service
//@Service
public class DeviceMessageConsumer implements MessageListener<ThingModelMessage> {
private final ServerConfig serverConfig;
private final ProtocolConfig serverConfig;
private final ThingModelMessageRepository messageRepository;
@ -28,7 +28,7 @@ public class DeviceMessageConsumer implements MessageListener<ThingModelMessage>
@SneakyThrows
@Autowired
public DeviceMessageConsumer(ServerConfig serverConfig,
public DeviceMessageConsumer(ProtocolConfig serverConfig,
ThingModelMessageRepository messageRepository,
UserInfoRepository userInfoRepository) {
this.serverConfig = serverConfig;

View File

@ -3,7 +3,7 @@ package cc.iotkit.protocol.server.service;
import cc.iotkit.common.Constants;
import cc.iotkit.common.utils.JsonUtil;
import cc.iotkit.protocol.function.DecodeFunction;
import cc.iotkit.protocol.server.config.ServerConfig;
import cc.iotkit.protocol.server.config.ProtocolConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@ -22,7 +22,7 @@ public class GatewayService {
private PulsarAdmin pulsarAdmin;
@Autowired
private ServerConfig serverConfig;
private ProtocolConfig serverConfig;
private PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (pulsarAdmin == null) {

View File

@ -79,7 +79,7 @@ public class RuleConfiguration {
@Bean
public MessageProducer inbound() {
String clientId = "rule-consumer-" + env;
clientId = "su_" + CodecUtil.aesEncrypt("admin_" + clientId, Constants.MQTT_SECRET);
clientId = "su_" + CodecUtil.aesEncrypt("admin_" + clientId, Constants.PRODUCT_SECRET);
adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId, mqttClientFactory());
adapter.setCompletionTimeout(5000);