diff --git a/iot-common/src/main/java/cc/iotkit/common/function/IfHandler.java b/iot-common/src/main/java/cc/iotkit/common/function/IfHandler.java new file mode 100644 index 00000000..320f6696 --- /dev/null +++ b/iot-common/src/main/java/cc/iotkit/common/function/IfHandler.java @@ -0,0 +1,11 @@ +package cc.iotkit.common.function; + +/** + * @author huangwenl + * @date 2022-11-10 + */ +@FunctionalInterface +public interface IfHandler { + + void handler(Runnable tHandler, Runnable fHandler); +} diff --git a/iot-common/src/main/java/cc/iotkit/common/utils/FIUtil.java b/iot-common/src/main/java/cc/iotkit/common/utils/FIUtil.java new file mode 100644 index 00000000..bee86309 --- /dev/null +++ b/iot-common/src/main/java/cc/iotkit/common/utils/FIUtil.java @@ -0,0 +1,21 @@ +package cc.iotkit.common.utils; + +import cc.iotkit.common.function.IfHandler; + +/** + * @author huangwenl + * @date 2022-11-10 + */ +public class FIUtil { + + + public static IfHandler isTotF(boolean param) { + return (tHandler, fHandler) -> { + if (param) { + tHandler.run(); + } else { + fHandler.run(); + } + }; + } +} diff --git a/iot-rule-engine/pom.xml b/iot-rule-engine/pom.xml index b80a2ef8..3744e437 100755 --- a/iot-rule-engine/pom.xml +++ b/iot-rule-engine/pom.xml @@ -71,6 +71,17 @@ iot-message-core + + + io.vertx + vertx-mqtt + + + + + io.vertx + vertx-kafka-client + diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaAction.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaAction.java new file mode 100644 index 00000000..050b934f --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaAction.java @@ -0,0 +1,40 @@ +package cc.iotkit.ruleengine.action.kafka; + +import cc.iotkit.model.device.message.ThingModelMessage; +import cc.iotkit.ruleengine.action.Action; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author huangwenl + * @date 2022-11-11 + */ +@NoArgsConstructor +@AllArgsConstructor +@Data +public class KafkaAction implements Action { + + public static final String TYPE = "kafka"; + + + private List services; + + @Override + public String getType() { + return TYPE; + } + + + @Override + public List execute(ThingModelMessage msg) { + List results = new ArrayList<>(); + for (KafkaService service : services) { + results.add(service.execute(msg)); + } + return results; + } +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaService.java new file mode 100644 index 00000000..3c4a1a5a --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/kafka/KafkaService.java @@ -0,0 +1,64 @@ +package cc.iotkit.ruleengine.action.kafka; + +import cc.iotkit.common.utils.FIUtil; +import cc.iotkit.model.device.message.ThingModelMessage; +import cc.iotkit.ruleengine.action.ScriptService; +import cc.iotkit.ruleengine.link.LinkFactory; +import cc.iotkit.ruleengine.link.LinkService; +import cc.iotkit.ruleengine.link.impl.KafkaLink; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author huangwenl + * @date 2022-11-11 + */ +@EqualsAndHashCode(callSuper = true) +@Slf4j +@Data +public class KafkaService extends ScriptService implements LinkService { + + private String services; + private String ack; + + public String execute(ThingModelMessage msg) { + //执行转换脚本 + Map result = execScript(msg); + if (result == null) { + log.warn("execScript result is null"); + return "execScript result is null"; + } + boolean initResult = LinkFactory.initLink(getKey(), KafkaLink.LINK_TYPE, getLinkConf()); + + AtomicReference data = new AtomicReference<>(""); + FIUtil.isTotF(initResult).handler( + () -> LinkFactory.sendMsg(getKey(), result, data::set), + () -> data.set("创建连接失败!") + ); + + return data.get(); + } + + @Override + public String getKey() { + return String.format("kafka_%s", services); + } + + @Override + public String getLinkType() { + return KafkaLink.LINK_TYPE; + } + + @Override + public Map getLinkConf() { + Map config = new HashMap<>(); + config.put(KafkaLink.SERVERS, services); + config.put(KafkaLink.ACK, ack); + return config; + } +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttAction.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttAction.java new file mode 100644 index 00000000..8e6b8c01 --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttAction.java @@ -0,0 +1,39 @@ +package cc.iotkit.ruleengine.action.mqtt; + +import cc.iotkit.model.device.message.ThingModelMessage; +import cc.iotkit.ruleengine.action.Action; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author huangwenl + * @date 2022-11-10 + */ +@NoArgsConstructor +@AllArgsConstructor +@Data +public class MqttAction implements Action { + public static final String TYPE = "mqtt"; + + + private List services; + + @Override + public String getType() { + return TYPE; + } + + + @Override + public List execute(ThingModelMessage msg) { + List results = new ArrayList<>(); + for (MqttService service : services) { + results.add(service.execute(msg)); + } + return results; + } +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttService.java new file mode 100644 index 00000000..17cbedc0 --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/action/mqtt/MqttService.java @@ -0,0 +1,68 @@ +package cc.iotkit.ruleengine.action.mqtt; + +import cc.iotkit.common.utils.FIUtil; +import cc.iotkit.model.device.message.ThingModelMessage; +import cc.iotkit.ruleengine.action.ScriptService; +import cc.iotkit.ruleengine.link.LinkFactory; +import cc.iotkit.ruleengine.link.LinkService; +import cc.iotkit.ruleengine.link.impl.MqttClientLink; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author huangwenl + * @date 2022-11-09 + */ + +@EqualsAndHashCode(callSuper = true) +@Slf4j +@Data +public class MqttService extends ScriptService implements LinkService { + + private String username; + private String password; + private String host; + private int port; + + public String execute(ThingModelMessage msg) { + //执行转换脚本 + Map result = execScript(msg); + if (result == null) { + log.warn("execScript result is null"); + return "execScript result is null"; + } + boolean initResult = LinkFactory.initLink(getKey(), MqttClientLink.LINK_TYPE, getLinkConf()); + + AtomicReference data = new AtomicReference<>(""); + FIUtil.isTotF(initResult).handler( + () -> LinkFactory.sendMsg(getKey(), result, data::set), + () -> data.set("创建连接失败!") + ); + return data.get(); + } + + @Override + public String getKey() { + return String.format("mqtt_%s_%d", host, port); + } + + @Override + public String getLinkType() { + return MqttClientLink.LINK_TYPE; + } + + @Override + public Map getLinkConf() { + Map config = new HashMap<>(); + config.put(MqttClientLink.HOST, host); + config.put(MqttClientLink.PORT, port); + config.put(MqttClientLink.USERNAME, username); + config.put(MqttClientLink.PASSWORD, password); + return config; + } +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/BaseSinkLink.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/BaseSinkLink.java new file mode 100644 index 00000000..559cea38 --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/BaseSinkLink.java @@ -0,0 +1,35 @@ +package cc.iotkit.ruleengine.link; + +import java.util.Map; +import java.util.function.Consumer; + +/** + * @author huangwenl + * @date 2022-11-10 + */ +public interface BaseSinkLink { + + /** + * 建立连接 + * @param config 连接配置信息 + */ + boolean open(Map config); + + /** + * 发送消息 + * @param msg 消息内容 + * @param consumer 发送回调 + */ + void send(Map msg, Consumer consumer); + + /** + * 关闭连接 + */ + void close(); + + /** + * 连接监听 + * @param closeHandler + */ + void closeHandler(Consumer closeHandler); +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/LinkFactory.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/LinkFactory.java new file mode 100644 index 00000000..a82fe2db --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/LinkFactory.java @@ -0,0 +1,146 @@ +package cc.iotkit.ruleengine.link; + +import cc.iotkit.common.utils.FIUtil; +import cc.iotkit.ruleengine.link.impl.KafkaLink; +import cc.iotkit.ruleengine.link.impl.MqttClientLink; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * @author huangwenl + * @date 2022-11-10 + */ +@Slf4j +public class LinkFactory { + + private static final Map linkMap = new ConcurrentHashMap<>(); + private static final Map> ruleLink = new ConcurrentHashMap<>(); + + /** + * 发送消息前,初始化连接 + * + * @param key 连接标识 + * @param type 连接类型 + * @param config 连接配置 + * @return + */ + public static boolean initLink(String key, String type, Map config) { + AtomicBoolean exist = new AtomicBoolean(false); + FIUtil.isTotF(linkMap.containsKey(key)).handler( + () -> exist.set(true), + () -> exist.set(buildLink(key, type, config)) + ); + return exist.get(); + } + + /** + * 启动规则时,初始化连接 + * + * @param ruleId + * @param ruleId 规则ID + * @param key 连接标识 + * @param type 连接类型 + * @param config 连接配置 + * @return + */ + public static boolean initLink(String ruleId, String key, String type, Map config) { + boolean result = initLink(key, type, config); + if (result) { + Set linkKeys = Optional.ofNullable(ruleLink.get(ruleId)).orElse(new HashSet<>()); + linkKeys.add(key); + ruleLink.put(ruleId, linkKeys); + } + return result; + } + + /** + * 注册连接 + * + * @param key 连接标识 + * @param link 连接 + */ + public static void register(String key, BaseSinkLink link) { + log.info("连接器{}注册连接", key); + linkMap.put(key, link); + + link.closeHandler(Void -> { + linkMap.remove(key); + log.info("连接器{}断开连接", key); + }); + } + + /** + * 发送消息 + * + * @param key 连接标识 + * @param msg 消息 + * @param consumer 发送回调 + */ + public static void sendMsg(String key, Map msg, Consumer consumer) { + try { + BaseSinkLink sinkLink = linkMap.get(key); + FIUtil.isTotF(sinkLink != null).handler(() -> sinkLink.send(msg, consumer), + () -> consumer.accept(String.format("key:%s, 没有连接!", key))); + } catch (Exception e) { + e.printStackTrace(); + consumer.accept(String.format("key:%s,发送异常:%s", key, e.getMessage())); + } + + } + + /** + * 停用规则 + * + * @param ruleId + */ + public static void ruleClose(String ruleId) { + Set linkKeys = ruleLink.remove(ruleId); + // 排除其他规则也在用这个 link的 + if (linkKeys != null && !linkKeys.isEmpty()) { + Set existKey = new HashSet<>(); + ruleLink.forEach((key, value) -> existKey.addAll(value)); + linkKeys.removeAll(existKey); + linkKeys.forEach(LinkFactory::close); + } + } + + /** + * 关闭连接 + * + * @param key + */ + public static void close(String key) { + BaseSinkLink link = linkMap.get(key); + if (link != null) { + link.close(); + } + } + + private static boolean buildLink(String key, String type, Map conf) { + boolean success = false; + BaseSinkLink link = null; + switch (type) { + case MqttClientLink.LINK_TYPE: + link = new MqttClientLink(); + break; + case KafkaLink.LINK_TYPE: + link = new KafkaLink(); + break; + } + if (link != null) { + success = link.open(conf); + } + if (success) { + register(key, link); + } + return success; + } + +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/LinkService.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/LinkService.java new file mode 100644 index 00000000..45a16279 --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/LinkService.java @@ -0,0 +1,20 @@ +package cc.iotkit.ruleengine.link; + +import java.util.Map; + +/** + * @author huangwenl + * @date 2022-11-11 + */ +public interface LinkService { + + default boolean initLink(String ruleId) { + return LinkFactory.initLink(ruleId, getKey(), getLinkType(), getLinkConf()); + } + + String getKey(); + + String getLinkType(); + + Map getLinkConf(); +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/KafkaLink.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/KafkaLink.java new file mode 100644 index 00000000..f19be415 --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/KafkaLink.java @@ -0,0 +1,84 @@ +package cc.iotkit.ruleengine.link.impl; + +import cc.iotkit.common.utils.FIUtil; +import cc.iotkit.ruleengine.link.BaseSinkLink; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * kafka 连接器 + * 支持自定义topic 和 分区, ack + * k-v 只支持String + * + * @author huangwenl + * @date 2022-11-11 + */ +public class KafkaLink implements BaseSinkLink { + public static final String LINK_TYPE = "kafka"; + public static final String TOPIC = "topic"; + public static final String PAYLOAD = "payload"; + public static final String PARTITION = "partition"; + + public static final String SERVERS = "servers"; + public static final String ACK = "ack"; + + private KafkaProducer producer; + private Consumer closeHandler; + + @Override + public boolean open(Map config) { + try { + AtomicReference vertx = new AtomicReference<>(); + FIUtil.isTotF(Vertx.currentContext() == null).handler( + () -> vertx.set(Vertx.vertx()), + () -> vertx.set(Vertx.currentContext().owner()) + ); + Map kafkaConfig = new HashMap<>(); + kafkaConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, (String) config.get(SERVERS)); + kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + kafkaConfig.put(ProducerConfig.ACKS_CONFIG, (String) config.get(ACK)); + producer = KafkaProducer.create(vertx.get(), kafkaConfig); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + return true; + } + + @Override + public void send(Map msg, Consumer consumer) { + AtomicReference> record = new AtomicReference<>(); + FIUtil.isTotF(msg.containsKey(PARTITION)).handler( + () -> record.set(KafkaProducerRecord.create((String) msg.get(TOPIC), "", msg.get(PAYLOAD).toString(), (Integer) msg.get(PARTITION))), + () -> record.set(KafkaProducerRecord.create((String) msg.get(TOPIC), msg.get(PAYLOAD).toString()))); + // todo 异步发送(不能确认是否成功) + producer.write(record.get()); + consumer.accept(String.format("kafka,topic[%s],发送成功:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString())); + } + + @Override + public void close() { + try { + producer.close(); + producer = null; + } catch (Exception e) { + e.printStackTrace(); + } finally { + closeHandler.accept(null); + } + } + + @Override + public void closeHandler(Consumer consumer) { + this.closeHandler = consumer; + } +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/MqttClientLink.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/MqttClientLink.java new file mode 100644 index 00000000..a8fb6d90 --- /dev/null +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/link/impl/MqttClientLink.java @@ -0,0 +1,75 @@ +package cc.iotkit.ruleengine.link.impl; + +import cc.iotkit.common.utils.FIUtil; +import cc.iotkit.ruleengine.link.BaseSinkLink; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * @author huangwenl + * @date 2022-11-10 + */ +public class MqttClientLink implements BaseSinkLink { + public static final String LINK_TYPE = "mqtt"; + public static final String TOPIC = "topic"; + public static final String PASSWORD = "password"; + public static final String USERNAME = "username"; + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String PAYLOAD = "payload"; + + private MqttClient mqttClient; + private Consumer closeHandler; + + + @Override + public boolean open(Map config) { + try { + AtomicReference vertx = new AtomicReference<>(); + FIUtil.isTotF(Vertx.currentContext() == null).handler( + () -> vertx.set(Vertx.vertx()), + () -> vertx.set(Vertx.currentContext().owner()) + ); + MqttClientOptions clientOptions = new MqttClientOptions(); + clientOptions.setUsername((String) config.get(USERNAME)); + clientOptions.setPassword((String) config.get(PASSWORD)); + mqttClient = MqttClient.create(vertx.get(), clientOptions); + mqttClient.connect((int) config.get(PORT), (String) config.get(HOST)); + mqttClient.closeHandler(Void -> closeHandler.accept(null)); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + return true; + } + + @Override + public void send(Map msg, Consumer consumer) { + FIUtil.isTotF(mqttClient.isConnected()).handler( + () -> { + mqttClient.publish((String) msg.get(TOPIC), + Buffer.buffer(msg.get(PAYLOAD).toString()), + MqttQoS.AT_MOST_ONCE, false, false); + consumer.accept(String.format("mqtt, topic:[%s],发送成功:,%s", msg.get(TOPIC), msg.get(PAYLOAD).toString())); + }, + () -> consumer.accept("mqtt,连接断开,发送失败")); + } + + @Override + public void close() { + mqttClient.disconnect(); + mqttClient = null; + } + + @Override + public void closeHandler(Consumer consumer) { + this.closeHandler = consumer; + } +} diff --git a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleManager.java b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleManager.java index 8537533a..e56b2011 100755 --- a/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleManager.java +++ b/iot-rule-engine/src/main/java/cc/iotkit/ruleengine/rule/RuleManager.java @@ -16,9 +16,14 @@ import cc.iotkit.model.Paging; import cc.iotkit.model.rule.RuleAction; import cc.iotkit.model.rule.RuleInfo; import cc.iotkit.ruleengine.action.*; +import cc.iotkit.ruleengine.action.kafka.KafkaAction; +import cc.iotkit.ruleengine.action.kafka.KafkaService; +import cc.iotkit.ruleengine.action.mqtt.MqttAction; +import cc.iotkit.ruleengine.action.mqtt.MqttService; import cc.iotkit.ruleengine.config.RuleConfiguration; import cc.iotkit.ruleengine.filter.DeviceFilter; import cc.iotkit.ruleengine.filter.Filter; +import cc.iotkit.ruleengine.link.LinkFactory; import cc.iotkit.ruleengine.listener.DeviceListener; import cc.iotkit.ruleengine.listener.Listener; import lombok.SneakyThrows; @@ -90,6 +95,8 @@ public class RuleManager { public void remove(String ruleId) { ruleMessageHandler.removeRule(ruleId); + // 移出link连接 + LinkFactory.ruleClose(ruleId); } public void pause(String ruleId) { @@ -111,7 +118,7 @@ public class RuleManager { } List> actions = new ArrayList<>(); for (RuleAction action : ruleInfo.getActions()) { - actions.add(parseAction(action.getType(), action.getConfig())); + actions.add(parseAction(ruleInfo.getId(), action.getType(), action.getConfig())); } return new Rule(ruleInfo.getId(), ruleInfo.getName(), listeners, filters, actions); @@ -133,7 +140,7 @@ public class RuleManager { return null; } - private Action parseAction(String type, String config) { + private Action parseAction(String ruleId, String type, String config) { if (DeviceAction.TYPE.equals(type)) { DeviceAction action = parse(config, DeviceAction.class); action.setDeviceActionService(deviceActionService); @@ -144,6 +151,20 @@ public class RuleManager { service.setDeviceInfoData(deviceInfoData); } return httpAction; + } else if (MqttAction.TYPE.equals(type)) { + MqttAction mqttAction = parse(config, MqttAction.class); + for (MqttService service : mqttAction.getServices()) { + service.setDeviceInfoData(deviceInfoData); + service.initLink(ruleId); + } + return mqttAction; + } else if (KafkaAction.TYPE.equals(type)) { + KafkaAction kafkaAction = parse(config, KafkaAction.class); + for (KafkaService service : kafkaAction.getServices()) { + service.setDeviceInfoData(deviceInfoData); + service.initLink(ruleId); + } + return kafkaAction; } return null; } diff --git a/pom.xml b/pom.xml index e9aa7769..f83ed53e 100755 --- a/pom.xml +++ b/pom.xml @@ -184,6 +184,12 @@ import + + io.vertx + vertx-kafka-client + ${vertx.version} + + io.vertx vertx-core