diff --git a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/FakeThingService.java b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/FakeThingService.java index 514985c..7bba4e0 100755 --- a/http-plugin/src/main/java/cc/iotkit/plugins/http/service/FakeThingService.java +++ b/http-plugin/src/main/java/cc/iotkit/plugins/http/service/FakeThingService.java @@ -29,7 +29,7 @@ public class FakeThingService implements IThingService { public Product getProduct(String pk) { return Product.builder() .productKey("cGCrkK7Ex4FESAwe") - .productSecret("aaaaaaaa") + .productSecret("xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU") .build(); } diff --git a/http-plugin/src/test/java/cc/iotkit/test/http/HttpTest.java b/http-plugin/src/test/java/cc/iotkit/test/http/HttpTest.java new file mode 100644 index 0000000..af7c947 --- /dev/null +++ b/http-plugin/src/test/java/cc/iotkit/test/http/HttpTest.java @@ -0,0 +1,36 @@ +package cc.iotkit.test.http; + +import cc.iotkit.common.utils.ThreadUtil; +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; +import io.vertx.core.json.JsonObject; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class HttpTest { + + public static void main(String[] args) { + ScheduledThreadPoolExecutor timer = ThreadUtil.newScheduled(1, "http-test"); + timer.scheduleWithFixedDelay(HttpTest::report, 0, 3, TimeUnit.SECONDS); + } + + public static void report() { + HttpResponse rst = HttpUtil.createPost("http://127.0.0.1:9084/sys/cGCrkK7Ex4FESAwe/cz00001/properties") + .header("secret", "mBCr3TKstTj2KeM6") + .body(new JsonObject() + .put("id", IdUtil.fastSimpleUUID()) + .put("params", new JsonObject() + .put("powerstate", RandomUtil.randomInt(0, 2)) + .put("rssi", RandomUtil.randomInt(-127, 127)) + .getMap() + ).encode() + ).execute(); + log.info("send result:status={},body={}", rst.getStatus(), rst.body()); + } + +} diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/FakeThingService.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/FakeThingService.java index 0923414..b8f4265 100755 --- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/FakeThingService.java +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/FakeThingService.java @@ -6,6 +6,7 @@ import cc.iotkit.plugin.core.thing.IThingService; import cc.iotkit.plugin.core.thing.actions.ActionResult; import cc.iotkit.plugin.core.thing.actions.IDeviceAction; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import java.util.HashMap; import java.util.Map; @@ -18,6 +19,28 @@ import java.util.Map; @Slf4j public class FakeThingService implements IThingService { + /** + * 添加测试产品 + */ + private static final Map PRODUCTS = Map.of( + "hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "Rf4QSjbm65X45753", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "cGCrkK7Ex4FESAwe", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU" + ); + + /** + * 添加测试设备 + */ + private static final Map DEVICES = new HashMap<>(); + + static { + for (int i = 0; i < 10; i++) { + DEVICES.put("TEST:GW:" + StringUtils.leftPad(i + "", 6, "0"), "hbtgIA0SuVw9lxjB"); + DEVICES.put("TEST_SW_" + StringUtils.leftPad(i + "", 6, "0"), "Rf4QSjbm65X45753"); + DEVICES.put("TEST_SC_" + StringUtils.leftPad(i + "", 6, "0"), "cGCrkK7Ex4FESAwe"); + } + } + @Override public ActionResult post(String pluginId, IDeviceAction action) { log.info("post action:{}", action); @@ -27,15 +50,15 @@ public class FakeThingService implements IThingService { @Override public Product getProduct(String pk) { return Product.builder() - .productKey("cGCrkK7Ex4FESAwe") - .productSecret("aaaaaaaa") + .productKey(pk) + .productSecret(PRODUCTS.get(pk)) .build(); } @Override public DeviceInfo getDevice(String dn) { return DeviceInfo.builder() - .productKey("cGCrkK7Ex4FESAwe") + .productKey(DEVICES.get(dn)) .deviceName(dn) .build(); } diff --git a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java index 3949138..31c6946 100755 --- a/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java +++ b/mqtt-plugin/src/main/java/cc/iotkit/plugins/mqtt/service/MqttVerticle.java @@ -12,6 +12,7 @@ package cc.iotkit.plugins.mqtt.service; import cc.iotkit.common.enums.ErrCode; import cc.iotkit.common.exception.BizException; import cc.iotkit.common.utils.CodecUtil; +import cc.iotkit.common.utils.StringUtils; import cc.iotkit.common.utils.UniqueIdUtil; import cc.iotkit.model.product.Product; import cc.iotkit.plugin.core.thing.IThingService; @@ -62,6 +63,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler MQTT_CONNECT_POOL = new ConcurrentHashMap<>(); + private static final Map DEVICE_ONLINE = new ConcurrentHashMap<>(); private MqttConfig config; @@ -85,7 +87,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler { + mqttServer.endpointHandler(this).listen(ar -> { if (ar.succeeded()) { log.info("MQTT server is listening on port " + ar.result().actualPort()); } else { @@ -142,7 +144,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler { + // 网络不好时也会出发,但是设备仍然可以发消息 log.warn("client connection closed,clientId:{}", clientId); if (Boolean.FALSE.equals(MQTT_CONNECT_POOL.get(clientId))) { MQTT_CONNECT_POOL.remove(clientId); @@ -183,35 +186,23 @@ public class MqttVerticle extends AbstractVerticle implements Handler { log.info("Received disconnect from client, reason code = {}", disconnectMessage.code()); + if (!MQTT_CONNECT_POOL.get(clientId)) { + return; + } //删除设备与连接关系 endpointMap.remove(deviceName); MQTT_CONNECT_POOL.put(clientId, false); + DEVICE_ONLINE.clear(); }).subscribeHandler(subscribe -> { - //上线 - thingService.post( - pluginInfo.getPluginId(), - fillAction(DeviceStateChange.builder() - .state(DeviceState.ONLINE) - .build() - , productKey, deviceName - ) - ); - List reasonCodes = new ArrayList<>(); for (MqttTopicSubscription s : subscribe.topicSubscriptions()) { log.info("Subscription for {},with QoS {}", s.topicName(), s.qualityOfService()); try { - String topic = s.topicName(); - //topic订阅验证 /sys/{productKey}/{deviceName}/# - String regex = String.format("^/sys/%s/%s/.*", productKey, deviceName); - if (!topic.matches(regex)) { - log.error("subscript topic:{} incorrect,regex:{}", topic, regex); - continue; - } reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService())); } catch (Throwable e) { log.error("subscribe failed,topic:" + s.topicName(), e); @@ -220,7 +211,6 @@ public class MqttVerticle extends AbstractVerticle implements Handler { //下线 thingService.post( @@ -232,57 +222,112 @@ public class MqttVerticle extends AbstractVerticle implements Handler { + String topic = message.topicName(); JsonObject payload = message.payload().toJsonObject(); - log.info("Received message:{}, with QoS {}", payload, + log.info("Received message:topic={},payload={}, with QoS {}", topic, payload, message.qosLevel()); + + if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { + endpoint.publishAcknowledge(message.messageId()); + } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { + endpoint.publishReceived(message.messageId()); + } if (payload.isEmpty()) { return; } - String topic = message.topicName(); + + String[] topicParts = topic.split("/"); + if (topicParts.length < 5) { + return; + } + + //网关上线 + online(productKey, deviceName); + + String topicPk = topicParts[2]; + String topicDn = topicParts[3]; + + if (!MQTT_CONNECT_POOL.get(clientId)) { + //保存设备与连接关系 + endpointMap.put(deviceName, endpoint); + MQTT_CONNECT_POOL.put(clientId, true); + log.info("mqtt client reconnect success,clientId:{}", clientId); + } try { JsonObject defParams = JsonObject.mapFrom(new HashMap<>(0)); IDeviceAction action = null; String method = payload.getString("method", ""); + if (StringUtils.isBlank(method)) { + return; + } + JsonObject params = payload.getJsonObject("params", defParams); + + if ("thing.lifetime.register".equalsIgnoreCase(method)) { + //子设备注册 + ActionResult regResult = thingService.post( + pluginInfo.getPluginId(), + fillAction( + DeviceRegister.builder() + .productKey(params.getString("productKey")) + .deviceName(params.getString("deviceName")) + .model(params.getString("model")) + .version("1.0") + .build() + , productKey, deviceName + ) + ); + if (regResult.getCode() == 0) { + //注册成功 + reply(endpoint, topic, payload); + } else { + //注册失败 + reply(endpoint, topic, new JsonObject(), regResult.getCode()); + } + return; + } + if ("thing.event.property.post".equalsIgnoreCase(method)) { + //设备上线处理 + online(topicPk, topicDn); //属性上报 action = PropertyReport.builder() - .params(payload.getJsonObject("params", defParams).getMap()) + .params(params.getMap()) .build(); reply(endpoint, topic, payload); } else if (method.startsWith("thing.event.")) { + //设备上线处理 + online(topicPk, topicDn); //事件上报 action = EventReport.builder() .name(method.replace("thing.event.", "")) .level(EventLevel.INFO) - .params(payload.getJsonObject("params", defParams).getMap()) + .params(params.getMap()) .build(); reply(endpoint, topic, payload); } else if (method.startsWith("thing.service.") && method.endsWith("_reply")) { + //设备上线处理 + online(topicPk, topicDn); //服务回复 action = ServiceReply.builder() .name(method.replaceAll("thing\\.service\\.(.*)_reply", "$1")) .code(payload.getInteger("code", 0)) - .params(payload.getJsonObject("data", defParams).getMap()) + .params(params.getMap()) .build(); } - if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { - endpoint.publishAcknowledge(message.messageId()); - } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { - endpoint.publishReceived(message.messageId()); - } if (action == null) { return; } action.setId(payload.getString("id")); - action.setProductKey(productKey); - action.setDeviceName(deviceName); + action.setProductKey(topicPk); + action.setDeviceName(topicDn); action.setTime(System.currentTimeMillis()); thingService.post(pluginInfo.getPluginId(), action); } catch (Throwable e) { @@ -292,14 +337,39 @@ public class MqttVerticle extends AbstractVerticle implements Handler payloadReply = new HashMap<>(); payloadReply.put("id", payload.getString("id")); payloadReply.put("method", payload.getString("method") + "_reply"); - payloadReply.put("code", 0); + payloadReply.put("code", code); + payloadReply.put("data", payload.getJsonObject("params")); endpoint.publish(topic + "_reply", JsonObject.mapFrom(payloadReply).toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false); } @@ -332,6 +402,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler log.info("close mqtt server...")); } diff --git a/mqtt-plugin/src/main/resources/application.yml b/mqtt-plugin/src/main/resources/application.yml index 778daac..edb8497 100755 --- a/mqtt-plugin/src/main/resources/application.yml +++ b/mqtt-plugin/src/main/resources/application.yml @@ -1,5 +1,5 @@ plugin: - runMode: prod + runMode: dev mainPackage: cc.iotkit.plugin mqtt: diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/config/Mqtt.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/config/Mqtt.java new file mode 100755 index 0000000..2c2c5f4 --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/config/Mqtt.java @@ -0,0 +1,17 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.config; + +public class Mqtt { + + public static String brokerHost; + public static int brokerPort = 1883; + +} diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/example/TransparentTest.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/example/TransparentTest.java new file mode 100755 index 0000000..ea1501e --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/example/TransparentTest.java @@ -0,0 +1,73 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.example; + +import cc.iotkit.test.mqtt.config.Mqtt; +import cc.iotkit.test.mqtt.model.Request; +import cc.iotkit.test.mqtt.service.Gateway; +import cc.iotkit.test.mqtt.service.ReportTask; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * 透传测试 + */ +@Slf4j +public class TransparentTest { + + public static void main(String[] args) throws IOException { + + + if (args.length == 0) { + Mqtt.brokerHost = "127.0.0.1"; +// Mqtt.brokerHost = "120.76.96.206"; +// Mqtt.brokerHost = "172.16.1.109"; + } else { + Mqtt.brokerHost = args[0]; + } + + log.info("start gateway "); + Gateway gateway = new Gateway("hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "TEST:GW:T0001"); + + gateway.addSubDevice("hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "TEST_LIGHT_0001", + "M1"); + + gateway.onDeviceOnline(device -> { + String pk = device.getProductKey(); + + //设备上线后添加上报定时任务 + ReportTask reportTask = new ReportTask(gateway.getClient()); + reportTask.addTask(String.format("/sys/%s/%s/s/event/rawReport", + pk, device.getDeviceName()), + () -> { + Request request = new Request(); + request.setId(UUID.randomUUID().toString()); + request.setMethod("thing.event.rawReport"); + Map param = new HashMap<>(); + param.put("model", "M1"); + param.put("deviceName", "TEST_LIGHT_0001"); + param.put("data", "111110011"); + request.setParams(param); + return request; + }); + reportTask.start(10); + }); + + gateway.start(); + + System.in.read(); + } +} diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/model/Request.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/model/Request.java new file mode 100755 index 0000000..91371ed --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/model/Request.java @@ -0,0 +1,26 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Request { + + private String id; + + private String method; + + private Object params; +} \ No newline at end of file diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/model/Response.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/model/Response.java new file mode 100755 index 0000000..d1b3268 --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/model/Response.java @@ -0,0 +1,31 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.model; + + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Response { + + private String id; + + private int code; + + private String method; + + private Map data; +} \ No newline at end of file diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/performance/ConnectionTest.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/performance/ConnectionTest.java new file mode 100755 index 0000000..f40b6ba --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/performance/ConnectionTest.java @@ -0,0 +1,67 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.performance; + +import cc.iotkit.test.mqtt.config.Mqtt; +import cc.iotkit.test.mqtt.service.Gateway; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 连接压力测试 + */ +@Slf4j +public class ConnectionTest { + + public static void main(String[] args) throws IOException { + + if (args.length == 0) { + Mqtt.brokerHost = "127.0.0.1"; +// Mqtt.brokerHost = "120.76.96.206"; + } else { + Mqtt.brokerHost = args[0]; + } + + int total = 10; + if (args.length > 1) { + total = Integer.parseInt(args[1]); + } + + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i = 0; i < total; i++) { + int finalI = i; + executor.submit(() -> { + log.info("start gateway " + (finalI + 1)); + Gateway gateway = new Gateway("hbtgIA0SuVw9lxjB","xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "TEST:GW:T" + StringUtils.leftPad(finalI + "", 6, "0")); + +// gateway.addSubDevice("Rf4QSjbm65X45753", +// "TEST_SW_" + StringUtils.leftPad(finalI + "", 6, "0"), +// "S01"); +// +// gateway.addSubDevice("cGCrkK7Ex4FESAwe", +// "TEST_SC_" + StringUtils.leftPad(finalI + "", 6, "0"), +// "S01"); +// +// gateway.addSubDevice("xpsYHExTKPFaQMS7", +// "TEST_LT_" + StringUtils.leftPad(finalI + "", 6, "0"), +// "L01"); + + gateway.start(); + }); + } + + System.in.read(); + } +} diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/performance/ReportTest.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/performance/ReportTest.java new file mode 100755 index 0000000..e7f44ea --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/performance/ReportTest.java @@ -0,0 +1,93 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.performance; + +import cc.iotkit.test.mqtt.config.Mqtt; +import cc.iotkit.test.mqtt.model.Request; +import cc.iotkit.test.mqtt.service.Gateway; +import cc.iotkit.test.mqtt.service.ReportTask; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 上报压力测试 + */ +@Slf4j +public class ReportTest { + + public static void main(String[] args) throws IOException { + + + if (args.length == 0) { + Mqtt.brokerHost = "127.0.0.1"; + Mqtt.brokerPort = 1883; +// Mqtt.brokerHost = "120.76.96.206"; +// Mqtt.brokerHost = "172.16.1.109"; + } else { + Mqtt.brokerHost = args[0]; + } + + int total = 10; + if (args.length > 1) { + total = Integer.parseInt(args[1]); + } + + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i = 0; i < total; i++) { + int finalI = i; + executor.submit(() -> { + log.info("start gateway " + (finalI + 1)); + Gateway gateway = new Gateway("hbtgIA0SuVw9lxjB", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "TEST:GW:" + StringUtils.leftPad(finalI + "", 6, "0")); + + gateway.addSubDevice("Rf4QSjbm65X45753", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "TEST_SW_" + StringUtils.leftPad(finalI + "", 6, "0"), + "S01"); + + gateway.addSubDevice("cGCrkK7Ex4FESAwe", "xdkKUymrEGSCYWswqCvSPyRSFvH5j7CU", + "TEST_SC_" + StringUtils.leftPad(finalI + "", 6, "0"), + "S01"); + + gateway.onDeviceOnline((device) -> { + String pk = device.getProductKey(); + if (!"Rf4QSjbm65X45753".equals(pk)) { + return; + } + + //设备上线后添加上报定时任务 + ReportTask reportTask = new ReportTask(gateway.getClient()); + reportTask.addTask(String.format("/sys/%s/%s/s/event/property/post", + pk, device.getDeviceName()), + () -> { + Request request = new Request(); + request.setId(UUID.randomUUID().toString()); + request.setMethod("thing.event.property.post"); + Map param = new HashMap<>(); + param.put("volt", Math.round(Math.random() * 100)); + request.setParams(param); + return request; + }); + reportTask.start(10); + }); + + gateway.start(); + }); + } + + System.in.read(); + } +} diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Device.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Device.java new file mode 100755 index 0000000..d37f076 --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Device.java @@ -0,0 +1,28 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.service; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Device { + + protected String productKey; + + private String productSecret ; + + protected String deviceName; + + private String model; +} diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Gateway.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Gateway.java new file mode 100755 index 0000000..7168596 --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Gateway.java @@ -0,0 +1,155 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.service; + + +import cc.iotkit.common.utils.CodecUtil; +import cc.iotkit.test.mqtt.config.Mqtt; +import cc.iotkit.test.mqtt.model.Request; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.Json; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import io.vertx.mqtt.messages.MqttConnAckMessage; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.digest.DigestUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +@Slf4j +@EqualsAndHashCode(callSuper = true) +@Data +public class Gateway extends Device { + + private List subDevices = new ArrayList<>(); + + private Consumer deviceOnlineListener; + + private MqttClient client; + + private boolean isConnecting; + + public Gateway(String productKey, String productSecret, String deviceName) { + super(productKey, productSecret, deviceName, "GW01"); + } + + @SneakyThrows + public void start() { + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + executorService.scheduleAtFixedRate(this::connect, 0, 3, TimeUnit.SECONDS); + } + + private void connect() { + if (client != null && client.isConnected()) { + return; + } + + if (isConnecting) { + return; + } + + String clientId = String.format("%s_%s_%s", productKey, deviceName, getModel()); + + try { + isConnecting = true; + MqttClientOptions options = new MqttClientOptions(); + options.setUsername(this.deviceName); + options.setPassword(CodecUtil.md5Str(getProductSecret() + clientId)); + options.setCleanSession(true); + options.setKeepAliveInterval(30); + options.setClientId(clientId); + options.setReconnectInterval(3000); + options.setReconnectAttempts(100); + + client = MqttClient.create(Vertxs.getVertx(), options); + + CountDownLatch countDownLatch = new CountDownLatch(1); + client.connect(Mqtt.brokerPort, Mqtt.brokerHost, s -> { + if (s.succeeded()) { + log.info("mqtt connected,clientId:{}", clientId); + countDownLatch.countDown(); + } else { + log.info("mqtt connect failed,clientId:{}", clientId); + } + }); + countDownLatch.await(); + + // 订阅 + String topic = String.format("/sys/%s/%s/c/#", productKey, deviceName); + log.info("subscribe topic:{}", topic); + + client.subscribe(topic, 1, r -> { + //配置获取 +// String configGetTopic = String.format("/sys/%s/%s/s/config/get", productKey, deviceName); +// Request configRequest = new Request(); +// configRequest.setId(UUID.randomUUID().toString()); +// String configPayload = JsonUtils.toJsonString(configRequest); +// client.publish(configGetTopic, Buffer.buffer(configPayload), MqttQoS.AT_LEAST_ONCE, false, false); +// log.info("publish message,topic:{},payload:{}", configGetTopic, configPayload); + + //注册子设备 + for (Device subDevice : subDevices) { + log.info("start register sub device,pk:{},dn:{}", subDevice.getProductKey(), subDevice.getDeviceName()); + Request request = new Request(); + request.setId(UUID.randomUUID().toString()); + request.setParams(subDevice); + request.setMethod("thing.lifetime.register"); + String registerTopic = String.format("/sys/%s/%s/s/register", productKey, deviceName); + String payload = Json.encode(request); + client.publish(registerTopic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false); + log.info("publish message,topic:{},payload:{}", registerTopic, payload); + } + }); + + client.publishHandler(new MessageHandler(client, this, deviceOnlineListener)); + + client.closeHandler((v) -> { + log.info("{} closed,reconnecting...", deviceName); + client.disconnect(); + }); + + } catch (Throwable e) { + log.error("connect mqtt-broker error", e); + } finally { + isConnecting = false; + } + } + + public void addSubDevice(String productKey, String productSecret, String deviceName, String model) { + subDevices.add(new Device(productKey, productSecret, deviceName, model)); + } + + public void onDeviceOnline(Consumer listener) { + this.deviceOnlineListener = listener; + } + + + public static class OnConnected implements Handler> { + + @Override + public void handle(AsyncResult mqttConnAckMessageAsyncResult) { + + } + } + +} diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/MessageHandler.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/MessageHandler.java new file mode 100755 index 0000000..b033429 --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/MessageHandler.java @@ -0,0 +1,100 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.service; + + +import cc.iotkit.test.mqtt.model.Request; +import cc.iotkit.test.mqtt.model.Response; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.Json; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.messages.MqttPublishMessage; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +@Slf4j +@Data +public class MessageHandler implements Handler { + + private MqttClient client; + private Gateway gateway; + private Consumer deviceOnlineListener; + + public MessageHandler(MqttClient client, Gateway gateway, Consumer deviceOnlineListener) { + this.client = client; + this.gateway = gateway; + this.deviceOnlineListener = deviceOnlineListener; + } + + @SneakyThrows + + @Override + public void handle(MqttPublishMessage msg) { + try { + String topic = msg.topicName(); + String payload = msg.payload().toString(); + + log.info("received msg,topic:{},payload:{}", topic, payload); + + if (topic.endsWith("register_reply")) { + Response response = Json.decodeValue(payload, Response.class); + //子设备注册成功 + if (response.getCode() == 0) { + Map data = response.getData(); + String productKey = data.get("productKey").toString(); + String deviceName = data.get("deviceName").toString(); + if (StringUtils.isBlank(productKey)) { + deviceOnlineListener.accept(new Device(productKey, "", deviceName, "")); + return; + } + + //订阅子设备消息 + String subTopic = String.format("/sys/%s/%s/c/#", productKey, deviceName); + log.info("subscribe topic:{}", subTopic); + client.subscribe(subTopic, 1, r -> { + if (deviceOnlineListener != null) { + deviceOnlineListener.accept(new Device(productKey, "", deviceName, "")); + } + }); + } + } + + if (topic.endsWith("_reply")) { + return; + } + Request request = Json.decodeValue(payload, Request.class); + + Response response = new Response(request.getId(), 0, request.getMethod(), new HashMap<>()); + client.publish(topic.replace("/c/", "/s/") + "_reply", + Buffer.buffer(Json.encode(response)), MqttQoS.AT_LEAST_ONCE, false, false); + + //属性设置后上报属性 + String setTopic = "/c/service/property/set"; + if (topic.endsWith(setTopic)) { + request.setId(UUID.randomUUID().toString()); + client.publish(topic.replace(setTopic, "/s/event/property/post"), + Buffer.buffer(Json.encode(request)), MqttQoS.AT_LEAST_ONCE, false, false); + } + } catch (Throwable e) { + log.info("receive msg error", e); + } + } + + +} diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/ReportTask.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/ReportTask.java new file mode 100755 index 0000000..063d50b --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/ReportTask.java @@ -0,0 +1,67 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.service; + +import cc.iotkit.test.mqtt.model.Request; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.Json; +import io.vertx.mqtt.MqttClient; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class ReportTask { + + private final MqttClient client; + private final Map> taskMap = new HashMap<>(); + private ScheduledExecutorService taskService = null; + + public ReportTask(MqttClient client) { + this.client = client; + } + + public void start(int interval) { + if (taskService == null) { + taskService = Executors.newScheduledThreadPool(1); + taskService.scheduleWithFixedDelay(this::send, 3, interval, TimeUnit.SECONDS); + } + } + + private void send() { + taskMap.forEach((topic, action) -> { + try { + Request request = action.call(); + if (request == null) { + return; + } + if (!client.isConnected()) { + return; + } + String msg = Json.encode(request); + log.info("send msg,topic:{},payload:{}", topic, msg); + client.publish(topic, Buffer.buffer(msg), MqttQoS.AT_LEAST_ONCE, false, false); + + } catch (Throwable e) { + log.error("send error", e); + } + }); + } + + public void addTask(String topic, Callable callable) { + taskMap.put(topic, callable); + } +} diff --git a/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Vertxs.java b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Vertxs.java new file mode 100755 index 0000000..99a1389 --- /dev/null +++ b/mqtt-plugin/src/test/java/cc/iotkit/test/mqtt/service/Vertxs.java @@ -0,0 +1,21 @@ +/* + * +---------------------------------------------------------------------- + * | Copyright (c) 奇特物联 2021-2022 All rights reserved. + * +---------------------------------------------------------------------- + * | Licensed 未经许可不能去掉「奇特物联」相关版权 + * +---------------------------------------------------------------------- + * | Author: xw2sy@163.com + * +---------------------------------------------------------------------- + */ +package cc.iotkit.test.mqtt.service; + +import io.vertx.core.Vertx; + +public class Vertxs { + + private static final Vertx INSTANCE = Vertx.vertx(); + + public static Vertx getVertx() { + return INSTANCE; + } +}