feat:mqtt插件增加子设备注册

master
xiwa 2024-02-24 23:32:58 +08:00
parent 24bfc5b85b
commit 9124e5be37
4 changed files with 45 additions and 44 deletions

View File

@ -73,7 +73,7 @@ public class HttpPlugin implements PluginCloseListener {
CountDownLatch wait = new CountDownLatch(1);
Future<Void> future = vertx.undeploy(deployedId);
future.onSuccess(unused -> {
log.info("tcp plugin stopped success");
log.info("http plugin stopped success");
wait.countDown();
});
future.onFailure(h -> {

View File

@ -72,7 +72,7 @@ public class MqttPlugin implements PluginCloseListener, IPlugin {
CountDownLatch wait = new CountDownLatch(1);
Future<Void> future = vertx.undeploy(deployedId);
future.onSuccess(unused -> {
log.info("tcp plugin stopped success");
log.info("mqtt plugin stopped success");
wait.countDown();
});
future.onFailure(h -> {

View File

@ -132,6 +132,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
String productKey = parts[0];
String deviceName = parts[1];
String gwModel = parts[2];
if (!auth.getUsername().equals(deviceName)) {
log.error("username:{}不正确", deviceName);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
@ -157,10 +158,11 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
pluginInfo.getPluginId(),
fillAction(
DeviceRegister.builder()
.model(parts[2])
.productKey(productKey)
.deviceName(deviceName)
.model(gwModel)
.version("1.0")
.build()
, productKey, deviceName
)
);
if (result.getCode() != 0) {
@ -185,15 +187,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
return;
}
//下线
thingService.post(
pluginInfo.getPluginId(),
fillAction(
DeviceStateChange.builder()
.state(DeviceState.OFFLINE)
.build()
, productKey, deviceName
)
);
offline(productKey, deviceName);
DEVICE_ONLINE.clear();
//删除设备与连接关系
endpointMap.remove(deviceName);
@ -202,6 +196,8 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
if (!MQTT_CONNECT_POOL.get(clientId)) {
return;
}
//下线
offline(productKey, deviceName);
//删除设备与连接关系
endpointMap.remove(deviceName);
MQTT_CONNECT_POOL.put(clientId, false);
@ -230,15 +226,7 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
//删除设备对应连接
endpointMap.remove(device.getDeviceName());
//下线
thingService.post(
pluginInfo.getPluginId(),
fillAction(
DeviceStateChange.builder()
.state(DeviceState.OFFLINE)
.build()
, device.getProductKey(), device.getDeviceName()
)
);
offline(device.getProductKey(), device.getDeviceName());
DEVICE_ONLINE.remove(device.getDeviceName());
}
@ -291,13 +279,20 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
ActionResult regResult = thingService.post(
pluginInfo.getPluginId(),
fillAction(
SubDeviceRegister.builder()
.productKey(productKey)
.deviceName(deviceName)
.model(gwModel)
.version("1.0")
.subs(List.of(
DeviceRegister.builder()
.productKey(subPk)
.deviceName(subDn)
.model(params.getString("model"))
.version("1.0")
.build()
, subPk, subDn
))
.build()
)
);
if (regResult.getCode() == 0) {
@ -357,9 +352,10 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
thingService.post(
pluginInfo.getPluginId(),
fillAction(DeviceStateChange.builder()
.productKey(pk)
.deviceName(dn)
.state(DeviceState.ONLINE)
.build()
, pk, dn
)
);
DEVICE_ONLINE.put(dn, true);
@ -385,10 +381,8 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
endpoint.publish(topic + "_reply", JsonObject.mapFrom(payloadReply).toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);
}
private IDeviceAction fillAction(IDeviceAction action, String productKey, String deviceName) {
private IDeviceAction fillAction(IDeviceAction action) {
action.setId(UniqueIdUtil.newRequestId());
action.setProductKey(productKey);
action.setDeviceName(deviceName);
action.setTime(System.currentTimeMillis());
return action;
}
@ -403,18 +397,25 @@ public class MqttVerticle extends AbstractVerticle implements Handler<MqttEndpoi
}
//下线
offline(parts[0], parts[1]);
DEVICE_ONLINE.clear();
}
if(mqttServer!=null) {
mqttServer.close();
}
}
private void offline(String productKey, String deviceName) {
thingService.post(
pluginInfo.getPluginId(),
fillAction(
DeviceStateChange.builder()
.productKey(productKey)
.deviceName(deviceName)
.state(DeviceState.OFFLINE)
.build(),
parts[0],
parts[1]
.build()
)
);
DEVICE_ONLINE.clear();
}
}
public void publish(String deviceName, String topic, String msg) {

View File

@ -37,7 +37,7 @@
<dependency>
<groupId>cc.iotkit</groupId>
<artifactId>iot-plugin-core</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
</dependency>
<dependency>