添加tcp插件
parent
e9bb44221c
commit
d2ba881e40
|
@ -13,6 +13,11 @@ import lombok.Data;
|
|||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* http配置
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "http")
|
||||
|
|
|
@ -18,7 +18,6 @@ import org.springframework.stereotype.Service;
|
|||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* @author sjg
|
||||
|
@ -39,7 +38,6 @@ public class HttpPlugin implements PluginCloseListener {
|
|||
private IPluginConfig pluginConfig;
|
||||
|
||||
private Vertx vertx;
|
||||
private CountDownLatch countDownLatch;
|
||||
private String deployedId;
|
||||
|
||||
@PostConstruct
|
||||
|
@ -52,20 +50,16 @@ public class HttpPlugin implements PluginCloseListener {
|
|||
BeanUtil.copyProperties(config, httpConfig, CopyOptions.create().ignoreNullValue());
|
||||
httpVerticle.setConfig(httpConfig);
|
||||
|
||||
countDownLatch = new CountDownLatch(1);
|
||||
Future<String> future = vertx.deployVerticle(httpVerticle);
|
||||
future.onSuccess((s -> {
|
||||
deployedId = s;
|
||||
countDownLatch.countDown();
|
||||
log.info("http plugin startup success");
|
||||
}));
|
||||
future.onFailure((e) -> {
|
||||
countDownLatch.countDown();
|
||||
log.error("start http plugin failed", e);
|
||||
log.error("http plugin startup failed", e);
|
||||
});
|
||||
countDownLatch.await();
|
||||
future.succeeded();
|
||||
} catch (Throwable e) {
|
||||
log.error("start http plugin error.", e);
|
||||
log.error("http plugin startup error", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,16 +68,16 @@ public class HttpPlugin implements PluginCloseListener {
|
|||
try {
|
||||
httpVerticle.stop();
|
||||
Future<Void> future = vertx.undeploy(deployedId);
|
||||
future.onSuccess(unused -> log.info("stop http plugin success"));
|
||||
future.onSuccess(unused -> log.info("http plugin stopped success"));
|
||||
if (closeType == PluginCloseType.UNINSTALL) {
|
||||
log.info("插件被卸载了:{}", pluginInfo.getPluginId());
|
||||
log.info("http plugin UNINSTALL:{}", pluginInfo.getPluginId());
|
||||
} else if (closeType == PluginCloseType.STOP) {
|
||||
log.info("插件被关闭了:{}", pluginInfo.getPluginId());
|
||||
log.info("http plugin STOP:{}", pluginInfo.getPluginId());
|
||||
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) {
|
||||
log.info("插件被升级卸载了:{}", pluginInfo.getPluginId());
|
||||
log.info("http plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("stop http plugin error.", e);
|
||||
log.error("http plugin stop error", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -124,14 +124,14 @@ public class ModbusPlugin implements PluginCloseListener {
|
|||
try {
|
||||
master.disconnect();
|
||||
if (closeType == PluginCloseType.UNINSTALL) {
|
||||
log.info("插件被卸载了:{}", pluginInfo.getPluginId());
|
||||
log.info("modbus plugin UNINSTALL:{}", pluginInfo.getPluginId());
|
||||
} else if (closeType == PluginCloseType.STOP) {
|
||||
log.info("插件被关闭了:{}", pluginInfo.getPluginId());
|
||||
log.info("modbus plugin STOP:{}", pluginInfo.getPluginId());
|
||||
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) {
|
||||
log.info("插件被升级卸载了:{}", pluginInfo.getPluginId());
|
||||
log.info("modbus plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("stop mqtt plugin error.", e);
|
||||
log.error("modbus plugin stop error", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package cc.iotkit.plugins.mqtt.service;
|
||||
|
||||
import cc.iotkit.common.utils.JsonUtils;
|
||||
import cc.iotkit.plugin.core.IPlugin;
|
||||
import cc.iotkit.plugin.core.IPluginConfig;
|
||||
import cc.iotkit.plugins.mqtt.conf.MqttConfig;
|
||||
|
@ -19,7 +18,6 @@ import org.springframework.stereotype.Service;
|
|||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* @author sjg
|
||||
|
@ -40,7 +38,6 @@ public class MqttPlugin implements PluginCloseListener, IPlugin {
|
|||
private IPluginConfig pluginConfig;
|
||||
|
||||
private Vertx vertx;
|
||||
private CountDownLatch countDownLatch;
|
||||
private String deployedId;
|
||||
|
||||
@PostConstruct
|
||||
|
@ -49,23 +46,19 @@ public class MqttPlugin implements PluginCloseListener, IPlugin {
|
|||
try {
|
||||
//获取插件最新配置替换当前配置
|
||||
Map<String, Object> config = pluginConfig.getConfig(pluginInfo.getPluginId());
|
||||
BeanUtil.copyProperties(config,mqttConfig, CopyOptions.create().ignoreNullValue());
|
||||
BeanUtil.copyProperties(config, mqttConfig, CopyOptions.create().ignoreNullValue());
|
||||
mqttVerticle.setConfig(mqttConfig);
|
||||
|
||||
countDownLatch = new CountDownLatch(1);
|
||||
Future<String> future = vertx.deployVerticle(mqttVerticle);
|
||||
future.onSuccess((s -> {
|
||||
deployedId = s;
|
||||
countDownLatch.countDown();
|
||||
log.info("mqtt plugin started success");
|
||||
}));
|
||||
future.onFailure((e) -> {
|
||||
countDownLatch.countDown();
|
||||
log.error("start mqtt plugin failed", e);
|
||||
log.error("mqtt plugin startup failed", e);
|
||||
});
|
||||
countDownLatch.await();
|
||||
future.succeeded();
|
||||
} catch (Throwable e) {
|
||||
log.error("start mqtt plugin error.", e);
|
||||
log.error("mqtt plugin startup error", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,16 +67,16 @@ public class MqttPlugin implements PluginCloseListener, IPlugin {
|
|||
try {
|
||||
mqttVerticle.stop();
|
||||
Future<Void> future = vertx.undeploy(deployedId);
|
||||
future.onSuccess(unused -> log.info("stop mqtt plugin success"));
|
||||
future.onSuccess(unused -> log.info("mqtt plugin stopped success"));
|
||||
if (closeType == PluginCloseType.UNINSTALL) {
|
||||
log.info("插件被卸载了:{}", pluginInfo.getPluginId());
|
||||
log.info("mqtt plugin UNINSTALL:{}", pluginInfo.getPluginId());
|
||||
} else if (closeType == PluginCloseType.STOP) {
|
||||
log.info("插件被关闭了:{}", pluginInfo.getPluginId());
|
||||
log.info("mqtt plugin STOP:{}", pluginInfo.getPluginId());
|
||||
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) {
|
||||
log.info("插件被升级卸载了:{}", pluginInfo.getPluginId());
|
||||
log.info("mqtt plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("stop mqtt plugin error.", e);
|
||||
log.error("mqtt plugin stop error", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
[
|
||||
{
|
||||
"id": "port",
|
||||
"name": "端口",
|
||||
"type": "number",
|
||||
"value": 1883,
|
||||
"desc": "mqtt端口,默认为1883"
|
||||
}
|
||||
]
|
1
pom.xml
1
pom.xml
|
@ -7,6 +7,7 @@
|
|||
<module>mqtt-plugin</module>
|
||||
<module>http-plugin</module>
|
||||
<module>modbus-plugin</module>
|
||||
<module>tcp-plugin</module>
|
||||
</modules>
|
||||
|
||||
<parent>
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>iot-iita-plugins</artifactId>
|
||||
<groupId>cc.iotkit.plugins</groupId>
|
||||
<version>1.0.0</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>tcp-plugin</artifactId>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-core</artifactId>
|
||||
<version>${vertx.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>dev</id>
|
||||
<activation>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<properties>
|
||||
<plugin.build.mode>dev</plugin.build.mode>
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>prod</id>
|
||||
<properties>
|
||||
<plugin.build.mode>prod</plugin.build.mode>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>com.gitee.starblues</groupId>
|
||||
<artifactId>spring-brick-maven-packager</artifactId>
|
||||
<configuration>
|
||||
<mode>${plugin.build.mode}</mode>
|
||||
<pluginInfo>
|
||||
<id>tcp-plugin</id>
|
||||
<bootstrapClass>cc.iotkit.plugins.tcp.Application</bootstrapClass>
|
||||
<version>${project.version}</version>
|
||||
<provider>iita</provider>
|
||||
<description>tcp示例插件</description>
|
||||
<configFileName>application.yml</configFileName>
|
||||
</pluginInfo>
|
||||
<prodConfig>
|
||||
<packageType>jar</packageType>
|
||||
</prodConfig>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>repackage</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,19 @@
|
|||
package cc.iotkit.plugins.tcp;
|
||||
|
||||
import com.gitee.starblues.bootstrap.SpringPluginBootstrap;
|
||||
import com.gitee.starblues.bootstrap.annotation.OneselfConfig;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
|
||||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"cc.iotkit.plugin.core", "cc.iotkit.plugins.tcp"})
|
||||
@OneselfConfig(mainConfigFileName = {"application.yml"})
|
||||
@EnableConfigurationProperties
|
||||
public class Application extends SpringPluginBootstrap {
|
||||
|
||||
public static void main(String[] args) {
|
||||
new Application().run(Application.class, args);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
package cc.iotkit.plugins.tcp.cilent;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Slf4j
|
||||
public class VertxTcpClient {
|
||||
@Getter
|
||||
private String id;
|
||||
public NetSocket socket;
|
||||
@Setter
|
||||
private long keepAliveTimeoutMs = Duration.ofSeconds(30).toMillis();
|
||||
private volatile long lastKeepAliveTime = System.currentTimeMillis();
|
||||
|
||||
@Setter
|
||||
private RecordParser parser;
|
||||
|
||||
public VertxTcpClient(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public void keepAlive() {
|
||||
lastKeepAliveTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public boolean isOnline() {
|
||||
return System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs;
|
||||
}
|
||||
|
||||
public void setSocket(NetSocket socket) {
|
||||
synchronized (this) {
|
||||
if (this.socket != null && this.socket != socket) {
|
||||
this.socket.close();
|
||||
}
|
||||
|
||||
this.socket = socket
|
||||
.closeHandler(v -> shutdown())
|
||||
.handler(buffer -> {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("handle tcp client[{}] payload:[{}]",
|
||||
socket.remoteAddress(),
|
||||
Hex.encodeHexString(buffer.getBytes()));
|
||||
}
|
||||
keepAlive();
|
||||
parser.handle(buffer);
|
||||
if (this.socket != socket) {
|
||||
log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
|
||||
socket.close();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
log.debug("tcp client [{}] disconnect", getId());
|
||||
synchronized (this) {
|
||||
if (null != socket) {
|
||||
execute(socket::close);
|
||||
this.socket = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void sendMessage(Buffer buffer) {
|
||||
log.info("wirte data:{}", buffer.toString());
|
||||
socket.write(buffer, r -> {
|
||||
keepAlive();
|
||||
if (r.succeeded()) {
|
||||
log.info("client msg send success:{}", buffer.toString());
|
||||
} else {
|
||||
log.error("client msg send failed", r.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void execute(Runnable runnable) {
|
||||
try {
|
||||
runnable.run();
|
||||
} catch (Exception e) {
|
||||
log.warn("close tcp client error", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package cc.iotkit.plugins.tcp.conf;
|
||||
|
||||
import cc.iotkit.plugin.core.IPluginConfig;
|
||||
import cc.iotkit.plugin.core.IPluginScript;
|
||||
import cc.iotkit.plugin.core.LocalPluginConfig;
|
||||
import cc.iotkit.plugin.core.LocalPluginScript;
|
||||
import cc.iotkit.plugin.core.thing.IThingService;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author sjg
|
||||
*/
|
||||
@Component
|
||||
public class BeanConfig {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
|
||||
IThingService getThingService() {
|
||||
return new FakeThingService();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
|
||||
IPluginScript getPluginScript() {
|
||||
return new LocalPluginScript("test.js");
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "plugin.runMode", havingValue = "dev")
|
||||
IPluginConfig getPluginConfig(){
|
||||
return new LocalPluginConfig();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package cc.iotkit.plugins.tcp.conf;
|
||||
|
||||
import cc.iotkit.model.device.DeviceInfo;
|
||||
import cc.iotkit.model.product.Product;
|
||||
import cc.iotkit.plugin.core.thing.IThingService;
|
||||
import cc.iotkit.plugin.core.thing.actions.ActionResult;
|
||||
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 测试服务
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Slf4j
|
||||
public class FakeThingService implements IThingService {
|
||||
|
||||
@Override
|
||||
public ActionResult post(String pluginId, IDeviceAction action) {
|
||||
log.info("post action:{}", action);
|
||||
return ActionResult.builder().code(0).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Product getProduct(String pk) {
|
||||
return Product.builder()
|
||||
.productKey("cGCrkK7Ex4FESAwe")
|
||||
.productSecret("aaaaaaaa")
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeviceInfo getDevice(String dn) {
|
||||
return DeviceInfo.builder()
|
||||
.productKey("cGCrkK7Ex4FESAwe")
|
||||
.deviceName(dn)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ?> getProperty(String dn) {
|
||||
return new HashMap<>(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package cc.iotkit.plugins.tcp.conf;
|
||||
|
||||
import io.vertx.core.net.SocketAddress;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "tcp")
|
||||
public class TcpServerConfig {
|
||||
|
||||
private String host;
|
||||
|
||||
private int port;
|
||||
|
||||
/**
|
||||
* 服务实例数量(线程数)
|
||||
*/
|
||||
private int instance = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
public SocketAddress createSocketAddress() {
|
||||
if (StringUtils.isEmpty(host)) {
|
||||
host = "localhost";
|
||||
}
|
||||
return SocketAddress.inetSocketAddress(port, host);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package cc.iotkit.plugins.tcp.parser;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 数据解码
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Slf4j
|
||||
public class DataDecoder {
|
||||
|
||||
public static DataPackage decode(Buffer buffer) {
|
||||
DataPackage data = new DataPackage();
|
||||
data.setAddr(buffer.getBuffer(0, 6).toString());
|
||||
data.setCode(buffer.getShort(6));
|
||||
data.setMid(buffer.getShort(8));
|
||||
data.setPayload(buffer.getBytes(10, buffer.length()));
|
||||
return data;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package cc.iotkit.plugins.tcp.parser;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 数据编码
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Slf4j
|
||||
public class DataEncoder {
|
||||
|
||||
public static Buffer encode(DataPackage data) {
|
||||
Buffer buffer = Buffer.buffer();
|
||||
//设备地址(6byte) + 功能码(2byte) + 消息序号(2byte) + 包体(不定长度)
|
||||
buffer.appendInt(6+2+2+data.getPayload().length);
|
||||
buffer.appendBytes(data.getAddr().getBytes());
|
||||
buffer.appendShort(data.getCode());
|
||||
buffer.appendShort(data.getMid());
|
||||
buffer.appendBytes(data.getPayload());
|
||||
return buffer;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package cc.iotkit.plugins.tcp.parser;
|
||||
|
||||
import cn.hutool.core.util.HexUtil;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* 数据包
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class DataPackage {
|
||||
|
||||
public static final short FLAG = 0x8D;
|
||||
|
||||
public static final int HEAD_LEN = 2 + 6 + 2 + 2 + 4;
|
||||
|
||||
public static final short CODE_REGISTER = 0x10;
|
||||
public static final short CODE_REGISTER_REPLY = 0x11;
|
||||
public static final short CODE_HEARTBEAT = 0x20;
|
||||
public static final short CODE_DATA_UP = 0x30;
|
||||
public static final short CODE_DATA_DOWN = 0x40;
|
||||
|
||||
/**
|
||||
* 设备地址
|
||||
*/
|
||||
private String addr;
|
||||
|
||||
/**
|
||||
* 功能码
|
||||
*/
|
||||
private short code;
|
||||
|
||||
/**
|
||||
* 消息序号
|
||||
*/
|
||||
private short mid;
|
||||
|
||||
/**
|
||||
* 包体数据
|
||||
*/
|
||||
@JsonSerialize(using = BufferSerializer.class)
|
||||
private byte[] payload;
|
||||
|
||||
|
||||
public static class BufferSerializer extends JsonSerializer<byte[]> {
|
||||
|
||||
@Override
|
||||
public void serialize(byte[] value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
|
||||
jgen.writeString(HexUtil.encodeHexStr(value));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
package cc.iotkit.plugins.tcp.parser;
|
||||
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 数据包读取器
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
public class DataReader {
|
||||
|
||||
public static RecordParser getParser(Consumer<Buffer> receiveHandler) {
|
||||
RecordParser parser = RecordParser.newFixed(4);
|
||||
// 设置处理器
|
||||
parser.setOutput(new Handler<>() {
|
||||
// 表示当前数据长度
|
||||
int size = -1;
|
||||
|
||||
@Override
|
||||
public void handle(Buffer buffer) {
|
||||
//-1表示当前还没有长度信息,需要从收到的数据中取出长度
|
||||
if (-1 == size) {
|
||||
//取出长度
|
||||
size = buffer.getInt(0);
|
||||
//动态修改长度
|
||||
parser.fixedSizeMode(size);
|
||||
} else {
|
||||
//如果size != -1, 说明已经接受到长度信息了,接下来的数据就是protobuf可识别的字节数组
|
||||
byte[] buf = buffer.getBytes();
|
||||
receiveHandler.accept(Buffer.buffer(buf));
|
||||
//处理完后要将长度改回
|
||||
parser.fixedSizeMode(4);
|
||||
//重置size变量
|
||||
size = -1;
|
||||
}
|
||||
}
|
||||
});
|
||||
return parser;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
package cc.iotkit.plugins.tcp.server;
|
||||
|
||||
import cc.iotkit.common.enums.ErrCode;
|
||||
import cc.iotkit.common.exception.BizException;
|
||||
import cc.iotkit.plugin.core.thing.IDevice;
|
||||
import cc.iotkit.plugin.core.thing.actions.ActionResult;
|
||||
import cc.iotkit.plugin.core.thing.actions.down.DeviceConfig;
|
||||
import cc.iotkit.plugin.core.thing.actions.down.PropertyGet;
|
||||
import cc.iotkit.plugin.core.thing.actions.down.PropertySet;
|
||||
import cc.iotkit.plugin.core.thing.actions.down.ServiceInvoke;
|
||||
import cc.iotkit.plugins.tcp.parser.DataEncoder;
|
||||
import cc.iotkit.plugins.tcp.parser.DataPackage;
|
||||
import cc.iotkit.script.IScriptEngine;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* tcp设备下行接口
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Service
|
||||
public class TcpDevice implements IDevice {
|
||||
|
||||
@Autowired
|
||||
private TcpServerVerticle tcpServerVerticle;
|
||||
|
||||
private final AtomicInteger atMid = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public ActionResult config(DeviceConfig action) {
|
||||
return ActionResult.builder().code(0).reason("").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResult propertyGet(PropertyGet action) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResult propertySet(PropertySet action) {
|
||||
IScriptEngine scriptEngine = tcpServerVerticle.getScriptEngine();
|
||||
//使用转换脚本转换参数部分内容
|
||||
String payload = scriptEngine.invokeMethod(new TypeReference<>() {
|
||||
}, "encode", action.getParams());
|
||||
|
||||
if (payload == null) {
|
||||
return ActionResult.builder().code(ErrCode.MSG_CONVERT_ERROR.getKey()).build();
|
||||
}
|
||||
|
||||
if (atMid.compareAndSet(Short.MAX_VALUE / 2 - 1, 0)) {
|
||||
atMid.set(0);
|
||||
}
|
||||
byte[] bytes = payload.getBytes();
|
||||
|
||||
//构造数据包
|
||||
DataPackage data = DataPackage.builder()
|
||||
.addr(action.getDeviceName())
|
||||
.code(DataPackage.CODE_DATA_DOWN)
|
||||
.mid((short) atMid.getAndIncrement())
|
||||
.payload(bytes)
|
||||
.build();
|
||||
|
||||
return send(action.getDeviceName(), DataEncoder.encode(data));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResult serviceInvoke(ServiceInvoke action) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private ActionResult send(String deviceName, Buffer msg) {
|
||||
try {
|
||||
tcpServerVerticle.sendMsg(deviceName, msg);
|
||||
return ActionResult.builder().code(0).reason("").build();
|
||||
} catch (BizException e) {
|
||||
return ActionResult.builder().code(e.getCode()).reason(e.getMessage()).build();
|
||||
} catch (Exception e) {
|
||||
return ActionResult.builder().code(ErrCode.UNKNOWN_EXCEPTION.getKey()).reason(e.getMessage()).build();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
package cc.iotkit.plugins.tcp.server;
|
||||
|
||||
import cc.iotkit.plugin.core.IPlugin;
|
||||
import cc.iotkit.plugin.core.IPluginConfig;
|
||||
import cc.iotkit.plugins.tcp.conf.TcpServerConfig;
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.core.bean.copier.CopyOptions;
|
||||
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
|
||||
import com.gitee.starblues.bootstrap.realize.PluginCloseListener;
|
||||
import com.gitee.starblues.core.PluginCloseType;
|
||||
import com.gitee.starblues.core.PluginInfo;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Vertx;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* tcp插件
|
||||
*
|
||||
* @author sjg
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class TcpPlugin implements PluginCloseListener, IPlugin {
|
||||
|
||||
@Autowired
|
||||
private PluginInfo pluginInfo;
|
||||
@Autowired
|
||||
private TcpServerVerticle tcpServerVerticle;
|
||||
|
||||
@Autowired
|
||||
private TcpServerConfig tcpConfig;
|
||||
|
||||
@Autowired
|
||||
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
|
||||
private IPluginConfig pluginConfig;
|
||||
|
||||
private Vertx vertx;
|
||||
private String deployedId;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
vertx = Vertx.vertx();
|
||||
try {
|
||||
//获取插件最新配置替换当前配置
|
||||
Map<String, Object> config = pluginConfig.getConfig(pluginInfo.getPluginId());
|
||||
BeanUtil.copyProperties(config, tcpConfig, CopyOptions.create().ignoreNullValue());
|
||||
tcpServerVerticle.setConfig(tcpConfig);
|
||||
|
||||
Future<String> future = vertx.deployVerticle(tcpServerVerticle);
|
||||
future.onSuccess((s -> {
|
||||
deployedId = s;
|
||||
log.info("tcp plugin started success");
|
||||
}));
|
||||
future.onFailure((e) -> {
|
||||
log.error("tcp plugin startup failed", e);
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
log.error("tcp plugin startup error", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(GenericApplicationContext applicationContext, PluginInfo pluginInfo, PluginCloseType closeType) {
|
||||
try {
|
||||
tcpServerVerticle.stop();
|
||||
Future<Void> future = vertx.undeploy(deployedId);
|
||||
future.onSuccess(unused -> log.info("tcp plugin stopped success"));
|
||||
if (closeType == PluginCloseType.UNINSTALL) {
|
||||
log.info("tcp plugin UNINSTALL:{}", pluginInfo.getPluginId());
|
||||
} else if (closeType == PluginCloseType.STOP) {
|
||||
log.info("tcp plugin STOP:{}", pluginInfo.getPluginId());
|
||||
} else if (closeType == PluginCloseType.UPGRADE_UNINSTALL) {
|
||||
log.info("tcp plugin UPGRADE_UNINSTALL:{}", pluginInfo.getPluginId());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("tcp plugin stop error", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getLinkInfo(String pk, String dn) {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,303 @@
|
|||
package cc.iotkit.plugins.tcp.server;
|
||||
|
||||
|
||||
import cc.iotkit.plugin.core.IPluginScript;
|
||||
import cc.iotkit.plugin.core.thing.IThingService;
|
||||
import cc.iotkit.plugin.core.thing.actions.ActionResult;
|
||||
import cc.iotkit.plugin.core.thing.actions.DeviceState;
|
||||
import cc.iotkit.plugin.core.thing.actions.up.DeviceRegister;
|
||||
import cc.iotkit.plugin.core.thing.actions.up.DeviceStateChange;
|
||||
import cc.iotkit.plugin.core.thing.actions.up.PropertyReport;
|
||||
import cc.iotkit.plugins.tcp.cilent.VertxTcpClient;
|
||||
import cc.iotkit.plugins.tcp.conf.TcpServerConfig;
|
||||
import cc.iotkit.plugins.tcp.parser.DataDecoder;
|
||||
import cc.iotkit.plugins.tcp.parser.DataEncoder;
|
||||
import cc.iotkit.plugins.tcp.parser.DataPackage;
|
||||
import cc.iotkit.plugins.tcp.parser.DataReader;
|
||||
import cc.iotkit.script.IScriptEngine;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.gitee.starblues.bootstrap.annotation.AutowiredType;
|
||||
import com.gitee.starblues.core.PluginInfo;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetServer;
|
||||
import io.vertx.core.net.NetServerOptions;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author huangwenl
|
||||
* @date 2022-10-13
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class TcpServerVerticle extends AbstractVerticle {
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private TcpServerConfig config;
|
||||
|
||||
private VertxTcpServer tcpServer;
|
||||
|
||||
private final Map<String, VertxTcpClient> clientMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<String, String> dnToPk = new HashMap<>();
|
||||
|
||||
private final Map<String, Long> heartbeatDevice = new HashMap<>();
|
||||
|
||||
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
|
||||
|
||||
private ScheduledThreadPoolExecutor offlineCheckExecutor;
|
||||
|
||||
@Setter
|
||||
private long keepAliveTimeout = Duration.ofSeconds(30).toMillis();
|
||||
|
||||
private Collection<NetServer> tcpServers;
|
||||
|
||||
@Getter
|
||||
private IScriptEngine scriptEngine;
|
||||
|
||||
@Autowired
|
||||
private PluginInfo pluginInfo;
|
||||
|
||||
@Autowired
|
||||
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
|
||||
private IPluginScript pluginScript;
|
||||
|
||||
@Autowired
|
||||
@AutowiredType(AutowiredType.Type.MAIN_PLUGIN)
|
||||
private IThingService thingService;
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
tcpServer = new VertxTcpServer();
|
||||
initConfig();
|
||||
initTcpServer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
tcpServer.shutdown();
|
||||
scheduledThreadPoolExecutor.shutdown();
|
||||
offlineCheckExecutor.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建配置文件
|
||||
*/
|
||||
public void initConfig() {
|
||||
//获取脚本引擎
|
||||
scriptEngine = pluginScript.getScriptEngine(pluginInfo.getPluginId());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 初始TCP服务
|
||||
*/
|
||||
private void initTcpServer() {
|
||||
int instance = Math.max(2, config.getInstance());
|
||||
List<NetServer> instances = new ArrayList<>(instance);
|
||||
for (int i = 0; i < instance; i++) {
|
||||
instances.add(vertx.createNetServer(
|
||||
new NetServerOptions().setHost(config.getHost())
|
||||
.setPort(config.getPort())
|
||||
));
|
||||
}
|
||||
// 根据解析类型配置数据解析器
|
||||
tcpServer.setServer(instances);
|
||||
// 针对JVM做的多路复用优化
|
||||
// 多个server listen同一个端口,每个client连接的时候vertx会分配
|
||||
// 一个connection只能在一个server中处理
|
||||
for (NetServer netServer : instances) {
|
||||
netServer.listen(config.createSocketAddress(), result -> {
|
||||
if (result.succeeded()) {
|
||||
log.info("tcp server startup on {}", result.result().actualPort());
|
||||
} else {
|
||||
log.error("tcp server startup error", result.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void sendMsg(String addr, Buffer msg) {
|
||||
VertxTcpClient tcpClient = clientMap.get(addr);
|
||||
if (tcpClient != null) {
|
||||
tcpClient.sendMessage(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 40, timeUnit = TimeUnit.SECONDS)
|
||||
private void offlineCheckTask() {
|
||||
log.info("keepClientTask");
|
||||
Set<String> clients = new HashSet<>(clientMap.keySet());
|
||||
for (String key : clients) {
|
||||
VertxTcpClient client = clientMap.get(key);
|
||||
if (!client.isOnline()) {
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
heartbeatDevice.keySet().iterator().forEachRemaining(addr -> {
|
||||
Long time = heartbeatDevice.get(addr);
|
||||
//心跳超时,判定为离线
|
||||
if (System.currentTimeMillis() - time > keepAliveTimeout * 2) {
|
||||
heartbeatDevice.remove(addr);
|
||||
//离线上报
|
||||
thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder()
|
||||
.id(IdUtil.simpleUUID())
|
||||
.productKey(dnToPk.get(addr))
|
||||
.deviceName(addr)
|
||||
.state(DeviceState.OFFLINE)
|
||||
.time(System.currentTimeMillis())
|
||||
.build());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class VertxTcpServer {
|
||||
|
||||
/**
|
||||
* 为每个NetServer添加connectHandler
|
||||
*
|
||||
* @param servers 创建的所有NetServer
|
||||
*/
|
||||
public void setServer(Collection<NetServer> servers) {
|
||||
if (tcpServers != null && !tcpServers.isEmpty()) {
|
||||
shutdown();
|
||||
}
|
||||
tcpServers = servers;
|
||||
for (NetServer tcpServer : tcpServers) {
|
||||
tcpServer.connectHandler(this::acceptTcpConnection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* TCP连接处理逻辑
|
||||
*
|
||||
* @param socket socket
|
||||
*/
|
||||
protected void acceptTcpConnection(NetSocket socket) {
|
||||
// 客户端连接处理
|
||||
String clientId = IdUtil.simpleUUID() + "_" + socket.remoteAddress();
|
||||
VertxTcpClient client = new VertxTcpClient(clientId);
|
||||
client.setKeepAliveTimeoutMs(keepAliveTimeout);
|
||||
try {
|
||||
// TCP异常和关闭处理
|
||||
socket.exceptionHandler(err -> log.error("tcp server client [{}] error", socket.remoteAddress(), err)).closeHandler(nil -> {
|
||||
log.debug("tcp server client [{}] closed", socket.remoteAddress());
|
||||
client.shutdown();
|
||||
});
|
||||
// 这个地方是在TCP服务初始化的时候设置的 parserSupplier
|
||||
client.setKeepAliveTimeoutMs(keepAliveTimeout);
|
||||
client.setSocket(socket);
|
||||
RecordParser parser = DataReader.getParser(buffer -> {
|
||||
try {
|
||||
DataPackage data = DataDecoder.decode(buffer);
|
||||
String addr = data.getAddr();
|
||||
int code = data.getCode();
|
||||
if (code == 0x10) {
|
||||
clientMap.put(addr, client);
|
||||
//设备注册
|
||||
String pk = dnToPk.put(addr, new String(data.getPayload()));
|
||||
ActionResult rst = thingService.post(pluginInfo.getPluginId(), DeviceRegister.builder()
|
||||
.id(IdUtil.simpleUUID())
|
||||
.productKey(pk)
|
||||
.deviceName(addr)
|
||||
.time(System.currentTimeMillis())
|
||||
.build());
|
||||
if (rst.getCode() == 0) {
|
||||
//回复注册成功
|
||||
sendMsg(addr, DataEncoder.encode(
|
||||
DataPackage.builder()
|
||||
.addr(addr)
|
||||
.code(DataPackage.CODE_REGISTER_REPLY)
|
||||
.mid(data.getMid())
|
||||
.payload(Buffer.buffer().appendInt(0).getBytes())
|
||||
.build()
|
||||
));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (code == 0x20) {
|
||||
//心跳
|
||||
if (!heartbeatDevice.containsKey(addr)) {
|
||||
//第一次心跳,上线
|
||||
thingService.post(pluginInfo.getPluginId(), DeviceStateChange.builder()
|
||||
.id(IdUtil.simpleUUID())
|
||||
.productKey(dnToPk.get(addr))
|
||||
.deviceName(addr)
|
||||
.state(DeviceState.ONLINE)
|
||||
.time(System.currentTimeMillis())
|
||||
.build());
|
||||
}
|
||||
heartbeatDevice.put(addr, System.currentTimeMillis());
|
||||
return;
|
||||
}
|
||||
|
||||
if (code == 0x30) {
|
||||
//设备数据上报
|
||||
//数据上报也作为心跳
|
||||
heartbeatDevice.put(addr, System.currentTimeMillis());
|
||||
//调用脚本解码
|
||||
Map<String, Object> rst = scriptEngine.invokeMethod(new TypeReference<>() {
|
||||
}, "decode", data);
|
||||
if (rst == null) {
|
||||
return;
|
||||
}
|
||||
//属性上报
|
||||
thingService.post(pluginInfo.getPluginId(), PropertyReport.builder()
|
||||
.id(IdUtil.simpleUUID())
|
||||
.productKey(dnToPk.get(addr))
|
||||
.deviceName(addr)
|
||||
.params(rst)
|
||||
.time(System.currentTimeMillis())
|
||||
.build());
|
||||
}
|
||||
|
||||
//未注册断开连接
|
||||
if (!clientMap.containsKey(data.getAddr())) {
|
||||
socket.close();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("handler error", e);
|
||||
}
|
||||
});
|
||||
client.setParser(parser);
|
||||
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
|
||||
} catch (Exception e) {
|
||||
log.error("create tcp server client error", e);
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (tcpServers == null) {
|
||||
return;
|
||||
}
|
||||
for (NetServer tcpServer : tcpServers) {
|
||||
try {
|
||||
tcpServer.close();
|
||||
} catch (Exception e) {
|
||||
log.warn("close tcp server error", e);
|
||||
}
|
||||
}
|
||||
tcpServers = null;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
plugin:
|
||||
runMode: dev
|
||||
# runMode: prod
|
||||
mainPackage: cc.iotkit.plugin
|
||||
|
||||
tcp:
|
||||
host: 127.0.0.1
|
||||
port: 6883
|
|
@ -0,0 +1,16 @@
|
|||
[
|
||||
{
|
||||
"id": "host",
|
||||
"name": "绑定ip",
|
||||
"type": "text",
|
||||
"value": "127.0.0.1",
|
||||
"desc": "tcp绑定ip,默认为127.0.0.1"
|
||||
},
|
||||
{
|
||||
"id": "port",
|
||||
"name": "端口",
|
||||
"type": "number",
|
||||
"value": 6883,
|
||||
"desc": "tcp端口,默认为6883"
|
||||
}
|
||||
]
|
|
@ -0,0 +1,21 @@
|
|||
function hexToByte(hexString) {
|
||||
if (hexString.length % 2 !== 0) {
|
||||
throw new Error('Invalid hex string. String must have an even number of characters.');
|
||||
}
|
||||
|
||||
let byteArray = [];
|
||||
for (let i = 0; i < hexString.length; i += 4) {
|
||||
byteArray.push(parseInt(hexString.substr(i, 4), 16));
|
||||
}
|
||||
|
||||
return byteArray;
|
||||
}
|
||||
|
||||
this.decode=function(data){
|
||||
hex=data.payload;
|
||||
const bytes=hexToByte(hex);
|
||||
return {
|
||||
"rssi":bytes[0],
|
||||
"powerstate":bytes[1]
|
||||
};
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package cc.iotkit.test;
|
||||
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Vertx;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TcpClientTest {
|
||||
|
||||
public static void main(String[] args) {
|
||||
Vertx vertx = Vertx.vertx();
|
||||
Future<String> future = vertx.deployVerticle(new TcpClientVerticle());
|
||||
future.onSuccess((s -> {
|
||||
log.info("tcp client started success");
|
||||
}));
|
||||
future.onFailure((e) -> {
|
||||
log.error("tcp client startup failed", e);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
package cc.iotkit.test;
|
||||
|
||||
import cc.iotkit.common.utils.ThreadUtil;
|
||||
import cc.iotkit.plugins.tcp.parser.DataDecoder;
|
||||
import cc.iotkit.plugins.tcp.parser.DataEncoder;
|
||||
import cc.iotkit.plugins.tcp.parser.DataPackage;
|
||||
import cc.iotkit.plugins.tcp.parser.DataReader;
|
||||
import cn.hutool.core.util.HexUtil;
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.net.NetClient;
|
||||
import io.vertx.core.net.NetClientOptions;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* @author huangwenlong
|
||||
* @version 1.0
|
||||
* @date 2022/10/23 13:08
|
||||
*/
|
||||
@Slf4j
|
||||
public class TcpClientVerticle extends AbstractVerticle {
|
||||
|
||||
private NetClient netClient;
|
||||
|
||||
private NetSocket socket;
|
||||
|
||||
private String addr = "060101";
|
||||
|
||||
private String pk = "cGCrkK7Ex4FESAwe";
|
||||
|
||||
private AtomicInteger atMid = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
initClient();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (null != netClient) {
|
||||
netClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void initClient() {
|
||||
NetClientOptions options = new NetClientOptions();
|
||||
options.setReconnectAttempts(Integer.MAX_VALUE);
|
||||
options.setReconnectInterval(20000L);
|
||||
netClient = vertx.createNetClient(options);
|
||||
RecordParser parser = DataReader.getParser(this::handle);
|
||||
|
||||
netClient.connect(6883, "127.0.0.1", result -> {
|
||||
if (result.succeeded()) {
|
||||
log.debug("connect tcp success");
|
||||
socket = result.result();
|
||||
socket.handler(parser);
|
||||
//注册
|
||||
byte[] pkBytes = pk.getBytes();
|
||||
send(DataPackage.builder()
|
||||
.addr(addr)
|
||||
.code(DataPackage.CODE_REGISTER)
|
||||
.mid(getMid())
|
||||
.payload(pkBytes)
|
||||
.build());
|
||||
} else {
|
||||
log.error("connect tcp error", result.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private short getMid() {
|
||||
atMid.compareAndSet(254, 0);
|
||||
return (short) atMid.getAndIncrement();
|
||||
}
|
||||
|
||||
private void send(DataPackage data) {
|
||||
Buffer buffer = DataEncoder.encode(data);
|
||||
log.info("send data:{}", HexUtil.encodeHexStr(buffer.getBytes()));
|
||||
socket.write(buffer);
|
||||
}
|
||||
|
||||
public void handle(Buffer buffer) {
|
||||
log.info("receive server data:{}", buffer.toString());
|
||||
DataPackage data = DataDecoder.decode(buffer);
|
||||
if (data.getCode() == DataPackage.CODE_REGISTER_REPLY) {
|
||||
int rst = Buffer.buffer(data.getPayload()).getInt(0);
|
||||
if (rst == 0) {
|
||||
log.info("device:{} register success", data.getAddr());
|
||||
//定时心跳
|
||||
ThreadUtil.newScheduled(1, "heartbeat")
|
||||
.scheduleWithFixedDelay(this::heartbeat, 10, 30, TimeUnit.SECONDS);
|
||||
//随机上报数据
|
||||
ThreadUtil.newScheduled(1, "reportData")
|
||||
.scheduleWithFixedDelay(this::reportData, 20, 3, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void heartbeat() {
|
||||
send(DataPackage.builder()
|
||||
.addr(addr)
|
||||
.code(DataPackage.CODE_HEARTBEAT)
|
||||
.mid(getMid())
|
||||
.build());
|
||||
}
|
||||
|
||||
private void reportData() {
|
||||
if (RandomUtil.randomInt() % 3 == 0) {
|
||||
//随机
|
||||
return;
|
||||
}
|
||||
send(DataPackage.builder()
|
||||
.addr(addr)
|
||||
.code(DataPackage.CODE_DATA_UP)
|
||||
.mid(getMid())
|
||||
.payload(Buffer.buffer()
|
||||
//rssi
|
||||
.appendShort((short) RandomUtil.randomInt(0, 127))
|
||||
//powerstate
|
||||
.appendByte((byte) (RandomUtil.randomInt() % 2 == 0 ? 1 : 0))
|
||||
.getBytes()
|
||||
)
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue