no message
parent
dd27f8bf0e
commit
a0fdde18b0
|
@ -34,6 +34,7 @@ BOOLEAN WINAPI SystemFunction036(PVOID, ULONG);
|
|||
#pragma comment(lib, "msdmo.lib")
|
||||
#pragma comment(lib, "d3d11.lib")
|
||||
#pragma comment(lib, "dxgi.lib")
|
||||
#pragma comment(lib, "Gdi32.lib")
|
||||
|
||||
void EnumCapture()
|
||||
{
|
||||
|
@ -60,27 +61,32 @@ void EnumCapture()
|
|||
//使用索引i创建capture对象
|
||||
}
|
||||
}
|
||||
void InitCustomMetaType(){
|
||||
qRegisterMetaType<rtc::scoped_refptr<webrtc::I420BufferInterface>>("rtc::scoped_refptr<webrtc::I420BufferInterface>");
|
||||
qRegisterMetaType<rtc::scoped_refptr<webrtc::I420BufferInterface>>("rtc::scoped_refptr<webrtc::I420BufferInterface>&");
|
||||
|
||||
}
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
const size_t kWidth = 1280;
|
||||
const size_t kHeight = 720;
|
||||
const size_t kFps = 30;
|
||||
InitCustomMetaType();
|
||||
// const size_t kWidth = 1280;
|
||||
// const size_t kHeight = 720;
|
||||
// const size_t kFps = 30;
|
||||
|
||||
std::unique_ptr<VcmCapturerTest> capturer;
|
||||
std::unique_ptr<webrtc::VideoCaptureModule::DeviceInfo> info(
|
||||
webrtc::VideoCaptureFactory::CreateDeviceInfo());
|
||||
if (!info) {
|
||||
RTC_LOG(LERROR) << "CreateDeviceInfo failed";
|
||||
return -1;
|
||||
}
|
||||
int num_devices = info->NumberOfDevices();
|
||||
for (int i = 0; i < num_devices; ++i) {
|
||||
capturer.reset(VcmCapturerTest::Create(kWidth, kHeight, kFps, i));
|
||||
if (capturer) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// std::unique_ptr<VcmCapturerTest> capturer;
|
||||
// std::unique_ptr<webrtc::VideoCaptureModule::DeviceInfo> info(
|
||||
// webrtc::VideoCaptureFactory::CreateDeviceInfo());
|
||||
// if (!info) {
|
||||
// RTC_LOG(LERROR) << "CreateDeviceInfo failed";
|
||||
// return -1;
|
||||
// }
|
||||
// int num_devices = info->NumberOfDevices();
|
||||
// for (int i = 0; i < num_devices; ++i) {
|
||||
// capturer.reset(VcmCapturerTest::Create(kWidth, kHeight, kFps, i));
|
||||
// if (capturer) {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
|
||||
rtc::WinsockInitializer winsock_init;
|
||||
|
@ -91,9 +97,6 @@ int main(int argc, char *argv[])
|
|||
|
||||
setbuf(stdout, NULL);
|
||||
|
||||
qRegisterMetaType<rtc::scoped_refptr<webrtc::I420BufferInterface>>("rtc::scoped_refptr<webrtc::I420BufferInterface>");
|
||||
qRegisterMetaType<rtc::scoped_refptr<webrtc::I420BufferInterface>>("rtc::scoped_refptr<webrtc::I420BufferInterface>&");
|
||||
|
||||
QCoreApplication::setAttribute(Qt::AA_DisableHighDpiScaling);
|
||||
QApplication a(argc, argv);
|
||||
MainWindow w;
|
||||
|
|
|
@ -6,14 +6,16 @@
|
|||
|
||||
MainWindow::MainWindow(QWidget *parent)
|
||||
: QMainWindow(parent)
|
||||
, ui(new Ui::MainWindow),
|
||||
mHandler(new rtc::RefCountedObject<WebrtcHanlder>())
|
||||
, ui(new Ui::MainWindow)
|
||||
,mHandler(new rtc::RefCountedObject<WebrtcHanlder>())
|
||||
,mSignalClient(nullptr)
|
||||
{
|
||||
ui->setupUi(this);
|
||||
ui->openGLWidget->SetImgSize(640,480);
|
||||
ui->openGLWidget->show();
|
||||
|
||||
mHandler->InitWebrtc();
|
||||
mSignalClient = new SignalClient(QUrl("ws://127.0.0.1:9555/ws"),true,this);
|
||||
}
|
||||
|
||||
MainWindow::~MainWindow()
|
||||
|
@ -33,6 +35,9 @@ void MainWindow::OnUpdateFrame1(uint8_t *dat)
|
|||
ui->openGLWidget->OnCameraData(dat);
|
||||
}
|
||||
|
||||
const char kAudioLabel[] = "audio_label";
|
||||
const char kVideoLabel[] = "video_label";
|
||||
const char kStreamId[] = "stream_id";
|
||||
|
||||
int WebrtcHanlder::InitWebrtc()
|
||||
{
|
||||
|
@ -68,13 +73,41 @@ int WebrtcHanlder::InitWebrtc()
|
|||
x->show();
|
||||
exit(0);
|
||||
}
|
||||
|
||||
rtc::scoped_refptr<webrtc::AudioTrackInterface> audio_track(
|
||||
m_peer_connection_factory_->CreateAudioTrack(
|
||||
kAudioLabel, m_peer_connection_factory_->CreateAudioSource(
|
||||
cricket::AudioOptions())));
|
||||
|
||||
auto result_or_error = m_peer_connection_->AddTrack(audio_track, {kStreamId});
|
||||
if (!result_or_error.ok()) {
|
||||
qDebug() << "Failed to add audio track to PeerConnection: "
|
||||
<< result_or_error.error().message();
|
||||
}
|
||||
//rtc::scoped_refptr<CapturerTrackSource> video_device =
|
||||
// CapturerTrackSource::Create();
|
||||
rtc::scoped_refptr<MyCapturer> video_device = new rtc::RefCountedObject<MyCapturer>();
|
||||
if (video_device) {
|
||||
video_device->startCapturer();
|
||||
rtc::scoped_refptr<webrtc::VideoTrackInterface> video_track_(
|
||||
m_peer_connection_factory_->CreateVideoTrack(kVideoLabel, video_device));
|
||||
// main_wnd_->StartLocalRenderer(video_track_);
|
||||
|
||||
result_or_error = m_peer_connection_->AddTrack(video_track_, { kStreamId });
|
||||
if (!result_or_error.ok()) {
|
||||
qDebug() << "Failed to add video track to PeerConnection: "
|
||||
<< result_or_error.error().message();
|
||||
}
|
||||
} else {
|
||||
qDebug()<< "OpenVideoCaptureDevice failed";
|
||||
}
|
||||
|
||||
|
||||
m_peer_connection_->CreateOffer(this,
|
||||
webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
|
||||
}
|
||||
|
||||
|
||||
const char kAudioLabel[] = "audio_label";
|
||||
const char kVideoLabel[] = "video_label";
|
||||
const char kStreamId[] = "stream_id";
|
||||
|
||||
int WebrtcHanlder::AddTrack()
|
||||
{
|
||||
if (!m_peer_connection_->GetSenders().empty()) {
|
||||
|
@ -107,6 +140,10 @@ int WebrtcHanlder::AddTrack()
|
|||
} else {
|
||||
RTC_LOG(LS_ERROR) << "OpenVideoCaptureDevice failed";
|
||||
}
|
||||
|
||||
webrtc::DataChannelInit config;
|
||||
// DataChannelの設定
|
||||
|
||||
}
|
||||
|
||||
WebrtcHanlder::~WebrtcHanlder()
|
||||
|
@ -133,25 +170,65 @@ void WebrtcHanlder::OnDataChannel(rtc::scoped_refptr<webrtc::DataChannelInterfac
|
|||
}
|
||||
|
||||
void WebrtcHanlder::OnIceConnectionChange(webrtc::PeerConnectionInterface::IceConnectionState new_state) {
|
||||
|
||||
qDebug()<<"OnIceConnectionChange"<<new_state;
|
||||
}
|
||||
|
||||
void WebrtcHanlder::OnIceGatheringChange(webrtc::PeerConnectionInterface::IceGatheringState new_state) {
|
||||
|
||||
}
|
||||
const char kCandidateSdpMidName[] = "sdpMid";
|
||||
const char kCandidateSdpMlineIndexName[] = "sdpMLineIndex";
|
||||
const char kCandidateSdpName[] = "candidate";
|
||||
|
||||
void WebrtcHanlder::OnIceCandidate(const webrtc::IceCandidateInterface *candidate)
|
||||
{
|
||||
qDebug()<<"on condidate\r\n";
|
||||
|
||||
QJsonObject addr;
|
||||
addr.insert(kCandidateSdpMidName, candidate->sdp_mid().c_str());
|
||||
addr.insert(kCandidateSdpMlineIndexName,candidate->sdp_mline_index());
|
||||
std::string sdp;
|
||||
if (!candidate->ToString(&sdp)) {
|
||||
RTC_LOG(LS_ERROR) << "Failed to serialize candidate";
|
||||
return;
|
||||
}
|
||||
addr.insert(kCandidateSdpName,sdp.c_str());
|
||||
qDebug()<<"condidate:\r\n"<< QString(QJsonDocument(addr).toJson());
|
||||
}
|
||||
|
||||
void WebrtcHanlder::OnIceConnectionReceivingChange(bool receiving) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
class DummySetSessionDescriptionObserver
|
||||
: public webrtc::SetSessionDescriptionObserver {
|
||||
public:
|
||||
static DummySetSessionDescriptionObserver* Create() {
|
||||
return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
|
||||
}
|
||||
virtual void OnSuccess() { RTC_LOG(INFO) << __FUNCTION__; }
|
||||
virtual void OnFailure(webrtc::RTCError error) {
|
||||
RTC_LOG(INFO) << __FUNCTION__ << " " << ToString(error.type()) << ": "
|
||||
<< error.message();
|
||||
}
|
||||
};
|
||||
|
||||
const char kSessionDescriptionTypeName[] = "type";
|
||||
const char kSessionDescriptionSdpName[] = "sdp";
|
||||
void WebrtcHanlder::OnSuccess(webrtc::SessionDescriptionInterface *desc)
|
||||
{
|
||||
qDebug()<<desc->type().c_str();
|
||||
|
||||
std::string sdp;
|
||||
desc->ToString(&sdp);
|
||||
|
||||
QJsonObject addr;
|
||||
addr.insert(kSessionDescriptionTypeName,QString(webrtc::SdpTypeToString(desc->GetType())));
|
||||
addr.insert(kSessionDescriptionSdpName,sdp.c_str());
|
||||
qDebug()<<"sdp : \r\n "<<QJsonDocument(addr).toJson();
|
||||
m_peer_connection_->SetLocalDescription(
|
||||
DummySetSessionDescriptionObserver::Create(), desc);
|
||||
}
|
||||
|
||||
void WebrtcHanlder::OnFailure(webrtc::RTCError error)
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "rtc_base/rtc_certificate_generator.h"
|
||||
#include <QMainWindow>
|
||||
#include "api/video/i420_buffer.h"
|
||||
#include "signal_client.h"
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
namespace Ui { class MainWindow; }
|
||||
|
@ -87,6 +88,6 @@ public slots:
|
|||
private:
|
||||
Ui::MainWindow *ui;
|
||||
rtc::scoped_refptr<WebrtcHanlder> mHandler;
|
||||
|
||||
SignalClient *mSignalClient;
|
||||
};
|
||||
#endif // MAINWINDOW_H
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
#include "signal_client.h"
|
||||
|
||||
|
||||
SignalClient::SignalClient(const QUrl &url, bool debug, QObject *parent):
|
||||
QObject(parent),
|
||||
m_url(url),
|
||||
m_debug(debug)
|
||||
{
|
||||
qDebug() << "WebSocket server:" << url;
|
||||
connect(&m_webSocket, &QWebSocket::connected, this, &SignalClient::onConnected);
|
||||
connect(&m_webSocket, &QWebSocket::disconnected, this, &SignalClient::onUnConnected);
|
||||
m_webSocket.open(QUrl(url));
|
||||
|
||||
}
|
||||
|
||||
void SignalClient::onConnected()
|
||||
{
|
||||
qDebug() << "WebSocket connected";
|
||||
connect(&m_webSocket, &QWebSocket::textMessageReceived,
|
||||
this, &SignalClient::onTextMessageReceived);
|
||||
QJsonObject addr;
|
||||
QJsonObject dat;
|
||||
dat.insert("message","hello world");
|
||||
addr.insert("type", 1001);
|
||||
addr.insert("data", dat);
|
||||
|
||||
m_webSocket.sendTextMessage(QJsonDocument(addr).toJson());
|
||||
}
|
||||
|
||||
void SignalClient::onUnConnected()
|
||||
{
|
||||
qDebug() <<"unconnected";
|
||||
|
||||
}
|
||||
|
||||
void SignalClient::onTextMessageReceived(QString message)
|
||||
{
|
||||
qDebug() << "Message received:" << message;
|
||||
m_webSocket.close();
|
||||
}
|
||||
|
||||
void SignalClient::onSocketError(QAbstractSocket::SocketError error)
|
||||
{
|
||||
qDebug()<<error;
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
#ifndef SIGNALCLIENT_H
|
||||
#define SIGNALCLIENT_H
|
||||
#include <QtCore/QObject>
|
||||
#include <QtWebSockets/QWebSocket>
|
||||
#include <QObject>
|
||||
#include <QJsonObject>
|
||||
#include <QJsonDocument>
|
||||
|
||||
class SignalClient : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
explicit SignalClient(const QUrl &url, bool debug = false, QObject *parent = Q_NULLPTR);
|
||||
|
||||
Q_SIGNALS:
|
||||
void closed();
|
||||
|
||||
private Q_SLOTS:
|
||||
void onConnected();
|
||||
void onUnConnected();
|
||||
|
||||
void onTextMessageReceived(QString message);
|
||||
void onSocketError(QAbstractSocket::SocketError error);
|
||||
private:
|
||||
QWebSocket m_webSocket;
|
||||
QUrl m_url;
|
||||
bool m_debug;
|
||||
};
|
||||
|
||||
#endif // SIGNALCLIENT_H
|
|
@ -78,9 +78,9 @@ void VcmCapturerTest::OnFrame(const webrtc::VideoFrame& frame) {
|
|||
std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
static size_t cnt = 0;
|
||||
|
||||
RTC_LOG(LS_INFO) << "OnFrame "<<frame.width()<<" "<<frame.height()<<" "
|
||||
<<frame.size()<<" "<<frame.timestamp()<<frame.video_frame_buffer().get()->type()
|
||||
<<" stride "<<frame.video_frame_buffer().get()->GetI420()->StrideY()<< frame.video_frame_buffer().get()->GetI420()->StrideU() ;
|
||||
// RTC_LOG(LS_INFO) << "OnFrame "<<frame.width()<<" "<<frame.height()<<" "
|
||||
// <<frame.size()<<" "<<frame.timestamp()<<frame.video_frame_buffer().get()->type()
|
||||
// <<" stride "<<frame.video_frame_buffer().get()->GetI420()->StrideY()<< frame.video_frame_buffer().get()->GetI420()->StrideU() ;
|
||||
|
||||
int m_height = frame.height();
|
||||
int m_width = frame.width();
|
||||
|
@ -98,7 +98,7 @@ void VcmCapturerTest::OnFrame(const webrtc::VideoFrame& frame) {
|
|||
auto timestamp_curr = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
if(timestamp_curr - timestamp > 1000) {
|
||||
RTC_LOG(LS_INFO) << "FPS: " << cnt;
|
||||
// RTC_LOG(LS_INFO) << "FPS: " << cnt;
|
||||
cnt = 0;
|
||||
timestamp = timestamp_curr;
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ SOURCES += \
|
|||
src/cplaywidget.cpp \
|
||||
src/main.cpp \
|
||||
src/mainwindow.cpp \
|
||||
src/signal_client.cpp \
|
||||
src/video_capture.cpp \
|
||||
src/video_capturer_test.cpp
|
||||
|
||||
|
@ -32,6 +33,7 @@ HEADERS += \
|
|||
src/MyCapturer.h \
|
||||
src/cplaywidget.h \
|
||||
src/mainwindow.h \
|
||||
src/signal_client.h \
|
||||
src/video_capture.h \
|
||||
src/video_capturer_test.h
|
||||
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* @Author: your name
|
||||
* @Date: 2021-10-21 22:36:25
|
||||
* @LastEditTime: 2021-10-21 23:01:28
|
||||
* @LastEditors: Please set LastEditors
|
||||
* @Description: In User Settings Edit
|
||||
* @FilePath: \webrtc_easy_signal\proto.go
|
||||
*/
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Request struct{
|
||||
Type int `json:"type"`
|
||||
Data map[string]interface{} `json:"data"`
|
||||
}
|
||||
const (
|
||||
REQ_INROOM = 1001
|
||||
REQ_LEAVEROOM = 1002
|
||||
REQ_CREATEROOM = 1003
|
||||
REQ_LISTROOM = 1004
|
||||
REQ_SENDSDP = 1005
|
||||
)
|
||||
|
||||
var gmap sync.Map
|
||||
|
||||
func ProtoCallBack(ws *WsConnection,dat []byte) {
|
||||
if nil == ws{
|
||||
return
|
||||
}
|
||||
var payload Request
|
||||
e := json.Unmarshal(dat, &payload)
|
||||
if nil != e {
|
||||
log.Print(e.Error())
|
||||
}
|
||||
log.Print(payload)
|
||||
switch(payload.Type){
|
||||
case REQ_INROOM:
|
||||
log.Print(payload.Data["message"])
|
||||
break
|
||||
case REQ_CREATEROOM:
|
||||
|
||||
break
|
||||
case REQ_LEAVEROOM:
|
||||
|
||||
break
|
||||
case REQ_LISTROOM:
|
||||
|
||||
break
|
||||
case REQ_SENDSDP:
|
||||
|
||||
break
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* @Author: your name
|
||||
* @Date: 2021-10-21 20:30:24
|
||||
* @LastEditTime: 2021-10-21 22:52:46
|
||||
* @LastEditors: Please set LastEditors
|
||||
* @Description: In User Settings Edit
|
||||
* @FilePath: \webrtc_easy_signal\main.go
|
||||
*/
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// 启动程序
|
||||
func StartWebsocket(addrPort string) {
|
||||
WsConnAll = make(map[int64]*WsConnection)
|
||||
http.HandleFunc("/ws", WsHandler)
|
||||
http.ListenAndServe(addrPort, nil)
|
||||
}
|
||||
|
||||
func main(){
|
||||
StartWebsocket(fmt.Sprintf("0.0.0.0:9555"))
|
||||
}
|
|
@ -0,0 +1,228 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
const (
|
||||
// 允许等待的写入时间
|
||||
writeWait = 10 * time.Second
|
||||
// Time allowed to read the next pong message from the peer.
|
||||
pongWait = 0
|
||||
// Send pings to peer with this period. Must be less than pongWait.
|
||||
pingPeriod = (pongWait * 9) / 10
|
||||
// Maximum message size allowed from peer.
|
||||
maxMessageSize = 512
|
||||
)
|
||||
|
||||
var maxConnId int64
|
||||
// 用于广播
|
||||
var WsConnAll map[int64]*WsConnection
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
// 允许所有的CORS 跨域请求,正式环境可以关闭
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
// 客户端读写消息
|
||||
type wsMessage struct {
|
||||
// websocket.TextMessage 消息类型
|
||||
messageType int
|
||||
data []byte
|
||||
}
|
||||
|
||||
// 客户端连接
|
||||
type WsConnection struct {
|
||||
wsSocket *websocket.Conn // 底层websocket
|
||||
inChan chan *wsMessage // 读队列
|
||||
outChan chan *wsMessage // 写队列
|
||||
|
||||
mutex sync.Mutex // 避免重复关闭管道,加锁处理
|
||||
isClosed bool
|
||||
closeChan chan byte // 关闭通知
|
||||
id int64
|
||||
lastHeartBeatTime time.Time // 上次心跳包时间
|
||||
needHeartBeat bool // 上次心跳
|
||||
}
|
||||
|
||||
func init() {
|
||||
maxConnId = 0
|
||||
}
|
||||
|
||||
// 读取消息队列中的消息
|
||||
func WsHandler(resp http.ResponseWriter, req *http.Request) {
|
||||
// 应答客户端告知升级连接为websocket
|
||||
wsSocket, err := upgrader.Upgrade(resp, req, nil)
|
||||
if err != nil {
|
||||
log.Println("升级为websocket失败", err.Error())
|
||||
return
|
||||
}
|
||||
maxConnId++
|
||||
// TODO 如果要控制连接数可以计算,wsConnAll长度
|
||||
// 连接数保持一定数量,超过的部分不提供服务
|
||||
wsConn := &WsConnection{
|
||||
wsSocket: wsSocket,
|
||||
inChan: make(chan *wsMessage, 50),
|
||||
outChan: make(chan *wsMessage, 50),
|
||||
closeChan: make(chan byte),
|
||||
isClosed: false,
|
||||
id: maxConnId,
|
||||
lastHeartBeatTime: time.Now(),
|
||||
needHeartBeat: false,
|
||||
}
|
||||
WsConnAll[maxConnId] = wsConn
|
||||
log.Println("当前在线人数", len(WsConnAll))
|
||||
// 处理器,发送定时信息,避免意外关闭
|
||||
go wsConn.processLoop()
|
||||
// 读协程
|
||||
go wsConn.wsReadLoop()
|
||||
// 写协程
|
||||
go wsConn.wsWriteLoop()
|
||||
}
|
||||
|
||||
// 读取消息队列中的消息
|
||||
func (wsConn *WsConnection) wsRead() (*wsMessage, error) {
|
||||
tick := time.Tick(time.Second * 1)
|
||||
select {
|
||||
case msg := <-wsConn.inChan:
|
||||
// 获取到消息队列中的消息
|
||||
return msg, nil
|
||||
case <-wsConn.closeChan:
|
||||
case <-tick:
|
||||
return nil, errors.New("超时")
|
||||
}
|
||||
return nil, errors.New("连接已经关闭")
|
||||
}
|
||||
|
||||
// 处理队列中的消息
|
||||
func (this *WsConnection) PayloadParseAndCallback(dat []byte) error {
|
||||
var payload Request
|
||||
e := json.Unmarshal(dat, &payload)
|
||||
if nil != e {
|
||||
log.Print(e.Error())
|
||||
return e
|
||||
}
|
||||
log.Print(payload)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 处理队列中的消息
|
||||
func (wsConn *WsConnection) processLoop() {
|
||||
// 处理消息队列中的消息
|
||||
// 获取到消息队列中的消息,处理完成后,发送消息给客户端
|
||||
for {
|
||||
msg, err := wsConn.wsRead()
|
||||
if err != nil {
|
||||
if err.Error() == "超时" {
|
||||
// log.Print(wsConn.lastHeartBeatTime.String())
|
||||
if(wsConn.needHeartBeat){
|
||||
if time.Now().Sub(wsConn.lastHeartBeatTime) > time.Second*15 {
|
||||
log.Print("心跳超时")
|
||||
wsConn.close()
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
ProtoCallBack(wsConn,msg.data)
|
||||
}
|
||||
}
|
||||
type Response struct{
|
||||
Type int `json:"type"`
|
||||
Payload interface{} `json:"data"`
|
||||
}
|
||||
// 发送信息
|
||||
func (wsConn *WsConnection) SendPayload(ptype int64, v interface{}) error {
|
||||
var resp Response
|
||||
resp.Type = int(ptype)
|
||||
resp.Payload = v
|
||||
bytes, e := json.Marshal(resp)
|
||||
if nil != e {
|
||||
log.Print(e.Error())
|
||||
}
|
||||
wsConn.wsWrite(1, bytes)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 写入消息到队列中
|
||||
func (wsConn *WsConnection) wsWrite(messageType int, data []byte) error {
|
||||
select {
|
||||
case wsConn.outChan <- &wsMessage{messageType, data}:
|
||||
case <-wsConn.closeChan:
|
||||
return errors.New("连接已经关闭")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 处理消息队列中的消息
|
||||
func (wsConn *WsConnection) wsReadLoop() {
|
||||
// 设置消息的最大长度
|
||||
wsConn.wsSocket.SetReadLimit(maxMessageSize)
|
||||
for {
|
||||
// 读一个message
|
||||
msgType, data, err := wsConn.wsSocket.ReadMessage()
|
||||
req := &wsMessage{
|
||||
msgType,
|
||||
data,
|
||||
}
|
||||
if err != nil {
|
||||
websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure)
|
||||
log.Println("消息读取出现错误", err.Error())
|
||||
|
||||
wsConn.close()
|
||||
}
|
||||
log.Print(string(data))
|
||||
// 放入请求队列,消息入栈
|
||||
select {
|
||||
case wsConn.inChan <- req:
|
||||
case <-wsConn.closeChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 发送消息给客户端
|
||||
func (wsConn *WsConnection) wsWriteLoop() {
|
||||
for {
|
||||
select {
|
||||
// 取一个应答
|
||||
case msg := <-wsConn.outChan:
|
||||
// 写给websocket
|
||||
if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil {
|
||||
log.Println("发送消息给客户端发生错误", err.Error())
|
||||
// 切断服务
|
||||
wsConn.close()
|
||||
return
|
||||
}
|
||||
case <-wsConn.closeChan:
|
||||
// 获取到关闭通知
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
// 关闭连接
|
||||
func (wsConn *WsConnection) close() {
|
||||
log.Println("关闭连接被调用了")
|
||||
wsConn.wsSocket.Close()
|
||||
wsConn.mutex.Lock()
|
||||
|
||||
defer wsConn.mutex.Unlock()
|
||||
if wsConn.isClosed == false {
|
||||
wsConn.isClosed = true
|
||||
// 删除这个连接的变量
|
||||
delete(WsConnAll, wsConn.id)
|
||||
close(wsConn.closeChan)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue