diff --git a/client/webrtc_demo/src/main.cpp b/client/webrtc_demo/src/main.cpp index 4541156..736305e 100644 --- a/client/webrtc_demo/src/main.cpp +++ b/client/webrtc_demo/src/main.cpp @@ -34,6 +34,7 @@ BOOLEAN WINAPI SystemFunction036(PVOID, ULONG); #pragma comment(lib, "msdmo.lib") #pragma comment(lib, "d3d11.lib") #pragma comment(lib, "dxgi.lib") +#pragma comment(lib, "Gdi32.lib") void EnumCapture() { @@ -60,27 +61,32 @@ void EnumCapture() //使用索引i创建capture对象 } } +void InitCustomMetaType(){ + qRegisterMetaType>("rtc::scoped_refptr"); + qRegisterMetaType>("rtc::scoped_refptr&"); +} int main(int argc, char *argv[]) { - const size_t kWidth = 1280; - const size_t kHeight = 720; - const size_t kFps = 30; + InitCustomMetaType(); +// const size_t kWidth = 1280; +// const size_t kHeight = 720; +// const size_t kFps = 30; - std::unique_ptr capturer; - std::unique_ptr info( - webrtc::VideoCaptureFactory::CreateDeviceInfo()); - if (!info) { - RTC_LOG(LERROR) << "CreateDeviceInfo failed"; - return -1; - } - int num_devices = info->NumberOfDevices(); - for (int i = 0; i < num_devices; ++i) { - capturer.reset(VcmCapturerTest::Create(kWidth, kHeight, kFps, i)); - if (capturer) { - break; - } - } +// std::unique_ptr capturer; +// std::unique_ptr info( +// webrtc::VideoCaptureFactory::CreateDeviceInfo()); +// if (!info) { +// RTC_LOG(LERROR) << "CreateDeviceInfo failed"; +// return -1; +// } +// int num_devices = info->NumberOfDevices(); +// for (int i = 0; i < num_devices; ++i) { +// capturer.reset(VcmCapturerTest::Create(kWidth, kHeight, kFps, i)); +// if (capturer) { +// break; +// } +// } rtc::WinsockInitializer winsock_init; @@ -91,9 +97,6 @@ int main(int argc, char *argv[]) setbuf(stdout, NULL); - qRegisterMetaType>("rtc::scoped_refptr"); - qRegisterMetaType>("rtc::scoped_refptr&"); - QCoreApplication::setAttribute(Qt::AA_DisableHighDpiScaling); QApplication a(argc, argv); MainWindow w; diff --git a/client/webrtc_demo/src/mainwindow.cpp b/client/webrtc_demo/src/mainwindow.cpp index a425a46..0d4f7d3 100644 --- a/client/webrtc_demo/src/mainwindow.cpp +++ b/client/webrtc_demo/src/mainwindow.cpp @@ -6,14 +6,16 @@ MainWindow::MainWindow(QWidget *parent) : QMainWindow(parent) - , ui(new Ui::MainWindow), - mHandler(new rtc::RefCountedObject()) + , ui(new Ui::MainWindow) + ,mHandler(new rtc::RefCountedObject()) + ,mSignalClient(nullptr) { ui->setupUi(this); ui->openGLWidget->SetImgSize(640,480); ui->openGLWidget->show(); mHandler->InitWebrtc(); + mSignalClient = new SignalClient(QUrl("ws://127.0.0.1:9555/ws"),true,this); } MainWindow::~MainWindow() @@ -33,6 +35,9 @@ void MainWindow::OnUpdateFrame1(uint8_t *dat) ui->openGLWidget->OnCameraData(dat); } +const char kAudioLabel[] = "audio_label"; +const char kVideoLabel[] = "video_label"; +const char kStreamId[] = "stream_id"; int WebrtcHanlder::InitWebrtc() { @@ -68,13 +73,41 @@ int WebrtcHanlder::InitWebrtc() x->show(); exit(0); } + + rtc::scoped_refptr audio_track( + m_peer_connection_factory_->CreateAudioTrack( + kAudioLabel, m_peer_connection_factory_->CreateAudioSource( + cricket::AudioOptions()))); + + auto result_or_error = m_peer_connection_->AddTrack(audio_track, {kStreamId}); + if (!result_or_error.ok()) { + qDebug() << "Failed to add audio track to PeerConnection: " + << result_or_error.error().message(); + } + //rtc::scoped_refptr video_device = + // CapturerTrackSource::Create(); + rtc::scoped_refptr video_device = new rtc::RefCountedObject(); + if (video_device) { + video_device->startCapturer(); + rtc::scoped_refptr video_track_( + m_peer_connection_factory_->CreateVideoTrack(kVideoLabel, video_device)); +// main_wnd_->StartLocalRenderer(video_track_); + + result_or_error = m_peer_connection_->AddTrack(video_track_, { kStreamId }); + if (!result_or_error.ok()) { + qDebug() << "Failed to add video track to PeerConnection: " + << result_or_error.error().message(); + } + } else { + qDebug()<< "OpenVideoCaptureDevice failed"; + } + + + m_peer_connection_->CreateOffer(this, + webrtc::PeerConnectionInterface::RTCOfferAnswerOptions()); } -const char kAudioLabel[] = "audio_label"; -const char kVideoLabel[] = "video_label"; -const char kStreamId[] = "stream_id"; - int WebrtcHanlder::AddTrack() { if (!m_peer_connection_->GetSenders().empty()) { @@ -107,6 +140,10 @@ int WebrtcHanlder::AddTrack() } else { RTC_LOG(LS_ERROR) << "OpenVideoCaptureDevice failed"; } + + webrtc::DataChannelInit config; + // DataChannelの設定 + } WebrtcHanlder::~WebrtcHanlder() @@ -133,25 +170,65 @@ void WebrtcHanlder::OnDataChannel(rtc::scoped_refptrsdp_mid().c_str()); + addr.insert(kCandidateSdpMlineIndexName,candidate->sdp_mline_index()); + std::string sdp; + if (!candidate->ToString(&sdp)) { + RTC_LOG(LS_ERROR) << "Failed to serialize candidate"; + return; + } + addr.insert(kCandidateSdpName,sdp.c_str()); + qDebug()<<"condidate:\r\n"<< QString(QJsonDocument(addr).toJson()); } void WebrtcHanlder::OnIceConnectionReceivingChange(bool receiving) { } + +class DummySetSessionDescriptionObserver + : public webrtc::SetSessionDescriptionObserver { +public: + static DummySetSessionDescriptionObserver* Create() { + return new rtc::RefCountedObject(); + } + virtual void OnSuccess() { RTC_LOG(INFO) << __FUNCTION__; } + virtual void OnFailure(webrtc::RTCError error) { + RTC_LOG(INFO) << __FUNCTION__ << " " << ToString(error.type()) << ": " + << error.message(); + } +}; + +const char kSessionDescriptionTypeName[] = "type"; +const char kSessionDescriptionSdpName[] = "sdp"; void WebrtcHanlder::OnSuccess(webrtc::SessionDescriptionInterface *desc) { qDebug()<type().c_str(); + + std::string sdp; + desc->ToString(&sdp); + + QJsonObject addr; + addr.insert(kSessionDescriptionTypeName,QString(webrtc::SdpTypeToString(desc->GetType()))); + addr.insert(kSessionDescriptionSdpName,sdp.c_str()); + qDebug()<<"sdp : \r\n "<SetLocalDescription( + DummySetSessionDescriptionObserver::Create(), desc); } void WebrtcHanlder::OnFailure(webrtc::RTCError error) diff --git a/client/webrtc_demo/src/mainwindow.h b/client/webrtc_demo/src/mainwindow.h index 010a62d..6316c07 100644 --- a/client/webrtc_demo/src/mainwindow.h +++ b/client/webrtc_demo/src/mainwindow.h @@ -27,6 +27,7 @@ #include "rtc_base/rtc_certificate_generator.h" #include #include "api/video/i420_buffer.h" +#include "signal_client.h" QT_BEGIN_NAMESPACE namespace Ui { class MainWindow; } @@ -87,6 +88,6 @@ public slots: private: Ui::MainWindow *ui; rtc::scoped_refptr mHandler; - + SignalClient *mSignalClient; }; #endif // MAINWINDOW_H diff --git a/client/webrtc_demo/src/signal_client.cpp b/client/webrtc_demo/src/signal_client.cpp new file mode 100644 index 0000000..6ebf120 --- /dev/null +++ b/client/webrtc_demo/src/signal_client.cpp @@ -0,0 +1,45 @@ +#include "signal_client.h" + + +SignalClient::SignalClient(const QUrl &url, bool debug, QObject *parent): + QObject(parent), + m_url(url), + m_debug(debug) +{ + qDebug() << "WebSocket server:" << url; + connect(&m_webSocket, &QWebSocket::connected, this, &SignalClient::onConnected); + connect(&m_webSocket, &QWebSocket::disconnected, this, &SignalClient::onUnConnected); + m_webSocket.open(QUrl(url)); + +} + +void SignalClient::onConnected() +{ + qDebug() << "WebSocket connected"; + connect(&m_webSocket, &QWebSocket::textMessageReceived, + this, &SignalClient::onTextMessageReceived); + QJsonObject addr; + QJsonObject dat; + dat.insert("message","hello world"); + addr.insert("type", 1001); + addr.insert("data", dat); + + m_webSocket.sendTextMessage(QJsonDocument(addr).toJson()); +} + +void SignalClient::onUnConnected() +{ + qDebug() <<"unconnected"; + +} + +void SignalClient::onTextMessageReceived(QString message) +{ + qDebug() << "Message received:" << message; + m_webSocket.close(); +} + +void SignalClient::onSocketError(QAbstractSocket::SocketError error) +{ + qDebug()< +#include +#include +#include +#include + +class SignalClient : public QObject +{ +Q_OBJECT +public: + explicit SignalClient(const QUrl &url, bool debug = false, QObject *parent = Q_NULLPTR); + +Q_SIGNALS: + void closed(); + +private Q_SLOTS: + void onConnected(); + void onUnConnected(); + + void onTextMessageReceived(QString message); + void onSocketError(QAbstractSocket::SocketError error); +private: + QWebSocket m_webSocket; + QUrl m_url; + bool m_debug; +}; + +#endif // SIGNALCLIENT_H diff --git a/client/webrtc_demo/src/video_capture.cpp b/client/webrtc_demo/src/video_capture.cpp index 4c45f52..01e15cd 100644 --- a/client/webrtc_demo/src/video_capture.cpp +++ b/client/webrtc_demo/src/video_capture.cpp @@ -78,9 +78,9 @@ void VcmCapturerTest::OnFrame(const webrtc::VideoFrame& frame) { std::chrono::system_clock::now().time_since_epoch()).count(); static size_t cnt = 0; - RTC_LOG(LS_INFO) << "OnFrame "<type() - <<" stride "<GetI420()->StrideY()<< frame.video_frame_buffer().get()->GetI420()->StrideU() ; +// RTC_LOG(LS_INFO) << "OnFrame "<type() +// <<" stride "<GetI420()->StrideY()<< frame.video_frame_buffer().get()->GetI420()->StrideU() ; int m_height = frame.height(); int m_width = frame.width(); @@ -98,7 +98,7 @@ void VcmCapturerTest::OnFrame(const webrtc::VideoFrame& frame) { auto timestamp_curr = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); if(timestamp_curr - timestamp > 1000) { - RTC_LOG(LS_INFO) << "FPS: " << cnt; +// RTC_LOG(LS_INFO) << "FPS: " << cnt; cnt = 0; timestamp = timestamp_curr; } diff --git a/client/webrtc_demo/webrtc_demo.pro b/client/webrtc_demo/webrtc_demo.pro index 50ed135..6efb41d 100644 --- a/client/webrtc_demo/webrtc_demo.pro +++ b/client/webrtc_demo/webrtc_demo.pro @@ -25,6 +25,7 @@ SOURCES += \ src/cplaywidget.cpp \ src/main.cpp \ src/mainwindow.cpp \ + src/signal_client.cpp \ src/video_capture.cpp \ src/video_capturer_test.cpp @@ -32,6 +33,7 @@ HEADERS += \ src/MyCapturer.h \ src/cplaywidget.h \ src/mainwindow.h \ + src/signal_client.h \ src/video_capture.h \ src/video_capturer_test.h diff --git a/server/webrtc_easy_signal/callbacks.go b/server/webrtc_easy_signal/callbacks.go new file mode 100644 index 0000000..4a73101 --- /dev/null +++ b/server/webrtc_easy_signal/callbacks.go @@ -0,0 +1,58 @@ +/* + * @Author: your name + * @Date: 2021-10-21 22:36:25 + * @LastEditTime: 2021-10-21 23:01:28 + * @LastEditors: Please set LastEditors + * @Description: In User Settings Edit + * @FilePath: \webrtc_easy_signal\proto.go + */ +package main + +import ( + "encoding/json" + "log" + "sync" +) + +type Request struct{ + Type int `json:"type"` + Data map[string]interface{} `json:"data"` +} +const ( + REQ_INROOM = 1001 + REQ_LEAVEROOM = 1002 + REQ_CREATEROOM = 1003 + REQ_LISTROOM = 1004 + REQ_SENDSDP = 1005 +) + +var gmap sync.Map + +func ProtoCallBack(ws *WsConnection,dat []byte) { + if nil == ws{ + return + } + var payload Request + e := json.Unmarshal(dat, &payload) + if nil != e { + log.Print(e.Error()) + } + log.Print(payload) + switch(payload.Type){ + case REQ_INROOM: + log.Print(payload.Data["message"]) + break + case REQ_CREATEROOM: + + break + case REQ_LEAVEROOM: + + break + case REQ_LISTROOM: + + break + case REQ_SENDSDP: + + break + } +} \ No newline at end of file diff --git a/server/webrtc_easy_signal/main.go b/server/webrtc_easy_signal/main.go new file mode 100644 index 0000000..23775a8 --- /dev/null +++ b/server/webrtc_easy_signal/main.go @@ -0,0 +1,25 @@ +/* + * @Author: your name + * @Date: 2021-10-21 20:30:24 + * @LastEditTime: 2021-10-21 22:52:46 + * @LastEditors: Please set LastEditors + * @Description: In User Settings Edit + * @FilePath: \webrtc_easy_signal\main.go + */ +package main + +import ( + "fmt" + "net/http" +) + +// 启动程序 +func StartWebsocket(addrPort string) { + WsConnAll = make(map[int64]*WsConnection) + http.HandleFunc("/ws", WsHandler) + http.ListenAndServe(addrPort, nil) +} + +func main(){ + StartWebsocket(fmt.Sprintf("0.0.0.0:9555")) +} \ No newline at end of file diff --git a/server/webrtc_easy_signal/ws_conection.go b/server/webrtc_easy_signal/ws_conection.go new file mode 100644 index 0000000..8e22675 --- /dev/null +++ b/server/webrtc_easy_signal/ws_conection.go @@ -0,0 +1,228 @@ +package main + +import ( + "encoding/json" + "errors" + "log" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +const ( + // 允许等待的写入时间 + writeWait = 10 * time.Second + // Time allowed to read the next pong message from the peer. + pongWait = 0 + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +var maxConnId int64 +// 用于广播 +var WsConnAll map[int64]*WsConnection + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // 允许所有的CORS 跨域请求,正式环境可以关闭 + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +// 客户端读写消息 +type wsMessage struct { + // websocket.TextMessage 消息类型 + messageType int + data []byte +} + +// 客户端连接 +type WsConnection struct { + wsSocket *websocket.Conn // 底层websocket + inChan chan *wsMessage // 读队列 + outChan chan *wsMessage // 写队列 + + mutex sync.Mutex // 避免重复关闭管道,加锁处理 + isClosed bool + closeChan chan byte // 关闭通知 + id int64 + lastHeartBeatTime time.Time // 上次心跳包时间 + needHeartBeat bool // 上次心跳 +} + +func init() { + maxConnId = 0 +} + +// 读取消息队列中的消息 +func WsHandler(resp http.ResponseWriter, req *http.Request) { + // 应答客户端告知升级连接为websocket + wsSocket, err := upgrader.Upgrade(resp, req, nil) + if err != nil { + log.Println("升级为websocket失败", err.Error()) + return + } + maxConnId++ + // TODO 如果要控制连接数可以计算,wsConnAll长度 + // 连接数保持一定数量,超过的部分不提供服务 + wsConn := &WsConnection{ + wsSocket: wsSocket, + inChan: make(chan *wsMessage, 50), + outChan: make(chan *wsMessage, 50), + closeChan: make(chan byte), + isClosed: false, + id: maxConnId, + lastHeartBeatTime: time.Now(), + needHeartBeat: false, + } + WsConnAll[maxConnId] = wsConn + log.Println("当前在线人数", len(WsConnAll)) + // 处理器,发送定时信息,避免意外关闭 + go wsConn.processLoop() + // 读协程 + go wsConn.wsReadLoop() + // 写协程 + go wsConn.wsWriteLoop() +} + +// 读取消息队列中的消息 +func (wsConn *WsConnection) wsRead() (*wsMessage, error) { + tick := time.Tick(time.Second * 1) + select { + case msg := <-wsConn.inChan: + // 获取到消息队列中的消息 + return msg, nil + case <-wsConn.closeChan: + case <-tick: + return nil, errors.New("超时") + } + return nil, errors.New("连接已经关闭") +} + +// 处理队列中的消息 +func (this *WsConnection) PayloadParseAndCallback(dat []byte) error { + var payload Request + e := json.Unmarshal(dat, &payload) + if nil != e { + log.Print(e.Error()) + return e + } + log.Print(payload) + return nil +} + +// 处理队列中的消息 +func (wsConn *WsConnection) processLoop() { + // 处理消息队列中的消息 + // 获取到消息队列中的消息,处理完成后,发送消息给客户端 + for { + msg, err := wsConn.wsRead() + if err != nil { + if err.Error() == "超时" { + // log.Print(wsConn.lastHeartBeatTime.String()) + if(wsConn.needHeartBeat){ + if time.Now().Sub(wsConn.lastHeartBeatTime) > time.Second*15 { + log.Print("心跳超时") + wsConn.close() + } + } + continue + } + break + } + ProtoCallBack(wsConn,msg.data) + } +} +type Response struct{ + Type int `json:"type"` + Payload interface{} `json:"data"` +} +// 发送信息 +func (wsConn *WsConnection) SendPayload(ptype int64, v interface{}) error { + var resp Response + resp.Type = int(ptype) + resp.Payload = v + bytes, e := json.Marshal(resp) + if nil != e { + log.Print(e.Error()) + } + wsConn.wsWrite(1, bytes) + return nil +} + +// 写入消息到队列中 +func (wsConn *WsConnection) wsWrite(messageType int, data []byte) error { + select { + case wsConn.outChan <- &wsMessage{messageType, data}: + case <-wsConn.closeChan: + return errors.New("连接已经关闭") + } + return nil +} + +// 处理消息队列中的消息 +func (wsConn *WsConnection) wsReadLoop() { + // 设置消息的最大长度 + wsConn.wsSocket.SetReadLimit(maxMessageSize) + for { + // 读一个message + msgType, data, err := wsConn.wsSocket.ReadMessage() + req := &wsMessage{ + msgType, + data, + } + if err != nil { + websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) + log.Println("消息读取出现错误", err.Error()) + + wsConn.close() + } + log.Print(string(data)) + // 放入请求队列,消息入栈 + select { + case wsConn.inChan <- req: + case <-wsConn.closeChan: + return + } + } +} + +// 发送消息给客户端 +func (wsConn *WsConnection) wsWriteLoop() { + for { + select { + // 取一个应答 + case msg := <-wsConn.outChan: + // 写给websocket + if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil { + log.Println("发送消息给客户端发生错误", err.Error()) + // 切断服务 + wsConn.close() + return + } + case <-wsConn.closeChan: + // 获取到关闭通知 + return + } + } +} +// 关闭连接 +func (wsConn *WsConnection) close() { + log.Println("关闭连接被调用了") + wsConn.wsSocket.Close() + wsConn.mutex.Lock() + + defer wsConn.mutex.Unlock() + if wsConn.isClosed == false { + wsConn.isClosed = true + // 删除这个连接的变量 + delete(WsConnAll, wsConn.id) + close(wsConn.closeChan) + } +}