理解 FFmpeg avio 中断机制
背景
今天来聊聊 avformat_open_input 打开耗时,卡住线程的问题。
阿里云盘字幕大于 5M 就需要走下载接口,并且速度很慢,体现到 FSPlayer 交互上就是切换字幕的时候卡住了,有声音但是画面不刷新,之前测试的字幕有本地的也有网络上的,都没发现卡住的问题。
FSPlayer 的外挂字幕流切换是在渲染视频画面的 vout 线程里调用的,其中就包括了执行 avformat_open_input 函数,除了阿里云盘大于 5M 的,都能很快返回,因为下载速度都很快。
读 FFmpeg 源码
在没有头绪的情况下,在 http_read 里下了断点,通过调用堆栈发现,是从avformat_open_input 方法调用过来的,顺着这个链路尝试解决这个问题,
1
2
3
avformat_open_input -> init_input -> av_probe_input_buffer2
-> avio_read -> fill_buffer -> read_packet_wrapper
-> ffurl_read2 -> retry_transfer_wrapper -> http_read
为了简化问题,这里不再逐个分析,但有个问题需要搞清楚,那就是为什么 avio_read 能够调用到 http_read 里来?答案在 ffio_fdopen 函数里,这个函数建立了这个对应关系:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int ffio_fdopen(AVIOContext **s, URLContext *h)
{
uint8_t *buffer = NULL;
int buffer_size, max_packet_size;
max_packet_size = h->max_packet_size;
if (max_packet_size) {
buffer_size = max_packet_size; /* no need to bufferize more than one packet */
} else {
buffer_size = IO_BUFFER_SIZE;
}
if (!(h->flags & AVIO_FLAG_WRITE) && h->is_streamed) {
if (buffer_size > INT_MAX/2)
return AVERROR(EINVAL);
buffer_size *= 2;
}
buffer = av_malloc(buffer_size);
if (!buffer)
return AVERROR(ENOMEM);
*s = avio_alloc_context(buffer, buffer_size, h->flags & AVIO_FLAG_WRITE, h,
ffurl_read2, ffurl_write2, ffurl_seek2);
if (!*s) {
av_freep(&buffer);
return AVERROR(ENOMEM);
}
(*s)->protocol_whitelist = av_strdup(h->protocol_whitelist);
if (!(*s)->protocol_whitelist && h->protocol_whitelist) {
avio_closep(s);
return AVERROR(ENOMEM);
}
(*s)->protocol_blacklist = av_strdup(h->protocol_blacklist);
if (!(*s)->protocol_blacklist && h->protocol_blacklist) {
avio_closep(s);
return AVERROR(ENOMEM);
}
(*s)->direct = h->flags & AVIO_FLAG_DIRECT;
(*s)->seekable = h->is_streamed ? 0 : AVIO_SEEKABLE_NORMAL;
(*s)->max_packet_size = max_packet_size;
(*s)->min_packet_size = h->min_packet_size;
if(h->prot) {
(*s)->read_pause = (int (*)(void *, int))h->prot->url_read_pause;
(*s)->read_seek =
(int64_t (*)(void *, int, int64_t, int))h->prot->url_read_seek;
if (h->prot->url_read_seek)
(*s)->seekable |= AVIO_SEEKABLE_TIME;
}
((FFIOContext*)(*s))->short_seek_get = ffurl_get_short_seek;
(*s)->av_class = &ff_avio_class;
return 0;
}
你会发现读包的时候很多时候都是 32768 这个数,原因是 IO_BUFFER_SIZE 的大小默认是 32768。
创建 avio_alloc_context 时包括了三个函数,分别是 ffurl_read2, ffurl_write2, ffurl_seek2,创建成功后又对 protocol_whitelist,seekable 以及 read_pause,read_seek 等赋值,这样后续调用 avio 的方法时就直接调用 prot 这个协议内部的方法了。
read2,write2,seek2 这三个方法的实现也很清晰,其中 read2 和 write2 调用相同的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int ffurl_read2(void *urlcontext, uint8_t *buf, int size)
{
URLContext *h = urlcontext;
if (!(h->flags & AVIO_FLAG_READ))
return AVERROR(EIO);
return retry_transfer_wrapper(h, buf, NULL, size, 1, 1);
}
int ffurl_read_complete(URLContext *h, unsigned char *buf, int size)
{
if (!(h->flags & AVIO_FLAG_READ))
return AVERROR(EIO);
return retry_transfer_wrapper(h, buf, NULL, size, size, 1);
}
#if FF_API_AVIO_WRITE_NONCONST
int ffurl_write2(void *urlcontext, uint8_t *buf, int size)
#else
int ffurl_write2(void *urlcontext, const uint8_t *buf, int size)
#endif
{
URLContext *h = urlcontext;
if (!(h->flags & AVIO_FLAG_WRITE))
return AVERROR(EIO);
/* avoid sending too big packets */
if (h->max_packet_size && size > h->max_packet_size)
return AVERROR(EIO);
return retry_transfer_wrapper(h, NULL, buf, size, size, 0);
}
int64_t ffurl_seek2(void *urlcontext, int64_t pos, int whence)
{
URLContext *h = urlcontext;
int64_t ret;
if (!h->prot->url_seek)
return AVERROR(ENOSYS);
ret = h->prot->url_seek(h, pos, whence & ~AVSEEK_FORCE);
return ret;
}
可以看到 avio 的 seek 直接调用到了 URL protocol 层的 url_seek 函数。但是 read 却不是这样,而是多了一层 retry_transfer_wrapper 的封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static inline int retry_transfer_wrapper(URLContext *h, uint8_t *buf,
const uint8_t *cbuf,
int size, int size_min,
int read)
{
int ret, len;
int fast_retries = 5;
int64_t wait_since = 0;
len = 0;
while (len < size_min) {
if (ff_check_interrupt(&h->interrupt_callback))
return AVERROR_EXIT;
ret = read ? h->prot->url_read (h, buf + len, size - len):
h->prot->url_write(h, cbuf + len, size - len);
if (ret == AVERROR(EINTR))
continue;
if (h->flags & AVIO_FLAG_NONBLOCK)
return ret;
if (ret == AVERROR(EAGAIN)) {
ret = 0;
if (fast_retries) {
fast_retries--;
} else {
if (h->rw_timeout) {
if (!wait_since)
wait_since = av_gettime_relative();
else if (av_gettime_relative() > wait_since + h->rw_timeout)
return AVERROR(EIO);
}
av_usleep(1000);
}
} else if (ret == AVERROR_EOF)
return (len > 0) ? len : AVERROR_EOF;
else if (ret < 0)
return ret;
if (ret) {
fast_retries = FFMAX(fast_retries, 2);
wait_since = 0;
}
len += ret;
}
return len;
}
这里面有个今天要讲的重点 ff_check_interrupt,在读(写)之前都会检查是否要求中断,这个逻辑的作用主要是给上层调用者快速取消的机会,否则 ffmpeg 一旦发起请求则无法取消,因为这个过程是同步的。特别是网络状况不好的情况下会卡很久,我就遇到了阿里云盘限速导致切换字幕卡顿严重的问题,因为切换时要先关闭上一个字幕流,关闭逻辑里会等待读包线程结束,但是读包线程调用的 read_frame 是一个同步方法,其内部调用 avio 层之后调用到 http 层的 read 函数,不把包读完或者出错是不可能返回的,这就导致了上层需要等待底层读包结束,由于下载网速非常慢,从而导致等待很长时间。
在这个场景下是可以通过设定 interrupt_callback 解决的,我们需要先找到如何给 h->interrupt_callback 赋值。
使用 interrupt_callback
查看调用堆栈发现:
在 io_open_default 方法里,获取了 AVFormatContext 的 interrupt_callback 属性的值,然后就一直往下传递,最终在 url_alloc_for_protocol 方法里,将这个值赋值给了 URLContext 结构体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int io_open_default(AVFormatContext *s, AVIOContext **pb,
const char *url, int flags, AVDictionary **options)
{
int loglevel;
if (!strcmp(url, s->url) ||
s->iformat && !strcmp(s->iformat->name, "image2") ||
s->oformat && !strcmp(s->oformat->name, "image2")
) {
loglevel = AV_LOG_DEBUG;
} else
loglevel = AV_LOG_INFO;
av_log(s, loglevel, "Opening \'%s\' for %s\n", url, flags & AVIO_FLAG_WRITE ? "writing" : "reading");
return ffio_open_whitelist(pb, url, flags, &s->interrupt_callback, options, s->protocol_whitelist, s->protocol_blacklist);
}
static int url_alloc_for_protocol(URLContext **puc, const URLProtocol *up,
const char *filename, int flags,
const AVIOInterruptCB *int_cb)
{
//...
if (int_cb)
uc->interrupt_callback = *int_cb;
//...
}
因此我们需要在 avformat_open_input之前,就创建好一个 AVFormatContext 对象,然后填充上 interrupt_callback,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int ex_read_interrupt_cb(void *ctx)
{
FFExSubtitle *sub = ctx;
return sub->abort_request;
}
int ret = 0;
AVFormatContext* ic = avformat_alloc_context();
if (ic) {
ic->interrupt_callback.callback = ex_read_interrupt_cb;
ic->interrupt_callback.opaque = sub;
} else {
ret = AVERROR(ENOMEM);
goto fail;
}
if (avformat_open_input(&ic, file_name, NULL, &sub->opts) < 0) {
ret = -1;
goto fail;
}
这样底层协议在每次读写包之前都会通过 ex_read_interrupt_cb 函数和上层业务逻辑沟通,询问是否要继续读数据,以防止同步读写导致长时间占用线程卡住问题。
结束语
我们再次学习了 avio 的部分源码,后续还会继续学习,因为 avio 是个聊不完的话题。