no message

master
zcy 2021-10-26 19:55:55 +08:00
parent 489c84d85b
commit f3fe679e1a
3 changed files with 135 additions and 35 deletions

View File

@ -1,7 +1,7 @@
/* /*
* @Author: your name * @Author: your name
* @Date: 2021-10-21 22:36:25 * @Date: 2021-10-21 22:36:25
* @LastEditTime: 2021-10-22 14:36:26 * @LastEditTime: 2021-10-25 22:41:16
* @LastEditors: Please set LastEditors * @LastEditors: Please set LastEditors
* @Description: In User Settings Edit * @Description: In User Settings Edit
* @FilePath: \webrtc_easy_signal\proto.go * @FilePath: \webrtc_easy_signal\proto.go
@ -26,6 +26,15 @@ const (
REQ_CREATEROOM = 1003 REQ_CREATEROOM = 1003
REQ_LISTROOM = 1004 REQ_LISTROOM = 1004
REQ_SENDSDP = 1005 REQ_SENDSDP = 1005
REQ_SENDCANDIDATE = 1006
RESP_LOGIN = 2000
RESP_INROOM = 2001
RESP_LEAVEROOM = 2002
RESP_CREATEROOM = 2003
RESP_LISTROOM = 2004
RESP_SENDSDP = 2005
RESP_SENDCANDIDATE = 2006
) )
type Peer struct { type Peer struct {
@ -36,12 +45,21 @@ type Peer struct {
type Room struct { type Room struct {
Name string // 房间名称 Name string // 房间名称
ws *WsConnection // websocket连接
Peers map[string]*Peer // 在房间里的节点 Peers map[string]*Peer // 在房间里的节点
} }
var gmapRoom sync.Map var gmapRoom sync.Map
var gmapUser sync.Map var gmapUser sync.Map
func get_room_online(name string) *Room{
room,ok := gmapRoom.Load(name)
if ok{
return room.(*Room)
}
return nil
}
func get_peers_online(name string) *Peer { func get_peers_online(name string) *Peer {
peer, ok := gmapUser.Load(name) peer, ok := gmapUser.Load(name)
if ok { if ok {
@ -57,6 +75,31 @@ func get_interface_string(v interface{}) string {
} }
return "" return ""
} }
func ProtoUnconnect(ws *WsConnection){
if nil == ws{
return
}
foundname := ""
gmapUser.Range(func(key, value interface{}) bool {
if value.(*Peer).Ws == ws{
foundname = value.(*Peer).Name
if value.(*Peer).Room != nil{
delete(value.(*Peer).Room.Peers, foundname)
peers_in_room := map[string]interface{}{}
for _,v := range value.(*Peer).Room.Peers{
peers_in_room[v.Name] = true
}
for _,v := range value.(*Peer).Room.Peers{
v.Ws.SendPayload(RESP_LISTROOM,peers_in_room)
}
}
}
return true
})
gmapUser.Delete(foundname)
}
func ProtoCallBack(ws *WsConnection, dat []byte) { func ProtoCallBack(ws *WsConnection, dat []byte) {
if nil == ws { if nil == ws {
@ -75,34 +118,54 @@ func ProtoCallBack(ws *WsConnection, dat []byte) {
if get_peers_online(name) != nil { if get_peers_online(name) != nil {
log.Print("节点名重复") log.Print("节点名重复")
// todo 返回 // todo 返回
ws.SendPayload(RESP_LOGIN,map[string]interface{}{
"status": -1,
"data":"节点名重复",
})
return return
} }
peer := new(Peer) peer := new(Peer)
peer.Name = name peer.Name = name
peer.Ws = ws peer.Ws = ws
gmapUser.Store(name, peer) gmapUser.Store(name, peer)
ws.SendPayload(RESP_LOGIN,map[string]interface{}{
"status":0,
"data":"success",
})
} }
case REQ_INROOM: case REQ_INROOM:
log.Print(payload.Data["name"]) // 用户名称 name := get_interface_string(payload.Data["name"])
log.Print(payload.Data["room_name"]) // 房间名称 room_name := get_interface_string(payload.Data["room_name"])
if (payload.Data["room_name"] != "") && (payload.Data["name"] != "") { if (room_name != "") && (name != "") {
// 不存在房间就创建房间 // 不存在房间就创建房间
if _, ok := gmapRoom.Load("room_name"); !ok { if _, ok := gmapRoom.Load(payload.Data["room_name"]); !ok {
room := Room{
Name: payload.Data["room_name"].(string), newroom := new(Room)
} newroom.Name = payload.Data["room_name"].(string)
gmapRoom.Store(payload.Data["room_name"], Room{}) gmapRoom.Store(payload.Data["room_name"], newroom)
// //
if peer, ok := gmapUser.Load(payload.Data["name"]); ok { if peer, ok := gmapUser.Load(payload.Data["name"]); ok {
room.Peers[payload.Data["name"].(string)] = peer.(*Peer) newroom.Peers = make(map[string]*Peer)
newroom.Peers[payload.Data["name"].(string)] = peer.(*Peer)
} }
} else { } else {
// 将该用户添加进房间 // 将该用户添加进房间
peer := get_peers_online(name)
peer.Room = get_room_online(room_name)
peer.Room.Peers[name] = peer
}
room := get_room_online(room_name)
peers_in_room := map[string]interface{}{}
if nil != room{
for _,v := range room.Peers{
peers_in_room[v.Name] = true
}
for _,v := range room.Peers{
v.Ws.SendPayload(RESP_LISTROOM,peers_in_room)
}
} }
} }
break break
case REQ_CREATEROOM: case REQ_CREATEROOM:
@ -111,10 +174,49 @@ func ProtoCallBack(ws *WsConnection, dat []byte) {
break break
case REQ_LISTROOM: case REQ_LISTROOM:
// 显示room中其他成员
name := get_interface_string(payload.Data["name"])
peer := get_peers_online(name)
if nil != peer{
}
room_name := get_interface_string(payload.Data["room_name"])
room := get_room_online(room_name)
peers_in_room := map[string]interface{}{}
if nil != room{
for _,v := range room.Peers{
peers_in_room[v.Name] = true
}
}
ws.SendPayload(RESP_LISTROOM,peers_in_room)
break break
case REQ_SENDSDP: case REQ_SENDSDP:
name := get_interface_string(payload.Data["name"])
peer_name := get_interface_string(payload.Data["remote_name"])
sdp := get_interface_string(payload.Data["sdp"])
if((name == "") ||(peer_name == "")){
return
}
peer_remote := get_peers_online(peer_name)
if peer_remote != nil{
peer_remote.Ws.SendPayload(RESP_SENDSDP,map[string]interface{}{
"sdp":sdp,
"remote_name":name,
})
}
break
case REQ_SENDCANDIDATE:
name := get_interface_string(payload.Data["name"])
peer_name := get_interface_string(payload.Data["remote_name"])
sdp := get_interface_string(payload.Data["candidate"])
peer_remote := get_peers_online(peer_name)
if peer_remote != nil{
peer_remote.Ws.SendPayload(RESP_SENDCANDIDATE,map[string]interface{}{
"candidate":sdp,
"remote_name":name,
})
}
break break
} }
} }

View File

@ -1,7 +1,7 @@
/* /*
* @Author: your name * @Author: your name
* @Date: 2021-10-21 20:30:24 * @Date: 2021-10-21 20:30:24
* @LastEditTime: 2021-10-22 10:47:26 * @LastEditTime: 2021-10-22 22:49:21
* @LastEditors: Please set LastEditors * @LastEditors: Please set LastEditors
* @Description: In User Settings Edit * @Description: In User Settings Edit
* @FilePath: \webrtc_easy_signal\main.go * @FilePath: \webrtc_easy_signal\main.go
@ -10,7 +10,6 @@ package main
import ( import (
"fmt" "fmt"
"log"
"net/http" "net/http"
) )
@ -21,7 +20,6 @@ func StartWebsocket(addrPort string) {
http.ListenAndServe(addrPort, nil) http.ListenAndServe(addrPort, nil)
} }
func main() { func main(){
log.Print("server started")
StartWebsocket(fmt.Sprintf("0.0.0.0:9555")) StartWebsocket(fmt.Sprintf("0.0.0.0:9555"))
} }

View File

@ -19,11 +19,10 @@ const (
// Send pings to peer with this period. Must be less than pongWait. // Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10 pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer. // Maximum message size allowed from peer.
maxMessageSize = 512 maxMessageSize = 40960
) )
var maxConnId int64 var maxConnId int64
// 用于广播 // 用于广播
var WsConnAll map[int64]*WsConnection var WsConnAll map[int64]*WsConnection
@ -69,6 +68,7 @@ func WsHandler(resp http.ResponseWriter, req *http.Request) {
log.Println("升级为websocket失败", err.Error()) log.Println("升级为websocket失败", err.Error())
return return
} }
wsSocket.SetReadLimit(40960);
maxConnId++ maxConnId++
// TODO 如果要控制连接数可以计算wsConnAll长度 // TODO 如果要控制连接数可以计算wsConnAll长度
// 连接数保持一定数量,超过的部分不提供服务 // 连接数保持一定数量,超过的部分不提供服务
@ -127,7 +127,7 @@ func (wsConn *WsConnection) processLoop() {
if err != nil { if err != nil {
if err.Error() == "超时" { if err.Error() == "超时" {
// log.Print(wsConn.lastHeartBeatTime.String()) // log.Print(wsConn.lastHeartBeatTime.String())
if wsConn.needHeartBeat { if(wsConn.needHeartBeat){
if time.Now().Sub(wsConn.lastHeartBeatTime) > time.Second*15 { if time.Now().Sub(wsConn.lastHeartBeatTime) > time.Second*15 {
log.Print("心跳超时") log.Print("心跳超时")
wsConn.close() wsConn.close()
@ -137,15 +137,13 @@ func (wsConn *WsConnection) processLoop() {
} }
break break
} }
ProtoCallBack(wsConn, msg.data) ProtoCallBack(wsConn,msg.data)
} }
} }
type Response struct{
type Response struct { Type int `json:"type"`
Type int `json:"type"`
Payload interface{} `json:"data"` Payload interface{} `json:"data"`
} }
// 发送信息 // 发送信息
func (wsConn *WsConnection) SendPayload(ptype int64, v interface{}) error { func (wsConn *WsConnection) SendPayload(ptype int64, v interface{}) error {
var resp Response var resp Response
@ -162,9 +160,9 @@ func (wsConn *WsConnection) SendPayload(ptype int64, v interface{}) error {
// 写入消息到队列中 // 写入消息到队列中
func (wsConn *WsConnection) wsWrite(messageType int, data []byte) error { func (wsConn *WsConnection) wsWrite(messageType int, data []byte) error {
select { select {
case wsConn.outChan <- &wsMessage{messageType, data}: case wsConn.outChan <- &wsMessage{messageType, data}:
case <-wsConn.closeChan: case <-wsConn.closeChan:
return errors.New("连接已经关闭") return errors.New("连接已经关闭")
} }
return nil return nil
} }
@ -181,10 +179,12 @@ func (wsConn *WsConnection) wsReadLoop() {
data, data,
} }
if err != nil { if err != nil {
websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) log.Print(err.Error())
log.Println("消息读取出现错误", err.Error()) if(websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure)){
log.Println("消息读取出现错误", err.Error())
}
wsConn.close() wsConn.close()
return
} }
log.Print(string(data)) log.Print(string(data))
// 放入请求队列,消息入栈 // 放入请求队列,消息入栈
@ -215,7 +215,6 @@ func (wsConn *WsConnection) wsWriteLoop() {
} }
} }
} }
// 关闭连接 // 关闭连接
func (wsConn *WsConnection) close() { func (wsConn *WsConnection) close() {
log.Println("关闭连接被调用了") log.Println("关闭连接被调用了")
@ -229,4 +228,5 @@ func (wsConn *WsConnection) close() {
delete(WsConnAll, wsConn.id) delete(WsConnAll, wsConn.id)
close(wsConn.closeChan) close(wsConn.closeChan)
} }
ProtoUnconnect(wsConn)
} }