From 3e0977e1ca425ad428eddb58a92765326e9b6324 Mon Sep 17 00:00:00 2001 From: zcy <290198252@qq.com> Date: Mon, 6 Dec 2021 10:07:20 +0800 Subject: [PATCH] tcp client libevent test --- general/src/net/tcp_client.cpp | 193 +++++++++++++++++++++----------- general/src/net/tcp_client.h | 105 +++++++++-------- test/src/cpp11/template.cpp | 9 +- test/src/cpp11/thread_usage.cpp | 3 +- test/src/cpp11/threadpool.cpp | 2 - 5 files changed, 195 insertions(+), 117 deletions(-) diff --git a/general/src/net/tcp_client.cpp b/general/src/net/tcp_client.cpp index 37f8c92..a4f585d 100644 --- a/general/src/net/tcp_client.cpp +++ b/general/src/net/tcp_client.cpp @@ -1,51 +1,42 @@ -// + // Created by 29019 on 2020/4/18. // - +#define _WSPIAPI_H_ +#define _WINSOCKAPI_ #include "tcp_client.h" +#include +#include +#include +#include -void conn_writecb(struct bufferevent *, void *); -void conn_readcb(struct bufferevent *, void *); -void conn_eventcb(struct bufferevent *, short, void *); +using namespace std::chrono; + +static void conn_writecb(struct bufferevent *, void *); +static void conn_readcb(struct bufferevent *, void *); +static void conn_eventcb(struct bufferevent *, short, void *); void delay(int ms); int ThreadRun(TcpClientLibevent *p); -//conn_writecwritecb函数将在bufferevent中的output evbuffer缓冲区发送完成后被调用。 -//此时evbuffer_get_length(output) = 0,说明output evbuffer缓冲区被清空。 -//假设发现有10000条记录要发送出去,1次发送10000条将占用大量内存,所以,我们要分批发送 -//先发送100条数据,假设每条数据为1024字节bufferevent_write(bev,buf,1024 *100); -//系统在这100条记录发送完成后将调用conn_writecbb回调函数,然后在该函数中循环发送剩下的 -//数据 void conn_writecb(struct bufferevent *bev, void *user_data) { - // struct evbuffer *output = bufferevent_get_output(bev); - // if (evbuffer_get_length(output) == 0) - // { - // printf("Output evbuffer is flushed\n"); - // bufferevent_free(bev); - // } - //delay 1 second - //delay(1000); - //static int msg_num = 1; - //char reply_msg[1000] = { '\0' }; - //char *str = "I receive a message from client "; - //memcpy(reply_msg, str, strlen(str)); - //sprintf(reply_msg + strlen(str), "%d", msg_num); - //bufferevent_write(bev, reply_msg, strlen(reply_msg)); - //msg_num++; + TcpClientLibevent *client = (TcpClientLibevent*)user_data; + struct evbuffer *output = bufferevent_get_output(bev); + size_t sz = evbuffer_get_length(output); + std::cout<<"write data length: "<mStatus = TcpClientLibevent::UNCONNECTED; - int ret = p->Dispatch(); - if (0 > ret){ - } - while ((p->mStatus != TcpClientLibevent::UNCONNECTED )) + while (true) { - if (p->mStatus == TcpClientLibevent::FAIL) { //连接失败,如果有设置自动重连就一直重连 + if ((p->mStatus == TcpClientLibevent::STOP)){ + Sleep(100); + continue; + } + if ((p->mStatus == TcpClientLibevent::FAIL) || + (p->mStatus == TcpClientLibevent::UNCONNECTED)){ //连接失败,如果有设置自动重连就一直重连 p->ConnectServer(); #ifdef _WIN32 Sleep(100); @@ -53,16 +44,18 @@ int ThreadRun(TcpClientLibevent *p) { //todo linux版本sleep #endif } - ret = p->Dispatch(); + int ret = p->Dispatch(); + if(ret < 0){ + break; + } } } - p->mStatus = TcpClientLibevent::UNCONNECTED; return 0; } void conn_readcb(struct bufferevent *bev, void *user_data) { - TcpClientLibevent *server = (TcpClientLibevent*)user_data; + TcpClientLibevent *client = (TcpClientLibevent*)user_data; struct evbuffer *input = bufferevent_get_input(bev); size_t sz = evbuffer_get_length(input); if (sz > 0) @@ -70,9 +63,9 @@ void conn_readcb(struct bufferevent *bev, void *user_data) uint8_t *msg = new uint8_t[sz]; int ret = bufferevent_read(bev, msg, sz); printf("%s\n", msg); - if(server->mObserver != nullptr){ + if(client->mObserver != nullptr){ + client->mObserver->OnData(msg, ret); } - server->mObserver->OnData(msg,ret); delete[] msg; } } @@ -81,29 +74,30 @@ void conn_eventcb(struct bufferevent *bev, short events, void *user_data) { TcpClientLibevent *p; p = (TcpClientLibevent *)user_data; - if (events & BEV_EVENT_EOF) - { + if (p == nullptr) { + return; + } + if (events & BEV_EVENT_EOF) { if (nullptr != p->mObserver) - p->mObserver->OnDisConnected(); + p->mObserver->OnDisConnected("服务器主动断开连接"); if (p != nullptr) p->mStatus = TcpClientLibevent::UNCONNECTED; printf("Connection closed\n"); } - else if (events & BEV_EVENT_ERROR) - { + else if (events & BEV_EVENT_ERROR) { printf("Got an error on the connection: %s\n", strerror(errno)); if (nullptr != p->mObserver) - p->mObserver->OnDisConnected(); + p->mObserver->OnDisConnected("连接失败"); p->mStatus = TcpClientLibevent::FAIL; - } - else if (events & BEV_EVENT_CONNECTED) - { - printf("Connect succeed\n"); + } + else if (events & BEV_EVENT_CONNECTED) { + + p->mSocketFD = (uint64_t)event_get_fd(&(bev->ev_read)); //客户端链接成功后,给服务器发送第一条消息 + std::cout << "连接成功 socket fd" << p->mSocketFD << std::endl; if (nullptr != p->mObserver) p->mObserver->OnConnected(); - if (p != nullptr) - p->mStatus = TcpClientLibevent::UNCONNECTED; + p->mStatus = TcpClientLibevent::CONNECTED; return; } bufferevent_free(bev); @@ -116,12 +110,13 @@ void delay(int ms) } bool TcpClientLibevent::Connected() { - return (((mStatus != UNCONNECTED)&& (mStatus != FAIL) )?true : false); + return ((mStatus == CONNECTED)?true:false); } TcpClientLibevent::TcpClientLibevent(std::string addrinfo, int port, TcpClientLibevent::TcpClientObserver *p) : - mStatus(UNCONNECTED), - mObserver(nullptr) + mStatus(UNCONNECTED), + mObserver(nullptr), + mBev(nullptr) { memset(&mSrv, 0, sizeof(mSrv)); #ifdef linux @@ -129,12 +124,10 @@ TcpClientLibevent::TcpClientLibevent(std::string addrinfo, int port, TcpClientLi mSrv.sin_family = AF_INET; #endif #ifdef _WIN32 - mSrv.sin_addr.S_un.S_addr = inet_addr(addrinfo.c_str()); mSrv.sin_family = AF_INET; #endif mSrv.sin_port = htons(port); - mBase = event_base_new(); if (!mBase) { @@ -146,32 +139,75 @@ TcpClientLibevent::TcpClientLibevent(std::string addrinfo, int port, TcpClientLi #else evthread_use_pthreads(); #endif - ConnectServer(); this->mThread = new thread(ThreadRun,this); this->mObserver = p; + mByteRecv = 0; + mByteSend = 0; + this->mStatus = TcpClientLibevent::Status::STOP; } int TcpClientLibevent::ConnectServer() { - printf("connect server\r\n"); + printf("server conecting...\r\n"); + if(this->mStatus == TcpClientLibevent::CONNECTED) { // 已经连接 + return 0; + } + if(this->mStatus == TcpClientLibevent::CONNECTING) { // 正在连接等待连接成功 + return -1; + } + evthread_make_base_notifiable(mBase); - bev = bufferevent_socket_new(mBase, -1, + mBev = bufferevent_socket_new(mBase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); - if (nullptr == bev) { + if (nullptr == mBev) { this->mStatus = TcpClientLibevent::FAIL; return - 1; } - bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, this); - int flag = bufferevent_socket_connect(bev, (struct sockaddr *)&mSrv, sizeof(mSrv)); - bufferevent_enable(bev, EV_READ | EV_WRITE); - if (-1 == flag) - { + bufferevent_setcb(mBev, conn_readcb, conn_writecb, conn_eventcb, this); + int flag = bufferevent_socket_connect(mBev, (struct sockaddr *)&mSrv, sizeof(mSrv)); + bufferevent_enable(mBev, EV_READ | EV_WRITE); + if (-1 == flag) { this->mStatus = TcpClientLibevent::FAIL; - bufferevent_free(bev); - bev = nullptr; + bufferevent_free(mBev); + mBev = nullptr; printf("Connect failed\n"); return -1; } - this->mStatus = TcpClientLibevent::CONNECTED; + this->mStatus = TcpClientLibevent::CONNECTING; + return 0; +} + +int TcpClientLibevent::ConnectServerSync() +{ + evthread_make_base_notifiable(mBase); + if (nullptr != mBev) { + delete mBev; + } + mBev = bufferevent_socket_new(mBase, -1, + BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); + if (nullptr == mBev) { + this->mStatus = TcpClientLibevent::FAIL; + return -1; + } + bufferevent_setcb(mBev, conn_readcb, conn_writecb, conn_eventcb, this); + int flag = bufferevent_socket_connect(mBev, (struct sockaddr*)&mSrv, sizeof(mSrv)); + bufferevent_enable(mBev, EV_READ | EV_WRITE); + if (-1 == flag) { + this->mStatus = TcpClientLibevent::FAIL; + bufferevent_free(mBev); + mBev = nullptr; + printf("Connect failed\n"); + return -1; + } + + this->mStatus = TcpClientLibevent::CONNECTING; + auto start = system_clock::to_time_t(system_clock::now()); + while (this->mStatus != TcpClientLibevent::CONNECTED) { + auto end = system_clock::to_time_t(system_clock::now()); + if ((end - start) > 5) { + this->mStatus = TcpClientLibevent::FAIL; + break; + } + } return 0; } @@ -189,8 +225,31 @@ int TcpClientLibevent::Dispatch() { return event_base_dispatch(mBase);; } - int TcpClientLibevent::Close() { event_base_free(mBase); return 0; } + +int TcpClientLibevent::SendDataAsync(const char* data, int len) +{ + if(data == nullptr){ + return -1; + } + int res; + //将data开始size大小的字节接到输出缓冲区的尾部 + res = evbuffer_add(bufferevent_get_output(mBev), data, len); + //调用失败 + if (res == -1) + return (res); + /* If everything is okay, we need to schedule a write */ + //注册写事件 + if (len > 0 && (mBev->enabled & EV_WRITE)) { + // event_active(&mBev->ev_write,EV_WRITE,1); + } + return (res); +} + +uint64_t TcpClientLibevent::SocketFd() +{ + return mSocketFD; +} diff --git a/general/src/net/tcp_client.h b/general/src/net/tcp_client.h index f432d8c..409a916 100644 --- a/general/src/net/tcp_client.h +++ b/general/src/net/tcp_client.h @@ -1,86 +1,101 @@ /* * @Author: your name - * @Date: 2021-06-12 14:42:28 - * @LastEditTime: 2021-07-24 00:47:43 + * @Date: 2021-06-30 10:02:08 + * @LastEditTime: 2021-12-06 10:00:18 * @LastEditors: Please set LastEditors - * @Description: In User Settings Edit - * @FilePath: \generallib\general\src\net\tcp_client.h + * @Description: 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE + * @FilePath: \server\tcp_client.h */ // // Created by 29019 on 2020/4/18. // - -#ifndef GENERAL_TCPCLIENT_H -#define GENERAL_TCPCLIENT_H +#pragma once #ifndef _WIN32_WINNT #define _WIN32_WINNT 0x0500 #endif + #ifdef linux -#include -#include -#include +#include +#include +#include #define EVENT__HAVE_PTHREADS #endif -extern "C" -{ -#include "third/include/event2/bufferevent.h" -#include "third/include/event2/buffer.h" -#include "third/include/event2/listener.h" -#include "third/include/event2/util.h" -#include "third/include/event2/event.h" -#include "third/include/event2/thread.h" +extern "C"{ + #include "event2/bufferevent.h" + #include "event2/bufferevent_struct.h" + #include "event2/buffer.h" + #include "event2/listener.h" + #include "event2/util.h" + #include "event2/event.h" + #include "event2/thread.h" + /* For int types. */ + #include + /* For struct event */ + #include + }; +#include #include -#include "package_receiver.h" #include #include -using namespace std; +#include -class TcpClientLibevent -{ +using namespace std; + +class TcpClientLibevent { public: - typedef enum - { - UNCONNECTED, // 未连接 - CONNECTED, //已经连接 - FAIL, // 连接失败 - } Status; - class TcpClientObserver - { + typedef enum { + UNCONNECTED, // 未连接 + CONNECTING, //已经连接 + CONNECTED, //已经连接 + FAIL, // 连接失败 + STOP, // 初始状态 + }Status; + + class TcpClientObserver { public: virtual ~TcpClientObserver() { return; } mutex mMux; virtual void OnConnected() { return; }; - virtual void OnDisConnected() { return; }; - virtual void OnData(uint8_t *dat, uint64_t len) { return; }; + virtual void OnDisConnected(std::string) { return; }; + virtual void OnData(uint8_t* dat, uint64_t len) { return; }; virtual void OnClose() { return; }; }; - TcpClientLibevent(std::string addrinfo, int port, TcpClientObserver *p); - ~TcpClientLibevent() - { + TcpClientLibevent(std::string addrinfo, int port, TcpClientObserver* p); + ~TcpClientLibevent() { event_base_free(mBase); }; + + friend void conn_eventcb(struct bufferevent*, short, void*); + int ConnectServer(); + int ConnectServerSync(); + bool Connected(); int Dispatch(); - int OnTCPPackage(uint8_t *, uint16_t); + int OnTCPPackage(uint8_t*, uint16_t); int SetReconnect(bool); - int SetObserver(TcpClientObserver *); + int SetObserver(TcpClientObserver*); int Close(); - Status mStatus; - TcpClientObserver *mObserver; + int SendDataAsync(const char*, int len); + uint64_t SocketFd(); + Status mStatus; + TcpClientObserver* mObserver; private: bool mReConnect = false; - int sendData(void *, size_t); - struct event_base *mBase; - struct bufferevent *bev; + int sendData(void*, size_t); + struct event_base* mBase; + struct bufferevent* mBev; struct sockaddr_in mSrv; - std::thread *mThread; - mutex mLock; + std::thread* mThread; + mutex mLock; // 互斥锁 + uint64_t mByteSend; // 发送字节数 + uint64_t mByteRecv; // 接收字节数 + evutil_socket_t mSocketFD; // 操作系统原生socket }; -#endif //GENERAL_TCPCLIENT_H + diff --git a/test/src/cpp11/template.cpp b/test/src/cpp11/template.cpp index 828ab56..8592479 100644 --- a/test/src/cpp11/template.cpp +++ b/test/src/cpp11/template.cpp @@ -1,3 +1,11 @@ +/* + * @Author: your name + * @Date: 2021-10-09 10:03:45 + * @LastEditTime: 2021-12-01 14:48:38 + * @LastEditors: your name + * @Description: 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE + * @FilePath: \cpp11\template.cpp + */ #include "iostream" #include #include "pattern/ringbuffer.hpp" @@ -43,5 +51,4 @@ void TestRingBuffer(){ printf("%d ",in[i]); } - } \ No newline at end of file diff --git a/test/src/cpp11/thread_usage.cpp b/test/src/cpp11/thread_usage.cpp index fc2c816..f818457 100644 --- a/test/src/cpp11/thread_usage.cpp +++ b/test/src/cpp11/thread_usage.cpp @@ -305,11 +305,10 @@ int main() { int result =200; ASyncProcess,int,int> process([](int p) -> int{ - Sleep(1000); return 20; },result,150); while(process.Finish() == false){ - std::cout<mThreadCnt = num; mStarted = false; @@ -62,7 +61,6 @@ namespace general{ } void CThreadPool::Process(int id) { - // std::cout << "thread id " << id << " started " << std::endl; while (mStarted) { Task *task = PopTask();