diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentClassLoader.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentClassLoader.java index bd205160..cb875e7b 100755 --- a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentClassLoader.java +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentClassLoader.java @@ -11,34 +11,44 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; @Slf4j public class ComponentClassLoader { + private static final Map classLoaders = new HashMap<>(); - protected static Class findClass(String name) throws ClassNotFoundException { - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - return (Class) classLoader.loadClass(name); + protected static Class findClass(String name, String clsName) throws ClassNotFoundException { + ClassLoader classLoader = classLoaders.get(name); + return (Class) classLoader.loadClass(clsName); } - private static String addUrl(File jarPath) throws NoSuchMethodException, InvocationTargetException, + private static String addUrl(String name, File jarPath) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, IOException { - URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); + URLClassLoader classLoader = classLoaders.get(name); + if (classLoader != null) { + classLoader.close(); + } + + classLoader = URLClassLoader.newInstance(new URL[]{jarPath.toURI().toURL()}, ClassLoader.getSystemClassLoader()); + classLoaders.put(name, classLoader); + Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); if (!method.isAccessible()) { method.setAccessible(true); } + URL url = jarPath.toURI().toURL(); method.invoke(classLoader, url); - ClassLoader loader = Thread.currentThread().getContextClassLoader(); - InputStream is = loader.getResourceAsStream("component.spi"); - return StreamUtils.copyToString(is, Charset.forName("UTF-8")); + InputStream is = classLoader.getResourceAsStream("component.spi"); + return StreamUtils.copyToString(is, StandardCharsets.UTF_8); } - public static IComponent getComponent(File jarFile) { + public static IComponent getComponent(String name, File jarFile) { try { - String className = addUrl(jarFile); - Class componentClass = findClass(className); + String className = addUrl(name, jarFile); + Class componentClass = findClass(name, className); return componentClass.newInstance(); } catch (Throwable e) { log.error("instance component from jar error", e); diff --git a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentManager.java b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentManager.java index 95f954fd..ae193199 100755 --- a/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentManager.java +++ b/protocol-gateway/component-server/src/main/java/cc/iotkit/comps/ComponentManager.java @@ -68,7 +68,7 @@ public class ComponentManager { Path path = componentConfig.getComponentFilePath(id); File file = path.resolve(component.getJarFile()).toAbsolutePath().toFile(); - IComponent componentInstance = ComponentClassLoader.getComponent(file); + IComponent componentInstance = ComponentClassLoader.getComponent(component.getId(), file); if (componentInstance == null) { throw new BizException("instance component failed"); } diff --git a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxComponent.java b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxComponent.java index b698e745..e1ee27f4 100755 --- a/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxComponent.java +++ b/protocol-gateway/emqx-component/src/main/java/cc/iotkit/comp/emqx/EmqxComponent.java @@ -76,9 +76,9 @@ public class EmqxComponent extends AbstractComponent { } client.publishHandler(s -> { - String topic = s.topicName(); - String payload = s.payload().toString(); - log.info("receive message,topic:{},payload:{}", topic, payload); + String topic = s.topicName(); + String payload = s.payload().toString(); + log.info("receive message,topic:{},payload:{}", topic, payload); // // //取消订阅 @@ -105,10 +105,10 @@ public class EmqxComponent extends AbstractComponent { // if (topic.endsWith("/register")) { - Map head = new HashMap<>(); - head.put("topic", topic); - getHandler().onReceive(head, "", payload); - }).subscribe(subscribes).onSuccess(a -> log.info("subscribe topic success")) + Map head = new HashMap<>(); + head.put("topic", topic); + getHandler().onReceive(head, "", payload); + }).subscribe(subscribes).onSuccess(a -> log.info("subscribe topic success")) .onFailure(e -> log.error("subscribe topic failed", e)); } catch (Throwable e) { diff --git a/protocol-gateway/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/Application.java b/protocol-gateway/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/Application.java index b693614c..d66aad78 100755 --- a/protocol-gateway/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/Application.java +++ b/protocol-gateway/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/Application.java @@ -11,8 +11,8 @@ public class Application { public static void main(String[] args) throws IOException { if (args.length == 0) { -// Mqtt.broker = "tcp://127.0.0.1:1883"; - Mqtt.broker = "tcp://120.76.96.206:1883"; + Mqtt.broker = "tcp://127.0.0.1:1883"; +// Mqtt.broker = "tcp://120.76.96.206:1883"; } else { Mqtt.broker = args[0]; } diff --git a/protocol-gateway/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/service/Gateway.java b/protocol-gateway/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/service/Gateway.java index 0f232147..ff98e17f 100755 --- a/protocol-gateway/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/service/Gateway.java +++ b/protocol-gateway/mqtt-client-simulator/src/main/java/cc/iotkit/simulator/service/Gateway.java @@ -41,6 +41,7 @@ public class Gateway extends Device { connOpts.setPassword(DigestUtils.md5Hex(Constants.PRODUCT_SECRET + clientId).toCharArray()); // 保留会话 connOpts.setCleanSession(true); + connOpts.setKeepAliveInterval(10); // 设置回调 client.setCallback(new OnMessageCallback(client, this)); @@ -71,9 +72,12 @@ public class Gateway extends Device { Request request = new Request(); request.setId(UUID.randomUUID().toString()); request.setParams(subDevice); + topic = String.format("/sys/%s/%s/s/register", productKey, deviceName); + String payload = JsonUtil.toJsonString(request); client.publish(String.format("/sys/%s/%s/s/register", productKey, deviceName), - new MqttMessage(JsonUtil.toJsonString(request).getBytes()) + new MqttMessage(payload.getBytes()) ); + log.info("publish message,topic:{},payload:{}", topic, payload); } } } catch (Throwable e) { diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java index ee1b2100..0b8a3074 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttComponent.java @@ -34,6 +34,7 @@ public class MqttComponent extends AbstractComponent { public void start() { try { + System.out.println("start:======2222222222"); mqttVerticle.setExecutor(getHandler()); countDownLatch = new CountDownLatch(1); Future future = vertx.deployVerticle(mqttVerticle); diff --git a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java index 50d05621..1fd224c4 100755 --- a/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java +++ b/protocol-gateway/mqtt-component/src/main/java/cc/iotkit/comp/mqtt/MqttVerticle.java @@ -79,7 +79,12 @@ public class MqttVerticle extends AbstractVerticle { log.info("MQTT client keep alive timeout = {} ", endpoint.keepAliveTimeSeconds()); endpoint.accept(false); - endpoint.disconnectMessageHandler(disconnectMessage -> { + endpoint.closeHandler((v) -> { + log.warn("client connection closed,clientId:{}", clientId); + ReceiveResult result = executor.onReceive(new HashMap<>(), "disconnect", clientId); + //删除设备与连接关系 + endpointMap.remove(getEndpointKey(result)); + }).disconnectMessageHandler(disconnectMessage -> { log.info("Received disconnect from client, reason code = {}", disconnectMessage.code()); ReceiveResult result = executor.onReceive(new HashMap<>(), "disconnect", clientId); //删除设备与连接关系