!7 数据流转添加 mqtt、kafka Action

Merge pull request !7 from yljun/master
V0.5.x
xiwa 2022-11-19 13:19:45 +00:00 committed by Gitee
commit 4b36a3a62a
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
14 changed files with 693 additions and 2 deletions

View File

@ -0,0 +1,11 @@
package cc.iotkit.common.function;
/**
* @author huangwenl
* @date 2022-11-10
*/
@FunctionalInterface
public interface IfHandler {
void handler(Runnable tHandler, Runnable fHandler);
}

View File

@ -0,0 +1,21 @@
package cc.iotkit.common.utils;
import cc.iotkit.common.function.IfHandler;
/**
* @author huangwenl
* @date 2022-11-10
*/
public class FIUtil {
public static IfHandler isTotF(boolean param) {
return (tHandler, fHandler) -> {
if (param) {
tHandler.run();
} else {
fHandler.run();
}
};
}
}

View File

@ -71,6 +71,17 @@
<artifactId>iot-message-core</artifactId>
</dependency>
<!-- mqtt-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
</dependency>
<!-- kafka-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,40 @@
package cc.iotkit.ruleengine.action.kafka;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.ruleengine.action.Action;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* @author huangwenl
* @date 2022-11-11
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class KafkaAction implements Action<KafkaService> {
public static final String TYPE = "kafka";
private List<KafkaService> services;
@Override
public String getType() {
return TYPE;
}
@Override
public List<String> execute(ThingModelMessage msg) {
List<String> results = new ArrayList<>();
for (KafkaService service : services) {
results.add(service.execute(msg));
}
return results;
}
}

View File

@ -0,0 +1,64 @@
package cc.iotkit.ruleengine.action.kafka;
import cc.iotkit.common.utils.FIUtil;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.ruleengine.action.ScriptService;
import cc.iotkit.ruleengine.link.LinkFactory;
import cc.iotkit.ruleengine.link.LinkService;
import cc.iotkit.ruleengine.link.impl.KafkaLink;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author huangwenl
* @date 2022-11-11
*/
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Data
public class KafkaService extends ScriptService implements LinkService {
private String services;
private String ack;
public String execute(ThingModelMessage msg) {
//执行转换脚本
Map result = execScript(msg);
if (result == null) {
log.warn("execScript result is null");
return "execScript result is null";
}
boolean initResult = LinkFactory.initLink(getKey(), KafkaLink.LINK_TYPE, getLinkConf());
AtomicReference<String> data = new AtomicReference<>("");
FIUtil.isTotF(initResult).handler(
() -> LinkFactory.sendMsg(getKey(), result, data::set),
() -> data.set("创建连接失败!")
);
return data.get();
}
@Override
public String getKey() {
return String.format("kafka_%s", services);
}
@Override
public String getLinkType() {
return KafkaLink.LINK_TYPE;
}
@Override
public Map<String, Object> getLinkConf() {
Map<String, Object> config = new HashMap<>();
config.put(KafkaLink.SERVERS, services);
config.put(KafkaLink.ACK, ack);
return config;
}
}

View File

@ -0,0 +1,39 @@
package cc.iotkit.ruleengine.action.mqtt;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.ruleengine.action.Action;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* @author huangwenl
* @date 2022-11-10
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class MqttAction implements Action<MqttService> {
public static final String TYPE = "mqtt";
private List<MqttService> services;
@Override
public String getType() {
return TYPE;
}
@Override
public List<String> execute(ThingModelMessage msg) {
List<String> results = new ArrayList<>();
for (MqttService service : services) {
results.add(service.execute(msg));
}
return results;
}
}

View File

@ -0,0 +1,68 @@
package cc.iotkit.ruleengine.action.mqtt;
import cc.iotkit.common.utils.FIUtil;
import cc.iotkit.model.device.message.ThingModelMessage;
import cc.iotkit.ruleengine.action.ScriptService;
import cc.iotkit.ruleengine.link.LinkFactory;
import cc.iotkit.ruleengine.link.LinkService;
import cc.iotkit.ruleengine.link.impl.MqttClientLink;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author huangwenl
* @date 2022-11-09
*/
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Data
public class MqttService extends ScriptService implements LinkService {
private String username;
private String password;
private String host;
private int port;
public String execute(ThingModelMessage msg) {
//执行转换脚本
Map result = execScript(msg);
if (result == null) {
log.warn("execScript result is null");
return "execScript result is null";
}
boolean initResult = LinkFactory.initLink(getKey(), MqttClientLink.LINK_TYPE, getLinkConf());
AtomicReference<String> data = new AtomicReference<>("");
FIUtil.isTotF(initResult).handler(
() -> LinkFactory.sendMsg(getKey(), result, data::set),
() -> data.set("创建连接失败!")
);
return data.get();
}
@Override
public String getKey() {
return String.format("mqtt_%s_%d", host, port);
}
@Override
public String getLinkType() {
return MqttClientLink.LINK_TYPE;
}
@Override
public Map<String, Object> getLinkConf() {
Map<String, Object> config = new HashMap<>();
config.put(MqttClientLink.HOST, host);
config.put(MqttClientLink.PORT, port);
config.put(MqttClientLink.USERNAME, username);
config.put(MqttClientLink.PASSWORD, password);
return config;
}
}

