diff --git a/iot-data/iot-data-cache/src/main/java/cc/iotkit/data/service/DeviceInfoDataCache.java b/iot-data/iot-data-cache/src/main/java/cc/iotkit/data/service/DeviceInfoDataCache.java index 8201238a..6cef5fab 100755 --- a/iot-data/iot-data-cache/src/main/java/cc/iotkit/data/service/DeviceInfoDataCache.java +++ b/iot-data/iot-data-cache/src/main/java/cc/iotkit/data/service/DeviceInfoDataCache.java @@ -103,15 +103,13 @@ public class DeviceInfoDataCache implements IDeviceInfoData, SmartInitializingSi @Override @Cacheable(value = Constants.CACHE_DEVICE_INFO, key = "#root.method.name+#deviceId", unless = "#result == null") public DeviceInfo findByDeviceId(String deviceId) { - //不需要查数据库,在数据变更时更新到缓存 - return null; + return deviceInfoData.findByDeviceId(deviceId); } @Override @Cacheable(value = Constants.CACHE_DEVICE_INFO, key = "#root.method.name+#productKey+#deviceName", unless = "#result == null") public DeviceInfo findByProductKeyAndDeviceName(String productKey, String deviceName) { - //不需要查数据库,在数据变更时更新到缓存 - return null; + return deviceInfoData.findByProductKeyAndDeviceName(productKey, deviceName); } @Override diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleMessageHandler.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleMessageHandler.java index e32e87a2..003ae265 100755 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleMessageHandler.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleMessageHandler.java @@ -21,7 +21,7 @@ import java.util.*; @Component public class RuleMessageHandler implements DeviceMessageHandler { - private Map> deviceRuleMap = new HashMap<>(); + private final Map> deviceRuleMap = new HashMap<>(); @Autowired private RuleExecutor ruleExecutor; diff --git a/iot-standalone/.DS_Store b/iot-standalone/.DS_Store index 3a821f73..43bda51d 100755 Binary files a/iot-standalone/.DS_Store and b/iot-standalone/.DS_Store differ diff --git a/iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/performance/ConnectionTest.java b/iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/performance/ConnectionTest.java index b2cb5a44..0be221a2 100755 --- a/iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/performance/ConnectionTest.java +++ b/iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/performance/ConnectionTest.java @@ -33,7 +33,7 @@ public class ConnectionTest { Mqtt.brokerHost = args[0]; } - int total = 1000; + int total = 100; if (args.length > 1) { total = Integer.parseInt(args[1]); } @@ -44,7 +44,7 @@ public class ConnectionTest { executor.submit(() -> { log.info("start gateway " + (finalI + 1)); Gateway gateway = new Gateway("hbtgIA0SuVw9lxjB", - "TEST:GW:" + StringUtils.leftPad(finalI + "", 6, "0")); + "TEST:GW:T" + StringUtils.leftPad(finalI + "", 6, "0")); // gateway.addSubDevice("Rf4QSjbm65X45753", // "TEST_SW_" + StringUtils.leftPad(finalI + "", 6, "0"), diff --git a/iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/service/Gateway.java b/iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/service/Gateway.java index 44478be0..751fa3eb 100755 --- a/iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/service/Gateway.java +++ b/iot-test-tool/iot-test-mqtt/src/main/java/cc/iotkit/test/mqtt/service/Gateway.java @@ -30,6 +30,9 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @Slf4j @@ -43,15 +46,31 @@ public class Gateway extends Device { private MqttClient client; + private boolean isConnecting; + public Gateway(String productKey, String deviceName) { super(productKey, deviceName, "GW01"); } @SneakyThrows public void start() { + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + executorService.scheduleAtFixedRate(this::connect, 0, 3, TimeUnit.SECONDS); + } + + private void connect() { + if (client != null && client.isConnected()) { + return; + } + + if(isConnecting){ + return; + } + String clientId = String.format("%s_%s_%s", productKey, deviceName, getModel()); try { + isConnecting = true; MqttClientOptions options = new MqttClientOptions(); options.setUsername(this.deviceName); options.setPassword(DigestUtils.md5Hex(Constants.PRODUCT_SECRET + clientId)); @@ -103,11 +122,14 @@ public class Gateway extends Device { client.publishHandler(new MessageHandler(client, this, deviceOnlineListener)); client.closeHandler((v) -> { - log.info("{} closed", deviceName); + log.info("{} closed,reconnecting...", deviceName); + client.disconnect(); }); } catch (Throwable e) { log.error("connect mqtt-broker error", e); + } finally { + isConnecting = false; } }