设备缓存增加查库

V0.5.x
xiwa 2022-11-21 20:10:06 +08:00
parent 6329b7685b
commit c6675b21df
5 changed files with 28 additions and 8 deletions

View File

@ -103,15 +103,13 @@ public class DeviceInfoDataCache implements IDeviceInfoData, SmartInitializingSi
@Override
@Cacheable(value = Constants.CACHE_DEVICE_INFO, key = "#root.method.name+#deviceId", unless = "#result == null")
public DeviceInfo findByDeviceId(String deviceId) {
//不需要查数据库,在数据变更时更新到缓存
return null;
return deviceInfoData.findByDeviceId(deviceId);
}
@Override
@Cacheable(value = Constants.CACHE_DEVICE_INFO, key = "#root.method.name+#productKey+#deviceName", unless = "#result == null")
public DeviceInfo findByProductKeyAndDeviceName(String productKey, String deviceName) {
//不需要查数据库,在数据变更时更新到缓存
return null;
return deviceInfoData.findByProductKeyAndDeviceName(productKey, deviceName);
}
@Override

View File

@ -21,7 +21,7 @@ import java.util.*;
@Component
public class RuleMessageHandler implements DeviceMessageHandler {
private Map<String, List<Rule>> deviceRuleMap = new HashMap<>();
private final Map<String, List<Rule>> deviceRuleMap = new HashMap<>();
@Autowired
private RuleExecutor ruleExecutor;

Binary file not shown.

View File

@ -33,7 +33,7 @@ public class ConnectionTest {
Mqtt.brokerHost = args[0];
}
int total = 1000;
int total = 100;
if (args.length > 1) {
total = Integer.parseInt(args[1]);
}
@ -44,7 +44,7 @@ public class ConnectionTest {
executor.submit(() -> {
log.info("start gateway " + (finalI + 1));
Gateway gateway = new Gateway("hbtgIA0SuVw9lxjB",
"TEST:GW:" + StringUtils.leftPad(finalI + "", 6, "0"));
"TEST:GW:T" + StringUtils.leftPad(finalI + "", 6, "0"));
// gateway.addSubDevice("Rf4QSjbm65X45753",
// "TEST_SW_" + StringUtils.leftPad(finalI + "", 6, "0"),

View File

@ -30,6 +30,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@Slf4j
@ -43,15 +46,31 @@ public class Gateway extends Device {
private MqttClient client;
private boolean isConnecting;
public Gateway(String productKey, String deviceName) {
super(productKey, deviceName, "GW01");
}
@SneakyThrows
public void start() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(this::connect, 0, 3, TimeUnit.SECONDS);
}
private void connect() {
if (client != null && client.isConnected()) {
return;
}
if(isConnecting){
return;
}
String clientId = String.format("%s_%s_%s", productKey, deviceName, getModel());
try {
isConnecting = true;
MqttClientOptions options = new MqttClientOptions();
options.setUsername(this.deviceName);
options.setPassword(DigestUtils.md5Hex(Constants.PRODUCT_SECRET + clientId));
@ -103,11 +122,14 @@ public class Gateway extends Device {
client.publishHandler(new MessageHandler(client, this, deviceOnlineListener));
client.closeHandler((v) -> {
log.info("{} closed", deviceName);
log.info("{} closed,reconnecting...", deviceName);
client.disconnect();
});
} catch (Throwable e) {
log.error("connect mqtt-broker error", e);
} finally {
isConnecting = false;
}
}