View File

@ -0,0 +1,35 @@
package cc.iotkit.ruleengine.link;
import java.util.Map;
import java.util.function.Consumer;
/**
* @author huangwenl
* @date 2022-11-10
*/
public interface BaseSinkLink {
/**
*
* @param config
*/
boolean open(Map<String, Object> config);
/**
*
* @param msg
* @param consumer
*/
void send(Map<String, Object> msg, Consumer<String> consumer);
/**
*
*/
void close();
/**
*
* @param closeHandler
*/
void closeHandler(Consumer<Void> closeHandler);
}

View File

@ -0,0 +1,146 @@
package cc.iotkit.ruleengine.link;
import cc.iotkit.common.utils.FIUtil;
import cc.iotkit.ruleengine.link.impl.KafkaLink;
import cc.iotkit.ruleengine.link.impl.MqttClientLink;
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* @author huangwenl
* @date 2022-11-10
*/
@Slf4j
public class LinkFactory {
private static final Map<String, BaseSinkLink> linkMap = new ConcurrentHashMap<>();
private static final Map<String, Set<String>> ruleLink = new ConcurrentHashMap<>();
/**
*
*
* @param key
* @param type
* @param config
* @return
*/
public static boolean initLink(String key, String type, Map<String, Object> config) {
AtomicBoolean exist = new AtomicBoolean(false);
FIUtil.isTotF(linkMap.containsKey(key)).handler(
() -> exist.set(true),
() -> exist.set(buildLink(key, type, config))
);
return exist.get();
}
/**
*
*
* @param ruleId
* @param ruleId ID
* @param key
* @param type
* @param config
* @return
*/
public static boolean initLink(String ruleId, String key, String type, Map<String, Object> config) {
boolean result = initLink(key, type, config);
if (result) {
Set<String> linkKeys = Optional.ofNullable(ruleLink.get(ruleId)).orElse(new HashSet<>());
linkKeys.add(key);
ruleLink.put(ruleId, linkKeys);
}
return result;
}
/**
*
*
* @param key
* @param link
*/
public static void register(String key, BaseSinkLink link) {
log.info("连接器{}注册连接", key);
linkMap.put(key, link);
link.closeHandler(Void -> {
linkMap.remove(key);
log.info("连接器{}断开连接", key);
});
}
/**
*
*
* @param key
* @param msg
* @param consumer
*/
public static void sendMsg(String key, Map<String, Object> msg, Consumer<String> consumer) {
try {
BaseSinkLink sinkLink = linkMap.get(key);
FIUtil.isTotF(sinkLink != null).handler(() -> sinkLink.send(msg, consumer),
() -> consumer.accept(String.format("key:%s, 没有连接!", key)));
} catch (Exception e) {
e.printStackTrace();
consumer.accept(String.format("key:%s,发送异常:%s", key, e.getMessage()));
}
}
/**
*
*
* @param ruleId
*/
public static void ruleClose(String ruleId) {
Set<String> linkKeys = ruleLink.remove(ruleId);
// 排除其他规则也在用这个 link的
if (linkKeys != null && !linkKeys.isEmpty()) {
Set<String> existKey = new HashSet<>();
ruleLink.forEach((key, value) -> existKey.addAll(value));
linkKeys.removeAll(existKey);
linkKeys.forEach(LinkFactory::close);
}
}
/**
*
*
* @param key
*/
public static void close(String key) {
BaseSinkLink link = linkMap.get(key);
if (link != null) {
link.close();
}
}
private static boolean buildLink(String key, String type, Map<String, Object> conf) {
boolean success = false;
BaseSinkLink link = null;
switch (type) {
case MqttClientLink.LINK_TYPE:
link = new MqttClientLink();
break;
case KafkaLink.LINK_TYPE:
link = new KafkaLink();
break;
}
if (link != null) {
success = link.open(conf);
}
if (success) {
register(key, link);
}
return success;
}
}

