diff --git a/server/webrtc_easy_signal/callbacks.go b/server/webrtc_easy_signal/callbacks.go index 55d34c4..23a7c83 100644 --- a/server/webrtc_easy_signal/callbacks.go +++ b/server/webrtc_easy_signal/callbacks.go @@ -1,7 +1,7 @@ /* * @Author: your name * @Date: 2021-10-21 22:36:25 - * @LastEditTime: 2021-10-22 14:36:26 + * @LastEditTime: 2021-10-25 22:41:16 * @LastEditors: Please set LastEditors * @Description: In User Settings Edit * @FilePath: \webrtc_easy_signal\proto.go @@ -26,6 +26,15 @@ const ( REQ_CREATEROOM = 1003 REQ_LISTROOM = 1004 REQ_SENDSDP = 1005 + REQ_SENDCANDIDATE = 1006 + + RESP_LOGIN = 2000 + RESP_INROOM = 2001 + RESP_LEAVEROOM = 2002 + RESP_CREATEROOM = 2003 + RESP_LISTROOM = 2004 + RESP_SENDSDP = 2005 + RESP_SENDCANDIDATE = 2006 ) type Peer struct { @@ -36,12 +45,21 @@ type Peer struct { type Room struct { Name string // 房间名称 + ws *WsConnection // websocket连接 Peers map[string]*Peer // 在房间里的节点 } var gmapRoom sync.Map var gmapUser sync.Map +func get_room_online(name string) *Room{ + room,ok := gmapRoom.Load(name) + if ok{ + return room.(*Room) + } + return nil +} + func get_peers_online(name string) *Peer { peer, ok := gmapUser.Load(name) if ok { @@ -57,6 +75,31 @@ func get_interface_string(v interface{}) string { } return "" } +func ProtoUnconnect(ws *WsConnection){ + if nil == ws{ + return + } + foundname := "" + gmapUser.Range(func(key, value interface{}) bool { + if value.(*Peer).Ws == ws{ + foundname = value.(*Peer).Name + if value.(*Peer).Room != nil{ + delete(value.(*Peer).Room.Peers, foundname) + + peers_in_room := map[string]interface{}{} + for _,v := range value.(*Peer).Room.Peers{ + peers_in_room[v.Name] = true + } + for _,v := range value.(*Peer).Room.Peers{ + v.Ws.SendPayload(RESP_LISTROOM,peers_in_room) + } + } + } + return true + }) + gmapUser.Delete(foundname) + +} func ProtoCallBack(ws *WsConnection, dat []byte) { if nil == ws { @@ -75,34 +118,54 @@ func ProtoCallBack(ws *WsConnection, dat []byte) { if get_peers_online(name) != nil { log.Print("节点名重复") // todo 返回 + ws.SendPayload(RESP_LOGIN,map[string]interface{}{ + "status": -1, + "data":"节点名重复", + }) return } peer := new(Peer) peer.Name = name peer.Ws = ws gmapUser.Store(name, peer) + ws.SendPayload(RESP_LOGIN,map[string]interface{}{ + "status":0, + "data":"success", + }) } case REQ_INROOM: - log.Print(payload.Data["name"]) // 用户名称 - log.Print(payload.Data["room_name"]) // 房间名称 - if (payload.Data["room_name"] != "") && (payload.Data["name"] != "") { + name := get_interface_string(payload.Data["name"]) + room_name := get_interface_string(payload.Data["room_name"]) + if (room_name != "") && (name != "") { // 不存在房间就创建房间 - if _, ok := gmapRoom.Load("room_name"); !ok { - room := Room{ - Name: payload.Data["room_name"].(string), - } - gmapRoom.Store(payload.Data["room_name"], Room{}) + if _, ok := gmapRoom.Load(payload.Data["room_name"]); !ok { + + newroom := new(Room) + newroom.Name = payload.Data["room_name"].(string) + gmapRoom.Store(payload.Data["room_name"], newroom) // if peer, ok := gmapUser.Load(payload.Data["name"]); ok { - room.Peers[payload.Data["name"].(string)] = peer.(*Peer) - + newroom.Peers = make(map[string]*Peer) + newroom.Peers[payload.Data["name"].(string)] = peer.(*Peer) } } else { // 将该用户添加进房间 - + peer := get_peers_online(name) + peer.Room = get_room_online(room_name) + peer.Room.Peers[name] = peer + } + room := get_room_online(room_name) + peers_in_room := map[string]interface{}{} + + if nil != room{ + for _,v := range room.Peers{ + peers_in_room[v.Name] = true + } + for _,v := range room.Peers{ + v.Ws.SendPayload(RESP_LISTROOM,peers_in_room) + } } } - break case REQ_CREATEROOM: @@ -111,10 +174,49 @@ func ProtoCallBack(ws *WsConnection, dat []byte) { break case REQ_LISTROOM: + // 显示room中其他成员 + name := get_interface_string(payload.Data["name"]) + peer := get_peers_online(name) + if nil != peer{ + } + room_name := get_interface_string(payload.Data["room_name"]) + room := get_room_online(room_name) + peers_in_room := map[string]interface{}{} + + if nil != room{ + for _,v := range room.Peers{ + peers_in_room[v.Name] = true + } + } + ws.SendPayload(RESP_LISTROOM,peers_in_room) break case REQ_SENDSDP: - + name := get_interface_string(payload.Data["name"]) + peer_name := get_interface_string(payload.Data["remote_name"]) + sdp := get_interface_string(payload.Data["sdp"]) + if((name == "") ||(peer_name == "")){ + return + } + peer_remote := get_peers_online(peer_name) + if peer_remote != nil{ + peer_remote.Ws.SendPayload(RESP_SENDSDP,map[string]interface{}{ + "sdp":sdp, + "remote_name":name, + }) + } break + case REQ_SENDCANDIDATE: + name := get_interface_string(payload.Data["name"]) + peer_name := get_interface_string(payload.Data["remote_name"]) + sdp := get_interface_string(payload.Data["candidate"]) + peer_remote := get_peers_online(peer_name) + if peer_remote != nil{ + peer_remote.Ws.SendPayload(RESP_SENDCANDIDATE,map[string]interface{}{ + "candidate":sdp, + "remote_name":name, + }) + } + break } } diff --git a/server/webrtc_easy_signal/main.go b/server/webrtc_easy_signal/main.go index 1a6f302..2cec57e 100644 --- a/server/webrtc_easy_signal/main.go +++ b/server/webrtc_easy_signal/main.go @@ -1,7 +1,7 @@ /* * @Author: your name * @Date: 2021-10-21 20:30:24 - * @LastEditTime: 2021-10-22 10:47:26 + * @LastEditTime: 2021-10-22 22:49:21 * @LastEditors: Please set LastEditors * @Description: In User Settings Edit * @FilePath: \webrtc_easy_signal\main.go @@ -10,7 +10,6 @@ package main import ( "fmt" - "log" "net/http" ) @@ -21,7 +20,6 @@ func StartWebsocket(addrPort string) { http.ListenAndServe(addrPort, nil) } -func main() { - log.Print("server started") +func main(){ StartWebsocket(fmt.Sprintf("0.0.0.0:9555")) -} +} \ No newline at end of file diff --git a/server/webrtc_easy_signal/ws_conection.go b/server/webrtc_easy_signal/ws_conection.go index af8c89b..1bb5e99 100644 --- a/server/webrtc_easy_signal/ws_conection.go +++ b/server/webrtc_easy_signal/ws_conection.go @@ -19,11 +19,10 @@ const ( // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum message size allowed from peer. - maxMessageSize = 512 + maxMessageSize = 40960 ) var maxConnId int64 - // 用于广播 var WsConnAll map[int64]*WsConnection @@ -69,6 +68,7 @@ func WsHandler(resp http.ResponseWriter, req *http.Request) { log.Println("升级为websocket失败", err.Error()) return } + wsSocket.SetReadLimit(40960); maxConnId++ // TODO 如果要控制连接数可以计算,wsConnAll长度 // 连接数保持一定数量,超过的部分不提供服务 @@ -127,7 +127,7 @@ func (wsConn *WsConnection) processLoop() { if err != nil { if err.Error() == "超时" { // log.Print(wsConn.lastHeartBeatTime.String()) - if wsConn.needHeartBeat { + if(wsConn.needHeartBeat){ if time.Now().Sub(wsConn.lastHeartBeatTime) > time.Second*15 { log.Print("心跳超时") wsConn.close() @@ -137,15 +137,13 @@ func (wsConn *WsConnection) processLoop() { } break } - ProtoCallBack(wsConn, msg.data) + ProtoCallBack(wsConn,msg.data) } } - -type Response struct { - Type int `json:"type"` +type Response struct{ + Type int `json:"type"` Payload interface{} `json:"data"` } - // 发送信息 func (wsConn *WsConnection) SendPayload(ptype int64, v interface{}) error { var resp Response @@ -162,9 +160,9 @@ func (wsConn *WsConnection) SendPayload(ptype int64, v interface{}) error { // 写入消息到队列中 func (wsConn *WsConnection) wsWrite(messageType int, data []byte) error { select { - case wsConn.outChan <- &wsMessage{messageType, data}: - case <-wsConn.closeChan: - return errors.New("连接已经关闭") + case wsConn.outChan <- &wsMessage{messageType, data}: + case <-wsConn.closeChan: + return errors.New("连接已经关闭") } return nil } @@ -181,10 +179,12 @@ func (wsConn *WsConnection) wsReadLoop() { data, } if err != nil { - websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) - log.Println("消息读取出现错误", err.Error()) - + log.Print(err.Error()) + if(websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure)){ + log.Println("消息读取出现错误", err.Error()) + } wsConn.close() + return } log.Print(string(data)) // 放入请求队列,消息入栈 @@ -215,13 +215,12 @@ func (wsConn *WsConnection) wsWriteLoop() { } } } - // 关闭连接 func (wsConn *WsConnection) close() { log.Println("关闭连接被调用了") wsConn.wsSocket.Close() wsConn.mutex.Lock() - + defer wsConn.mutex.Unlock() if wsConn.isClosed == false { wsConn.isClosed = true @@ -229,4 +228,5 @@ func (wsConn *WsConnection) close() { delete(WsConnAll, wsConn.id) close(wsConn.closeChan) } + ProtoUnconnect(wsConn) }