commit
d35fe5a02c
|
@ -71,4 +71,14 @@ public class ComponentClassLoader {
|
|||
return componentClass.getDeclaredConstructor().newInstance();
|
||||
}
|
||||
|
||||
public static void closeClassLoader(String name) {
|
||||
try {
|
||||
URLClassLoader classLoader = classLoaders.get(name);
|
||||
if (classLoader != null){
|
||||
classLoader.close();
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ public class BizComponentManager {
|
|||
try {
|
||||
componentInstance = ComponentClassLoader.getComponent(component.getId(), file);
|
||||
} catch (Throwable e) {
|
||||
ComponentClassLoader.closeClassLoader(component.getId());
|
||||
throw new BizException("get component instance error");
|
||||
}
|
||||
try {
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
package cc.iotkit.comps;
|
||||
|
||||
|
||||
import cc.iotkit.common.ComponentClassLoader;
|
||||
import cc.iotkit.model.protocol.ProtocolComponent;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
@ -38,6 +39,8 @@ public class ComponentManager {
|
|||
public void deRegister(String id) {
|
||||
bizComponentManager.deRegister(id);
|
||||
deviceComponentManager.deRegister(id);
|
||||
// 手动卸载jar应用,避免重新上传jar被占用
|
||||
ComponentClassLoader.closeClassLoader(id);
|
||||
}
|
||||
|
||||
public void start(String id) {
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>iot-components</artifactId>
|
||||
<groupId>cc.iotkit</groupId>
|
||||
<version>0.4.2-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>iot-component-tcp</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
<hsweb.expands.version>3.0.2</hsweb.expands.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.luaj</groupId>
|
||||
<artifactId>luaj-jse</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cc.iotkit</groupId>
|
||||
<artifactId>iot-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cc.iotkit</groupId>
|
||||
<artifactId>iot-component-base</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cc.iotkit</groupId>
|
||||
<artifactId>iot-data-service</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>io.vertx:vertx-core</include>
|
||||
<include>org.luaj:luaj-jse</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>11</source>
|
||||
<target>11</target>
|
||||
<forceJavacCompilerUse>true</forceJavacCompilerUse>
|
||||
<useIncrementalCompilation>false</useIncrementalCompilation>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,67 @@
|
|||
package cc.iotkit.comp;
|
||||
|
||||
|
||||
import cc.iotkit.common.utils.JsonUtil;
|
||||
import cc.iotkit.comp.model.DeviceState;
|
||||
import cc.iotkit.comp.tcp.cilent.TcpClientDeviceComponent;
|
||||
import cc.iotkit.comp.tcp.server.TcpServerDeviceComponent;
|
||||
import cc.iotkit.converter.DeviceMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Slf4j
|
||||
public class TcpDeviceComponent extends AbstractDeviceComponent {
|
||||
|
||||
private AbstractDeviceComponent tcpVerticle;
|
||||
|
||||
@Override
|
||||
public void create(CompConfig config) {
|
||||
Map maps = JsonUtil.parse(config.getOther(), Map.class);
|
||||
String type = maps.get("type").toString();
|
||||
if ("server".equals(type)) {
|
||||
tcpVerticle = new TcpServerDeviceComponent();
|
||||
} else {
|
||||
tcpVerticle = new TcpClientDeviceComponent();
|
||||
}
|
||||
tcpVerticle.create(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return tcpVerticle.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
tcpVerticle.setHandler(getHandler());
|
||||
tcpVerticle.start();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
tcpVerticle.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
tcpVerticle.destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceStateChange(DeviceState state) {
|
||||
tcpVerticle.onDeviceStateChange(state);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public DeviceMessage send(DeviceMessage message) {
|
||||
return tcpVerticle.send(message);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
package cc.iotkit.comp.tcp.cilent;
|
||||
|
||||
import cc.iotkit.common.exception.BizException;
|
||||
import cc.iotkit.common.utils.JsonUtil;
|
||||
import cc.iotkit.comp.AbstractDeviceComponent;
|
||||
import cc.iotkit.comp.CompConfig;
|
||||
import cc.iotkit.comp.model.DeviceState;
|
||||
import cc.iotkit.converter.DeviceMessage;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Vertx;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Slf4j
|
||||
public class TcpClientDeviceComponent extends AbstractDeviceComponent {
|
||||
private Vertx vertx;
|
||||
private TcpClientVerticle tcpClientVerticle;
|
||||
private String deployedId;
|
||||
|
||||
public void create(CompConfig config) {
|
||||
super.create(config);
|
||||
vertx = Vertx.vertx();
|
||||
TcpClinetConfig tcpClinetConfig = JsonUtil.parse(config.getOther(), TcpClinetConfig.class);
|
||||
tcpClientVerticle = new TcpClientVerticle(tcpClinetConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
try {
|
||||
tcpClientVerticle.setExecutor(getHandler());
|
||||
Future<String> future = vertx.deployVerticle(tcpClientVerticle);
|
||||
future.onSuccess((s -> {
|
||||
deployedId = s;
|
||||
log.info("tcp client start success, deployId:{}", s);
|
||||
}))
|
||||
.onFailure((e -> {
|
||||
log.error("tcp client start fail");
|
||||
e.printStackTrace();
|
||||
}));
|
||||
future.succeeded();
|
||||
} catch (Throwable e) {
|
||||
throw new BizException("start client component error", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
tcpClientVerticle.stop();
|
||||
Future<Void> future = vertx.undeploy(deployedId);
|
||||
future.onSuccess(unused -> log.info("stop tcpserver component success"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceStateChange(DeviceState state) {
|
||||
if (DeviceState.STATE_OFFLINE.equals(state.getState())) {
|
||||
tcpClientVerticle.offlineDevice(state.getDeviceName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public DeviceMessage send(DeviceMessage message) {
|
||||
tcpClientVerticle.sendMsg(message);
|
||||
return message;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
package cc.iotkit.comp.tcp.cilent;
|
||||
|
||||
import cc.iotkit.comp.IMessageHandler;
|
||||
import cc.iotkit.comp.tcp.parser.ParserStrategyBuilder;
|
||||
import cc.iotkit.converter.DeviceMessage;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetClient;
|
||||
import io.vertx.core.net.NetClientOptions;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* @author huangwenlong
|
||||
* @version 1.0
|
||||
* @date 2022/10/23 13:08
|
||||
*/
|
||||
@Slf4j
|
||||
public class TcpClientVerticle extends AbstractVerticle {
|
||||
|
||||
private TcpClinetConfig config;
|
||||
|
||||
private IMessageHandler executor;
|
||||
|
||||
private VertxTcpClient tcpClient;
|
||||
|
||||
private NetClient netClient;
|
||||
|
||||
private Map<String, ClientDevice> deviceMap = new ConcurrentHashMap();
|
||||
|
||||
private boolean stopAction = false;
|
||||
|
||||
public TcpClientVerticle(TcpClinetConfig config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public void setExecutor(IMessageHandler executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
tcpClient = new VertxTcpClient(UUID.randomUUID().toString(), false);
|
||||
initConfig();
|
||||
initClient();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (null != tcpClient) {
|
||||
stopAction = true;
|
||||
tcpClient.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void sendMsg(DeviceMessage msg) {
|
||||
if (tcpClient != null) {
|
||||
tcpClient.sendMessage(Buffer.buffer(msg.getContent().toString()));
|
||||
}
|
||||
}
|
||||
|
||||
public void offlineDevice(String deviceName) {
|
||||
ClientDevice remove = deviceMap.remove(deviceName);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建配置文件
|
||||
* 未链接成功就一直重连(每隔1分钟)
|
||||
*/
|
||||
public void initConfig() {
|
||||
if (config.getOptions() == null) {
|
||||
NetClientOptions options = new NetClientOptions();
|
||||
options.setReconnectAttempts(Integer.MAX_VALUE);
|
||||
options.setReconnectInterval(20000L);
|
||||
config.setOptions(options);
|
||||
}
|
||||
if (config.isSsl()) {
|
||||
// 证书
|
||||
}
|
||||
}
|
||||
|
||||
private void initClient() {
|
||||
netClient = vertx.createNetClient(config.getOptions());
|
||||
tcpClient.setKeepAliveTimeoutMs(Duration.ofMinutes(10).toMillis());
|
||||
tcpClient.onDisconnect(() -> {
|
||||
// 所有设备都离线
|
||||
for (String deviceName : deviceMap.keySet()) {
|
||||
// 发送离线消息
|
||||
executor.onReceive(null, "disconnect", deviceName);
|
||||
}
|
||||
});
|
||||
// 连接
|
||||
toConnection();
|
||||
// 设置收到消息处理
|
||||
tcpClient.setReceiveHandler(buffer -> {
|
||||
try {
|
||||
executor.onReceive(null, "", buffer.toString(),
|
||||
result -> {
|
||||
if (!deviceMap.containsKey(result.getDeviceName())) {
|
||||
deviceMap.put(result.getDeviceName(), new ClientDevice(result.getDeviceName(), result.getProductKey()));
|
||||
// 有些设备并没有连接时报文,所以模拟一次 online
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
map.put("deviceName", result.getDeviceName());
|
||||
executor.onReceive(map, "connect", buffer.toString());
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void toConnection() {
|
||||
netClient.connect(config.getPort(), config.getHost(), result -> {
|
||||
if (result.succeeded()) {
|
||||
log.debug("connect tcp [{}:{}] success", config.getHost(), config.getPort());
|
||||
tcpClient.setRecordParser(ParserStrategyBuilder.build(config.getParserType(), config.getParserConfiguration()));
|
||||
NetSocket socket = result.result();
|
||||
tcpClient.setSocket(socket);
|
||||
socket.closeHandler((nil) -> {
|
||||
tcpClient.shutdown();
|
||||
// 重连自动断开重连,收到停止组件不重连
|
||||
try {
|
||||
if (!stopAction) {
|
||||
Thread.sleep(5000L);
|
||||
toConnection();
|
||||
}else{
|
||||
netClient.close();
|
||||
netClient = null;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log.error("connect tcp [{}:{}] error", config.getHost(), config.getPort(), result.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
class ClientDevice {
|
||||
private String deviceName = "";
|
||||
private String productKey = "";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package cc.iotkit.comp.tcp.cilent;
|
||||
|
||||
import io.vertx.core.net.NetClientOptions;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Data
|
||||
public class TcpClinetConfig {
|
||||
|
||||
private String id;
|
||||
|
||||
private NetClientOptions options;
|
||||
|
||||
private String host;
|
||||
|
||||
private int port;
|
||||
|
||||
private boolean ssl;
|
||||
|
||||
private String parserType;
|
||||
|
||||
// 解析参数
|
||||
private Map<String, Object> parserConfiguration = new HashMap<>();
|
||||
|
||||
//服务实例数量(线程数)
|
||||
private int instance = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,167 @@
|
|||
package cc.iotkit.comp.tcp.cilent;
|
||||
|
||||
import cc.iotkit.comp.IMessageHandler;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParser;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetClient;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Slf4j
|
||||
public class VertxTcpClient {
|
||||
@Getter
|
||||
private String id;
|
||||
@Getter
|
||||
private String deviceName = "";
|
||||
@Setter
|
||||
@Getter
|
||||
private String parentName = "";
|
||||
@Getter
|
||||
private String productKey = "";
|
||||
// 是否是服务端的连接客户端
|
||||
private final boolean serverClient;
|
||||
volatile PayloadParser payloadParser;
|
||||
public NetSocket socket;
|
||||
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
|
||||
private IMessageHandler executor;
|
||||
private Consumer<Buffer> receiveHandler;
|
||||
@Setter
|
||||
private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
|
||||
private volatile long lastKeepAliveTime = System.currentTimeMillis();
|
||||
|
||||
public VertxTcpClient(String id, boolean serverClient) {
|
||||
this.id = id;
|
||||
this.serverClient = serverClient;
|
||||
}
|
||||
|
||||
|
||||
public void keepAlive() {
|
||||
lastKeepAliveTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public boolean isOnline() {
|
||||
return System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs;
|
||||
}
|
||||
|
||||
public void setDeviceInfo(String deviceName, String productKey) {
|
||||
this.deviceName = deviceName;
|
||||
this.productKey = productKey;
|
||||
}
|
||||
|
||||
public void setSocket(NetSocket socket) {
|
||||
synchronized (this) {
|
||||
// Objects.requireNonNull(payloadParser);
|
||||
if (this.socket != null && this.socket != socket) {
|
||||
this.socket.close();
|
||||
}
|
||||
this.socket = socket
|
||||
.closeHandler(v -> shutdown())
|
||||
.handler(buffer -> {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("handle tcp client[{}] payload:[{}]",
|
||||
socket.remoteAddress(),
|
||||
Hex.encodeHexString(buffer.getBytes()));
|
||||
}
|
||||
keepAlive();
|
||||
payloadParser.handle(buffer);
|
||||
if (this.socket != socket) {
|
||||
log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
|
||||
socket.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置客户端消息解析器
|
||||
*
|
||||
* @param payloadParser 消息解析器
|
||||
*/
|
||||
public void setRecordParser(PayloadParser payloadParser) {
|
||||
synchronized (this) {
|
||||
if (null != this.payloadParser && this.payloadParser != payloadParser) {
|
||||
this.payloadParser.close();
|
||||
}
|
||||
this.payloadParser = payloadParser;
|
||||
this.payloadParser
|
||||
.handlePayload()
|
||||
.onErrorContinue((err, res) -> {
|
||||
log.error(err.getMessage(), err);
|
||||
System.out.println(err.getMessage());
|
||||
})
|
||||
.subscribe(buffer -> {
|
||||
System.out.println(buffer.toString());
|
||||
receiveHandler.accept(buffer);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void onDisconnect(Runnable disconnected) {
|
||||
disconnectListener.add(disconnected);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 设置消息处理器
|
||||
*/
|
||||
public void setReceiveHandler(Consumer<Buffer> receiveHandler) {
|
||||
this.receiveHandler = receiveHandler;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
log.debug("tcp client [{}] disconnect", getId());
|
||||
synchronized (this) {
|
||||
if (null != socket) {
|
||||
execute(socket::close);
|
||||
this.socket = null;
|
||||
}
|
||||
// 粘包处理器
|
||||
if (null != payloadParser) {
|
||||
execute(payloadParser::close);
|
||||
payloadParser = null;
|
||||
}
|
||||
}
|
||||
for (Runnable runnable : disconnectListener) {
|
||||
execute(runnable);
|
||||
}
|
||||
disconnectListener.clear();
|
||||
}
|
||||
|
||||
public void sendMessage(Buffer buffer) {
|
||||
socket.write(buffer, r -> {
|
||||
keepAlive();
|
||||
if (r.succeeded()) {
|
||||
log.info("下行消息成功:{}", buffer.toString());
|
||||
} else {
|
||||
log.error("下行消息失败", r.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void execute(Runnable runnable) {
|
||||
try {
|
||||
runnable.run();
|
||||
} catch (Exception e) {
|
||||
log.warn("close tcp client error", e);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* 是否有父设备
|
||||
*/
|
||||
public boolean hasParent() {
|
||||
return StringUtils.isNotEmpty(parentName);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package cc.iotkit.comp.tcp.parser;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import org.apache.commons.lang3.StringEscapeUtils;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 分隔符
|
||||
*
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public class DelimitedPayloadParser implements PayloadParser {
|
||||
|
||||
private String delimited;
|
||||
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
|
||||
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
|
||||
|
||||
private RecordParser recordParser;
|
||||
|
||||
|
||||
public PayloadParser init(Object delimited) {
|
||||
this.delimited = StringEscapeUtils.unescapeJava(delimited.toString());
|
||||
this.reset();
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void handle(Buffer buffer) {
|
||||
recordParser.handle(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Buffer> handlePayload() {
|
||||
return processor.map(Function.identity());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
this.recordParser = RecordParser.newDelimited(delimited);
|
||||
// 塞入 skin pusher
|
||||
this.recordParser.handler(sink::next);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
processor.onComplete();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package cc.iotkit.comp.tcp.parser;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 不处理
|
||||
*
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public class DirectPayloadParser implements PayloadParser {
|
||||
|
||||
|
||||
EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
|
||||
|
||||
|
||||
@Override
|
||||
public PayloadParser init(Object param) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(Buffer buffer) {
|
||||
processor.onNext(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Buffer> handlePayload() {
|
||||
return processor.map(Function.identity());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
processor.onComplete();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package cc.iotkit.comp.tcp.parser;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 固定长度
|
||||
*
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public class FixPayloadParser implements PayloadParser {
|
||||
|
||||
private int size;
|
||||
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
|
||||
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
|
||||
|
||||
private RecordParser recordParser;
|
||||
|
||||
|
||||
public PayloadParser init(Object size) {
|
||||
this.size = (int) size;
|
||||
this.reset();
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void handle(Buffer buffer) {
|
||||
recordParser.handle(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Buffer> handlePayload() {
|
||||
return processor.map(Function.identity());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
this.recordParser = RecordParser.newFixed(size);
|
||||
this.recordParser.handler(sink::next);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
processor.onComplete();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package cc.iotkit.comp.tcp.parser;
|
||||
|
||||
import cc.iotkit.comp.tcp.parser.builder.DelimitedPayloadBuilder;
|
||||
import cc.iotkit.comp.tcp.parser.builder.DirectPayloadBuilder;
|
||||
import cc.iotkit.comp.tcp.parser.builder.FixPayloadBuilder;
|
||||
import cc.iotkit.comp.tcp.parser.builder.ScriptPayloadBuilder;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public class ParserStrategyBuilder {
|
||||
private static Map<String, PayloadParserBuilderStrategy> strategyMap = new ConcurrentHashMap<>();
|
||||
|
||||
static {
|
||||
register(new DelimitedPayloadBuilder());
|
||||
register(new DirectPayloadBuilder());
|
||||
register(new FixPayloadBuilder());
|
||||
register(new ScriptPayloadBuilder());
|
||||
}
|
||||
|
||||
public static PayloadParser build(String type, Map<String, Object> configuration) {
|
||||
return Optional.ofNullable(strategyMap.get(type))
|
||||
.map(builder -> builder.build(configuration))
|
||||
.orElseThrow(() -> new UnsupportedOperationException("unsupported parser:" + type));
|
||||
}
|
||||
|
||||
private static void register(PayloadParserBuilderStrategy strategy) {
|
||||
strategyMap.put(strategy.getType().getText(), strategy);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package cc.iotkit.comp.tcp.parser;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public interface PayloadParser {
|
||||
|
||||
|
||||
PayloadParser init(Object param) throws Exception;
|
||||
|
||||
void handle(Buffer buffer);
|
||||
|
||||
/**
|
||||
* 订阅完整的数据包流,每一个元素为一个完整的数据包
|
||||
*
|
||||
* @return 完整数据包流
|
||||
*/
|
||||
Flux<Buffer> handlePayload();
|
||||
|
||||
|
||||
/**
|
||||
* 重置规则
|
||||
*/
|
||||
default void reset() {
|
||||
}
|
||||
|
||||
void close();
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package cc.iotkit.comp.tcp.parser;
|
||||
|
||||
|
||||
|
||||
import cc.iotkit.comp.tcp.parser.enums.PayloadParserType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public interface PayloadParserBuilderStrategy {
|
||||
PayloadParserType getType();
|
||||
|
||||
PayloadParser build(Map<String, Object> parserConfiguration);
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
package cc.iotkit.comp.tcp.parser;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import jdk.nashorn.api.scripting.NashornScriptEngine;
|
||||
import jdk.nashorn.api.scripting.ScriptObjectMirror;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import reactor.core.publisher.EmitterProcessor;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import javax.script.ScriptEngineManager;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 固定长度
|
||||
*
|
||||
* @copy jetLink
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
|
||||
/**
|
||||
* <pre>
|
||||
* PipePayloadParser parser = new PipePayloadParser();
|
||||
* parser.fixed(4)
|
||||
* .handler(buffer -> {
|
||||
* int len = BytesUtils.highBytes2Int(buffer.getBytes());
|
||||
* parser.fixed(len);
|
||||
* })
|
||||
* .handler(buffer -> parser.result(buffer.toString("UTF-8")).complete());
|
||||
* </pre>
|
||||
*/
|
||||
@Slf4j
|
||||
public class ScriptPayloadParser implements PayloadParser {
|
||||
|
||||
|
||||
private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager())
|
||||
.getEngineByName("nashorn");
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public PayloadParser init(Object param) {
|
||||
String script = (String) param;
|
||||
;
|
||||
ScriptObjectMirror scriptObject = (ScriptObjectMirror) engine.eval("new (function(){" + script + "})()");
|
||||
//执行转换脚本
|
||||
engine.invokeMethod(scriptObject, "payloadParser", this);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void handle(Buffer buffer) {
|
||||
if (recordParser == null && directMapper == null) {
|
||||
log.error("record parser not init");
|
||||
return;
|
||||
}
|
||||
if (recordParser != null) {
|
||||
recordParser.handle(buffer);
|
||||
return;
|
||||
}
|
||||
Buffer buf = directMapper.apply(buffer);
|
||||
if (null != buf) {
|
||||
sink.next(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Buffer> handlePayload() {
|
||||
return processor.map(Function.identity());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
this.result.clear();
|
||||
complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
processor.onComplete();
|
||||
currentPipe.set(0);
|
||||
this.result.clear();
|
||||
}
|
||||
|
||||
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(true);
|
||||
|
||||
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
|
||||
|
||||
private final List<Consumer<Buffer>> pipe = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final List<Buffer> result = new CopyOnWriteArrayList<>();
|
||||
|
||||
private volatile RecordParser recordParser;
|
||||
|
||||
private Function<Buffer, Buffer> directMapper;
|
||||
|
||||
private Consumer<RecordParser> firstInit;
|
||||
|
||||
private final AtomicInteger currentPipe = new AtomicInteger();
|
||||
|
||||
public Buffer newBuffer() {
|
||||
return Buffer.buffer();
|
||||
}
|
||||
|
||||
public ScriptPayloadParser result(String buffer) {
|
||||
return result(Buffer.buffer(buffer));
|
||||
}
|
||||
|
||||
public ScriptPayloadParser result(byte[] buffer) {
|
||||
return result(Buffer.buffer(buffer));
|
||||
}
|
||||
|
||||
public ScriptPayloadParser handler(Consumer<Buffer> handler) {
|
||||
pipe.add(handler);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ScriptPayloadParser delimited(String delimited) {
|
||||
if (recordParser == null) {
|
||||
setParser(RecordParser.newDelimited(delimited));
|
||||
firstInit = (parser -> parser.delimitedMode(delimited));
|
||||
return this;
|
||||
}
|
||||
recordParser.delimitedMode(delimited);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ScriptPayloadParser fixed(int size) {
|
||||
if (size == 0) {
|
||||
complete();
|
||||
return this;
|
||||
}
|
||||
if (recordParser == null) {
|
||||
setParser(RecordParser.newFixed(size));
|
||||
firstInit = (parser -> parser.fixedSizeMode(size));
|
||||
return this;
|
||||
}
|
||||
recordParser.fixedSizeMode(size);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ScriptPayloadParser direct(Function<Buffer, Buffer> mapper) {
|
||||
this.directMapper = mapper;
|
||||
return this;
|
||||
}
|
||||
|
||||
private Consumer<Buffer> getNextHandler() {
|
||||
int i = currentPipe.getAndIncrement();
|
||||
if (i < pipe.size()) {
|
||||
return pipe.get(i);
|
||||
}
|
||||
currentPipe.set(0);
|
||||
return pipe.get(0);
|
||||
}
|
||||
|
||||
private void setParser(RecordParser parser) {
|
||||
this.recordParser = parser;
|
||||
this.recordParser.handler(buffer -> getNextHandler().accept(buffer));
|
||||
}
|
||||
|
||||
public ScriptPayloadParser complete() {
|
||||
currentPipe.set(0);
|
||||
if (recordParser != null) {
|
||||
firstInit.accept(recordParser);
|
||||
}
|
||||
if (!this.result.isEmpty()) {
|
||||
Buffer buffer = Buffer.buffer();
|
||||
for (Buffer buf : this.result) {
|
||||
buffer.appendBuffer(buf);
|
||||
}
|
||||
this.result.clear();
|
||||
sink.next(buffer);
|
||||
}
|
||||
return this;
|
||||
|
||||
}
|
||||
|
||||
public ScriptPayloadParser result(Buffer buffer) {
|
||||
this.result.add(buffer);
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package cc.iotkit.comp.tcp.parser.builder;
|
||||
|
||||
import cc.iotkit.comp.tcp.parser.DelimitedPayloadParser;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParser;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParserBuilderStrategy;
|
||||
import cc.iotkit.comp.tcp.parser.enums.PayloadParserType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public class DelimitedPayloadBuilder implements PayloadParserBuilderStrategy {
|
||||
@Override
|
||||
public PayloadParserType getType() {
|
||||
return PayloadParserType.DELIMITED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PayloadParser build(Map<String, Object> parserConfiguration) {
|
||||
return new DelimitedPayloadParser().init(parserConfiguration.get("delimited"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package cc.iotkit.comp.tcp.parser.builder;
|
||||
|
||||
import cc.iotkit.comp.tcp.parser.DirectPayloadParser;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParser;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParserBuilderStrategy;
|
||||
import cc.iotkit.comp.tcp.parser.enums.PayloadParserType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public class DirectPayloadBuilder implements PayloadParserBuilderStrategy {
|
||||
@Override
|
||||
public PayloadParserType getType() {
|
||||
return PayloadParserType.DIRECT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PayloadParser build(Map<String, Object> parserConfiguration) {
|
||||
return new DirectPayloadParser().init(null);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package cc.iotkit.comp.tcp.parser.builder;
|
||||
|
||||
import cc.iotkit.comp.tcp.parser.FixPayloadParser;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParser;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParserBuilderStrategy;
|
||||
import cc.iotkit.comp.tcp.parser.enums.PayloadParserType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public class FixPayloadBuilder implements PayloadParserBuilderStrategy {
|
||||
@Override
|
||||
public PayloadParserType getType() {
|
||||
return PayloadParserType.FIXED_LENGTH;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PayloadParser build(Map<String, Object> parserConfiguration) {
|
||||
return new FixPayloadParser().init(parserConfiguration.get("fix"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package cc.iotkit.comp.tcp.parser.builder;
|
||||
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParser;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParserBuilderStrategy;
|
||||
import cc.iotkit.comp.tcp.parser.ScriptPayloadParser;
|
||||
import cc.iotkit.comp.tcp.parser.enums.PayloadParserType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
public class ScriptPayloadBuilder implements PayloadParserBuilderStrategy {
|
||||
@Override
|
||||
public PayloadParserType getType() {
|
||||
return PayloadParserType.SCRIPT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PayloadParser build(Map<String, Object> parserConfiguration) {
|
||||
return new ScriptPayloadParser().init(parserConfiguration.get("script"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package cc.iotkit.comp.tcp.parser.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum PayloadParserType {
|
||||
DIRECT("不处理"),
|
||||
|
||||
FIXED_LENGTH("固定长度"),
|
||||
|
||||
DELIMITED("分隔符"),
|
||||
|
||||
SCRIPT("自定义脚本")
|
||||
;
|
||||
|
||||
private String text;
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package cc.iotkit.comp.tcp.server;
|
||||
|
||||
import io.vertx.core.net.NetServerOptions;
|
||||
import io.vertx.core.net.SocketAddress;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Data
|
||||
public class TcpServerConfig {
|
||||
|
||||
private String id;
|
||||
|
||||
private NetServerOptions options;
|
||||
|
||||
private String host;
|
||||
|
||||
private int port;
|
||||
|
||||
private boolean ssl;
|
||||
|
||||
private String parserType;
|
||||
|
||||
// 解析参数
|
||||
private Map<String, Object> parserConfiguration = new HashMap<>();
|
||||
|
||||
//服务实例数量(线程数)
|
||||
private int instance = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
public SocketAddress createSocketAddress() {
|
||||
if (StringUtils.isEmpty(host)) {
|
||||
host = "localhost";
|
||||
}
|
||||
return SocketAddress.inetSocketAddress(port, host);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
package cc.iotkit.comp.tcp.server;
|
||||
|
||||
import cc.iotkit.common.exception.BizException;
|
||||
import cc.iotkit.common.utils.JsonUtil;
|
||||
import cc.iotkit.comp.AbstractDeviceComponent;
|
||||
import cc.iotkit.comp.CompConfig;
|
||||
import cc.iotkit.comp.model.DeviceState;
|
||||
import cc.iotkit.converter.DeviceMessage;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Vertx;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Slf4j
|
||||
public class TcpServerDeviceComponent extends AbstractDeviceComponent {
|
||||
|
||||
private Vertx vertx;
|
||||
private TcpServerVerticle tcpServerVerticle;
|
||||
private String deployedId;
|
||||
|
||||
@Override
|
||||
public void create(CompConfig config) {
|
||||
super.create(config);
|
||||
vertx = Vertx.vertx();
|
||||
TcpServerConfig serverConfig = JsonUtil.parse(config.getOther(), TcpServerConfig.class);
|
||||
tcpServerVerticle = new TcpServerVerticle(serverConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
try {
|
||||
tcpServerVerticle.setExecutor(getHandler());
|
||||
Future<String> future = vertx.deployVerticle(tcpServerVerticle);
|
||||
future.onSuccess((s -> {
|
||||
deployedId = s;
|
||||
log.info("tcp server start success, deployId:{}", s);
|
||||
}))
|
||||
.onFailure((e -> {
|
||||
log.error("tcp server start fail");
|
||||
e.printStackTrace();
|
||||
}));
|
||||
future.succeeded();
|
||||
}catch (Throwable e){
|
||||
throw new BizException("start tcpserver component error", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
tcpServerVerticle.stop();
|
||||
Future<Void> future = vertx.undeploy(deployedId);
|
||||
future.onSuccess(unused -> log.info("stop tcpserver component success"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceStateChange(DeviceState state) {
|
||||
if (DeviceState.STATE_OFFLINE.equals(state.getState())){
|
||||
tcpServerVerticle.offlineDevice(state.getDeviceName());
|
||||
}else if(DeviceState.STATE_ONLINE.equals(state.getState())){
|
||||
tcpServerVerticle.onlineDevice(state.getDeviceName(),
|
||||
state.getParent() != null ? state.getParent().getDeviceName() : null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public DeviceMessage send(DeviceMessage message) {
|
||||
tcpServerVerticle.sendMsg(message);
|
||||
return message;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,261 @@
|
|||
package cc.iotkit.comp.tcp.server;
|
||||
|
||||
|
||||
import cc.iotkit.comp.IMessageHandler;
|
||||
import cc.iotkit.comp.tcp.cilent.VertxTcpClient;
|
||||
import cc.iotkit.comp.tcp.parser.ParserStrategyBuilder;
|
||||
import cc.iotkit.comp.tcp.parser.PayloadParser;
|
||||
import cc.iotkit.converter.DeviceMessage;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetServer;
|
||||
import io.vertx.core.net.NetServerOptions;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Slf4j
|
||||
public class TcpServerVerticle extends AbstractVerticle {
|
||||
|
||||
@Getter
|
||||
private TcpServerConfig config;
|
||||
|
||||
private IMessageHandler executor;
|
||||
|
||||
private VertxTcpServer tcpServer;
|
||||
|
||||
private String id;
|
||||
|
||||
private Map<String, VertxTcpClient> clientMap = new ConcurrentHashMap();
|
||||
|
||||
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
|
||||
|
||||
@Setter
|
||||
private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
|
||||
|
||||
private Collection<NetServer> tcpServers;
|
||||
|
||||
public TcpServerVerticle(TcpServerConfig config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public void setExecutor(IMessageHandler executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
tcpServer = new VertxTcpServer();
|
||||
initConfig();
|
||||
initTcpServer();
|
||||
keepClientTask();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
tcpServer.shutdown();
|
||||
scheduledThreadPoolExecutor.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建配置文件
|
||||
*/
|
||||
public void initConfig() {
|
||||
if (config.getOptions() == null) {
|
||||
config.setOptions(new NetServerOptions());
|
||||
}
|
||||
if (config.isSsl()) {
|
||||
// 证书
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 初始TCP服务
|
||||
*/
|
||||
private void initTcpServer() {
|
||||
int instance = Math.max(2, config.getInstance());
|
||||
List<NetServer> instances = new ArrayList<>(instance);
|
||||
for (int i = 0; i < instance; i++) {
|
||||
instances.add(vertx.createNetServer(config.getOptions()));
|
||||
}
|
||||
// 根据解析类型配置数据解析器
|
||||
tcpServer.setParserSupplier(() -> ParserStrategyBuilder.build(config.getParserType(), config.getParserConfiguration()));
|
||||
tcpServer.setServer(instances);
|
||||
// 针对JVM做的多路复用优化
|
||||
// 多个server listen同一个端口,每个client连接的时候vertx会分配
|
||||
// 一个connection只能在一个server中处理
|
||||
for (NetServer netServer : instances) {
|
||||
netServer.listen(config.createSocketAddress(), result -> {
|
||||
if (result.succeeded()) {
|
||||
log.info("tcp server startup on {}", result.result().actualPort());
|
||||
} else {
|
||||
log.error("startup tcp server error", result.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void offlineDevice(String deviceName) {
|
||||
VertxTcpClient client = clientMap.get(deviceName);
|
||||
if (client != null) {
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void onlineDevice(String deviceName, String parentName) {
|
||||
VertxTcpClient client = clientMap.get(deviceName);
|
||||
if (client != null) {
|
||||
client.setParentName(parentName);
|
||||
}
|
||||
}
|
||||
|
||||
public void sendMsg(DeviceMessage msg) {
|
||||
VertxTcpClient tcpClient = clientMap.get(msg.getDeviceName());
|
||||
if (tcpClient != null) {
|
||||
tcpClient.sendMessage(Buffer.buffer(msg.getContent().toString()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保活定时任务
|
||||
*/
|
||||
public void keepClientTask() {
|
||||
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
|
||||
|
||||
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
|
||||
log.info("保活任务开始!");
|
||||
Set<String> clients = new HashSet(clientMap.keySet());
|
||||
for (String key : clients) {
|
||||
VertxTcpClient client = clientMap.get(key);
|
||||
if (!client.isOnline()) {
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
}, 1000, keepAliveTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
||||
class VertxTcpServer {
|
||||
|
||||
private Supplier<PayloadParser> parserSupplier;
|
||||
|
||||
/**
|
||||
* 为每个NetServer添加connectHandler
|
||||
*
|
||||
* @param servers 创建的所有NetServer
|
||||
*/
|
||||
public void setServer(Collection<NetServer> servers) {
|
||||
if (tcpServers != null && !tcpServers.isEmpty()) {
|
||||
shutdown();
|
||||
}
|
||||
tcpServers = servers;
|
||||
for (NetServer tcpServer : tcpServers) {
|
||||
tcpServer.connectHandler(this::acceptTcpConnection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* TCP连接处理逻辑
|
||||
*
|
||||
* @param socket socket
|
||||
*/
|
||||
protected void acceptTcpConnection(NetSocket socket) {
|
||||
// 客户端连接处理
|
||||
String clientId = id + "_" + socket.remoteAddress();
|
||||
VertxTcpClient client = new VertxTcpClient(clientId, true);
|
||||
client.setKeepAliveTimeoutMs(keepAliveTimeout);
|
||||
try {
|
||||
// TCP异常和关闭处理
|
||||
socket.exceptionHandler(err -> {
|
||||
log.error("tcp server client [{}] error", socket.remoteAddress(), err);
|
||||
}).closeHandler((nil) -> {
|
||||
log.debug("tcp server client [{}] closed", socket.remoteAddress());
|
||||
client.shutdown();
|
||||
});
|
||||
// 这个地方是在TCP服务初始化的时候设置的 parserSupplier
|
||||
client.setKeepAliveTimeoutMs(keepAliveTimeout);
|
||||
client.setRecordParser(parserSupplier.get());
|
||||
client.setSocket(socket);
|
||||
client.onDisconnect(() -> {
|
||||
clientDisconnect(client.getDeviceName());
|
||||
});
|
||||
// 设置收到消息处理
|
||||
client.setReceiveHandler(buffer -> {
|
||||
System.out.println(buffer.toString());
|
||||
try {
|
||||
executor.onReceive(null, "", buffer.toString(),
|
||||
result -> {
|
||||
if (result != null && !clientMap.containsKey(result.getDeviceName())) {
|
||||
client.setDeviceInfo(result.getDeviceName(), result.getProductKey());
|
||||
clientMap.put(result.getDeviceName(), client);
|
||||
// 有些设备并没有连接时报文,所以模拟一次 online
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
map.put("deviceName", result.getDeviceName());
|
||||
executor.onReceive(map, "connect", buffer.toString());
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
});
|
||||
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
|
||||
} catch (Exception e) {
|
||||
log.error("create tcp server client error", e);
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void setParserSupplier(Supplier<PayloadParser> parserSupplier) {
|
||||
this.parserSupplier = parserSupplier;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (null != tcpServers) {
|
||||
for (NetServer tcpServer : tcpServers) {
|
||||
execute(tcpServer::close);
|
||||
}
|
||||
tcpServers = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void execute(Runnable runnable) {
|
||||
try {
|
||||
runnable.run();
|
||||
} catch (Exception e) {
|
||||
log.warn("close tcp server error", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 断开连接,并移除子设备
|
||||
*/
|
||||
private void clientDisconnect(String deviceName) {
|
||||
VertxTcpClient remove = clientMap.remove(deviceName);
|
||||
if (null != remove) {
|
||||
// 发送离线消息
|
||||
executor.onReceive(null, "disconnect", deviceName);
|
||||
// 移除子设备
|
||||
if (remove.hasParent()) {
|
||||
List<VertxTcpClient> childClients = clientMap.values().stream().filter(cl -> cl.hasParent() && cl.getParentName()
|
||||
.equals(remove.getParentName())).collect(Collectors.toList());
|
||||
childClients.forEach(child -> clientMap.remove(child.getDeviceName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,160 @@
|
|||
package cc.iotkit.comp.tcp.server;//package cc.iotkit.comp.tcp.server;
|
||||
//
|
||||
//import cc.iotkit.comp.IMessageHandler;
|
||||
//import cc.iotkit.comp.tcp.cilent.VertxTcpClient;
|
||||
//import cc.iotkit.comp.tcp.parser.PayloadParser;
|
||||
//import cc.iotkit.converter.DeviceMessage;
|
||||
//import io.vertx.core.buffer.Buffer;
|
||||
//import io.vertx.core.net.NetServer;
|
||||
//import io.vertx.core.net.NetSocket;
|
||||
//import lombok.Setter;
|
||||
//import lombok.extern.slf4j.Slf4j;
|
||||
//
|
||||
//import java.time.Duration;
|
||||
//import java.util.Collection;
|
||||
//import java.util.HashMap;
|
||||
//import java.util.List;
|
||||
//import java.util.Map;
|
||||
//import java.util.concurrent.ConcurrentHashMap;
|
||||
//import java.util.function.Supplier;
|
||||
//import java.util.stream.Collectors;
|
||||
//
|
||||
///**
|
||||
// * @author huangwenl
|
||||
// * @date 2022-10-13
|
||||
// */
|
||||
//@Slf4j
|
||||
//public class VertxTcpServer {
|
||||
//
|
||||
// private String id;
|
||||
//
|
||||
// private Supplier<PayloadParser> parserSupplier;
|
||||
//
|
||||
// private Map<String, VertxTcpClient> clientMap = new ConcurrentHashMap();
|
||||
// private IMessageHandler executor;
|
||||
// private Collection<NetServer> tcpServers;
|
||||
// @Setter
|
||||
// private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
|
||||
//
|
||||
// public VertxTcpServer(String id) {
|
||||
// this.id = id;
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 为每个NetServer添加connectHandler
|
||||
// *
|
||||
// * @param servers 创建的所有NetServer
|
||||
// */
|
||||
// public void setServer(Collection<NetServer> servers) {
|
||||
// if (this.tcpServers != null && !this.tcpServers.isEmpty()) {
|
||||
// shutdown();
|
||||
// }
|
||||
// this.tcpServers = servers;
|
||||
//
|
||||
// for (NetServer tcpServer : this.tcpServers) {
|
||||
// tcpServer.connectHandler(this::acceptTcpConnection);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * TCP连接处理逻辑
|
||||
// *
|
||||
// * @param socket socket
|
||||
// */
|
||||
// protected void acceptTcpConnection(NetSocket socket) {
|
||||
// // 客户端连接处理
|
||||
// String clientId = id + "_" + socket.remoteAddress();
|
||||
// VertxTcpClient client = new VertxTcpClient(clientId, true);
|
||||
// client.setKeepAliveTimeoutMs(keepAliveTimeout);
|
||||
// try {
|
||||
// // TCP异常和关闭处理
|
||||
// socket.exceptionHandler(err -> {
|
||||
// log.error("tcp server client [{}] error", socket.remoteAddress(), err);
|
||||
// }).closeHandler((nil) -> {
|
||||
// log.debug("tcp server client [{}] closed", socket.remoteAddress());
|
||||
// client.shutdown();
|
||||
// });
|
||||
// // 这个地方是在TCP服务初始化的时候设置的 parserSupplier
|
||||
// client.setRecordParser(parserSupplier.get());
|
||||
// client.setSocket(socket);
|
||||
// client.onDisconnect(() -> {
|
||||
// clientDisconnect(client.getDeviceName());
|
||||
// });
|
||||
// // 设置收到消息处理
|
||||
// client.setReceiveHandler(buffer -> {
|
||||
// executor.onReceive(null, "", buffer.toString(),
|
||||
// result -> {
|
||||
// if (!clientMap.containsKey(result.getDeviceName())) {
|
||||
// client.setDeviceInfo(result.getDeviceName(), result.getData().getParent().getDeviceName(),
|
||||
// result.getProductKey());
|
||||
// clientMap.put(result.getDeviceName(), client);
|
||||
// // 有些设备并没有连接时报文,所以模拟一次 online
|
||||
// HashMap<String, Object> map = new HashMap<>();
|
||||
// map.put("deviceName", result.getDeviceName());
|
||||
// executor.onReceive(map, "online", "");
|
||||
// }
|
||||
// });
|
||||
// });
|
||||
//// clientMap.put(clientId, client);
|
||||
// log.debug("accept tcp client [{}] connection", socket.remoteAddress());
|
||||
// } catch (Exception e) {
|
||||
// log.error("create tcp server client error", e);
|
||||
// client.shutdown();
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// public void setParserSupplier(Supplier<PayloadParser> parserSupplier) {
|
||||
// this.parserSupplier = parserSupplier;
|
||||
// }
|
||||
//
|
||||
// public void shutdown() {
|
||||
// if (null != tcpServers) {
|
||||
// for (NetServer tcpServer : tcpServers) {
|
||||
// execute(tcpServer::close);
|
||||
// }
|
||||
// tcpServers = null;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// private void execute(Runnable runnable) {
|
||||
// try {
|
||||
// runnable.run();
|
||||
// } catch (Exception e) {
|
||||
// log.warn("close tcp server error", e);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
//
|
||||
// public void sendMsg(DeviceMessage message) {
|
||||
// VertxTcpClient client = clientMap.get(message.getDeviceName());
|
||||
// if (client != null) {
|
||||
// client.sendMessage(Buffer.buffer(message.getContent().toString()));
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 递归断开连接
|
||||
// */
|
||||
// private void clientDisconnect(String deviceName) {
|
||||
// VertxTcpClient remove = clientMap.remove(deviceName);
|
||||
// if (null != remove) {
|
||||
// executor.onReceive(null, "disconnect", deviceName);
|
||||
// if (remove.hasParent()) {
|
||||
// List<VertxTcpClient> childClients = clientMap.values().stream().filter(cl -> cl.hasParent() && cl.getParentName()
|
||||
// .equals(remove.getParentName())).collect(Collectors.toList());
|
||||
// childClients.forEach(child -> clientDisconnect(child.getDeviceName()));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// public void offlineDev(String deviceName) {
|
||||
// VertxTcpClient remove = clientMap.remove(deviceName);
|
||||
// if (null != remove) {
|
||||
// if (remove.hasParent()) {
|
||||
// List<VertxTcpClient> childClients = clientMap.values().stream().filter(cl -> cl.hasParent() && cl.getParentName()
|
||||
// .equals(remove.getParentName())).collect(Collectors.toList());
|
||||
// childClients.forEach(child -> offlineDev(child.getDeviceName()));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
|
@ -0,0 +1 @@
|
|||
cc.iotkit.comp.TcpDeviceComponent
|
|
@ -0,0 +1,6 @@
|
|||
this.payloadParser = function (parser) {
|
||||
parser.delimited("\r\n")
|
||||
.handler(function(buffer){
|
||||
parser.result(buffer.toString("UTF-8")).complete();
|
||||
});
|
||||
}
|
|
@ -18,6 +18,7 @@
|
|||
<module>iot-emqx-component</module>
|
||||
<module>iot-component-base</module>
|
||||
<module>iot-http-biz-component</module>
|
||||
<module>iot-component-tcp</module>
|
||||
<!-- <module>iot-ctwing-component</module>-->
|
||||
</modules>
|
||||
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package cc.iotkit.ruleengine.action.tcp;
|
||||
|
||||
import cc.iotkit.model.device.message.ThingModelMessage;
|
||||
import cc.iotkit.ruleengine.action.Action;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-12-14
|
||||
*/
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
public class TcpAction implements Action<TcpService> {
|
||||
public static final String TYPE = "tcp";
|
||||
|
||||
|
||||
private List<TcpService> services;
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<String> execute(ThingModelMessage msg) {
|
||||
List<String> results = new ArrayList<>();
|
||||
for (TcpService service : services) {
|
||||
results.add(service.execute(msg));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
package cc.iotkit.ruleengine.action.tcp;
|
||||
|
||||
import cc.iotkit.common.utils.FIUtil;
|
||||
import cc.iotkit.model.device.message.ThingModelMessage;
|
||||
import cc.iotkit.ruleengine.action.ScriptService;
|
||||
import cc.iotkit.ruleengine.link.LinkFactory;
|
||||
import cc.iotkit.ruleengine.link.LinkService;
|
||||
import cc.iotkit.ruleengine.link.impl.TcpClientLink;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-12-14
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Slf4j
|
||||
@Data
|
||||
public class TcpService extends ScriptService implements LinkService {
|
||||
private String host;
|
||||
private int port;
|
||||
|
||||
public String execute(ThingModelMessage msg) {
|
||||
//执行转换脚本
|
||||
Map result = execScript(msg);
|
||||
if (result == null) {
|
||||
log.warn("execScript result is null");
|
||||
return "execScript result is null";
|
||||
}
|
||||
boolean initResult = LinkFactory.initLink(getKey(), TcpClientLink.LINK_TYPE, getLinkConf());
|
||||
|
||||
AtomicReference<String> data = new AtomicReference<>("");
|
||||
FIUtil.isTotF(initResult).handler(
|
||||
() -> LinkFactory.sendMsg(getKey(), result, data::set),
|
||||
() -> data.set("创建连接失败!")
|
||||
);
|
||||
return data.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return String.format("tcp_%s_%d", host, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLinkType() {
|
||||
return TcpClientLink.LINK_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getLinkConf() {
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put(TcpClientLink.HOST, host);
|
||||
config.put(TcpClientLink.PORT, port);
|
||||
return config;
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package cc.iotkit.ruleengine.link;
|
|||
import cc.iotkit.common.utils.FIUtil;
|
||||
import cc.iotkit.ruleengine.link.impl.KafkaLink;
|
||||
import cc.iotkit.ruleengine.link.impl.MqttClientLink;
|
||||
import cc.iotkit.ruleengine.link.impl.TcpClientLink;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
@ -133,6 +134,9 @@ public class LinkFactory {
|
|||
case KafkaLink.LINK_TYPE:
|
||||
link = new KafkaLink();
|
||||
break;
|
||||
case TcpClientLink.LINK_TYPE:
|
||||
link = new TcpClientLink();
|
||||
break;
|
||||
}
|
||||
if (link != null) {
|
||||
success = link.open(conf);
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
package cc.iotkit.ruleengine.link.impl;
|
||||
|
||||
import cc.iotkit.common.utils.FIUtil;
|
||||
import cc.iotkit.ruleengine.link.BaseSinkLink;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetClient;
|
||||
import io.vertx.core.net.NetClientOptions;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-12-14
|
||||
*/
|
||||
@Slf4j
|
||||
public class TcpClientLink implements BaseSinkLink {
|
||||
public static final String LINK_TYPE = "tcp";
|
||||
public static final String HOST = "host";
|
||||
public static final String PORT = "port";
|
||||
public static final String PAYLOAD = "payload";
|
||||
|
||||
private Consumer<Void> closeHandler;
|
||||
private NetClient client;
|
||||
private NetSocket socket;
|
||||
private String host;
|
||||
private int port;
|
||||
private boolean connecting;
|
||||
private boolean normal;
|
||||
|
||||
@Override
|
||||
public boolean open(Map<String, Object> config) {
|
||||
try {
|
||||
AtomicReference<Vertx> vertx = new AtomicReference<>();
|
||||
FIUtil.isTotF(Vertx.currentContext() == null).handler(
|
||||
() -> vertx.set(Vertx.vertx()),
|
||||
() -> vertx.set(Vertx.currentContext().owner())
|
||||
);
|
||||
NetClientOptions options = new NetClientOptions().setConnectTimeout(10000);
|
||||
client = vertx.get().createNetClient(options);
|
||||
port = (int) config.get(PORT);
|
||||
host = (String) config.get(HOST);
|
||||
connecting = true;
|
||||
client.connect(port, host, res -> {
|
||||
connecting = false;
|
||||
if (res.succeeded()) {
|
||||
log.info("连接成功:{}, {}", port,host);
|
||||
socket = res.result();
|
||||
normal = true;
|
||||
socket.closeHandler(Void -> normal = false);
|
||||
} else {
|
||||
closeHandler.accept(null);
|
||||
log.info("连接失败:{}, {}", port,host);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
connecting = false;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Map<String, Object> msg, Consumer<String> consumer) {
|
||||
FIUtil.isTotF(normal).handler(
|
||||
() -> {
|
||||
Future<Void> publish = socket.write(Buffer.buffer(msg.get(PAYLOAD).toString()));
|
||||
try {
|
||||
publish.toCompletionStage().toCompletableFuture().get(300L, TimeUnit.MILLISECONDS);
|
||||
FIUtil.isTotF(publish.succeeded()).handler(
|
||||
() -> consumer.accept(String.format("tcp,发送成功:%s", msg.get(PAYLOAD).toString())),
|
||||
() -> consumer.accept(String.format("tcp,发送失败:%s", msg.get(PAYLOAD).toString()))
|
||||
);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
consumer.accept(String.format("tcp,发送异常:%s", msg.get(PAYLOAD).toString()));
|
||||
}
|
||||
},
|
||||
() -> {
|
||||
consumer.accept("tcp,连接断开,发送失败");
|
||||
if (!connecting) {
|
||||
log.info("tcp重连!");
|
||||
connecting = true;
|
||||
client.connect(port, host, res -> {
|
||||
connecting = false;
|
||||
if (res.succeeded()) {
|
||||
log.info("连接成功:{}, {}", port,host);
|
||||
socket = res.result();
|
||||
normal = true;
|
||||
socket.closeHandler(Void -> normal = false);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
socket.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
closeHandler.accept(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeHandler(Consumer<Void> consumer) {
|
||||
this.closeHandler = consumer;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,8 @@ import cc.iotkit.ruleengine.action.kafka.KafkaAction;
|
|||
import cc.iotkit.ruleengine.action.kafka.KafkaService;
|
||||
import cc.iotkit.ruleengine.action.mqtt.MqttAction;
|
||||
import cc.iotkit.ruleengine.action.mqtt.MqttService;
|
||||
import cc.iotkit.ruleengine.action.tcp.TcpAction;
|
||||
import cc.iotkit.ruleengine.action.tcp.TcpService;
|
||||
import cc.iotkit.ruleengine.config.RuleConfiguration;
|
||||
import cc.iotkit.ruleengine.filter.DeviceFilter;
|
||||
import cc.iotkit.ruleengine.filter.Filter;
|
||||
|
@ -165,6 +167,13 @@ public class RuleManager {
|
|||
service.initLink(ruleId);
|
||||
}
|
||||
return kafkaAction;
|
||||
} else if (TcpAction.TYPE.equals(type)) {
|
||||
TcpAction tcpAction = parse(config, TcpAction.class);
|
||||
for (TcpService service : tcpAction.getServices()) {
|
||||
service.setDeviceInfoData(deviceInfoData);
|
||||
service.initLink(ruleId);
|
||||
}
|
||||
return tcpAction;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue