proto-debuger/protoDebuger/tcp_server_libevent.cpp

359 lines
7.6 KiB
C++

#define _WSPIAPI_H_
#define _WINSOCKAPI_
#include "tcp_server_libevent.h"
#include <cstring>
/**
* @description:
* @param {*}
* @return {*}
*/
ConnectionLibevent::ConnectionLibevent(TcpServerLibevent *p,struct bufferevent *ev,uint32_t fd,struct sockaddr_in *p1):
m_parent_server(nullptr),
m_event(nullptr),
m_fd(-1),
m_addr(nullptr)
{
m_parent_server = p;
m_event = ev;
m_fd = fd;
m_addr = p1;
}
/**
* @description:
* @param {*}
* @return {*}
*/
ConnectionLibevent::ConnectionLibevent(struct bufferevent *ev,uint32_t fd,struct sockaddr_in *p1):
m_parent_server(nullptr),
m_event(nullptr),
m_fd(-1),
m_addr(nullptr)
{
m_event = ev;
m_fd = fd;
m_addr = p1;
}
/**
* @description:
* @param {*}
* @return {*}
*/
ConnectionLibevent* defaultConnAccept(struct bufferevent *ev,uint32_t fd,struct sockaddr_in *p1){
return new ConnectionLibevent(ev,fd,p1);
}
/**
* @description:
* @param {*}
* @return {*}
*/
int ConnectionLibevent::OnRecv(char *p,uint32_t len){
std::cout<<"OnRecv "<<p<<std::endl;
m_bytes_recv += len;
return 0;
}
/**
* @description:
* @param {*}
* @return {*}
*/
int ConnectionLibevent::OnClose(){
std::cout<<"close "<<this->m_fd << " "<<this->IpAddress()<<std::endl;
return 0;
}
/**
* @description:
* @param {*}
* @return {*}
*/
int ConnectionLibevent::OnWrite(){
return 0;
}
/**
* @description:
* @param {*}
* @return {*}
*/
int ConnectionLibevent::WriteData(const char *p,uint16_t len){
if(nullptr == p){
return -1;
}
return bufferevent_write(this->m_event,p,len);
}
uint32_t ConnectionLibevent::SocketFd(){
return m_fd;
}
/**
* @description:
* @param {*}
* @return {*}
*/
int ConnectionLibevent::SetServer(TcpServerLibevent *p){
if(nullptr != p){
this->m_parent_server = p;
return 0;
}
return -1;
}
string ConnectionLibevent::IpAddress(){
if(nullptr != m_addr){
return string(inet_ntoa(m_addr->sin_addr));
}
return "";
}
TcpServerLibevent *ConnectionLibevent::Server(){
return m_parent_server;
}
void read_cb(struct bufferevent *bev, void *arg)
{
char buf[1024] = {0};
ConnectionLibevent* conn = (ConnectionLibevent*)arg;
bufferevent_read(bev, buf, sizeof(buf));
cout << "client " << conn->IpAddress() << " say:" << buf << endl;
conn->OnRecv(buf,sizeof(buf));
}
void write_cb(struct bufferevent *bev, void *arg)
{
ConnectionLibevent* conn = (ConnectionLibevent*)arg;
std::cout<<"connection "<<conn->IpAddress()<<" sended data success"<< std::endl;
}
/**
* @description:
* @param {*}
* @return {*}
*/
void event_cb(struct bufferevent *bev, short events, void *arg)
{
ConnectionLibevent *conn = (ConnectionLibevent*)(arg);
TcpServerLibevent *server = conn->Server();
if (events & BEV_EVENT_EOF)
{
cout << "connection closed: " << conn->IpAddress() << " " << conn->SocketFd() << endl;
conn->OnClose();
bufferevent_free(bev);
server->RemoveConnection(conn->SocketFd());
}
else if (events & BEV_EVENT_ERROR)
{
conn->OnClose();
bufferevent_free(bev);
server->RemoveConnection(conn->SocketFd());
cout << "some other error !" << endl;
}
delete conn;
}
/**
* @description:
* @param {*}
* @return {*}
*/
void cb_listener(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *addr, int len, void *ptr) {
struct sockaddr_in* client = (sockaddr_in*)addr ;
cout << "connect new client: " << inet_ntoa(client->sin_addr) << " " << fd << " ::"<< ntohs(client->sin_port)<< endl;
TcpServerLibevent *server = (TcpServerLibevent*)ptr;
if(server != nullptr){
struct bufferevent *bev;
bev = bufferevent_socket_new(server->m_event_base, fd, BEV_OPT_CLOSE_ON_FREE);
ConnectionLibevent *conn = server->m_handle_accept(bev,ntohs(client->sin_port),client);
conn->SetServer(server);
server->AddConnection(ntohs(client->sin_port),conn);
bufferevent_setcb(bev, read_cb, write_cb, event_cb, conn);
bufferevent_enable(bev, EV_READ);
}
}
/**
* @description:
* @param {*}
* @return {*}
*/
void server_run(TcpServerLibevent *p){
if(nullptr != p){
while(p->m_status == TcpServerLibevent::RUNNING){
int ret =event_base_loop(p->m_event_base,EVLOOP_NONBLOCK);
if(ret < 0){
p->m_status = TcpServerLibevent::FAIL;
break;
}
}
}
}
/**
* @description:
* @param {*}
* @return {*}
*/
TcpServerLibevent::SERVER_STATUS TcpServerLibevent::Status(){
return m_status;
}
/**
* @description:
* @param {*}
* @return {*}
*/
int TcpServerLibevent::AddConnection(uint32_t fd,ConnectionLibevent *p){
if( m_map_client.find(fd) == m_map_client.end()){
if(nullptr != p)
m_map_client[fd] = p;
else
return -1;
}
return 0;
}
/**
* @description:
* @param {*}
* @return {*}
*/
int TcpServerLibevent::RemoveConnection(uint32_t fd){
if( m_map_client.find(fd) != m_map_client.end()){
m_map_client.erase(fd);
return 0;
}else{
return -1;
}
}
/**
* @description:
* @param {*}
* @return {*}
*/
int TcpServerLibevent::SetNewConnectionHandle(OnAccept p){
m_handle_accept = p;
return 0;
}
/**
* @description:
* @param {int} ports
* @param {string} bindip
* @return {*}
*/
TcpServerLibevent::TcpServerLibevent(int port,string bindip) :
m_thread(nullptr),
m_event_base(nullptr),
m_event_listener(nullptr)
{
m_handle_accept = defaultConnAccept;
m_backlog = 10000;
this->m_bind_ip = bindip;
this->m_port = port;
memset(&m_server_addr, 0, sizeof(m_server_addr));
m_server_addr.sin_family = AF_INET;
m_server_addr.sin_port = htons(port);
m_server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// 创建 event_base
m_event_base = event_base_new();
if(NULL == m_event_base){
return;
}
m_event_listener = evconnlistener_new_bind(m_event_base,
cb_listener,
this,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE|LEV_OPT_THREADSAFE,
m_backlog,
(struct sockaddr*)&m_server_addr,
sizeof(m_server_addr));
if(NULL == m_event_listener)
{
m_status = FAIL;
}
m_status = STOP;
}
/**
* @description: start server synchronous
* @param {*}
* @return {*}
*/
int TcpServerLibevent::StartServerSync(){
if(m_status == STOP){
m_status = RUNNING;
if(-1 == event_base_dispatch(m_event_base)){
qDebug()<<"error ";
}
event_base_free(m_event_base);
return 0;
}
return -1;
}
/**
* @description: start server asynchronous
* @param {*}
* @return {*}
*/
int TcpServerLibevent::StartServerAsync(){
if(m_status == STOP){
#ifdef WIN32
evthread_use_windows_threads();
#endif
#ifdef linux
evthread_use_pthreads();
#endif
m_status = RUNNING;
m_thread = new std::thread(server_run,this);
m_thread->detach();
return 0;
}
return -1;
}
int TcpServerLibevent::StopServer()
{
struct timeval v;
v.tv_usec = 1000;
v.tv_sec = 0;
if(m_status == RUNNING){
int ret = event_base_loopexit(m_event_base,&v);
if (ret < 0){
// todo write log
}
evconnlistener_free(this->m_event_listener);
if(ret < 0){
// todo write log
qDebug()<<"evconnlistener_disable"<<ret;
}
m_status = STOP;
return ret;
}
return -1;
}
/**
* @description: start server asynchronous
* @param {*}
* @return {*}
*/
TcpServerLibevent::~TcpServerLibevent(){
if(this->m_status == RUNNING){
m_thread->detach();
struct timeval v;
v.tv_usec = 1000;
v.tv_sec = 1;
int ret = event_base_loopexit(m_event_base,&v);
this->m_status = STOP;
}
if(nullptr != m_event_base){
event_base_free(m_event_base);
delete m_event_base;
m_event_base = nullptr;
}
if(nullptr != m_event_listener){
delete m_event_listener;
m_event_listener = nullptr;
}
}