View File

@ -0,0 +1,20 @@
package cc.iotkit.ruleengine.link;
import java.util.Map;
/**
* @author huangwenl
* @date 2022-11-11
*/
public interface LinkService {
default boolean initLink(String ruleId) {
return LinkFactory.initLink(ruleId, getKey(), getLinkType(), getLinkConf());
}
String getKey();
String getLinkType();
Map<String, Object> getLinkConf();
}

View File

@ -0,0 +1,96 @@
package cc.iotkit.ruleengine.link.impl;
import cc.iotkit.common.utils.FIUtil;
import cc.iotkit.ruleengine.link.BaseSinkLink;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* kafka
* topic , ack
* k-v String
*
* @author huangwenl
* @date 2022-11-11
*/
@Slf4j
public class KafkaLink implements BaseSinkLink {
public static final String LINK_TYPE = "kafka";
public static final String TOPIC = "topic";
public static final String PAYLOAD = "payload";
public static final String PARTITION = "partition";
public static final String SERVERS = "servers";
public static final String ACK = "ack";
private KafkaProducer<String, String> producer;
private Consumer<Void> closeHandler;
@Override
public boolean open(Map<String, Object> config) {
try {
AtomicReference<Vertx> vertx = new AtomicReference<>();
FIUtil.isTotF(Vertx.currentContext() == null).handler(
() -> vertx.set(Vertx.vertx()),
() -> vertx.set(Vertx.currentContext().owner())
);
Map<String, String> kafkaConfig = new HashMap<>();
kafkaConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, (String) config.get(SERVERS));
kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaConfig.put(ProducerConfig.ACKS_CONFIG, (String) config.get(ACK));
producer = KafkaProducer.create(vertx.get(), kafkaConfig);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
@Override
public void send(Map<String, Object> msg, Consumer<String> consumer) {
AtomicReference<KafkaProducerRecord<String, String>> record = new AtomicReference<>();
FIUtil.isTotF(msg.containsKey(PARTITION)).handler(
() -> record.set(KafkaProducerRecord.create((String) msg.get(TOPIC), "", msg.get(PAYLOAD).toString(), (Integer) msg.get(PARTITION))),
() -> record.set(KafkaProducerRecord.create((String) msg.get(TOPIC), msg.get(PAYLOAD).toString())));
// 同步等待结果
try {
Future<Void> write = producer.write(record.get());
write.toCompletionStage().toCompletableFuture().get();
FIUtil.isTotF(write.succeeded()).handler(
() -> consumer.accept(String.format("kafka,topic[%s],发送成功:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString())),
() -> consumer.accept(String.format("kafka,topic[%s],发送失败:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()))
);
} catch (Exception e) {
e.printStackTrace();
consumer.accept(String.format("kafka,topic[%s],发送异常:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()));
}
}
@Override
public void close() {
try {
producer.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
closeHandler.accept(null);
}
}
@Override
public void closeHandler(Consumer<Void> consumer) {
this.closeHandler = consumer;
}
}

View File

@ -0,0 +1,113 @@
package cc.iotkit.ruleengine.link.impl;
import cc.iotkit.common.utils.FIUtil;
import cc.iotkit.ruleengine.link.BaseSinkLink;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* @author huangwenl
* @date 2022-11-10
*/
@Slf4j
public class MqttClientLink implements BaseSinkLink {
public static final String LINK_TYPE = "mqtt";
public static final String TOPIC = "topic";
public static final String PASSWORD = "password";
public static final String USERNAME = "username";
public static final String HOST = "host";
public static final String PORT = "port";
public static final String PAYLOAD = "payload";
private MqttClient mqttClient;
private Consumer<Void> closeHandler;
private MqttClientOptions clientOptions;
private String host;
private int port;
private boolean connecting;
@Override
public boolean open(Map<String, Object> config) {
try {
AtomicReference<Vertx> vertx = new AtomicReference<>();
FIUtil.isTotF(Vertx.currentContext() == null).handler(
() -> vertx.set(Vertx.vertx()),
() -> vertx.set(Vertx.currentContext().owner())
);
clientOptions = new MqttClientOptions();
clientOptions.setUsername((String) config.get(USERNAME));
clientOptions.setPassword((String) config.get(PASSWORD));
mqttClient = MqttClient.create(vertx.get(), clientOptions);
host = (String) config.get(HOST);
port = (int) config.get(PORT);
mqttClient = MqttClient.create(vertx.get(), clientOptions);
connecting = true;
mqttClient.connect(port, host,
s -> {
connecting = false;
if (!s.succeeded()) {
closeHandler.accept(null);
}
});
} catch (Exception e) {
e.printStackTrace();
connecting = false;
return false;
}
return true;
}
@Override
public void send(Map<String, Object> msg, Consumer<String> consumer) {
FIUtil.isTotF(mqttClient.isConnected()).handler(
() -> {
Future<Integer> publish = mqttClient.publish((String) msg.get(TOPIC),
Buffer.buffer(msg.get(PAYLOAD).toString()),
MqttQoS.AT_MOST_ONCE, false, false);
try {
publish.toCompletionStage().toCompletableFuture().get();
FIUtil.isTotF(publish.succeeded()).handler(
() -> consumer.accept(String.format("mqtt,topic[%s],发送成功:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString())),
() -> consumer.accept(String.format("mqtt,topic[%s],发送失败:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()))
);
} catch (Exception e) {
e.printStackTrace();
consumer.accept(String.format("mqtt,topic[%s],发送异常:%s", msg.get(TOPIC), msg.get(PAYLOAD).toString()));
}
},
() -> {
consumer.accept("mqtt,连接断开,发送失败");
if (!connecting) {
log.info("mqtt重连");
connecting = true;
mqttClient.connect(port, host, s -> connecting = false);
}
});
}
@Override
public void close() {
try {
mqttClient.disconnect();
} catch (Exception e) {
e.printStackTrace();
} finally {
closeHandler.accept(null);
}
}
@Override
public void closeHandler(Consumer<Void> consumer) {
this.closeHandler = consumer;
}
}

View File

@ -16,9 +16,14 @@ import cc.iotkit.model.Paging;
import cc.iotkit.model.rule.RuleAction;
import cc.iotkit.model.rule.RuleInfo;
import cc.iotkit.ruleengine.action.*;
import cc.iotkit.ruleengine.action.kafka.KafkaAction;
import cc.iotkit.ruleengine.action.kafka.KafkaService;
import cc.iotkit.ruleengine.action.mqtt.MqttAction;
import cc.iotkit.ruleengine.action.mqtt.MqttService;
import cc.iotkit.ruleengine.config.RuleConfiguration;
import cc.iotkit.ruleengine.filter.DeviceFilter;
import cc.iotkit.ruleengine.filter.Filter;
import cc.iotkit.ruleengine.link.LinkFactory;
import cc.iotkit.ruleengine.listener.DeviceListener;
import cc.iotkit.ruleengine.listener.Listener;
import lombok.SneakyThrows;
@ -90,6 +95,8 @@ public class RuleManager {
public void remove(String ruleId) {
ruleMessageHandler.removeRule(ruleId);
// 移出link连接
LinkFactory.ruleClose(ruleId);
}
public void pause(String ruleId) {
@ -111,7 +118,7 @@ public class RuleManager {
}
List<Action<?>> actions = new ArrayList<>();
for (RuleAction action : ruleInfo.getActions()) {
actions.add(parseAction(action.getType(), action.getConfig()));
actions.add(parseAction(ruleInfo.getId(), action.getType(), action.getConfig()));
}
return new Rule(ruleInfo.getId(), ruleInfo.getName(), listeners, filters, actions);
@ -133,7 +140,7 @@ public class RuleManager {
return null;
}
private Action<?> parseAction(String type, String config) {
private Action<?> parseAction(String ruleId, String type, String config) {
if (DeviceAction.TYPE.equals(type)) {
DeviceAction action = parse(config, DeviceAction.class);
action.setDeviceActionService(deviceActionService);
@ -144,6 +151,20 @@ public class RuleManager {
service.setDeviceInfoData(deviceInfoData);
}
return httpAction;
} else if (MqttAction.TYPE.equals(type)) {
MqttAction mqttAction = parse(config, MqttAction.class);
for (MqttService service : mqttAction.getServices()) {
service.setDeviceInfoData(deviceInfoData);
service.initLink(ruleId);
}
return mqttAction;
} else if (KafkaAction.TYPE.equals(type)) {
KafkaAction kafkaAction = parse(config, KafkaAction.class);
for (KafkaService service : kafkaAction.getServices()) {
service.setDeviceInfoData(deviceInfoData);
service.initLink(ruleId);
}
return kafkaAction;
}
return null;
}

View File

@ -184,6 +184,12 @@
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>