diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/HttpConfig.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/HttpConfig.java index e4cb217..f0108db 100755 --- a/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/HttpConfig.java +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/conf/HttpConfig.java @@ -13,6 +13,11 @@ import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; +/** + * http配置 + * + * @author sjg + */ @Data @Component @ConfigurationProperties(prefix = "http") diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java index 7faa65b..0b9942a 100755 --- a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/HttpPlugin.java @@ -18,7 +18,6 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Map; -import java.util.concurrent.CountDownLatch; /** * @author sjg @@ -39,7 +38,6 @@ public class HttpPlugin implements PluginCloseListener { private IPluginConfig pluginConfig; private Vertx vertx; - private CountDownLatch countDownLatch; private String deployedId; @PostConstruct @@ -52,20 +50,16 @@ public class HttpPlugin implements PluginCloseListener { BeanUtil.copyProperties(config, httpConfig, CopyOptions.create().ignoreNullValue()); httpVerticle.setConfig(httpConfig); - countDownLatch = new CountDownLatch(1); Future future = vertx.deployVerticle(httpVerticle); future.onSuccess((s -> { deployedId = s; - countDownLatch.countDown(); + log.info("http plugin startup success"); })); future.onFailure((e) -> { - countDownLatch.countDown(); - log.error("start http plugin failed", e); + log.error("http plugin startup failed", e); }); - countDownLatch.await(); - future.succeeded(); } catch (Throwable e) { - log.error("start http plugin error.", e); + log.error("http plugin startup error", e); } } @@ -74,16 +68,16 @@ public class HttpPlugin implements PluginCloseListener { try { httpVerticle.stop(); Future future = vertx.undeploy(deployedId); - future.onSuccess(unused -> log.info("stop http plugin success")); + future.onSuccess(unused -> log.info("http plugin stopped success")); if (closeType == PluginCloseType.UNINSTALL) { - log.info("插件被卸载了:{}", pluginInfo.getPluginId()); + log.info("http plugin UNINSTALL:{}", pluginInfo.getPluginId()); } else if (closeType == PluginCloseType.STOP) { - log.info("插件被关闭了:{}", pluginInfo.getPluginId()); + log.info("http plugin STOP:{}", pluginInfo.getPluginId()); } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { - log.info("插件被升级卸载了:{}", pluginInfo.getPluginId()); + log.info("http plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId()); } } catch (Throwable e) { - log.error("stop http plugin error.", e); + log.error("http plugin stop error", e); } } diff --git a/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/service/ModbusPlugin.java b/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/service/ModbusPlugin.java index 5700d63..f87f9e7 100755 --- a/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/service/ModbusPlugin.java +++ b/modbus-plugin/src/main/java/cc/iotkit/plugins/modbus/service/ModbusPlugin.java @@ -124,14 +124,14 @@ public class ModbusPlugin implements PluginCloseListener { try { master.disconnect(); if (closeType == PluginCloseType.UNINSTALL) { - log.info("插件被卸载了:{}", pluginInfo.getPluginId()); + log.info("modbus plugin UNINSTALL:{}", pluginInfo.getPluginId()); } else if (closeType == PluginCloseType.STOP) { - log.info("插件被关闭了:{}", pluginInfo.getPluginId()); + log.info("modbus plugin STOP:{}", pluginInfo.getPluginId()); } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { - log.info("插件被升级卸载了:{}", pluginInfo.getPluginId()); + log.info("modbus plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId()); } } catch (Throwable e) { - log.error("stop mqtt plugin error.", e); + log.error("modbus plugin stop error", e); } } diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java index 829d108..28ca096 100755 --- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttPlugin.java @@ -1,6 +1,5 @@ package cc.iotkit.plugins.mqtt.service; -import cc.iotkit.common.utils.JsonUtils; import cc.iotkit.plugin.core.IPlugin; import cc.iotkit.plugin.core.IPluginConfig; import cc.iotkit.plugins.mqtt.conf.MqttConfig; @@ -19,7 +18,6 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Map; -import java.util.concurrent.CountDownLatch; /** * @author sjg @@ -40,7 +38,6 @@ public class MqttPlugin implements PluginCloseListener, IPlugin { private IPluginConfig pluginConfig; private Vertx vertx; - private CountDownLatch countDownLatch; private String deployedId; @PostConstruct @@ -49,23 +46,19 @@ public class MqttPlugin implements PluginCloseListener, IPlugin { try { //获取插件最新配置替换当前配置 Map config = pluginConfig.getConfig(pluginInfo.getPluginId()); - BeanUtil.copyProperties(config,mqttConfig, CopyOptions.create().ignoreNullValue()); + BeanUtil.copyProperties(config, mqttConfig, CopyOptions.create().ignoreNullValue()); mqttVerticle.setConfig(mqttConfig); - countDownLatch = new CountDownLatch(1); Future future = vertx.deployVerticle(mqttVerticle); future.onSuccess((s -> { deployedId = s; - countDownLatch.countDown(); + log.info("mqtt plugin started success"); })); future.onFailure((e) -> { - countDownLatch.countDown(); - log.error("start mqtt plugin failed", e); + log.error("mqtt plugin startup failed", e); }); - countDownLatch.await(); - future.succeeded(); } catch (Throwable e) { - log.error("start mqtt plugin error.", e); + log.error("mqtt plugin startup error", e); } } @@ -74,16 +67,16 @@ public class MqttPlugin implements PluginCloseListener, IPlugin { try { mqttVerticle.stop(); Future future = vertx.undeploy(deployedId); - future.onSuccess(unused -> log.info("stop mqtt plugin success")); + future.onSuccess(unused -> log.info("mqtt plugin stopped success")); if (closeType == PluginCloseType.UNINSTALL) { - log.info("插件被卸载了:{}", pluginInfo.getPluginId()); + log.info("mqtt plugin UNINSTALL:{}", pluginInfo.getPluginId()); } else if (closeType == PluginCloseType.STOP) { - log.info("插件被关闭了:{}", pluginInfo.getPluginId()); + log.info("mqtt plugin STOP:{}", pluginInfo.getPluginId()); } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { - log.info("插件被升级卸载了:{}", pluginInfo.getPluginId()); + log.info("mqtt plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId()); } } catch (Throwable e) { - log.error("stop mqtt plugin error.", e); + log.error("mqtt plugin stop error", e); } } diff --git a/mqtt-plugin/src/main/resources/config.json b/mqtt-plugin/src/main/resources/config.json new file mode 100644 index 0000000..a029bb3 --- /dev/null +++ b/mqtt-plugin/src/main/resources/config.json @@ -0,0 +1,9 @@ +[ + { + "id": "port", + "name": "端口", + "type": "number", + "value": 1883, + "desc": "mqtt端口,默认为1883" + } +] \ No newline at end of file diff --git a/pom.xml b/pom.xml index c989671..4d276e5 100755 --- a/pom.xml +++ b/pom.xml @@ -7,6 +7,7 @@ mqtt-plugin http-plugin modbus-plugin + tcp-plugin diff --git a/tcp-plugin/pom.xml b/tcp-plugin/pom.xml new file mode 100644 index 0000000..5befca2 --- /dev/null +++ b/tcp-plugin/pom.xml @@ -0,0 +1,78 @@ + + + + iot-iita-plugins + cc.iotkit.plugins + 1.0.0 + + 4.0.0 + + tcp-plugin + + + + + io.vertx + vertx-core + ${vertx.version} + + + + org.slf4j + slf4j-api + + + + + + + dev + + true + + + dev + + + + + prod + + prod + + + + + + + + com.gitee.starblues + spring-brick-maven-packager + + ${plugin.build.mode} + + tcp-plugin + cc.iotkit.plugins.tcp.Application + ${project.version} + iita + tcp示例插件 + application.yml + + + jar + + + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java new file mode 100755 index 0000000..75fbef2 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/Application.java @@ -0,0 +1,19 @@ +package cc.iotkit.plugins.tcp; + +import com.gitee.starblues.bootstrap.SpringPluginBootstrap; +import com.gitee.starblues.bootstrap.annotation.OneselfConfig; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +/** + * @author sjg + */ +@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.tcp"}) +@OneselfConfig(mainConfigFileName = {"application.yml"}) +@EnableConfigurationProperties +public class Application extends SpringPluginBootstrap { + + public static void main(String[] args) { + new Application().run(Application.class, args); + } +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/cilent/VertxTcpClient.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/cilent/VertxTcpClient.java new file mode 100644 index 0000000..285b365 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/cilent/VertxTcpClient.java @@ -0,0 +1,95 @@ +package cc.iotkit.plugins.tcp.cilent; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Hex; + +import java.time.Duration; + +/** + * @author huangwenl + * @date 2022-10-13 + */ +@Slf4j +public class VertxTcpClient { + @Getter + private String id; + public NetSocket socket; + @Setter + private long keepAliveTimeoutMs = Duration.ofSeconds(30).toMillis(); + private volatile long lastKeepAliveTime = System.currentTimeMillis(); + + @Setter + private RecordParser parser; + + public VertxTcpClient(String id) { + this.id = id; + } + + public void keepAlive() { + lastKeepAliveTime = System.currentTimeMillis(); + } + + public boolean isOnline() { + return System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs; + } + + public void setSocket(NetSocket socket) { + synchronized (this) { + if (this.socket != null && this.socket != socket) { + this.socket.close(); + } + + this.socket = socket + .closeHandler(v -> shutdown()) + .handler(buffer -> { + if (log.isDebugEnabled()) { + log.debug("handle tcp client[{}] payload:[{}]", + socket.remoteAddress(), + Hex.encodeHexString(buffer.getBytes())); + } + keepAlive(); + parser.handle(buffer); + if (this.socket != socket) { + log.warn("tcp client [{}] memory leak ", socket.remoteAddress()); + socket.close(); + } + }); + } + } + + public void shutdown() { + log.debug("tcp client [{}] disconnect", getId()); + synchronized (this) { + if (null != socket) { + execute(socket::close); + this.socket = null; + } + } + } + + public void sendMessage(Buffer buffer) { + log.info("wirte data:{}", buffer.toString()); + socket.write(buffer, r -> { + keepAlive(); + if (r.succeeded()) { + log.info("client msg send success:{}", buffer.toString()); + } else { + log.error("client msg send failed", r.cause()); + } + }); + } + + private void execute(Runnable runnable) { + try { + runnable.run(); + } catch (Exception e) { + log.warn("close tcp client error", e); + } + } + +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/BeanConfig.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/BeanConfig.java new file mode 100755 index 0000000..61353f4 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/BeanConfig.java @@ -0,0 +1,35 @@ +package cc.iotkit.plugins.tcp.conf; + +import cc.iotkit.plugin.core.IPluginConfig; +import cc.iotkit.plugin.core.IPluginScript; +import cc.iotkit.plugin.core.LocalPluginConfig; +import cc.iotkit.plugin.core.LocalPluginScript; +import cc.iotkit.plugin.core.thing.IThingService; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +/** + * @author sjg + */ +@Component +public class BeanConfig { + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IThingService getThingService() { + return new FakeThingService(); + } + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IPluginScript getPluginScript() { + return new LocalPluginScript("test.js"); + } + + @Bean + @ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev") + IPluginConfig getPluginConfig(){ + return new LocalPluginConfig(); + } +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/FakeThingService.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/FakeThingService.java new file mode 100755 index 0000000..6885e3f --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/FakeThingService.java @@ -0,0 +1,47 @@ +package cc.iotkit.plugins.tcp.conf; + +import cc.iotkit.model.device.DeviceInfo; +import cc.iotkit.model.product.Product; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.IDeviceAction; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; + +/** + * 测试服务 + * + * @author sjg + */ +@Slf4j +public class FakeThingService implements IThingService { + + @Override + public ActionResult post(String pluginId, IDeviceAction action) { + log.info("post action:{}", action); + return ActionResult.builder().code(0).build(); + } + + @Override + public Product getProduct(String pk) { + return Product.builder() + .productKey("cGCrkK7Ex4FESAwe") + .productSecret("aaaaaaaa") + .build(); + } + + @Override + public DeviceInfo getDevice(String dn) { + return DeviceInfo.builder() + .productKey("cGCrkK7Ex4FESAwe") + .deviceName(dn) + .build(); + } + + @Override + public Map getProperty(String dn) { + return new HashMap<>(0); + } +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/TcpServerConfig.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/TcpServerConfig.java new file mode 100644 index 0000000..0bf07ab --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/conf/TcpServerConfig.java @@ -0,0 +1,34 @@ +package cc.iotkit.plugins.tcp.conf; + +import io.vertx.core.net.SocketAddress; +import lombok.Data; +import org.apache.commons.lang3.StringUtils; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @author huangwenl + * @date 2022-10-13 + */ +@Data +@Component +@ConfigurationProperties(prefix = "tcp") +public class TcpServerConfig { + + private String host; + + private int port; + + /** + * 服务实例数量(线程数) + */ + private int instance = Runtime.getRuntime().availableProcessors(); + + public SocketAddress createSocketAddress() { + if (StringUtils.isEmpty(host)) { + host = "localhost"; + } + return SocketAddress.inetSocketAddress(port, host); + } + +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataDecoder.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataDecoder.java new file mode 100644 index 0000000..59fd31d --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataDecoder.java @@ -0,0 +1,23 @@ +package cc.iotkit.plugins.tcp.parser; + +import io.vertx.core.buffer.Buffer; +import lombok.extern.slf4j.Slf4j; + +/** + * 数据解码 + * + * @author sjg + */ +@Slf4j +public class DataDecoder { + + public static DataPackage decode(Buffer buffer) { + DataPackage data = new DataPackage(); + data.setAddr(buffer.getBuffer(0, 6).toString()); + data.setCode(buffer.getShort(6)); + data.setMid(buffer.getShort(8)); + data.setPayload(buffer.getBytes(10, buffer.length())); + return data; + } + +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataEncoder.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataEncoder.java new file mode 100644 index 0000000..88d2dd1 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataEncoder.java @@ -0,0 +1,24 @@ +package cc.iotkit.plugins.tcp.parser; + +import io.vertx.core.buffer.Buffer; +import lombok.extern.slf4j.Slf4j; + +/** + * 数据编码 + * + * @author sjg + */ +@Slf4j +public class DataEncoder { + + public static Buffer encode(DataPackage data) { + Buffer buffer = Buffer.buffer(); + //设备地址(6byte) + 功能码(2byte) + 消息序号(2byte) + 包体(不定长度) + buffer.appendInt(6+2+2+data.getPayload().length); + buffer.appendBytes(data.getAddr().getBytes()); + buffer.appendShort(data.getCode()); + buffer.appendShort(data.getMid()); + buffer.appendBytes(data.getPayload()); + return buffer; + } +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataPackage.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataPackage.java new file mode 100644 index 0000000..0201dc6 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataPackage.java @@ -0,0 +1,66 @@ +package cc.iotkit.plugins.tcp.parser; + +import cn.hutool.core.util.HexUtil; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.vertx.core.buffer.Buffer; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.IOException; + +/** + * 数据包 + * + * @author sjg + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class DataPackage { + + public static final short FLAG = 0x8D; + + public static final int HEAD_LEN = 2 + 6 + 2 + 2 + 4; + + public static final short CODE_REGISTER = 0x10; + public static final short CODE_REGISTER_REPLY = 0x11; + public static final short CODE_HEARTBEAT = 0x20; + public static final short CODE_DATA_UP = 0x30; + public static final short CODE_DATA_DOWN = 0x40; + + /** + * 设备地址 + */ + private String addr; + + /** + * 功能码 + */ + private short code; + + /** + * 消息序号 + */ + private short mid; + + /** + * 包体数据 + */ + @JsonSerialize(using = BufferSerializer.class) + private byte[] payload; + + + public static class BufferSerializer extends JsonSerializer { + + @Override + public void serialize(byte[] value, JsonGenerator jgen, SerializerProvider provider) throws IOException { + jgen.writeString(HexUtil.encodeHexStr(value)); + } + } +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataReader.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataReader.java new file mode 100644 index 0000000..9961863 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/parser/DataReader.java @@ -0,0 +1,44 @@ +package cc.iotkit.plugins.tcp.parser; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.parsetools.RecordParser; + +import java.util.function.Consumer; + +/** + * 数据包读取器 + * + * @author sjg + */ +public class DataReader { + + public static RecordParser getParser(Consumer receiveHandler) { + RecordParser parser = RecordParser.newFixed(4); + // 设置处理器 + parser.setOutput(new Handler<>() { + // 表示当前数据长度 + int size = -1; + + @Override + public void handle(Buffer buffer) { + //-1表示当前还没有长度信息,需要从收到的数据中取出长度 + if (-1 == size) { + //取出长度 + size = buffer.getInt(0); + //动态修改长度 + parser.fixedSizeMode(size); + } else { + //如果size != -1, 说明已经接受到长度信息了,接下来的数据就是protobuf可识别的字节数组 + byte[] buf = buffer.getBytes(); + receiveHandler.accept(Buffer.buffer(buf)); + //处理完后要将长度改回 + parser.fixedSizeMode(4); + //重置size变量 + size = -1; + } + } + }); + return parser; + } +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpDevice.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpDevice.java new file mode 100755 index 0000000..604e805 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpDevice.java @@ -0,0 +1,87 @@ +package cc.iotkit.plugins.tcp.server; + +import cc.iotkit.common.enums.ErrCode; +import cc.iotkit.common.exception.BizException; +import cc.iotkit.plugin.core.thing.IDevice; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig; +import cc.iotkit.plugin.core.thing.actions.down.PropertyGet; +import cc.iotkit.plugin.core.thing.actions.down.PropertySet; +import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke; +import cc.iotkit.plugins.tcp.parser.DataEncoder; +import cc.iotkit.plugins.tcp.parser.DataPackage; +import cc.iotkit.script.IScriptEngine; +import com.fasterxml.jackson.core.type.TypeReference; +import io.vertx.core.buffer.Buffer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * tcp设备下行接口 + * + * @author sjg + */ +@Service +public class TcpDevice implements IDevice { + + @Autowired + private TcpServerVerticle tcpServerVerticle; + + private final AtomicInteger atMid = new AtomicInteger(0); + + @Override + public ActionResult config(DeviceConfig action) { + return ActionResult.builder().code(0).reason("").build(); + } + + @Override + public ActionResult propertyGet(PropertyGet action) { + throw new UnsupportedOperationException(); + } + + @Override + public ActionResult propertySet(PropertySet action) { + IScriptEngine scriptEngine = tcpServerVerticle.getScriptEngine(); + //使用转换脚本转换参数部分内容 + String payload = scriptEngine.invokeMethod(new TypeReference<>() { + }, "encode", action.getParams()); + + if (payload == null) { + return ActionResult.builder().code(ErrCode.MSG_CONVERT_ERROR.getKey()).build(); + } + + if (atMid.compareAndSet(Short.MAX_VALUE / 2 - 1, 0)) { + atMid.set(0); + } + byte[] bytes = payload.getBytes(); + + //构造数据包 + DataPackage data = DataPackage.builder() + .addr(action.getDeviceName()) + .code(DataPackage.CODE_DATA_DOWN) + .mid((short) atMid.getAndIncrement()) + .payload(bytes) + .build(); + + return send(action.getDeviceName(), DataEncoder.encode(data)); + } + + @Override + public ActionResult serviceInvoke(ServiceInvoke action) { + throw new UnsupportedOperationException(); + } + + private ActionResult send(String deviceName, Buffer msg) { + try { + tcpServerVerticle.sendMsg(deviceName, msg); + return ActionResult.builder().code(0).reason("").build(); + } catch (BizException e) { + return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build(); + } catch (Exception e) { + return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build(); + } + } + +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpPlugin.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpPlugin.java new file mode 100755 index 0000000..53a7728 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpPlugin.java @@ -0,0 +1,90 @@ +package cc.iotkit.plugins.tcp.server; + +import cc.iotkit.plugin.core.IPlugin; +import cc.iotkit.plugin.core.IPluginConfig; +import cc.iotkit.plugins.tcp.conf.TcpServerConfig; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.bean.copier.CopyOptions; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; +import com.gitee.starblues.bootstrap.realize.PluginCloseListener; +import com.gitee.starblues.core.PluginCloseType; +import com.gitee.starblues.core.PluginInfo; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.Map; + +/** + * tcp插件 + * + * @author sjg + */ +@Slf4j +@Service +public class TcpPlugin implements PluginCloseListener, IPlugin { + + @Autowired + private PluginInfo pluginInfo; + @Autowired + private TcpServerVerticle tcpServerVerticle; + + @Autowired + private TcpServerConfig tcpConfig; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IPluginConfig pluginConfig; + + private Vertx vertx; + private String deployedId; + + @PostConstruct + public void init() { + vertx = Vertx.vertx(); + try { + //获取插件最新配置替换当前配置 + Map config = pluginConfig.getConfig(pluginInfo.getPluginId()); + BeanUtil.copyProperties(config, tcpConfig, CopyOptions.create().ignoreNullValue()); + tcpServerVerticle.setConfig(tcpConfig); + + Future future = vertx.deployVerticle(tcpServerVerticle); + future.onSuccess((s -> { + deployedId = s; + log.info("tcp plugin started success"); + })); + future.onFailure((e) -> { + log.error("tcp plugin startup failed", e); + }); + } catch (Throwable e) { + log.error("tcp plugin startup error", e); + } + } + + @Override + public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) { + try { + tcpServerVerticle.stop(); + Future future = vertx.undeploy(deployedId); + future.onSuccess(unused -> log.info("tcp plugin stopped success")); + if (closeType == PluginCloseType.UNINSTALL) { + log.info("tcp plugin UNINSTALL:{}", pluginInfo.getPluginId()); + } else if (closeType == PluginCloseType.STOP) { + log.info("tcp plugin STOP:{}", pluginInfo.getPluginId()); + } else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) { + log.info("tcp plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId()); + } + } catch (Throwable e) { + log.error("tcp plugin stop error", e); + } + } + + @Override + public Map getLinkInfo(String pk, String dn) { + return null; + } +} diff --git a/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java new file mode 100644 index 0000000..02c9cc3 --- /dev/null +++ b/tcp-plugin/src/main/java/cc/iotkit/plugins/tcp/server/TcpServerVerticle.java @@ -0,0 +1,303 @@ +package cc.iotkit.plugins.tcp.server; + + +import cc.iotkit.plugin.core.IPluginScript; +import cc.iotkit.plugin.core.thing.IThingService; +import cc.iotkit.plugin.core.thing.actions.ActionResult; +import cc.iotkit.plugin.core.thing.actions.DeviceState; +import cc.iotkit.plugin.core.thing.actions.up.DeviceRegister; +import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange; +import cc.iotkit.plugin.core.thing.actions.up.PropertyReport; +import cc.iotkit.plugins.tcp.cilent.VertxTcpClient; +import cc.iotkit.plugins.tcp.conf.TcpServerConfig; +import cc.iotkit.plugins.tcp.parser.DataDecoder; +import cc.iotkit.plugins.tcp.parser.DataEncoder; +import cc.iotkit.plugins.tcp.parser.DataPackage; +import cc.iotkit.plugins.tcp.parser.DataReader; +import cc.iotkit.script.IScriptEngine; +import cn.hutool.core.util.IdUtil; +import com.fasterxml.jackson.core.type.TypeReference; +import com.gitee.starblues.bootstrap.annotation.AutowiredType; +import com.gitee.starblues.core.PluginInfo; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author huangwenl + * @date 2022-10-13 + */ +@Slf4j +@Service +public class TcpServerVerticle extends AbstractVerticle { + + @Getter + @Setter + private TcpServerConfig config; + + private VertxTcpServer tcpServer; + + private final Map clientMap = new ConcurrentHashMap<>(); + + private final Map dnToPk = new HashMap<>(); + + private final Map heartbeatDevice = new HashMap<>(); + + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + private ScheduledThreadPoolExecutor offlineCheckExecutor; + + @Setter + private long keepAliveTimeout = Duration.ofSeconds(30).toMillis(); + + private Collection tcpServers; + + @Getter + private IScriptEngine scriptEngine; + + @Autowired + private PluginInfo pluginInfo; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IPluginScript pluginScript; + + @Autowired + @AutowiredType(AutowiredType.Type.MAIN_PLUGIN) + private IThingService thingService; + + @Override + public void start() { + tcpServer = new VertxTcpServer(); + initConfig(); + initTcpServer(); + } + + @Override + public void stop() { + tcpServer.shutdown(); + scheduledThreadPoolExecutor.shutdown(); + offlineCheckExecutor.shutdown(); + } + + /** + * 创建配置文件 + */ + public void initConfig() { + //获取脚本引擎 + scriptEngine = pluginScript.getScriptEngine(pluginInfo.getPluginId()); + } + + + /** + * 初始TCP服务 + */ + private void initTcpServer() { + int instance = Math.max(2, config.getInstance()); + List instances = new ArrayList<>(instance); + for (int i = 0; i < instance; i++) { + instances.add(vertx.createNetServer( + new NetServerOptions().setHost(config.getHost()) + .setPort(config.getPort()) + )); + } + // 根据解析类型配置数据解析器 + tcpServer.setServer(instances); + // 针对JVM做的多路复用优化 + // 多个server listen同一个端口,每个client连接的时候vertx会分配 + // 一个connection只能在一个server中处理 + for (NetServer netServer : instances) { + netServer.listen(config.createSocketAddress(), result -> { + if (result.succeeded()) { + log.info("tcp server startup on {}", result.result().actualPort()); + } else { + log.error("tcp server startup error", result.cause()); + } + }); + } + } + + public void sendMsg(String addr, Buffer msg) { + VertxTcpClient tcpClient = clientMap.get(addr); + if (tcpClient != null) { + tcpClient.sendMessage(msg); + } + } + + @Scheduled(fixedRate = 40, timeUnit = TimeUnit.SECONDS) + private void offlineCheckTask() { + log.info("keepClientTask"); + Set clients = new HashSet<>(clientMap.keySet()); + for (String key : clients) { + VertxTcpClient client = clientMap.get(key); + if (!client.isOnline()) { + client.shutdown(); + } + } + + heartbeatDevice.keySet().iterator().forEachRemaining(addr -> { + Long time = heartbeatDevice.get(addr); + //心跳超时,判定为离线 + if (System.currentTimeMillis() - time > keepAliveTimeout * 2) { + heartbeatDevice.remove(addr); + //离线上报 + thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder() + .id(IdUtil.simpleUUID()) + .productKey(dnToPk.get(addr)) + .deviceName(addr) + .state(DeviceState.OFFLINE) + .time(System.currentTimeMillis()) + .build()); + } + }); + } + + class VertxTcpServer { + + /** + * 为每个NetServer添加connectHandler + * + * @param servers 创建的所有NetServer + */ + public void setServer(Collection servers) { + if (tcpServers != null && !tcpServers.isEmpty()) { + shutdown(); + } + tcpServers = servers; + for (NetServer tcpServer : tcpServers) { + tcpServer.connectHandler(this::acceptTcpConnection); + } + } + + /** + * TCP连接处理逻辑 + * + * @param socket socket + */ + protected void acceptTcpConnection(NetSocket socket) { + // 客户端连接处理 + String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress(); + VertxTcpClient client = new VertxTcpClient(clientId); + client.setKeepAliveTimeoutMs(keepAliveTimeout); + try { + // TCP异常和关闭处理 + socket.exceptionHandler(err -> log.error("tcp server client [{}] error", socket.remoteAddress(), err)).closeHandler(nil -> { + log.debug("tcp server client [{}] closed", socket.remoteAddress()); + client.shutdown(); + }); + // 这个地方是在TCP服务初始化的时候设置的 parserSupplier + client.setKeepAliveTimeoutMs(keepAliveTimeout); + client.setSocket(socket); + RecordParser parser = DataReader.getParser(buffer -> { + try { + DataPackage data = DataDecoder.decode(buffer); + String addr = data.getAddr(); + int code = data.getCode(); + if (code == 0x10) { + clientMap.put(addr, client); + //设备注册 + String pk = dnToPk.put(addr, new String(data.getPayload())); + ActionResult rst = thingService.post(pluginInfo.getPluginId(), DeviceRegister.builder() + .id(IdUtil.simpleUUID()) + .productKey(pk) + .deviceName(addr) + .time(System.currentTimeMillis()) + .build()); + if (rst.getCode() == 0) { + //回复注册成功 + sendMsg(addr, DataEncoder.encode( + DataPackage.builder() + .addr(addr) + .code(DataPackage.CODE_REGISTER_REPLY) + .mid(data.getMid()) + .payload(Buffer.buffer().appendInt(0).getBytes()) + .build() + )); + } + return; + } + + if (code == 0x20) { + //心跳 + if (!heartbeatDevice.containsKey(addr)) { + //第一次心跳,上线 + thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder() + .id(IdUtil.simpleUUID()) + .productKey(dnToPk.get(addr)) + .deviceName(addr) + .state(DeviceState.ONLINE) + .time(System.currentTimeMillis()) + .build()); + } + heartbeatDevice.put(addr, System.currentTimeMillis()); + return; + } + + if (code == 0x30) { + //设备数据上报 + //数据上报也作为心跳 + heartbeatDevice.put(addr, System.currentTimeMillis()); + //调用脚本解码 + Map rst = scriptEngine.invokeMethod(new TypeReference<>() { + }, "decode", data); + if (rst == null) { + return; + } + //属性上报 + thingService.post(pluginInfo.getPluginId(), PropertyReport.builder() + .id(IdUtil.simpleUUID()) + .productKey(dnToPk.get(addr)) + .deviceName(addr) + .params(rst) + .time(System.currentTimeMillis()) + .build()); + } + + //未注册断开连接 + if (!clientMap.containsKey(data.getAddr())) { + socket.close(); + } + + } catch (Exception e) { + log.error("handler error", e); + } + }); + client.setParser(parser); + log.debug("accept tcp client [{}] connection", socket.remoteAddress()); + } catch (Exception e) { + log.error("create tcp server client error", e); + client.shutdown(); + } + } + + public void shutdown() { + if (tcpServers == null) { + return; + } + for (NetServer tcpServer : tcpServers) { + try { + tcpServer.close(); + } catch (Exception e) { + log.warn("close tcp server error", e); + } + } + tcpServers = null; + } + + } +} diff --git a/tcp-plugin/src/main/resources/application.yml b/tcp-plugin/src/main/resources/application.yml new file mode 100755 index 0000000..1c19ef5 --- /dev/null +++ b/tcp-plugin/src/main/resources/application.yml @@ -0,0 +1,8 @@ +plugin: + runMode: dev +# runMode: prod + mainPackage: cc.iotkit.plugin + +tcp: + host: 127.0.0.1 + port: 6883 diff --git a/tcp-plugin/src/main/resources/config.json b/tcp-plugin/src/main/resources/config.json new file mode 100644 index 0000000..50753b8 --- /dev/null +++ b/tcp-plugin/src/main/resources/config.json @@ -0,0 +1,16 @@ +[ + { + "id": "host", + "name": "绑定ip", + "type": "text", + "value": "127.0.0.1", + "desc": "tcp绑定ip,默认为127.0.0.1" + }, + { + "id": "port", + "name": "端口", + "type": "number", + "value": 6883, + "desc": "tcp端口,默认为6883" + } +] \ No newline at end of file diff --git a/tcp-plugin/src/main/resources/test.js b/tcp-plugin/src/main/resources/test.js new file mode 100644 index 0000000..4a96abf --- /dev/null +++ b/tcp-plugin/src/main/resources/test.js @@ -0,0 +1,21 @@ +function hexToByte(hexString) { + if (hexString.length % 2 !== 0) { + throw new Error('Invalid hex string. String must have an even number of characters.'); + } + + let byteArray = []; + for (let i = 0; i < hexString.length; i += 4) { + byteArray.push(parseInt(hexString.substr(i, 4), 16)); + } + + return byteArray; +} + +this.decode=function(data){ + hex=data.payload; + const bytes=hexToByte(hex); + return { + "rssi":bytes[0], + "powerstate":bytes[1] + }; +} \ No newline at end of file diff --git a/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientTest.java b/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientTest.java new file mode 100644 index 0000000..6d35aa0 --- /dev/null +++ b/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientTest.java @@ -0,0 +1,21 @@ +package cc.iotkit.test; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TcpClientTest { + + public static void main(String[] args) { + Vertx vertx = Vertx.vertx(); + Future future = vertx.deployVerticle(new TcpClientVerticle()); + future.onSuccess((s -> { + log.info("tcp client started success"); + })); + future.onFailure((e) -> { + log.error("tcp client startup failed", e); + }); + } + +} diff --git a/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientVerticle.java b/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientVerticle.java new file mode 100644 index 0000000..c89d985 --- /dev/null +++ b/tcp-plugin/src/test/java/cc/iotkit/test/TcpClientVerticle.java @@ -0,0 +1,134 @@ +package cc.iotkit.test; + +import cc.iotkit.common.utils.ThreadUtil; +import cc.iotkit.plugins.tcp.parser.DataDecoder; +import cc.iotkit.plugins.tcp.parser.DataEncoder; +import cc.iotkit.plugins.tcp.parser.DataPackage; +import cc.iotkit.plugins.tcp.parser.DataReader; +import cn.hutool.core.util.HexUtil; +import cn.hutool.core.util.RandomUtil; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author huangwenlong + * @version 1.0 + * @date 2022/10/23 13:08 + */ +@Slf4j +public class TcpClientVerticle extends AbstractVerticle { + + private NetClient netClient; + + private NetSocket socket; + + private String addr = "060101"; + + private String pk = "cGCrkK7Ex4FESAwe"; + + private AtomicInteger atMid = new AtomicInteger(0); + + @Override + public void start() { + initClient(); + } + + @Override + public void stop() { + if (null != netClient) { + netClient.close(); + } + } + + private void initClient() { + NetClientOptions options = new NetClientOptions(); + options.setReconnectAttempts(Integer.MAX_VALUE); + options.setReconnectInterval(20000L); + netClient = vertx.createNetClient(options); + RecordParser parser = DataReader.getParser(this::handle); + + netClient.connect(6883, "127.0.0.1", result -> { + if (result.succeeded()) { + log.debug("connect tcp success"); + socket = result.result(); + socket.handler(parser); + //注册 + byte[] pkBytes = pk.getBytes(); + send(DataPackage.builder() + .addr(addr) + .code(DataPackage.CODE_REGISTER) + .mid(getMid()) + .payload(pkBytes) + .build()); + } else { + log.error("connect tcp error", result.cause()); + } + }); + } + + private short getMid() { + atMid.compareAndSet(254, 0); + return (short) atMid.getAndIncrement(); + } + + private void send(DataPackage data) { + Buffer buffer = DataEncoder.encode(data); + log.info("send data:{}", HexUtil.encodeHexStr(buffer.getBytes())); + socket.write(buffer); + } + + public void handle(Buffer buffer) { + log.info("receive server data:{}", buffer.toString()); + DataPackage data = DataDecoder.decode(buffer); + if (data.getCode() == DataPackage.CODE_REGISTER_REPLY) { + int rst = Buffer.buffer(data.getPayload()).getInt(0); + if (rst == 0) { + log.info("device:{} register success", data.getAddr()); + //定时心跳 + ThreadUtil.newScheduled(1, "heartbeat") + .scheduleWithFixedDelay(this::heartbeat, 10, 30, TimeUnit.SECONDS); + //随机上报数据 + ThreadUtil.newScheduled(1, "reportData") + .scheduleWithFixedDelay(this::reportData, 20, 3, TimeUnit.SECONDS); + } + } + } + + private void heartbeat() { + send(DataPackage.builder() + .addr(addr) + .code(DataPackage.CODE_HEARTBEAT) + .mid(getMid()) + .build()); + } + + private void reportData() { + if (RandomUtil.randomInt() % 3 == 0) { + //随机 + return; + } + send(DataPackage.builder() + .addr(addr) + .code(DataPackage.CODE_DATA_UP) + .mid(getMid()) + .payload(Buffer.buffer() + //rssi + .appendShort((short) RandomUtil.randomInt(0, 127)) + //powerstate + .appendByte((byte) (RandomUtil.randomInt() % 2 == 0 ? 1 : 0)) + .getBytes() + ) + .build()); + } + +} \ No newline at end of file