#include "RtmpPuller2.h" #include "Debuger.h" RtmpPuller2::RtmpPuller2() { mAccBuffer = new uint8_t[3000]; } RtmpPuller2::~RtmpPuller2() { } int ThreadPull(RtmpPuller2*p) { while (p->Status() == RtmpPuller2::CAP_STATUS::RUNNING) { p->PullData(); Sleep(10); } return 0; } // 关闭拉流 int RtmpPuller2::StopPull() { mStatus = STOP; this->mThread->join(); RTMP_Close(mRtmp); RTMP_Free(mRtmp); return 0; } int RtmpPuller2::StartPull() { if(this->mStatus == CONNECTED) { mStatus = RUNNING; this->mThread = new std::thread(ThreadPull, this); this->mThread->get_id(); } else { } return 0; } FILE *fp = nullptr; int RtmpPuller2::PullData() { RTMPPacket packet = { 0 }; // Parse rtmp stream to h264 and aac uint8_t nalu_header[4] = { 0x00, 0x00, 0x00, 0x01 }; int ret = RTMP_ReadPacket(mRtmp, &packet); if (ret < 0) return ret; if (nullptr == fp) { fp = fopen("src.aac", "wb"); } if (RTMPPacket_IsReady(&packet)) { // Process packet, eg: set chunk size, set bw, ... RTMP_ClientPacket(mRtmp, &packet); if (packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) { bool keyframe = 0x17 == packet.m_body[0] ? true : false; bool sequence = 0x00 == packet.m_body[1]; printf("keyframe=%s, sequence=%s\n", keyframe ? "true" : "false", sequence ? "true" : "false"); // SPS/PPS sequence if (sequence) { uint32_t offset = 10; uint32_t sps_num = packet.m_body[offset++] & 0x1f; for (int i = 0; i < sps_num; i++) { uint8_t ch0 = packet.m_body[offset]; uint8_t ch1 = packet.m_body[offset + 1]; uint32_t sps_len = ((ch0 << 8) | ch1); offset += 2; packet.m_body[offset - 1] = 0x01; packet.m_body[offset - 2] = 0x00; packet.m_body[offset - 3] = 0x00; packet.m_body[offset - 4] = 0x00; if (mObserver.size() > 0) { for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) { RtmpPullObserver *p = (RtmpPullObserver *)*itr; if (p->mObserverType == RtmpPullObserver::Observer_Video) { p->OnRtmpFrame(packet.m_body + offset - 4, sps_len + 4); } } } // Write sps data //fwrite(nalu_header, sizeof(uint8_t), 4, _file_ptr); //fwrite(packet.m_body + offset, sizeof(uint8_t), sps_len, _file_ptr); offset += sps_len; } uint32_t pps_num = packet.m_body[offset++] & 0x1f; for (int i = 0; i < pps_num; i++) { uint8_t ch0 = packet.m_body[offset]; uint8_t ch1 = packet.m_body[offset + 1]; uint32_t pps_len = ((ch0 << 8) | ch1); offset += 2; packet.m_body[offset - 1] = 0x01; packet.m_body[offset - 2] = 0x00; packet.m_body[offset - 3] = 0x00; packet.m_body[offset - 4] = 0x00; if (mObserver.size() > 0) { for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) { RtmpPullObserver *p = (RtmpPullObserver *)*itr; if (p->mObserverType == RtmpPullObserver::Observer_Video) { p->OnRtmpFrame(packet.m_body + offset - 4, pps_len + 4); } } } // Write pps data offset += pps_len; } } // Nalu frames else { uint32_t offset = 5; uint8_t ch0 = packet.m_body[offset]; uint8_t ch1 = packet.m_body[offset + 1]; uint8_t ch2 = packet.m_body[offset + 2]; uint8_t ch3 = packet.m_body[offset + 3]; uint32_t data_len = ((ch0 << 24) | (ch1 << 16) | (ch2 << 8) | ch3); offset += 4; packet.m_body[offset - 1] = 0x01; packet.m_body[offset - 2] = 0x00; packet.m_body[offset - 3] = 0x00; packet.m_body[offset - 4] = 0x00; if (mObserver.size() > 0) { for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) { RtmpPullObserver *p = (RtmpPullObserver *)*itr; if (p->mObserverType == RtmpPullObserver::Observer_Video) { p->OnRtmpFrame(packet.m_body + offset - 4, data_len + 4); } } } // Write nalu data(already started with '0x00,0x00,0x00,0x01') //fwrite(nalu_header, sizeof(uint8_t), 4, _file_ptr); offset += data_len; } } else if (packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) { bool sequence = 0x00 == packet.m_body[1]; printf("sequence=%s\n", sequence ? "true" : "false"); // AAC sequence if (sequence) { uint8_t format = (packet.m_body[0] & 0xf0) >> 4; uint8_t samplerate = (packet.m_body[0] & 0x0c) >> 2; uint8_t sampledepth = (packet.m_body[0] & 0x02) >> 1; uint8_t type = packet.m_body[0] & 0x01; // sequence = packet.m_body[1]; // AAC(AudioSpecificConfig) if (format == 10) { ch0 = packet.m_body[2]; ch1 = packet.m_body[3]; config = ((ch0 << 8) | ch1); object_type = (config & 0xF800) >> 11; sample_frequency_index = (config & 0x0780) >> 7; channels = (config & 0x78) >> 3; frame_length_flag = (config & 0x04) >> 2; depend_on_core_coder = (config & 0x02) >> 1; extension_flag = config & 0x01; } // Speex(Fix data here, so no need to parse...) else if (format == 11) { // 16 KHz, mono, 16bit/sample type = 0; sampledepth = 1; samplerate = 4; } } // Audio frames else { // ADTS(7 bytes) + AAC data uint32_t data_len = packet.m_nBodySize - 2 + 7; uint8_t adts[7]; adts[0] = 0xff; adts[1] = 0xf1; adts[2] = ((object_type - 1) << 6) | (sample_frequency_index << 2) | (channels >> 2); adts[3] = ((channels & 3) << 6) + (data_len >> 11); adts[4] = (data_len & 0x7FF) >> 3; adts[5] = ((data_len & 7) << 5) + 0x1F; adts[6] = 0xfc; // Write audio frames fwrite(adts, sizeof(uint8_t), 7, fp); fwrite(packet.m_body + 2, sizeof(uint8_t), packet.m_nBodySize - 2, fp); fflush(fp); memcpy(mAccBuffer, adts, 7); memcpy(mAccBuffer + 7, packet.m_body + 2, packet.m_nBodySize - 2); if (mObserver.size() > 0) { for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) { RtmpPullObserver *p = (RtmpPullObserver *)*itr; if (p->mObserverType == RtmpPullObserver::Observer_Audio) { p->OnRtmpFrame(mAccBuffer, packet.m_nBodySize - 2 + 7); } } } } } RTMPPacket_Free(&packet); } return 0; } int RtmpPuller2::SetObserver(RtmpPuller2::RtmpPullObserver *p) { if (nullptr == p) return -1; mMux.lock(); for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) { if (p == *itr) return 0; } this->mObserver.push_back(p); mMux.unlock(); return 0; } RtmpPuller2::CAP_STATUS RtmpPuller2::Status() { return mStatus; } int RtmpPuller2::ConnectServer(string url) { mRtmp = RTMP_Alloc(); RTMP_Init(mRtmp); if (RTMP_SetupURL(mRtmp, (char*)url.c_str()) == FALSE) { RTMP_Free(mRtmp); mStatus = FAIL; return -1; } /*连接服务器*/ if (RTMP_Connect(mRtmp, NULL) == FALSE) { RTMP_Free(mRtmp); mStatus = FAIL; return -1; } /*连接流*/ if (RTMP_ConnectStream(mRtmp, 0) == FALSE) { RTMP_Close(mRtmp); RTMP_Free(mRtmp); mStatus = FAIL; return -1; } mStatus = CONNECTED; return 0; }