1.tcp网络组件

2.组件停止,jar仍然被占用解决
V0.5.x
huangwenlong 2022-12-14 19:04:28 +08:00
parent c1071b71ff
commit fabdb00c4b
17 changed files with 451 additions and 37 deletions

View File

@ -71,4 +71,14 @@ public class ComponentClassLoader {
return componentClass.getDeclaredConstructor().newInstance();
}
public static void closeClassLoader(String name) {
try {
URLClassLoader classLoader = classLoaders.get(name);
if (classLoader != null){
classLoader.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@ -71,6 +71,7 @@ public class BizComponentManager {
try {
componentInstance = ComponentClassLoader.getComponent(component.getId(), file);
} catch (Throwable e) {
ComponentClassLoader.closeClassLoader(component.getId());
throw new BizException("get component instance error");
}
try {

View File

@ -10,6 +10,7 @@
package cc.iotkit.comps;
import cc.iotkit.common.ComponentClassLoader;
import cc.iotkit.model.protocol.ProtocolComponent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -38,6 +39,8 @@ public class ComponentManager {
public void deRegister(String id) {
bizComponentManager.deRegister(id);
deviceComponentManager.deRegister(id);
// 手动卸载jar应用避免重新上传jar被占用
ComponentClassLoader.closeClassLoader(id);
}
public void start(String id) {

View File

@ -22,11 +22,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hswebframework</groupId>
<artifactId>hsweb-expands-script</artifactId>
<version>${hsweb.expands.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>

View File

@ -19,6 +19,7 @@ public class TcpDeviceComponent extends AbstractDeviceComponent {
private AbstractDeviceComponent tcpVerticle;
@Override
public void create(CompConfig config) {
Map maps = JsonUtil.parse(config.getOther(), Map.class);
String type = maps.get("type").toString();
@ -30,11 +31,18 @@ public class TcpDeviceComponent extends AbstractDeviceComponent {
tcpVerticle.create(config);
}
@Override
public String getId() {
return tcpVerticle.getId();
}
@Override
public void start() {
tcpVerticle.setHandler(getHandler());
tcpVerticle.start();
}
@Override
public void stop() {
tcpVerticle.stop();
@ -51,6 +59,7 @@ public class TcpDeviceComponent extends AbstractDeviceComponent {
}
@Override
public DeviceMessage send(DeviceMessage message) {
return tcpVerticle.send(message);

View File

@ -16,7 +16,6 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class TcpClientDeviceComponent extends AbstractDeviceComponent {
private Vertx vertx;
private TcpClientVerticle tcpClientVerticle;
private String deployedId;

View File

@ -7,6 +7,7 @@ import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -32,8 +33,12 @@ public class TcpClientVerticle extends AbstractVerticle {
private VertxTcpClient tcpClient;
private NetClient netClient;
private Map<String, ClientDevice> deviceMap = new ConcurrentHashMap();
private boolean stopAction = false;
public TcpClientVerticle(TcpClinetConfig config) {
this.config = config;
}
@ -52,6 +57,7 @@ public class TcpClientVerticle extends AbstractVerticle {
@Override
public void stop() {
if (null != tcpClient) {
stopAction = true;
tcpClient.shutdown();
}
}
@ -74,7 +80,7 @@ public class TcpClientVerticle extends AbstractVerticle {
if (config.getOptions() == null) {
NetClientOptions options = new NetClientOptions();
options.setReconnectAttempts(Integer.MAX_VALUE);
options.setReconnectInterval(60000L);
options.setReconnectInterval(20000L);
config.setOptions(options);
}
if (config.isSsl()) {
@ -83,8 +89,7 @@ public class TcpClientVerticle extends AbstractVerticle {
}
private void initClient() {
NetClient netClient = vertx.createNetClient(config.getOptions());
tcpClient.setClient(netClient);
netClient = vertx.createNetClient(config.getOptions());
tcpClient.setKeepAliveTimeoutMs(Duration.ofMinutes(10).toMillis());
tcpClient.onDisconnect(() -> {
// 所有设备都离线
@ -93,15 +98,8 @@ public class TcpClientVerticle extends AbstractVerticle {
executor.onReceive(null, "disconnect", deviceName);
}
});
netClient.connect(config.getPort(), config.getHost(), result -> {
if (result.succeeded()) {
log.debug("connect tcp [{}:{}] success", config.getHost(), config.getPort());
tcpClient.setRecordParser(ParserStrategyBuilder.build(config.getParserType(), config.getParserConfiguration()));
tcpClient.setSocket(result.result());
} else {
log.error("connect tcp [{}:{}] error", config.getHost(), config.getPort(), result.cause());
}
});
// 连接
toConnection();
// 设置收到消息处理
tcpClient.setReceiveHandler(buffer -> {
try {
@ -121,6 +119,34 @@ public class TcpClientVerticle extends AbstractVerticle {
});
}
public void toConnection() {
netClient.connect(config.getPort(), config.getHost(), result -> {
if (result.succeeded()) {
log.debug("connect tcp [{}:{}] success", config.getHost(), config.getPort());
tcpClient.setRecordParser(ParserStrategyBuilder.build(config.getParserType(), config.getParserConfiguration()));
NetSocket socket = result.result();
tcpClient.setSocket(socket);
socket.closeHandler((nil) -> {
tcpClient.shutdown();
// 重连自动断开重连,收到停止组件不重连
try {
if (!stopAction) {
Thread.sleep(5000L);
toConnection();
}else{
netClient.close();
netClient = null;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} else {
log.error("connect tcp [{}:{}] error", config.getHost(), config.getPort(), result.cause());
}
});
}
@Data
@AllArgsConstructor
@NoArgsConstructor

View File

@ -34,7 +34,6 @@ public class VertxTcpClient {
// 是否是服务端的连接客户端
private final boolean serverClient;
volatile PayloadParser payloadParser;
public volatile NetClient client;
public NetSocket socket;
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private IMessageHandler executor;
@ -125,10 +124,6 @@ public class VertxTcpClient {
public void shutdown() {
log.debug("tcp client [{}] disconnect", getId());
synchronized (this) {
if (null != client) {
execute(client::close);
client = null;
}
if (null != socket) {
execute(socket::close);
this.socket = null;
@ -163,15 +158,6 @@ public class VertxTcpClient {
log.warn("close tcp client error", e);
}
}
public void setClient(NetClient client) {
if (this.client != null && this.client != client) {
this.client.close();
}
keepAlive();
this.client = client;
}
/**
*
*/

View File

@ -10,7 +10,7 @@ import reactor.core.publisher.Flux;
public interface PayloadParser {
PayloadParser init(Object param);
PayloadParser init(Object param) throws Exception;
void handle(Buffer buffer);

View File

@ -1,31 +1,73 @@
package cc.iotkit.comp.tcp.parser;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import jdk.nashorn.api.scripting.NashornScriptEngine;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import javax.script.ScriptEngineManager;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
/**
*
*
* @copy jetLink
* @author huangwenl
* @date 2022-10-13
*/
/**
* <pre>
* PipePayloadParser parser = new PipePayloadParser();
* parser.fixed(4)
* .handler(buffer -> {
* int len = BytesUtils.highBytes2Int(buffer.getBytes());
* parser.fixed(len);
* })
* .handler(buffer -> parser.result(buffer.toString("UTF-8")).complete());
* </pre>
*/
@Slf4j
public class ScriptPayloadParser implements PayloadParser {
EmitterProcessor<Buffer> processor = EmitterProcessor.create(false);
private final NashornScriptEngine engine = (NashornScriptEngine) (new ScriptEngineManager())
.getEngineByName("nashorn");
@SneakyThrows
@Override
public PayloadParser init(Object param) {
String script = (String) param;
;
ScriptObjectMirror scriptObject = (ScriptObjectMirror) engine.eval("new (function(){" + script + "})()");
//执行转换脚本
engine.invokeMethod(scriptObject, "payloadParser", this);
return this;
}
@Override
public void handle(Buffer buffer) {
processor.onNext(buffer);
public synchronized void handle(Buffer buffer) {
if (recordParser == null && directMapper == null) {
log.error("record parser not init");
return;
}
if (recordParser != null) {
recordParser.handle(buffer);
return;
}
Buffer buf = directMapper.apply(buffer);
if (null != buf) {
sink.next(buf);
}
}
@Override
@ -33,9 +75,114 @@ public class ScriptPayloadParser implements PayloadParser {
return processor.map(Function.identity());
}
@Override
public void reset() {
this.result.clear();
complete();
}
@Override
public void close() {
processor.onComplete();
currentPipe.set(0);
this.result.clear();
}
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(true);
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final List<Consumer<Buffer>> pipe = new CopyOnWriteArrayList<>();
private final List<Buffer> result = new CopyOnWriteArrayList<>();
private volatile RecordParser recordParser;
private Function<Buffer, Buffer> directMapper;
private Consumer<RecordParser> firstInit;
private final AtomicInteger currentPipe = new AtomicInteger();
public Buffer newBuffer() {
return Buffer.buffer();
}
public ScriptPayloadParser result(String buffer) {
return result(Buffer.buffer(buffer));
}
public ScriptPayloadParser result(byte[] buffer) {
return result(Buffer.buffer(buffer));
}
public ScriptPayloadParser handler(Consumer<Buffer> handler) {
pipe.add(handler);
return this;
}
public ScriptPayloadParser delimited(String delimited) {
if (recordParser == null) {
setParser(RecordParser.newDelimited(delimited));
firstInit = (parser -> parser.delimitedMode(delimited));
return this;
}
recordParser.delimitedMode(delimited);
return this;
}
public ScriptPayloadParser fixed(int size) {
if (size == 0) {
complete();
return this;
}
if (recordParser == null) {
setParser(RecordParser.newFixed(size));
firstInit = (parser -> parser.fixedSizeMode(size));
return this;
}
recordParser.fixedSizeMode(size);
return this;
}
public ScriptPayloadParser direct(Function<Buffer, Buffer> mapper) {
this.directMapper = mapper;
return this;
}
private Consumer<Buffer> getNextHandler() {
int i = currentPipe.getAndIncrement();
if (i < pipe.size()) {
return pipe.get(i);
}
currentPipe.set(0);
return pipe.get(0);
}
private void setParser(RecordParser parser) {
this.recordParser = parser;
this.recordParser.handler(buffer -> getNextHandler().accept(buffer));
}
public ScriptPayloadParser complete() {
currentPipe.set(0);
if (recordParser != null) {
firstInit.accept(recordParser);
}
if (!this.result.isEmpty()) {
Buffer buffer = Buffer.buffer();
for (Buffer buf : this.result) {
buffer.appendBuffer(buf);
}
this.result.clear();
sink.next(buffer);
}
return this;
}
public ScriptPayloadParser result(Buffer buffer) {
this.result.add(buffer);
return this;
}
}

View File

@ -21,6 +21,7 @@ public class TcpServerDeviceComponent extends AbstractDeviceComponent {
private TcpServerVerticle tcpServerVerticle;
private String deployedId;
@Override
public void create(CompConfig config) {
super.create(config);
vertx = Vertx.vertx();

View File

@ -0,0 +1,4 @@
parser.delimited("\r\n")
.handler(function(buffer){
parser.result(parser.newBuffer().toString("UTF-8")).complete();
});

View File

@ -0,0 +1,39 @@
package cc.iotkit.ruleengine.action.tcp;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.ruleengine.action.Action;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* @author huangwenl
* @date 2022-12-14
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TcpAction implements Action<TcpService> {
public static final String TYPE = "tcp";
private List<TcpService> services;
@Override
public String getType() {
return TYPE;
}
@Override
public List<String> execute(ThingModelMessage msg) {
List<String> results = new ArrayList<>();
for (TcpService service : services) {
results.add(service.execute(msg));
}
return results;
}
}

View File

@ -0,0 +1,62 @@
package cc.iotkit.ruleengine.action.tcp;
import cc.iotkit.common.utils.FIUtil;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.ruleengine.action.ScriptService;
import cc.iotkit.ruleengine.link.LinkFactory;
import cc.iotkit.ruleengine.link.LinkService;
import cc.iotkit.ruleengine.link.impl.TcpClientLink;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author huangwenl
* @date 2022-12-14
*/
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Data
public class TcpService extends ScriptService implements LinkService {
private String host;
private int port;
public String execute(ThingModelMessage msg) {
//执行转换脚本
Map result = execScript(msg);
if (result == null) {
log.warn("execScript result is null");
return "execScript result is null";
}
boolean initResult = LinkFactory.initLink(getKey(), TcpClientLink.LINK_TYPE, getLinkConf());
AtomicReference<String> data = new AtomicReference<>("");
FIUtil.isTotF(initResult).handler(
() -> LinkFactory.sendMsg(getKey(), result, data::set),
() -> data.set("创建连接失败!")
);
return data.get();
}
@Override
public String getKey() {
return String.format("tcp_%s_%d", host, port);
}
@Override
public String getLinkType() {
return TcpClientLink.LINK_TYPE;
}
@Override
public Map<String, Object> getLinkConf() {
Map<String, Object> config = new HashMap<>();
config.put(TcpClientLink.HOST, host);
config.put(TcpClientLink.PORT, port);
return config;
}
}

View File

@ -3,6 +3,7 @@ package cc.iotkit.ruleengine.link;
import cc.iotkit.common.utils.FIUtil;
import cc.iotkit.ruleengine.link.impl.KafkaLink;
import cc.iotkit.ruleengine.link.impl.MqttClientLink;
import cc.iotkit.ruleengine.link.impl.TcpClientLink;
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
@ -133,6 +134,9 @@ public class LinkFactory {
case KafkaLink.LINK_TYPE:
link = new KafkaLink();
break;
case TcpClientLink.LINK_TYPE:
link = new TcpClientLink();
break;
}
if (link != null) {
success = link.open(conf);

View File

@ -0,0 +1,119 @@
package cc.iotkit.ruleengine.link.impl;
import cc.iotkit.common.utils.FIUtil;
import cc.iotkit.ruleengine.link.BaseSinkLink;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* @author huangwenl
* @date 2022-12-14
*/
@Slf4j
public class TcpClientLink implements BaseSinkLink {
public static final String LINK_TYPE = "tcp";
public static final String HOST = "host";
public static final String PORT = "port";
public static final String PAYLOAD = "payload";
private Consumer<Void> closeHandler;
private NetClient client;
private NetSocket socket;
private String host;
private int port;
private boolean connecting;
private boolean normal;
@Override
public boolean open(Map<String, Object> config) {
try {
AtomicReference<Vertx> vertx = new AtomicReference<>();
FIUtil.isTotF(Vertx.currentContext() == null).handler(
() -> vertx.set(Vertx.vertx()),
() -> vertx.set(Vertx.currentContext().owner())
);
NetClientOptions options = new NetClientOptions().setConnectTimeout(10000);
client = vertx.get().createNetClient(options);
port = (int) config.get(PORT);
host = (String) config.get(HOST);
connecting = true;
client.connect(port, host, res -> {
connecting = false;
if (res.succeeded()) {
log.info("连接成功:{}, {}", port,host);
socket = res.result();
normal = true;
socket.closeHandler(Void -> normal = false);
} else {
closeHandler.accept(null);
log.info("连接失败:{}, {}", port,host);
}
});
} catch (Exception e) {
e.printStackTrace();
connecting = false;
return false;
}
return true;
}
@Override
public void send(Map<String, Object> msg, Consumer<String> consumer) {
FIUtil.isTotF(normal).handler(
() -> {
Future<Void> publish = socket.write(Buffer.buffer(msg.get(PAYLOAD).toString()));
try {
publish.toCompletionStage().toCompletableFuture().get(300L, TimeUnit.MILLISECONDS);
FIUtil.isTotF(publish.succeeded()).handler(
() -> consumer.accept(String.format("tcp,发送成功:%s", msg.get(PAYLOAD).toString())),
() -> consumer.accept(String.format("tcp,发送失败:%s", msg.get(PAYLOAD).toString()))
);
} catch (Exception e) {
e.printStackTrace();
consumer.accept(String.format("tcp,发送异常:%s", msg.get(PAYLOAD).toString()));
}
},
() -> {
consumer.accept("tcp,连接断开,发送失败");
if (!connecting) {
log.info("tcp重连");
connecting = true;
client.connect(port, host, res -> {
connecting = false;
if (res.succeeded()) {
log.info("连接成功:{}, {}", port,host);
socket = res.result();
normal = true;
socket.closeHandler(Void -> normal = false);
}
});
}
});
}
@Override
public void close() {
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
closeHandler.accept(null);
}
}
@Override
public void closeHandler(Consumer<Void> consumer) {
this.closeHandler = consumer;
}
}

View File

@ -20,6 +20,8 @@ import cc.iotkit.ruleengine.action.kafka.KafkaAction;
import cc.iotkit.ruleengine.action.kafka.KafkaService;
import cc.iotkit.ruleengine.action.mqtt.MqttAction;
import cc.iotkit.ruleengine.action.mqtt.MqttService;
import cc.iotkit.ruleengine.action.tcp.TcpAction;
import cc.iotkit.ruleengine.action.tcp.TcpService;
import cc.iotkit.ruleengine.config.RuleConfiguration;
import cc.iotkit.ruleengine.filter.DeviceFilter;
import cc.iotkit.ruleengine.filter.Filter;
@ -165,6 +167,13 @@ public class RuleManager {
service.initLink(ruleId);
}
return kafkaAction;
} else if (TcpAction.TYPE.equals(type)) {
TcpAction tcpAction = parse(config, TcpAction.class);
for (TcpService service : tcpAction.getServices()) {
service.setDeviceInfoData(deviceInfoData);
service.initLink(ruleId);
}
return tcpAction;
}
return null;
}