fix:添加客户端测试

master
xiwa 2023-10-16 08:01:35 +08:00
parent 4818498ac6
commit 3d8e7ef524
16 changed files with 846 additions and 38 deletions

View File

@ -29,7 +29,7 @@ public class FakeThingService implements IThingService {
public Product getProduct(String pk) {
return Product.builder()
.productKey("cGCrkK7Ex4FESAwe")
.productSecret("aaaaaaaa")
.productSecret("xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU")
.build();
}

View File

@ -0,0 +1,36 @@
package cc.iotkit.test.http;
import cc.iotkit.common.utils.ThreadUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import io.vertx.core.json.JsonObject;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class HttpTest {
public static void main(String[] args) {
ScheduledThreadPoolExecutor timer = ThreadUtil.newScheduled(1, "http-test");
timer.scheduleWithFixedDelay(HttpTest::report, 0, 3, TimeUnit.SECONDS);
}
public static void report() {
HttpResponse rst = HttpUtil.createPost("http://127.0.0.1:9084/sys/cGCrkK7Ex4FESAwe/cz00001/properties")
.header("secret", "mBCr3TKstTj2KeM6")
.body(new JsonObject()
.put("id", IdUtil.fastSimpleUUID())
.put("params", new JsonObject()
.put("powerstate", RandomUtil.randomInt(0, 2))
.put("rssi", RandomUtil.randomInt(-127, 127))
.getMap()
).encode()
).execute();
log.info("send result:status={},body={}", rst.getStatus(), rst.body());
}
}

View File

@ -6,6 +6,7 @@ import cc.iotkit.plugin.core.thing.IThingService;
import cc.iotkit.plugin.core.thing.actions.ActionResult;
import cc.iotkit.plugin.core.thing.actions.IDeviceAction;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
@ -18,6 +19,28 @@ import java.util.Map;
@Slf4j
public class FakeThingService implements IThingService {
/**
*
*/
private static final Map<String, String> PRODUCTS = Map.of(
"hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
"Rf4QSjbm65X45753", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
"cGCrkK7Ex4FESAwe", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU"
);
/**
*
*/
private static final Map<String, String> DEVICES = new HashMap<>();
static {
for (int i = 0; i < 10; i++) {
DEVICES.put("TEST:GW:" + StringUtils.leftPad(i + "", 6, "0"), "hbtgIA0SuVw9lxjB");
DEVICES.put("TEST_SW_" + StringUtils.leftPad(i + "", 6, "0"), "Rf4QSjbm65X45753");
DEVICES.put("TEST_SC_" + StringUtils.leftPad(i + "", 6, "0"), "cGCrkK7Ex4FESAwe");
}
}
@Override
public ActionResult post(String pluginId, IDeviceAction action) {
log.info("post action:{}", action);
@ -27,15 +50,15 @@ public class FakeThingService implements IThingService {
@Override
public Product getProduct(String pk) {
return Product.builder()
.productKey("cGCrkK7Ex4FESAwe")
.productSecret("aaaaaaaa")
.productKey(pk)
.productSecret(PRODUCTS.get(pk))
.build();
}
@Override
public DeviceInfo getDevice(String dn) {
return DeviceInfo.builder()
.productKey("cGCrkK7Ex4FESAwe")
.productKey(DEVICES.get(dn))
.deviceName(dn)
.build();
}

View File

@ -12,6 +12,7 @@ package cc.iotkit.plugins.mqtt.service;
import cc.iotkit.common.enums.ErrCode;
import cc.iotkit.common.exception.BizException;
import cc.iotkit.common.utils.CodecUtil;
import cc.iotkit.common.utils.StringUtils;
import cc.iotkit.common.utils.UniqueIdUtil;
import cc.iotkit.model.product.Product;
import cc.iotkit.plugin.core.thing.IThingService;
@ -62,6 +63,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
* clientid-mqttmqtthandler线
*/
private static final Map<String, Boolean> MQTT_CONNECT_POOL = new ConcurrentHashMap<>();
private static final Map<String, Boolean> DEVICE_ONLINE = new ConcurrentHashMap<>();
private MqttConfig config;
@ -85,7 +87,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
options.setUseWebSocket(config.isUseWebSocket());
mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(this::handle).listen(ar -> {
mqttServer.endpointHandler(this).listen(ar -> {
if (ar.succeeded()) {
log.info("MQTT server is listening on port " + ar.result().actualPort());
} else {
@ -142,7 +144,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
return;
}
//设备注册
//网关设备注册
ActionResult result = thingService.post(
pluginInfo.getPluginId(),
fillAction(
@ -168,6 +170,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
endpoint.accept(false);
endpoint.closeHandler((v) -> {
// 网络不好时也会出发,但是设备仍然可以发消息
log.warn("client connection closed,clientId:{}", clientId);
if (Boolean.FALSE.equals(MQTT_CONNECT_POOL.get(clientId))) {
MQTT_CONNECT_POOL.remove(clientId);
@ -183,35 +186,23 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
, productKey, deviceName
)
);
DEVICE_ONLINE.clear();
//删除设备与连接关系
endpointMap.remove(deviceName);
}).disconnectMessageHandler(disconnectMessage -> {
log.info("Received disconnect from client, reason code = {}", disconnectMessage.code());
if (!MQTT_CONNECT_POOL.get(clientId)) {
return;
}
//删除设备与连接关系
endpointMap.remove(deviceName);
MQTT_CONNECT_POOL.put(clientId, false);
DEVICE_ONLINE.clear();
}).subscribeHandler(subscribe -> {
//上线
thingService.post(
pluginInfo.getPluginId(),
fillAction(DeviceStateChange.builder()
.state(DeviceState.ONLINE)
.build()
, productKey, deviceName
)
);
List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();
for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
log.info("Subscription for {},with QoS {}", s.topicName(), s.qualityOfService());
try {
String topic = s.topicName();
//topic订阅验证 /sys/{productKey}/{deviceName}/#
String regex = String.format("^/sys/%s/%s/.*", productKey, deviceName);
if (!topic.matches(regex)) {
log.error("subscript topic:{} incorrect,regex:{}", topic, regex);
continue;
}
reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
} catch (Throwable e) {
log.error("subscribe failed,topic:" + s.topicName(), e);
@ -220,7 +211,6 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
}
// ack the subscriptions request
endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
}).unsubscribeHandler(unsubscribe -> {
//下线
thingService.post(
@ -232,57 +222,112 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
, productKey, deviceName
)
);
DEVICE_ONLINE.clear();
// ack the subscriptions request
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
}).publishHandler(message -> {
String topic = message.topicName();
JsonObject payload = message.payload().toJsonObject();
log.info("Received message:{}, with QoS {}", payload,
log.info("Received message:topic={},payload={}, with QoS {}", topic, payload,
message.qosLevel());
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId());
}
if (payload.isEmpty()) {
return;
}
String topic = message.topicName();
String[] topicParts = topic.split("/");
if (topicParts.length < 5) {
return;
}
//网关上线
online(productKey, deviceName);
String topicPk = topicParts[2];
String topicDn = topicParts[3];
if (!MQTT_CONNECT_POOL.get(clientId)) {
//保存设备与连接关系
endpointMap.put(deviceName, endpoint);
MQTT_CONNECT_POOL.put(clientId, true);
log.info("mqtt client reconnect success,clientId:{}", clientId);
}
try {
JsonObject defParams = JsonObject.mapFrom(new HashMap<>(0));
IDeviceAction action = null;
String method = payload.getString("method", "");
if (StringUtils.isBlank(method)) {
return;
}
JsonObject params = payload.getJsonObject("params", defParams);
if ("thing.lifetime.register".equalsIgnoreCase(method)) {
//子设备注册
ActionResult regResult = thingService.post(
pluginInfo.getPluginId(),
fillAction(
DeviceRegister.builder()
.productKey(params.getString("productKey"))
.deviceName(params.getString("deviceName"))
.model(params.getString("model"))
.version("1.0")
.build()
, productKey, deviceName
)
);
if (regResult.getCode() == 0) {
//注册成功
reply(endpoint, topic, payload);
} else {
//注册失败
reply(endpoint, topic, new JsonObject(), regResult.getCode());
}
return;
}
if ("thing.event.property.post".equalsIgnoreCase(method)) {
//设备上线处理
online(topicPk, topicDn);
//属性上报
action = PropertyReport.builder()
.params(payload.getJsonObject("params", defParams).getMap())
.params(params.getMap())
.build();
reply(endpoint, topic, payload);
} else if (method.startsWith("thing.event.")) {
//设备上线处理
online(topicPk, topicDn);
//事件上报
action = EventReport.builder()
.name(method.replace("thing.event.", ""))
.level(EventLevel.INFO)
.params(payload.getJsonObject("params", defParams).getMap())
.params(params.getMap())
.build();
reply(endpoint, topic, payload);
} else if (method.startsWith("thing.service.") && method.endsWith("_reply")) {
//设备上线处理
online(topicPk, topicDn);
//服务回复
action = ServiceReply.builder()
.name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1"))
.code(payload.getInteger("code", 0))
.params(payload.getJsonObject("data", defParams).getMap())
.params(params.getMap())
.build();
}
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId());
}
if (action == null) {
return;
}
action.setId(payload.getString("id"));
action.setProductKey(productKey);
action.setDeviceName(deviceName);
action.setProductKey(topicPk);
action.setDeviceName(topicDn);
action.setTime(System.currentTimeMillis());
thingService.post(pluginInfo.getPluginId(), action);
} catch (Throwable e) {
@ -292,14 +337,39 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
}
public void online(String pk, String dn) {
if (Boolean.TRUE.equals(DEVICE_ONLINE.get(dn))) {
return;
}
//上线
thingService.post(
pluginInfo.getPluginId(),
fillAction(DeviceStateChange.builder()
.state(DeviceState.ONLINE)
.build()
, pk, dn
)
);
DEVICE_ONLINE.put(dn, true);
}
/**
*
*/
private void reply(MqttEndpoint endpoint, String topic, JsonObject payload) {
reply(endpoint, topic, payload, 0);
}
/**
*
*/
private void reply(MqttEndpoint endpoint, String topic, JsonObject payload, int code) {
Map<String, Object> payloadReply = new HashMap<>();
payloadReply.put("id", payload.getString("id"));
payloadReply.put("method", payload.getString("method") + "_reply");
payloadReply.put("code", 0);
payloadReply.put("code", code);
payloadReply.put("data", payload.getJsonObject("params"));
endpoint.publish(topic + "_reply", JsonObject.mapFrom(payloadReply).toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);
}
@ -332,6 +402,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
parts[1]
)
);
DEVICE_ONLINE.clear();
}
mqttServer.close(voidAsyncResult -> log.info("close mqtt server..."));
}

View File

@ -1,5 +1,5 @@
plugin:
runMode: prod
runMode: dev
mainPackage: cc.iotkit.plugin
mqtt:

View File

@ -0,0 +1,17 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.config;
public class Mqtt {
public static String brokerHost;
public static int brokerPort = 1883;
}

View File

@ -0,0 +1,73 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.example;
import cc.iotkit.test.mqtt.config.Mqtt;
import cc.iotkit.test.mqtt.model.Request;
import cc.iotkit.test.mqtt.service.Gateway;
import cc.iotkit.test.mqtt.service.ReportTask;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
*
*/
@Slf4j
public class TransparentTest {
public static void main(String[] args) throws IOException {
if (args.length == 0) {
Mqtt.brokerHost = "127.0.0.1";
// Mqtt.brokerHost = "120.76.96.206";
// Mqtt.brokerHost = "172.16.1.109";
} else {
Mqtt.brokerHost = args[0];
}
log.info("start gateway ");
Gateway gateway = new Gateway("hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
"TEST:GW:T0001");
gateway.addSubDevice("hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
"TEST_LIGHT_0001",
"M1");
gateway.onDeviceOnline(device -> {
String pk = device.getProductKey();
//设备上线后添加上报定时任务
ReportTask reportTask = new ReportTask(gateway.getClient());
reportTask.addTask(String.format("/sys/%s/%s/s/event/rawReport",
pk, device.getDeviceName()),
() -> {
Request request = new Request();
request.setId(UUID.randomUUID().toString());
request.setMethod("thing.event.rawReport");
Map<String, Object> param = new HashMap<>();
param.put("model", "M1");
param.put("deviceName", "TEST_LIGHT_0001");
param.put("data", "111110011");
request.setParams(param);
return request;
});
reportTask.start(10);
});
gateway.start();
System.in.read();
}
}

View File

@ -0,0 +1,26 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Request {
private String id;
private String method;
private Object params;
}

View File

@ -0,0 +1,31 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Response {
private String id;
private int code;
private String method;
private Map<String, Object> data;
}

View File

@ -0,0 +1,67 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.performance;
import cc.iotkit.test.mqtt.config.Mqtt;
import cc.iotkit.test.mqtt.service.Gateway;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
*/
@Slf4j
public class ConnectionTest {
public static void main(String[] args) throws IOException {
if (args.length == 0) {
Mqtt.brokerHost = "127.0.0.1";
// Mqtt.brokerHost = "120.76.96.206";
} else {
Mqtt.brokerHost = args[0];
}
int total = 10;
if (args.length > 1) {
total = Integer.parseInt(args[1]);
}
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < total; i++) {
int finalI = i;
executor.submit(() -> {
log.info("start gateway " + (finalI + 1));
Gateway gateway = new Gateway("hbtgIA0SuVw9lxjB","xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
"TEST:GW:T" + StringUtils.leftPad(finalI + "", 6, "0"));
// gateway.addSubDevice("Rf4QSjbm65X45753",
// "TEST_SW_" + StringUtils.leftPad(finalI + "", 6, "0"),
// "S01");
//
// gateway.addSubDevice("cGCrkK7Ex4FESAwe",
// "TEST_SC_" + StringUtils.leftPad(finalI + "", 6, "0"),
// "S01");
//
// gateway.addSubDevice("xpsYHExTKPFaQMS7",
// "TEST_LT_" + StringUtils.leftPad(finalI + "", 6, "0"),
// "L01");
gateway.start();
});
}
System.in.read();
}
}

View File

@ -0,0 +1,93 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.performance;
import cc.iotkit.test.mqtt.config.Mqtt;
import cc.iotkit.test.mqtt.model.Request;
import cc.iotkit.test.mqtt.service.Gateway;
import cc.iotkit.test.mqtt.service.ReportTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
*/
@Slf4j
public class ReportTest {
public static void main(String[] args) throws IOException {
if (args.length == 0) {
Mqtt.brokerHost = "127.0.0.1";
Mqtt.brokerPort = 1883;
// Mqtt.brokerHost = "120.76.96.206";
// Mqtt.brokerHost = "172.16.1.109";
} else {
Mqtt.brokerHost = args[0];
}
int total = 10;
if (args.length > 1) {
total = Integer.parseInt(args[1]);
}
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < total; i++) {
int finalI = i;
executor.submit(() -> {
log.info("start gateway " + (finalI + 1));
Gateway gateway = new Gateway("hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
"TEST:GW:" + StringUtils.leftPad(finalI + "", 6, "0"));
gateway.addSubDevice("Rf4QSjbm65X45753", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
"TEST_SW_" + StringUtils.leftPad(finalI + "", 6, "0"),
"S01");
gateway.addSubDevice("cGCrkK7Ex4FESAwe", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU",
"TEST_SC_" + StringUtils.leftPad(finalI + "", 6, "0"),
"S01");
gateway.onDeviceOnline((device) -> {
String pk = device.getProductKey();
if (!"Rf4QSjbm65X45753".equals(pk)) {
return;
}
//设备上线后添加上报定时任务
ReportTask reportTask = new ReportTask(gateway.getClient());
reportTask.addTask(String.format("/sys/%s/%s/s/event/property/post",
pk, device.getDeviceName()),
() -> {
Request request = new Request();
request.setId(UUID.randomUUID().toString());
request.setMethod("thing.event.property.post");
Map<String, Object> param = new HashMap<>();
param.put("volt", Math.round(Math.random() * 100));
request.setParams(param);
return request;
});
reportTask.start(10);
});
gateway.start();
});
}
System.in.read();
}
}

View File

@ -0,0 +1,28 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.service;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Device {
protected String productKey;
private String productSecret ;
protected String deviceName;
private String model;
}

View File

@ -0,0 +1,155 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.service;
import cc.iotkit.common.utils.CodecUtil;
import cc.iotkit.test.mqtt.config.Mqtt;
import cc.iotkit.test.mqtt.model.Request;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@Slf4j
@EqualsAndHashCode(callSuper = true)
@Data
public class Gateway extends Device {
private List<Device> subDevices = new ArrayList<>();
private Consumer<Device> deviceOnlineListener;
private MqttClient client;
private boolean isConnecting;
public Gateway(String productKey, String productSecret, String deviceName) {
super(productKey, productSecret, deviceName, "GW01");
}
@SneakyThrows
public void start() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(this::connect, 0, 3, TimeUnit.SECONDS);
}
private void connect() {
if (client != null && client.isConnected()) {
return;
}
if (isConnecting) {
return;
}
String clientId = String.format("%s_%s_%s", productKey, deviceName, getModel());
try {
isConnecting = true;
MqttClientOptions options = new MqttClientOptions();
options.setUsername(this.deviceName);
options.setPassword(CodecUtil.md5Str(getProductSecret() + clientId));
options.setCleanSession(true);
options.setKeepAliveInterval(30);
options.setClientId(clientId);
options.setReconnectInterval(3000);
options.setReconnectAttempts(100);
client = MqttClient.create(Vertxs.getVertx(), options);
CountDownLatch countDownLatch = new CountDownLatch(1);
client.connect(Mqtt.brokerPort, Mqtt.brokerHost, s -> {
if (s.succeeded()) {
log.info("mqtt connected,clientId:{}", clientId);
countDownLatch.countDown();
} else {
log.info("mqtt connect failed,clientId:{}", clientId);
}
});
countDownLatch.await();
// 订阅
String topic = String.format("/sys/%s/%s/c/#", productKey, deviceName);
log.info("subscribe topic:{}", topic);
client.subscribe(topic, 1, r -> {
//配置获取
// String configGetTopic = String.format("/sys/%s/%s/s/config/get", productKey, deviceName);
// Request configRequest = new Request();
// configRequest.setId(UUID.randomUUID().toString());
// String configPayload = JsonUtils.toJsonString(configRequest);
// client.publish(configGetTopic, Buffer.buffer(configPayload), MqttQoS.AT_LEAST_ONCE, false, false);
// log.info("publish message,topic:{},payload:{}", configGetTopic, configPayload);
//注册子设备
for (Device subDevice : subDevices) {
log.info("start register sub device,pk:{},dn:{}", subDevice.getProductKey(), subDevice.getDeviceName());
Request request = new Request();
request.setId(UUID.randomUUID().toString());
request.setParams(subDevice);
request.setMethod("thing.lifetime.register");
String registerTopic = String.format("/sys/%s/%s/s/register", productKey, deviceName);
String payload = Json.encode(request);
client.publish(registerTopic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false);
log.info("publish message,topic:{},payload:{}", registerTopic, payload);
}
});
client.publishHandler(new MessageHandler(client, this, deviceOnlineListener));
client.closeHandler((v) -> {
log.info("{} closed,reconnecting...", deviceName);
client.disconnect();
});
} catch (Throwable e) {
log.error("connect mqtt-broker error", e);
} finally {
isConnecting = false;
}
}
public void addSubDevice(String productKey, String productSecret, String deviceName, String model) {
subDevices.add(new Device(productKey, productSecret, deviceName, model));
}
public void onDeviceOnline(Consumer<Device> listener) {
this.deviceOnlineListener = listener;
}
public static class OnConnected implements Handler<AsyncResult<MqttConnAckMessage>> {
@Override
public void handle(AsyncResult<MqttConnAckMessage> mqttConnAckMessageAsyncResult) {
}
}
}

