webocket通讯协议服务端优化

V0.5.x
tangfudong 2023-03-20 13:41:39 +08:00
parent 27fc96d567
commit 99c2f1d881
1 changed files with 24 additions and 16 deletions

View File

@ -12,6 +12,7 @@ import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.net.PemKeyCertOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
@ -44,29 +45,34 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
}
httpServer = vertx.createHttpServer(options).webSocketHandler(wsClient -> {
log.info("webSocket client connect sessionId:{},path={}", wsClient.textHandlerID(), wsClient.path());
String deviceKey = wsClient.headers().get("deviceKey");
String deviceKey = wsClient.path().replace("/","");
if(StringUtils.isBlank(deviceKey)||deviceKey.split("_").length<2){
wsClient.reject();
log.warn("陌生连接,拒绝");
return;
}
Map<String,String> deviceKeyObj=new HashMap<>();
deviceKeyObj.put("deviceKey",deviceKey);
executor.onReceive(new HashMap<>(), "auth", JsonUtil.toJsonString(deviceKeyObj), (r) -> {
if (r == null) {
//认证失败
log.warn("认证失败,拒绝");
wsClient.reject();
return;
}
//保存设备与连接关系
wsClients.put(getDeviceKey(r), wsClient);
});
wsClient.textMessageHandler(message -> {
executor.onReceive(new HashMap<>(), "auth", deviceKey, (r) -> {
if (r == null) {
//认证失败
wsClient.reject();
return;
}
//保存设备与连接关系
wsClients.put(getDeviceKey(r), wsClient);
executor.onReceive(new HashMap<>(), "", message);
});
executor.onReceive(new HashMap<>(), "", message);
});
wsClient.closeHandler(c -> {
log.warn("client connection closed,deviceKey:{}", deviceKey);
executor.onReceive(new HashMap<>(), "disconnect", deviceKey, (r) -> {
executor.onReceive(new HashMap<>(), "disconnect", JsonUtil.toJsonString(deviceKeyObj), (r) -> {
//删除设备与连接关系
wsClients.remove(getDeviceKey(r));
});
});
wsClient.endHandler(e -> {
log.warn("webSocket client connection end,deviceKey:{}", deviceKey);
});
wsClient.exceptionHandler(ex -> {
log.warn("webSocket client connection exception,deviceKey:{}", deviceKey);
});
@ -82,7 +88,9 @@ public class WebSocketServerVerticle extends AbstractDeviceVerticle {
@Override
public void stop() throws Exception {
for (String deviceKey : wsClients.keySet()) {
executor.onReceive(null, "disconnect", deviceKey);
Map<String,String> deviceKeyObj=new HashMap<>();
deviceKeyObj.put("deviceKey",deviceKey);
executor.onReceive(null, "disconnect", JsonUtil.toJsonString(deviceKeyObj));
}
httpServer.close(voidAsyncResult -> log.info("close webocket server..."));
}