fix:tcp启动问题修复

master
xiwa 2023-10-29 12:55:55 +08:00
parent 9550c091eb
commit 1ddaa6045d
9 changed files with 174 additions and 209 deletions

View File

@ -18,6 +18,8 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/** /**
* @author sjg * @author sjg
@ -66,15 +68,20 @@ public class HttpPlugin implements PluginCloseListener {
@Override @Override
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
try { try {
httpVerticle.stop(); log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId());
Future<Void> future = vertx.undeploy(deployedId); if (deployedId != null) {
future.onSuccess(unused -> log.info("http plugin stopped success")); CountDownLatch wait = new CountDownLatch(1);
if (closeType == PluginCloseType.UNINSTALL) { Future<Void> future = vertx.undeploy(deployedId);
log.info("http plugin UNINSTALL{}", pluginInfo.getPluginId()); future.onSuccess(unused -> {
} else if (closeType == PluginCloseType.STOP) { log.info("tcp plugin stopped success");
log.info("http plugin STOP{}", pluginInfo.getPluginId()); wait.countDown();
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { });
log.info("http plugin UPGRADE_UNINSTALL{}", pluginInfo.getPluginId()); future.onFailure(h -> {
log.info("tcp plugin stopped failed");
h.printStackTrace();
wait.countDown();
});
wait.await(5, TimeUnit.SECONDS);
} }
} catch (Throwable e) { } catch (Throwable e) {
log.error("http plugin stop error", e); log.error("http plugin stop error", e);

View File

@ -78,8 +78,7 @@ public class HttpVerticle extends AbstractVerticle implements Handler<RoutingCon
} }
@Override @Override
public void stop() throws Exception { public void stop() {
httpServer.close((r) -> log.info("http server close result:{}", r.succeeded()));
} }
@Override @Override

View File

@ -122,14 +122,8 @@ public class ModbusPlugin implements PluginCloseListener {
@Override @Override
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
try { try {
log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId());
master.disconnect(); master.disconnect();
if (closeType == PluginCloseType.UNINSTALL) {
log.info("modbus plugin UNINSTALL{}", pluginInfo.getPluginId());
} else if (closeType == PluginCloseType.STOP) {
log.info("modbus plugin STOP{}", pluginInfo.getPluginId());
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) {
log.info("modbus plugin UPGRADE_UNINSTALL{}", pluginInfo.getPluginId());
}
} catch (Throwable e) { } catch (Throwable e) {
log.error("modbus plugin stop error", e); log.error("modbus plugin stop error", e);
} }

View File

@ -18,6 +18,8 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/** /**
* @author sjg * @author sjg
@ -65,15 +67,20 @@ public class MqttPlugin implements PluginCloseListener, IPlugin {
@Override @Override
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
try { try {
mqttVerticle.stop(); log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId());
Future<Void> future = vertx.undeploy(deployedId); if (deployedId != null) {
future.onSuccess(unused -> log.info("mqtt plugin stopped success")); CountDownLatch wait = new CountDownLatch(1);
if (closeType == PluginCloseType.UNINSTALL) { Future<Void> future = vertx.undeploy(deployedId);
log.info("mqtt plugin UNINSTALL{}", pluginInfo.getPluginId()); future.onSuccess(unused -> {
} else if (closeType == PluginCloseType.STOP) { log.info("tcp plugin stopped success");
log.info("mqtt plugin STOP{}", pluginInfo.getPluginId()); wait.countDown();
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { });
log.info("mqtt plugin UPGRADE_UNINSTALL{}", pluginInfo.getPluginId()); future.onFailure(h -> {
log.info("tcp plugin stopped failed");
h.printStackTrace();
wait.countDown();
});
wait.await(5, TimeUnit.SECONDS);
} }
} catch (Throwable e) { } catch (Throwable e) {
log.error("mqtt plugin stop error", e); log.error("mqtt plugin stop error", e);

View File

@ -404,7 +404,6 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
); );
DEVICE_ONLINE.clear(); DEVICE_ONLINE.clear();
} }
mqttServer.close(voidAsyncResult -> log.info("close mqtt server..."));
} }
public void publish(String deviceName, String topic, String msg) { public void publish(String deviceName, String topic, String msg) {

View File

@ -23,10 +23,6 @@ import java.io.IOException;
@Builder @Builder
public class DataPackage { public class DataPackage {
public static final short FLAG = 0x8D;
public static final int HEAD_LEN = 2 + 6 + 2 + 2 + 4;
public static final short CODE_REGISTER = 0x10; public static final short CODE_REGISTER = 0x10;
public static final short CODE_REGISTER_REPLY = 0x11; public static final short CODE_REGISTER_REPLY = 0x11;
public static final short CODE_HEARTBEAT = 0x20; public static final short CODE_HEARTBEAT = 0x20;

View File

@ -11,6 +11,7 @@ import com.gitee.starblues.core.PluginCloseType;
import com.gitee.starblues.core.PluginInfo; import com.gitee.starblues.core.PluginInfo;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.GenericApplicationContext; import org.springframework.context.support.GenericApplicationContext;
@ -18,6 +19,8 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/** /**
* tcp * tcp
@ -57,25 +60,30 @@ public class TcpPlugin implements PluginCloseListener, IPlugin {
deployedId = s; deployedId = s;
log.info("tcp plugin started success"); log.info("tcp plugin started success");
})); }));
future.onFailure((e) -> { future.onFailure(Throwable::printStackTrace);
log.error("tcp plugin startup failed", e);
});
} catch (Throwable e) { } catch (Throwable e) {
log.error("tcp plugin startup error", e); e.printStackTrace();
} }
} }
@SneakyThrows
@Override @Override
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
tcpServerVerticle.stop(); log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId());
Future<Void> future = vertx.undeploy(deployedId); if (deployedId != null) {
future.onSuccess(unused -> log.info("tcp plugin stopped success")); CountDownLatch wait = new CountDownLatch(1);
if (closeType == PluginCloseType.UNINSTALL) { Future<Void> future = vertx.undeploy(deployedId);
log.info("tcp plugin UNINSTALL{}", pluginInfo.getPluginId()); future.onSuccess(unused -> {
} else if (closeType == PluginCloseType.STOP) { log.info("tcp plugin stopped success");
log.info("tcp plugin STOP{}", pluginInfo.getPluginId()); wait.countDown();
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { });
log.info("tcp plugin UPGRADE_UNINSTALL{}", pluginInfo.getPluginId()); future.onFailure(h -> {
log.info("tcp plugin stopped failed");
h.printStackTrace();
wait.countDown();
});
wait.await(5, TimeUnit.SECONDS);
} }
} }

View File

@ -49,8 +49,6 @@ public class TcpServerVerticle extends AbstractVerticle {
@Setter @Setter
private TcpServerConfig config; private TcpServerConfig config;
private VertxTcpServer tcpServer;
private final Map<String, VertxTcpClient> clientMap = new ConcurrentHashMap<>(); private final Map<String, VertxTcpClient> clientMap = new ConcurrentHashMap<>();
private final Map<String, String> dnToPk = new HashMap<>(); private final Map<String, String> dnToPk = new HashMap<>();
@ -60,7 +58,7 @@ public class TcpServerVerticle extends AbstractVerticle {
@Setter @Setter
private long keepAliveTimeout = Duration.ofSeconds(30).toMillis(); private long keepAliveTimeout = Duration.ofSeconds(30).toMillis();
private Collection<NetServer> tcpServers; private NetServer netServer;
@Getter @Getter
private IScriptEngine scriptEngine; private IScriptEngine scriptEngine;
@ -78,18 +76,18 @@ public class TcpServerVerticle extends AbstractVerticle {
@Override @Override
public void start() { public void start() {
tcpServer = new VertxTcpServer();
try { try {
initConfig(); initConfig();
initTcpServer(); initTcpServer();
} catch (Exception e) { } catch (Exception e) {
log.error("init tcp server failed", e); log.info("init tcp server failed");
e.printStackTrace();
} }
} }
@Override @Override
public void stop() { public void stop() {
tcpServer.shutdown(); log.info("tcp server stopped");
} }
/** /**
@ -104,29 +102,18 @@ public class TcpServerVerticle extends AbstractVerticle {
/** /**
* TCP * TCP
*/ */
private void initTcpServer() { private void initTcpServer() throws Exception {
int instance = Math.max(2, config.getInstance()); netServer = vertx.createNetServer(
List<NetServer> instances = new ArrayList<>(instance); new NetServerOptions().setHost(config.getHost())
for (int i = 0; i < instance; i++) { .setPort(config.getPort()));
instances.add(vertx.createNetServer( netServer.connectHandler(this::acceptTcpConnection);
new NetServerOptions().setHost(config.getHost()) netServer.listen(config.createSocketAddress(), result -> {
.setPort(config.getPort()) if (result.succeeded()) {
)); log.info("tcp server startup on {}", result.result().actualPort());
} } else {
// 根据解析类型配置数据解析器 result.cause().printStackTrace();
tcpServer.setServer(instances); }
// 针对JVM做的多路复用优化 });
// 多个server listen同一个端口每个client连接的时候vertx会分配
// 一个connection只能在一个server中处理
for (NetServer netServer : instances) {
netServer.listen(config.createSocketAddress(), result -> {
if (result.succeeded()) {
log.info("tcp server startup on {}", result.result().actualPort());
} else {
log.error("tcp server startup error", result.cause());
}
});
}
} }
public void sendMsg(String addr, Buffer msg) { public void sendMsg(String addr, Buffer msg) {
@ -164,142 +151,110 @@ public class TcpServerVerticle extends AbstractVerticle {
}); });
} }
class VertxTcpServer { /**
* TCP
/** *
* NetServerconnectHandler * @param socket socket
* */
* @param servers NetServer protected void acceptTcpConnection(NetSocket socket) {
*/ // 客户端连接处理
public void setServer(Collection<NetServer> servers) { String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress();
if (tcpServers != null && !tcpServers.isEmpty()) { VertxTcpClient client = new VertxTcpClient(clientId);
shutdown(); client.setKeepAliveTimeoutMs(keepAliveTimeout);
} try {
tcpServers = servers; // TCP异常和关闭处理
for (NetServer tcpServer : tcpServers) { socket.exceptionHandler(Throwable::printStackTrace).closeHandler(nil -> {
tcpServer.connectHandler(this::acceptTcpConnection); log.debug("tcp server client [{}] closed", socket.remoteAddress());
}
}
/**
* TCP
*
* @param socket socket
*/
protected void acceptTcpConnection(NetSocket socket) {
// 客户端连接处理
String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress();
VertxTcpClient client = new VertxTcpClient(clientId);
client.setKeepAliveTimeoutMs(keepAliveTimeout);
try {
// TCP异常和关闭处理
socket.exceptionHandler(err -> log.error("tcp server client [{}] error", socket.remoteAddress(), err)).closeHandler(nil -> {
log.debug("tcp server client [{}] closed", socket.remoteAddress());
client.shutdown();
});
// 这个地方是在TCP服务初始化的时候设置的 parserSupplier
client.setKeepAliveTimeoutMs(keepAliveTimeout);
client.setSocket(socket);
RecordParser parser = DataReader.getParser(buffer -> {
try {
DataPackage data = DataDecoder.decode(buffer);
String addr = data.getAddr();
int code = data.getCode();
if (code == DataPackage.CODE_REGISTER) {
clientMap.put(addr, client);
//设备注册
String pk = dnToPk.put(addr, new String(data.getPayload()));
ActionResult rst = thingService.post(pluginInfo.getPluginId(), DeviceRegister.builder()
.id(IdUtil.simpleUUID())
.productKey(pk)
.deviceName(addr)
.time(System.currentTimeMillis())
.build());
if (rst.getCode() == 0) {
//回复注册成功给客户端
sendMsg(addr, DataEncoder.encode(
DataPackage.builder()
.addr(addr)
.code(DataPackage.CODE_REGISTER_REPLY)
.mid(data.getMid())
.payload(Buffer.buffer().appendInt(0).getBytes())
.build()
));
}
return;
}
if (code == DataPackage.CODE_HEARTBEAT) {
//心跳
online(addr);
heartbeatDevice.put(addr, System.currentTimeMillis());
return;
}
if (code == DataPackage.CODE_DATA_UP) {
//设备数据上报
online(addr);
//数据上报也作为心跳
heartbeatDevice.put(addr, System.currentTimeMillis());
//这里可以直接解码,或调用脚本解码(用脚本解码不用修改程序重新打包)
Map<String, Object> rst = scriptEngine.invokeMethod(new TypeReference<>() {
}, "decode", data);
if (rst == null) {
return;
}
//属性上报
thingService.post(pluginInfo.getPluginId(), PropertyReport.builder()
.id(IdUtil.simpleUUID())
.productKey(dnToPk.get(addr))
.deviceName(addr)
.params(rst)
.time(System.currentTimeMillis())
.build());
}
//未注册断开连接
if (!clientMap.containsKey(data.getAddr())) {
socket.close();
}
} catch (Exception e) {
log.error("handler error", e);
}
});
client.setParser(parser);
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
} catch (Exception e) {
log.error("create tcp server client error", e);
client.shutdown(); client.shutdown();
} });
} // 这个地方是在TCP服务初始化的时候设置的 parserSupplier
client.setKeepAliveTimeoutMs(keepAliveTimeout);
private void online(String addr) { client.setSocket(socket);
if (!heartbeatDevice.containsKey(addr)) { RecordParser parser = DataReader.getParser(buffer -> {
//第一次心跳,上线
thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder()
.id(IdUtil.simpleUUID())
.productKey(dnToPk.get(addr))
.deviceName(addr)
.state(DeviceState.ONLINE)
.time(System.currentTimeMillis())
.build());
}
}
public void shutdown() {
if (tcpServers == null) {
return;
}
for (NetServer tcpServer : tcpServers) {
try { try {
tcpServer.close(); DataPackage data = DataDecoder.decode(buffer);
} catch (Exception e) { String addr = data.getAddr();
log.warn("close tcp server error", e); int code = data.getCode();
} if (code == DataPackage.CODE_REGISTER) {
} clientMap.put(addr, client);
tcpServers = null; //设备注册
} String pk = dnToPk.put(addr, new String(data.getPayload()));
ActionResult rst = thingService.post(pluginInfo.getPluginId(), DeviceRegister.builder()
.id(IdUtil.simpleUUID())
.productKey(pk)
.deviceName(addr)
.time(System.currentTimeMillis())
.build());
if (rst.getCode() == 0) {
//回复注册成功给客户端
sendMsg(addr, DataEncoder.encode(
DataPackage.builder()
.addr(addr)
.code(DataPackage.CODE_REGISTER_REPLY)
.mid(data.getMid())
.payload(Buffer.buffer().appendInt(0).getBytes())
.build()
));
}
return;
}
if (code == DataPackage.CODE_HEARTBEAT) {
//心跳
online(addr);
heartbeatDevice.put(addr, System.currentTimeMillis());
return;
}
if (code == DataPackage.CODE_DATA_UP) {
//设备数据上报
online(addr);
//数据上报也作为心跳
heartbeatDevice.put(addr, System.currentTimeMillis());
//这里可以直接解码,或调用脚本解码(用脚本解码不用修改程序重新打包)
Map<String, Object> rst = scriptEngine.invokeMethod(new TypeReference<>() {
}, "decode", data);
if (rst == null) {
return;
}
//属性上报
thingService.post(pluginInfo.getPluginId(), PropertyReport.builder()
.id(IdUtil.simpleUUID())
.productKey(dnToPk.get(addr))
.deviceName(addr)
.params(rst)
.time(System.currentTimeMillis())
.build());
}
//未注册断开连接
if (!clientMap.containsKey(data.getAddr())) {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
});
client.setParser(parser);
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
} catch (Exception e) {
e.printStackTrace();
client.shutdown();
}
} }
private void online(String addr) {
if (!heartbeatDevice.containsKey(addr)) {
//第一次心跳,上线
thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder()
.id(IdUtil.simpleUUID())
.productKey(dnToPk.get(addr))
.deviceName(addr)
.state(DeviceState.ONLINE)
.time(System.currentTimeMillis())
.build());
}
}
} }

View File

@ -55,7 +55,7 @@ public class TcpClientVerticle extends AbstractVerticle {
netClient = vertx.createNetClient(options); netClient = vertx.createNetClient(options);
RecordParser parser = DataReader.getParser(this::handle); RecordParser parser = DataReader.getParser(this::handle);
netClient.connect(6884, "127.0.0.1", result -> { netClient.connect(6883, "127.0.0.1", result -> {
if (result.succeeded()) { if (result.succeeded()) {
log.debug("connect tcp success"); log.debug("connect tcp success");
socket = result.result(); socket = result.result();