View File

@ -0,0 +1,100 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.service;
import cc.iotkit.test.mqtt.model.Request;
import cc.iotkit.test.mqtt.model.Response;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
@Slf4j
@Data
public class MessageHandler implements Handler<MqttPublishMessage> {
private MqttClient client;
private Gateway gateway;
private Consumer<Device> deviceOnlineListener;
public MessageHandler(MqttClient client, Gateway gateway, Consumer<Device> deviceOnlineListener) {
this.client = client;
this.gateway = gateway;
this.deviceOnlineListener = deviceOnlineListener;
}
@SneakyThrows
@Override
public void handle(MqttPublishMessage msg) {
try {
String topic = msg.topicName();
String payload = msg.payload().toString();
log.info("received msg,topic:{},payload:{}", topic, payload);
if (topic.endsWith("register_reply")) {
Response response = Json.decodeValue(payload, Response.class);
//子设备注册成功
if (response.getCode() == 0) {
Map<String, Object> data = response.getData();
String productKey = data.get("productKey").toString();
String deviceName = data.get("deviceName").toString();
if (StringUtils.isBlank(productKey)) {
deviceOnlineListener.accept(new Device(productKey, "", deviceName, ""));
return;
}
//订阅子设备消息
String subTopic = String.format("/sys/%s/%s/c/#", productKey, deviceName);
log.info("subscribe topic:{}", subTopic);
client.subscribe(subTopic, 1, r -> {
if (deviceOnlineListener != null) {
deviceOnlineListener.accept(new Device(productKey, "", deviceName, ""));
}
});
}
}
if (topic.endsWith("_reply")) {
return;
}
Request request = Json.decodeValue(payload, Request.class);
Response response = new Response(request.getId(), 0, request.getMethod(), new HashMap<>());
client.publish(topic.replace("/c/", "/s/") + "_reply",
Buffer.buffer(Json.encode(response)), MqttQoS.AT_LEAST_ONCE, false, false);
//属性设置后上报属性
String setTopic = "/c/service/property/set";
if (topic.endsWith(setTopic)) {
request.setId(UUID.randomUUID().toString());
client.publish(topic.replace(setTopic, "/s/event/property/post"),
Buffer.buffer(Json.encode(request)), MqttQoS.AT_LEAST_ONCE, false, false);
}
} catch (Throwable e) {
log.info("receive msg error", e);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.service;
import cc.iotkit.test.mqtt.model.Request;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.mqtt.MqttClient;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ReportTask {
private final MqttClient client;
private final Map<String, Callable<Request>> taskMap = new HashMap<>();
private ScheduledExecutorService taskService = null;
public ReportTask(MqttClient client) {
this.client = client;
}
public void start(int interval) {
if (taskService == null) {
taskService = Executors.newScheduledThreadPool(1);
taskService.scheduleWithFixedDelay(this::send, 3, interval, TimeUnit.SECONDS);
}
}
private void send() {
taskMap.forEach((topic, action) -> {
try {
Request request = action.call();
if (request == null) {
return;
}
if (!client.isConnected()) {
return;
}
String msg = Json.encode(request);
log.info("send msg,topic:{},payload:{}", topic, msg);
client.publish(topic, Buffer.buffer(msg), MqttQoS.AT_LEAST_ONCE, false, false);
} catch (Throwable e) {
log.error("send error", e);
}
});
}
public void addTask(String topic, Callable<Request> callable) {
taskMap.put(topic, callable);
}
}

View File

@ -0,0 +1,21 @@
/*
* +----------------------------------------------------------------------
* | Copyright (c) 2021-2022 All rights reserved.
* +----------------------------------------------------------------------
* | Licensed
* +----------------------------------------------------------------------
* | Author: xw2sy@163.com
* +----------------------------------------------------------------------
*/
package cc.iotkit.test.mqtt.service;
import io.vertx.core.Vertx;
public class Vertxs {
private static final Vertx INSTANCE = Vertx.vertx();
public static Vertx getVertx() {
return INSTANCE;
}
}