diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java index 0b9942a..824902e 100755 --- a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java @@ -18,6 +18,8 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * @author sjg @@ -66,15 +68,20 @@ public class HttpPlugin implements PluginCloseListener { @Override public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { try { - httpVerticle.stop(); - Future future = vertx.undeploy(deployedId); - future.onSuccess(unused -> log.info("http plugin stopped success")); - if (closeType == PluginCloseType.UNINSTALL) { - log.info("http plugin UNINSTALL:{}", pluginInfo.getPluginId()); - } else if (closeType == PluginCloseType.STOP) { - log.info("http plugin STOP:{}", pluginInfo.getPluginId()); - } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { - log.info("http plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId()); + log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId()); + if (deployedId != null) { + CountDownLatch wait = new CountDownLatch(1); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> { + log.info("tcp plugin stopped success"); + wait.countDown(); + }); + future.onFailure(h -> { + log.info("tcp plugin stopped failed"); + h.printStackTrace(); + wait.countDown(); + }); + wait.await(5, TimeUnit.SECONDS); } } catch (Throwable e) { log.error("http plugin stop error", e); diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpVerticle.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpVerticle.java index 3b945a3..20e5ba4 100755 --- a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpVerticle.java +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpVerticle.java @@ -78,8 +78,7 @@ public class HttpVerticle extends AbstractVerticle implements Handler log.info("http server close result:{}", r.succeeded())); + public void stop() { } @Override diff --git a/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/service/ModbusPlugin.java b/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/service/ModbusPlugin.java index f87f9e7..3f4fed9 100755 --- a/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/service/ModbusPlugin.java +++ b/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/service/ModbusPlugin.java @@ -122,14 +122,8 @@ public class ModbusPlugin implements PluginCloseListener { @Override public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { try { + log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId()); master.disconnect(); - if (closeType == PluginCloseType.UNINSTALL) { - log.info("modbus plugin UNINSTALL:{}", pluginInfo.getPluginId()); - } else if (closeType == PluginCloseType.STOP) { - log.info("modbus plugin STOP:{}", pluginInfo.getPluginId()); - } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { - log.info("modbus plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId()); - } } catch (Throwable e) { log.error("modbus plugin stop error", e); } diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java index 28ca096..c0d1bcf 100755 --- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java @@ -18,6 +18,8 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * @author sjg @@ -65,15 +67,20 @@ public class MqttPlugin implements PluginCloseListener, IPlugin { @Override public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { try { - mqttVerticle.stop(); - Future future = vertx.undeploy(deployedId); - future.onSuccess(unused -> log.info("mqtt plugin stopped success")); - if (closeType == PluginCloseType.UNINSTALL) { - log.info("mqtt plugin UNINSTALL:{}", pluginInfo.getPluginId()); - } else if (closeType == PluginCloseType.STOP) { - log.info("mqtt plugin STOP:{}", pluginInfo.getPluginId()); - } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { - log.info("mqtt plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId()); + log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId()); + if (deployedId != null) { + CountDownLatch wait = new CountDownLatch(1); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> { + log.info("tcp plugin stopped success"); + wait.countDown(); + }); + future.onFailure(h -> { + log.info("tcp plugin stopped failed"); + h.printStackTrace(); + wait.countDown(); + }); + wait.await(5, TimeUnit.SECONDS); } } catch (Throwable e) { log.error("mqtt plugin stop error", e); diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java index 31c6946..44d5b56 100755 --- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java @@ -404,7 +404,6 @@ public class MqttVerticle extends AbstractVerticle implements Handler log.info("close mqtt server...")); } public void publish(String deviceName, String topic, String msg) { diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataPackage.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataPackage.java index d079736..e749a89 100644 --- a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataPackage.java +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataPackage.java @@ -23,10 +23,6 @@ import java.io.IOException; @Builder public class DataPackage { - public static final short FLAG = 0x8D; - - public static final int HEAD_LEN = 2 + 6 + 2 + 2 + 4; - public static final short CODE_REGISTER = 0x10; public static final short CODE_REGISTER_REPLY = 0x11; public static final short CODE_HEARTBEAT = 0x20; diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpPlugin.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpPlugin.java index 60c013d..46e36a8 100755 --- a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpPlugin.java +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpPlugin.java @@ -11,6 +11,7 @@ import com.gitee.starblues.core.PluginCloseType; import com.gitee.starblues.core.PluginInfo; import io.vertx.core.Future; import io.vertx.core.Vertx; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.support.GenericApplicationContext; @@ -18,6 +19,8 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * tcp插件 @@ -57,25 +60,30 @@ public class TcpPlugin implements PluginCloseListener, IPlugin { deployedId = s; log.info("tcp plugin started success"); })); - future.onFailure((e) -> { - log.error("tcp plugin startup failed", e); - }); + future.onFailure(Throwable::printStackTrace); } catch (Throwable e) { - log.error("tcp plugin startup error", e); + e.printStackTrace(); } } + + @SneakyThrows @Override public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { - tcpServerVerticle.stop(); - Future future = vertx.undeploy(deployedId); - future.onSuccess(unused -> log.info("tcp plugin stopped success")); - if (closeType == PluginCloseType.UNINSTALL) { - log.info("tcp plugin UNINSTALL:{}", pluginInfo.getPluginId()); - } else if (closeType == PluginCloseType.STOP) { - log.info("tcp plugin STOP:{}", pluginInfo.getPluginId()); - } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { - log.info("tcp plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId()); + log.info("plugin close,type:{},pluginId:{}", closeType, pluginInfo.getPluginId()); + if (deployedId != null) { + CountDownLatch wait = new CountDownLatch(1); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> { + log.info("tcp plugin stopped success"); + wait.countDown(); + }); + future.onFailure(h -> { + log.info("tcp plugin stopped failed"); + h.printStackTrace(); + wait.countDown(); + }); + wait.await(5, TimeUnit.SECONDS); } } diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java index ae2542a..ade0819 100644 --- a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java @@ -49,8 +49,6 @@ public class TcpServerVerticle extends AbstractVerticle { @Setter private TcpServerConfig config; - private VertxTcpServer tcpServer; - private final Map clientMap = new ConcurrentHashMap<>(); private final Map dnToPk = new HashMap<>(); @@ -60,7 +58,7 @@ public class TcpServerVerticle extends AbstractVerticle { @Setter private long keepAliveTimeout = Duration.ofSeconds(30).toMillis(); - private Collection tcpServers; + private NetServer netServer; @Getter private IScriptEngine scriptEngine; @@ -78,18 +76,18 @@ public class TcpServerVerticle extends AbstractVerticle { @Override public void start() { - tcpServer = new VertxTcpServer(); try { initConfig(); initTcpServer(); } catch (Exception e) { - log.error("init tcp server failed", e); + log.info("init tcp server failed"); + e.printStackTrace(); } } @Override public void stop() { - tcpServer.shutdown(); + log.info("tcp server stopped"); } /** @@ -104,29 +102,18 @@ public class TcpServerVerticle extends AbstractVerticle { /** * 初始TCP服务 */ - private void initTcpServer() { - int instance = Math.max(2, config.getInstance()); - List instances = new ArrayList<>(instance); - for (int i = 0; i < instance; i++) { - instances.add(vertx.createNetServer( - new NetServerOptions().setHost(config.getHost()) - .setPort(config.getPort()) - )); - } - // 根据解析类型配置数据解析器 - tcpServer.setServer(instances); - // 针对JVM做的多路复用优化 - // 多个server listen同一个端口,每个client连接的时候vertx会分配 - // 一个connection只能在一个server中处理 - for (NetServer netServer : instances) { - netServer.listen(config.createSocketAddress(), result -> { - if (result.succeeded()) { - log.info("tcp server startup on {}", result.result().actualPort()); - } else { - log.error("tcp server startup error", result.cause()); - } - }); - } + private void initTcpServer() throws Exception { + netServer = vertx.createNetServer( + new NetServerOptions().setHost(config.getHost()) + .setPort(config.getPort())); + netServer.connectHandler(this::acceptTcpConnection); + netServer.listen(config.createSocketAddress(), result -> { + if (result.succeeded()) { + log.info("tcp server startup on {}", result.result().actualPort()); + } else { + result.cause().printStackTrace(); + } + }); } public void sendMsg(String addr, Buffer msg) { @@ -164,142 +151,110 @@ public class TcpServerVerticle extends AbstractVerticle { }); } - class VertxTcpServer { - - /** - * 为每个NetServer添加connectHandler - * - * @param servers 创建的所有NetServer - */ - public void setServer(Collection servers) { - if (tcpServers != null && !tcpServers.isEmpty()) { - shutdown(); - } - tcpServers = servers; - for (NetServer tcpServer : tcpServers) { - tcpServer.connectHandler(this::acceptTcpConnection); - } - } - - /** - * TCP连接处理逻辑 - * - * @param socket socket - */ - protected void acceptTcpConnection(NetSocket socket) { - // 客户端连接处理 - String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress(); - VertxTcpClient client = new VertxTcpClient(clientId); - client.setKeepAliveTimeoutMs(keepAliveTimeout); - try { - // TCP异常和关闭处理 - socket.exceptionHandler(err -> log.error("tcp server client [{}] error", socket.remoteAddress(), err)).closeHandler(nil -> { - log.debug("tcp server client [{}] closed", socket.remoteAddress()); - client.shutdown(); - }); - // 这个地方是在TCP服务初始化的时候设置的 parserSupplier - client.setKeepAliveTimeoutMs(keepAliveTimeout); - client.setSocket(socket); - RecordParser parser = DataReader.getParser(buffer -> { - try { - DataPackage data = DataDecoder.decode(buffer); - String addr = data.getAddr(); - int code = data.getCode(); - if (code == DataPackage.CODE_REGISTER) { - clientMap.put(addr, client); - //设备注册 - String pk = dnToPk.put(addr, new String(data.getPayload())); - ActionResult rst = thingService.post(pluginInfo.getPluginId(), DeviceRegister.builder() - .id(IdUtil.simpleUUID()) - .productKey(pk) - .deviceName(addr) - .time(System.currentTimeMillis()) - .build()); - if (rst.getCode() == 0) { - //回复注册成功给客户端 - sendMsg(addr, DataEncoder.encode( - DataPackage.builder() - .addr(addr) - .code(DataPackage.CODE_REGISTER_REPLY) - .mid(data.getMid()) - .payload(Buffer.buffer().appendInt(0).getBytes()) - .build() - )); - } - return; - } - - if (code == DataPackage.CODE_HEARTBEAT) { - //心跳 - online(addr); - heartbeatDevice.put(addr, System.currentTimeMillis()); - return; - } - - if (code == DataPackage.CODE_DATA_UP) { - //设备数据上报 - online(addr); - //数据上报也作为心跳 - heartbeatDevice.put(addr, System.currentTimeMillis()); - //这里可以直接解码,或调用脚本解码(用脚本解码不用修改程序重新打包) - Map rst = scriptEngine.invokeMethod(new TypeReference<>() { - }, "decode", data); - if (rst == null) { - return; - } - //属性上报 - thingService.post(pluginInfo.getPluginId(), PropertyReport.builder() - .id(IdUtil.simpleUUID()) - .productKey(dnToPk.get(addr)) - .deviceName(addr) - .params(rst) - .time(System.currentTimeMillis()) - .build()); - } - - //未注册断开连接 - if (!clientMap.containsKey(data.getAddr())) { - socket.close(); - } - - } catch (Exception e) { - log.error("handler error", e); - } - }); - client.setParser(parser); - log.debug("accept tcp client [{}] connection", socket.remoteAddress()); - } catch (Exception e) { - log.error("create tcp server client error", e); + /** + * TCP连接处理逻辑 + * + * @param socket socket + */ + protected void acceptTcpConnection(NetSocket socket) { + // 客户端连接处理 + String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress(); + VertxTcpClient client = new VertxTcpClient(clientId); + client.setKeepAliveTimeoutMs(keepAliveTimeout); + try { + // TCP异常和关闭处理 + socket.exceptionHandler(Throwable::printStackTrace).closeHandler(nil -> { + log.debug("tcp server client [{}] closed", socket.remoteAddress()); client.shutdown(); - } - } - - private void online(String addr) { - if (!heartbeatDevice.containsKey(addr)) { - //第一次心跳,上线 - thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder() - .id(IdUtil.simpleUUID()) - .productKey(dnToPk.get(addr)) - .deviceName(addr) - .state(DeviceState.ONLINE) - .time(System.currentTimeMillis()) - .build()); - } - } - - public void shutdown() { - if (tcpServers == null) { - return; - } - for (NetServer tcpServer : tcpServers) { + }); + // 这个地方是在TCP服务初始化的时候设置的 parserSupplier + client.setKeepAliveTimeoutMs(keepAliveTimeout); + client.setSocket(socket); + RecordParser parser = DataReader.getParser(buffer -> { try { - tcpServer.close(); - } catch (Exception e) { - log.warn("close tcp server error", e); - } - } - tcpServers = null; - } + DataPackage data = DataDecoder.decode(buffer); + String addr = data.getAddr(); + int code = data.getCode(); + if (code == DataPackage.CODE_REGISTER) { + clientMap.put(addr, client); + //设备注册 + String pk = dnToPk.put(addr, new String(data.getPayload())); + ActionResult rst = thingService.post(pluginInfo.getPluginId(), DeviceRegister.builder() + .id(IdUtil.simpleUUID()) + .productKey(pk) + .deviceName(addr) + .time(System.currentTimeMillis()) + .build()); + if (rst.getCode() == 0) { + //回复注册成功给客户端 + sendMsg(addr, DataEncoder.encode( + DataPackage.builder() + .addr(addr) + .code(DataPackage.CODE_REGISTER_REPLY) + .mid(data.getMid()) + .payload(Buffer.buffer().appendInt(0).getBytes()) + .build() + )); + } + return; + } + if (code == DataPackage.CODE_HEARTBEAT) { + //心跳 + online(addr); + heartbeatDevice.put(addr, System.currentTimeMillis()); + return; + } + + if (code == DataPackage.CODE_DATA_UP) { + //设备数据上报 + online(addr); + //数据上报也作为心跳 + heartbeatDevice.put(addr, System.currentTimeMillis()); + //这里可以直接解码,或调用脚本解码(用脚本解码不用修改程序重新打包) + Map rst = scriptEngine.invokeMethod(new TypeReference<>() { + }, "decode", data); + if (rst == null) { + return; + } + //属性上报 + thingService.post(pluginInfo.getPluginId(), PropertyReport.builder() + .id(IdUtil.simpleUUID()) + .productKey(dnToPk.get(addr)) + .deviceName(addr) + .params(rst) + .time(System.currentTimeMillis()) + .build()); + } + + //未注册断开连接 + if (!clientMap.containsKey(data.getAddr())) { + socket.close(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + }); + client.setParser(parser); + log.debug("accept tcp client [{}] connection", socket.remoteAddress()); + } catch (Exception e) { + e.printStackTrace(); + client.shutdown(); + } } + + private void online(String addr) { + if (!heartbeatDevice.containsKey(addr)) { + //第一次心跳,上线 + thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder() + .id(IdUtil.simpleUUID()) + .productKey(dnToPk.get(addr)) + .deviceName(addr) + .state(DeviceState.ONLINE) + .time(System.currentTimeMillis()) + .build()); + } + } + } diff --git a/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientVerticle.java b/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientVerticle.java index 78be938..f048d5c 100644 --- a/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientVerticle.java +++ b/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientVerticle.java @@ -55,7 +55,7 @@ public class TcpClientVerticle extends AbstractVerticle { netClient = vertx.createNetClient(options); RecordParser parser = DataReader.getParser(this::handle); - netClient.connect(6884, "127.0.0.1", result -> { + netClient.connect(6883, "127.0.0.1", result -> { if (result.succeeded()) { log.debug("connect tcp success"); socket = result.result();