组件重新加载问题修复

V0.5.x
xiwa 2022-04-14 19:57:32 +08:00
parent 30f5a36663
commit 5b0fdc4794
7 changed files with 44 additions and 24 deletions

View File

@ -11,34 +11,44 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class ComponentClassLoader {
private static final Map<String, URLClassLoader> classLoaders = new HashMap<>();
protected static Class<IComponent> findClass(String name) throws ClassNotFoundException {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
return (Class<IComponent>) classLoader.loadClass(name);
protected static Class<IComponent> findClass(String name, String clsName) throws ClassNotFoundException {
ClassLoader classLoader = classLoaders.get(name);
return (Class<IComponent>) classLoader.loadClass(clsName);
}
private static String addUrl(File jarPath) throws NoSuchMethodException, InvocationTargetException,
private static String addUrl(String name, File jarPath) throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException, IOException {
URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
URLClassLoader classLoader = classLoaders.get(name);
if (classLoader != null) {
classLoader.close();
}
classLoader = URLClassLoader.newInstance(new URL[]{jarPath.toURI().toURL()}, ClassLoader.getSystemClassLoader());
classLoaders.put(name, classLoader);
Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
if (!method.isAccessible()) {
method.setAccessible(true);
}
URL url = jarPath.toURI().toURL();
method.invoke(classLoader, url);
ClassLoader loader = Thread.currentThread().getContextClassLoader();
InputStream is = loader.getResourceAsStream("component.spi");
return StreamUtils.copyToString(is, Charset.forName("UTF-8"));
InputStream is = classLoader.getResourceAsStream("component.spi");
return StreamUtils.copyToString(is, StandardCharsets.UTF_8);
}
public static IComponent getComponent(File jarFile) {
public static IComponent getComponent(String name, File jarFile) {
try {
String className = addUrl(jarFile);
Class<IComponent> componentClass = findClass(className);
String className = addUrl(name, jarFile);
Class<IComponent> componentClass = findClass(name, className);
return componentClass.newInstance();
} catch (Throwable e) {
log.error("instance component from jar error", e);

View File

@ -68,7 +68,7 @@ public class ComponentManager {
Path path = componentConfig.getComponentFilePath(id);
File file = path.resolve(component.getJarFile()).toAbsolutePath().toFile();
IComponent componentInstance = ComponentClassLoader.getComponent(file);
IComponent componentInstance = ComponentClassLoader.getComponent(component.getId(), file);
if (componentInstance == null) {
throw new BizException("instance component failed");
}

View File

@ -76,9 +76,9 @@ public class EmqxComponent extends AbstractComponent {
}
client.publishHandler(s -> {
String topic = s.topicName();
String payload = s.payload().toString();
log.info("receive message,topic:{},payload:{}", topic, payload);
String topic = s.topicName();
String payload = s.payload().toString();
log.info("receive message,topic:{},payload:{}", topic, payload);
//
// //取消订阅
@ -105,10 +105,10 @@ public class EmqxComponent extends AbstractComponent {
// if (topic.endsWith("/register")) {
Map<String, Object> head = new HashMap<>();
head.put("topic", topic);
getHandler().onReceive(head, "", payload);
}).subscribe(subscribes).onSuccess(a -> log.info("subscribe topic success"))
Map<String, Object> head = new HashMap<>();
head.put("topic", topic);
getHandler().onReceive(head, "", payload);
}).subscribe(subscribes).onSuccess(a -> log.info("subscribe topic success"))
.onFailure(e -> log.error("subscribe topic failed", e));
} catch (Throwable e) {

View File

@ -11,8 +11,8 @@ public class Application {
public static void main(String[] args) throws IOException {
if (args.length == 0) {
// Mqtt.broker = "tcp://127.0.0.1:1883";
Mqtt.broker = "tcp://120.76.96.206:1883";
Mqtt.broker = "tcp://127.0.0.1:1883";
// Mqtt.broker = "tcp://120.76.96.206:1883";
} else {
Mqtt.broker = args[0];
}

View File

@ -41,6 +41,7 @@ public class Gateway extends Device {
connOpts.setPassword(DigestUtils.md5Hex(Constants.PRODUCT_SECRET + clientId).toCharArray());
// 保留会话
connOpts.setCleanSession(true);
connOpts.setKeepAliveInterval(10);
// 设置回调
client.setCallback(new OnMessageCallback(client, this));
@ -71,9 +72,12 @@ public class Gateway extends Device {
Request request = new Request();
request.setId(UUID.randomUUID().toString());
request.setParams(subDevice);
topic = String.format("/sys/%s/%s/s/register", productKey, deviceName);
String payload = JsonUtil.toJsonString(request);
client.publish(String.format("/sys/%s/%s/s/register", productKey, deviceName),
new MqttMessage(JsonUtil.toJsonString(request).getBytes())
new MqttMessage(payload.getBytes())
);
log.info("publish message,topic:{},payload:{}", topic, payload);
}
}
} catch (Throwable e) {

View File

@ -34,6 +34,7 @@ public class MqttComponent extends AbstractComponent {
public void start() {
try {
System.out.println("start:======2222222222");
mqttVerticle.setExecutor(getHandler());
countDownLatch = new CountDownLatch(1);
Future<String> future = vertx.deployVerticle(mqttVerticle);

View File

@ -79,7 +79,12 @@ public class MqttVerticle extends AbstractVerticle {
log.info("MQTT client keep alive timeout = {} ", endpoint.keepAliveTimeSeconds());
endpoint.accept(false);
endpoint.disconnectMessageHandler(disconnectMessage -> {
endpoint.closeHandler((v) -> {
log.warn("client connection closed,clientId:{}", clientId);
ReceiveResult result = executor.onReceive(new HashMap<>(), "disconnect", clientId);
//删除设备与连接关系
endpointMap.remove(getEndpointKey(result));
}).disconnectMessageHandler(disconnectMessage -> {
log.info("Received disconnect from client, reason code = {}", disconnectMessage.code());
ReceiveResult result = executor.onReceive(new HashMap<>(), "disconnect", clientId);
//删除设备与连接关系