ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
《[音视频应用开发系列文章目录](https://blog.csdn.net/KayChanGEEK/article/details/103319415)》 ## 推流器设计思路 生产者-消费者模式。一个线程负责拉取原始音视频流然后存储在队列,一个线程负责从音视频流队列获取流然后推送,视频帧需要按照实际的帧率间隔发送,否则服务器可能会受不了 ![](https://img-blog.csdnimg.cn/20191226145526982.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0theUNoYW5HRUVL,size_16,color_FFFFFF,t_70) ## 设计AvPusher类 ### AvPusher.h ``` #ifndef _AV_PUSHER_H_ #define _AV_PUSHER_H_ /*********************************************************** ** Author:kaychan ** Data:2019-11-04 ** Mail:1203375695@qq.com ** Explain:a audio/video pusher ***********************************************************/ #include "AvCommon.h" // 错误回调函数 typedef void(*on_error)(std::string error, void *priv); typedef struct AvPusherParamters_S { public: void *priv = NULL; std::string istream = ""; std::string ostream = ""; bool run = true; AVFormatContext *ifmt_ctx = NULL; AVFormatContext *ofmt_ctx = NULL; int64_t video_udelay = 0; int64_t cache_packets_size = 0; int stream_mapping_size; int *stream_mapping = NULL; int video_index = -1; AvPacketSafeQueue av_packets; int64_t flowrate = 0; on_error av_on_error; }AvPusherParamters; class AvPusher { public: AvPusher(); // 启动推流器 // istream:拉流地址 // ostream:推流地址支持RTMP和UDP // av_on_error:错误回调函数 // priv:回调函数入参 void start(const std::string istream, const std::string ostream, on_error av_on_error, void *priv = NULL); // 停止推流器 void stop(); // 获取推流流量 int64_t flowrate(); private: std::thread tsk_pull_; std::thread tsk_push_; AvPusherParamters paramters_; }; #endif ``` ### AvPusher.cpp ``` #include "AvPusher.h" static inline void emit_error(int errcode, AvPusherParamters &paramters) { paramters.run = false; char error[256]; av_strerror(errcode, error, 256); paramters.av_on_error(error, paramters.priv); } void AvPusher_pull(AvPusherParamters &paramters) { // deal with input stream AVInputFormat *ifmt = NULL; AVDictionary *dict = NULL; av_dict_set(&dict, "stimeout", "2000000", 0); av_dict_set(&dict, "buffer_size", "4096000", 0); av_dict_set(&dict, "recv_buffer_size", "4096000", 0); int r = avformat_open_input(&paramters.ifmt_ctx, paramters.istream.c_str(), ifmt, &dict); if (r != 0) { emit_error(r, paramters); return; } r = avformat_find_stream_info(paramters.ifmt_ctx, NULL); if (r < 0) { emit_error(r, paramters); return; } // deal with output stream char *ofmt_name = "flv"; // default is rtmp std::string::size_type st = paramters.ostream.find("udp://"); // judge udp or not if (st != std::string::npos) ofmt_name = "mpegts"; r = avformat_alloc_output_context2(&paramters.ofmt_ctx, NULL, ofmt_name, paramters.ostream.c_str()); if (r < 0) { emit_error(r, paramters); return; } paramters.stream_mapping_size = paramters.ifmt_ctx->nb_streams; paramters.stream_mapping = (int *)av_mallocz_array(paramters.stream_mapping_size, sizeof(int)); if (!paramters.stream_mapping) { emit_error(-1, paramters); return; } int stream_index = 0; for (int ii = 0; ii < paramters.ifmt_ctx->nb_streams; ii++) { AVStream *istream = paramters.ifmt_ctx->streams[ii]; enum AVMediaType mt = istream->codecpar->codec_type; if (mt != AVMEDIA_TYPE_AUDIO && mt != AVMEDIA_TYPE_VIDEO) { paramters.stream_mapping[ii] = -1; continue; } paramters.stream_mapping[ii] = stream_index++; AVStream *ostream = NULL; ostream = avformat_new_stream(paramters.ofmt_ctx, NULL); if (ostream) { r = avcodec_parameters_copy(ostream->codecpar, istream->codecpar); if (r < 0) { emit_error(r, paramters); return; } ostream->codecpar->codec_tag = 0; } // if video, calc push delay if (mt == AVMEDIA_TYPE_VIDEO) { paramters.video_index = ii; AVRational frame_rate = av_guess_frame_rate(paramters.ifmt_ctx, istream, NULL); AVRational reciprocal = { frame_rate.den, frame_rate.num }; double duration = (reciprocal.num && reciprocal.den) ? av_q2d(reciprocal) : (0); paramters.video_udelay = (int64_t)(duration * AV_TIME_BASE); } } // if flags AVFMT_NOFILE is set, AVIOContext *pb will be NULL // should open first, otherwise is opened. if (!(paramters.ofmt_ctx->oformat->flags & AVFMT_NOFILE)) { r = avio_open(&paramters.ofmt_ctx->pb, paramters.ostream.c_str(), AVIO_FLAG_WRITE); if (r < 0) { emit_error(r, paramters); return; } } // start pull while (paramters.run) { AVPacket packet; paramters.cache_packets_size = paramters.av_packets.size(); if (paramters.cache_packets_size < 1 * MB) { r = av_read_frame(paramters.ifmt_ctx, &packet); if (r >= 0) { if (packet.stream_index >= paramters.stream_mapping_size || paramters.stream_mapping[packet.stream_index] < 0) { av_packet_unref(&packet); continue; } paramters.av_packets.eq(packet); } else { // maybe rtsp/rtmp has disconnected // maybe file is eof break; } } else CPP11_MSLEEP(30); } } void AvPusher_push(AvPusherParamters &paramters) { // start push while (!paramters.ofmt_ctx) { if (!paramters.run) return; CPP11_MSLEEP(30); } int r = avformat_write_header(paramters.ofmt_ctx, NULL); if (r >= 0) { while (paramters.run) { if (!paramters.av_packets.empty()) { AVPacket p = paramters.av_packets.dq(); paramters.flowrate += p.size; AVStream *istream, *ostream; istream = paramters.ifmt_ctx->streams[p.stream_index]; if (p.stream_index == paramters.video_index) { av_usleep(paramters.video_udelay); } p.stream_index = paramters.stream_mapping[p.stream_index]; ostream = paramters.ofmt_ctx->streams[p.stream_index]; // update packet timestamp av_packet_rescale_ts(&p, istream->time_base, ostream->time_base); p.pos = -1; r = av_interleaved_write_frame(paramters.ofmt_ctx, &p); if (r < 0) break; av_packet_unref(&p); } else CPP11_MSLEEP(30); } av_write_trailer(paramters.ofmt_ctx); } } AvPusher::AvPusher() { } void AvPusher::start(const std::string istream, const std::string ostream, on_error av_on_error, void *priv) { paramters_.istream = istream; paramters_.ostream = ostream; paramters_.av_on_error = av_on_error; paramters_.priv = priv; paramters_.flowrate = 0; paramters_.run = true; tsk_pull_ = std::thread(AvPusher_pull, std::ref(paramters_)); tsk_push_ = std::thread(AvPusher_push, std::ref(paramters_)); } void AvPusher::stop() { paramters_.run = false; if (tsk_pull_.joinable()) tsk_pull_.join(); if (tsk_push_.joinable()) tsk_push_.join(); paramters_.av_packets.clear(); if (paramters_.ofmt_ctx && !(paramters_.ofmt_ctx->oformat->flags & AVFMT_NOFILE)) { avio_closep(&paramters_.ofmt_ctx->pb); } if (paramters_.ifmt_ctx) { avformat_close_input(&paramters_.ifmt_ctx); paramters_.ifmt_ctx = NULL; } if (paramters_.ofmt_ctx) { avformat_free_context(paramters_.ofmt_ctx); paramters_.ofmt_ctx = NULL; } if (paramters_.stream_mapping) { av_freep(&paramters_.stream_mapping); paramters_.stream_mapping = NULL; } } int64_t AvPusher::flowrate() { return paramters_.flowrate; } ```