文章

理解 FFmpeg avio 中断机制

理解 FFmpeg avio 中断机制

背景

今天来聊聊 avformat_open_input 打开耗时,卡住线程的问题。

阿里云盘字幕大于 5M 就需要走下载接口,并且速度很慢,体现到 FSPlayer 交互上就是切换字幕的时候卡住了,有声音但是画面不刷新,之前测试的字幕有本地的也有网络上的,都没发现卡住的问题。

FSPlayer 的外挂字幕流切换是在渲染视频画面的 vout 线程里调用的,其中就包括了执行 avformat_open_input 函数,除了阿里云盘大于 5M 的,都能很快返回,因为下载速度都很快。

读 FFmpeg 源码

在没有头绪的情况下,在 http_read 里下了断点,通过调用堆栈发现,是从avformat_open_input 方法调用过来的,顺着这个链路尝试解决这个问题,

1
2
3
avformat_open_input -> init_input -> av_probe_input_buffer2 
-> avio_read -> fill_buffer -> read_packet_wrapper 
-> ffurl_read2 -> retry_transfer_wrapper -> http_read

为了简化问题,这里不再逐个分析,但有个问题需要搞清楚,那就是为什么 avio_read 能够调用到 http_read 里来?答案在 ffio_fdopen 函数里,这个函数建立了这个对应关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int ffio_fdopen(AVIOContext **s, URLContext *h)
{
    uint8_t *buffer = NULL;
    int buffer_size, max_packet_size;

    max_packet_size = h->max_packet_size;
    if (max_packet_size) {
        buffer_size = max_packet_size; /* no need to bufferize more than one packet */
    } else {
        buffer_size = IO_BUFFER_SIZE;
    }
    if (!(h->flags & AVIO_FLAG_WRITE) && h->is_streamed) {
        if (buffer_size > INT_MAX/2)
            return AVERROR(EINVAL);
        buffer_size *= 2;
    }
    buffer = av_malloc(buffer_size);
    if (!buffer)
        return AVERROR(ENOMEM);

    *s = avio_alloc_context(buffer, buffer_size, h->flags & AVIO_FLAG_WRITE, h,
                            ffurl_read2, ffurl_write2, ffurl_seek2);
    if (!*s) {
        av_freep(&buffer);
        return AVERROR(ENOMEM);
    }
    (*s)->protocol_whitelist = av_strdup(h->protocol_whitelist);
    if (!(*s)->protocol_whitelist && h->protocol_whitelist) {
        avio_closep(s);
        return AVERROR(ENOMEM);
    }
    (*s)->protocol_blacklist = av_strdup(h->protocol_blacklist);
    if (!(*s)->protocol_blacklist && h->protocol_blacklist) {
        avio_closep(s);
        return AVERROR(ENOMEM);
    }
    (*s)->direct = h->flags & AVIO_FLAG_DIRECT;

    (*s)->seekable = h->is_streamed ? 0 : AVIO_SEEKABLE_NORMAL;
    (*s)->max_packet_size = max_packet_size;
    (*s)->min_packet_size = h->min_packet_size;
    if(h->prot) {
        (*s)->read_pause = (int (*)(void *, int))h->prot->url_read_pause;
        (*s)->read_seek  =
            (int64_t (*)(void *, int, int64_t, int))h->prot->url_read_seek;

        if (h->prot->url_read_seek)
            (*s)->seekable |= AVIO_SEEKABLE_TIME;
    }
    ((FFIOContext*)(*s))->short_seek_get = ffurl_get_short_seek;
    (*s)->av_class = &ff_avio_class;
    return 0;
}

你会发现读包的时候很多时候都是 32768 这个数,原因是 IO_BUFFER_SIZE 的大小默认是 32768。
创建 avio_alloc_context 时包括了三个函数,分别是 ffurl_read2, ffurl_write2, ffurl_seek2,创建成功后又对 protocol_whitelist,seekable 以及 read_pause,read_seek 等赋值,这样后续调用 avio 的方法时就直接调用 prot 这个协议内部的方法了。

read2,write2,seek2 这三个方法的实现也很清晰,其中 read2 和 write2 调用相同的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int ffurl_read2(void *urlcontext, uint8_t *buf, int size)
{
    URLContext *h = urlcontext;

    if (!(h->flags & AVIO_FLAG_READ))
        return AVERROR(EIO);
    return retry_transfer_wrapper(h, buf, NULL, size, 1, 1);
}

int ffurl_read_complete(URLContext *h, unsigned char *buf, int size)
{
    if (!(h->flags & AVIO_FLAG_READ))
        return AVERROR(EIO);
    return retry_transfer_wrapper(h, buf, NULL, size, size, 1);
}

#if FF_API_AVIO_WRITE_NONCONST
int ffurl_write2(void *urlcontext, uint8_t *buf, int size)
#else
int ffurl_write2(void *urlcontext, const uint8_t *buf, int size)
#endif
{
    URLContext *h = urlcontext;

    if (!(h->flags & AVIO_FLAG_WRITE))
        return AVERROR(EIO);
    /* avoid sending too big packets */
    if (h->max_packet_size && size > h->max_packet_size)
        return AVERROR(EIO);

    return retry_transfer_wrapper(h, NULL, buf, size, size, 0);
}

int64_t ffurl_seek2(void *urlcontext, int64_t pos, int whence)
{
    URLContext *h = urlcontext;
    int64_t ret;

    if (!h->prot->url_seek)
        return AVERROR(ENOSYS);
    ret = h->prot->url_seek(h, pos, whence & ~AVSEEK_FORCE);
    return ret;
}

可以看到 avio 的 seek 直接调用到了 URL protocol 层的 url_seek 函数。但是 read 却不是这样,而是多了一层 retry_transfer_wrapper 的封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static inline int retry_transfer_wrapper(URLContext *h, uint8_t *buf,
                                         const uint8_t *cbuf,
                                         int size, int size_min,
                                         int read)
{
    int ret, len;
    int fast_retries = 5;
    int64_t wait_since = 0;

    len = 0;
    while (len < size_min) {
        if (ff_check_interrupt(&h->interrupt_callback))
            return AVERROR_EXIT;
        ret = read ? h->prot->url_read (h, buf + len, size - len):
                     h->prot->url_write(h, cbuf + len, size - len);
        if (ret == AVERROR(EINTR))
            continue;
        if (h->flags & AVIO_FLAG_NONBLOCK)
            return ret;
        if (ret == AVERROR(EAGAIN)) {
            ret = 0;
            if (fast_retries) {
                fast_retries--;
            } else {
                if (h->rw_timeout) {
                    if (!wait_since)
                        wait_since = av_gettime_relative();
                    else if (av_gettime_relative() > wait_since + h->rw_timeout)
                        return AVERROR(EIO);
                }
                av_usleep(1000);
            }
        } else if (ret == AVERROR_EOF)
            return (len > 0) ? len : AVERROR_EOF;
        else if (ret < 0)
            return ret;
        if (ret) {
            fast_retries = FFMAX(fast_retries, 2);
            wait_since = 0;
        }
        len += ret;
    }
    return len;
}

这里面有个今天要讲的重点 ff_check_interrupt,在读(写)之前都会检查是否要求中断,这个逻辑的作用主要是给上层调用者快速取消的机会,否则 ffmpeg 一旦发起请求则无法取消,因为这个过程是同步的。特别是网络状况不好的情况下会卡很久,我就遇到了阿里云盘限速导致切换字幕卡顿严重的问题,因为切换时要先关闭上一个字幕流,关闭逻辑里会等待读包线程结束,但是读包线程调用的 read_frame 是一个同步方法,其内部调用 avio 层之后调用到 http 层的 read 函数,不把包读完或者出错是不可能返回的,这就导致了上层需要等待底层读包结束,由于下载网速非常慢,从而导致等待很长时间。

在这个场景下是可以通过设定 interrupt_callback 解决的,我们需要先找到如何给 h->interrupt_callback 赋值。

使用 interrupt_callback

查看调用堆栈发现:

在 io_open_default 方法里,获取了 AVFormatContext 的 interrupt_callback 属性的值,然后就一直往下传递,最终在 url_alloc_for_protocol 方法里,将这个值赋值给了 URLContext 结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int io_open_default(AVFormatContext *s, AVIOContext **pb,
                           const char *url, int flags, AVDictionary **options)
{
    int loglevel;

    if (!strcmp(url, s->url) ||
        s->iformat && !strcmp(s->iformat->name, "image2") ||
        s->oformat && !strcmp(s->oformat->name, "image2")
    ) {
        loglevel = AV_LOG_DEBUG;
    } else
        loglevel = AV_LOG_INFO;

    av_log(s, loglevel, "Opening \'%s\' for %s\n", url, flags & AVIO_FLAG_WRITE ? "writing" : "reading");

    return ffio_open_whitelist(pb, url, flags, &s->interrupt_callback, options, s->protocol_whitelist, s->protocol_blacklist);
}

static int url_alloc_for_protocol(URLContext **puc, const URLProtocol *up,
                                  const char *filename, int flags,
                                  const AVIOInterruptCB *int_cb)
{
//...
if (int_cb)
        uc->interrupt_callback = *int_cb;
//...
}

因此我们需要在 avformat_open_input之前,就创建好一个 AVFormatContext 对象,然后填充上 interrupt_callback,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int ex_read_interrupt_cb(void *ctx)
{
    FFExSubtitle *sub = ctx;
    return sub->abort_request;
}

int ret = 0;
    AVFormatContext* ic = avformat_alloc_context();
    
    if (ic) {
        ic->interrupt_callback.callback = ex_read_interrupt_cb;
        ic->interrupt_callback.opaque = sub;
    } else {
        ret = AVERROR(ENOMEM);
        goto fail;
    }
    
    if (avformat_open_input(&ic, file_name, NULL, &sub->opts) < 0) {
        ret = -1;
        goto fail;
    }

这样底层协议在每次读写包之前都会通过 ex_read_interrupt_cb 函数和上层业务逻辑沟通,询问是否要继续读数据,以防止同步读写导致长时间占用线程卡住问题。

结束语

我们再次学习了 avio 的部分源码,后续还会继续学习,因为 avio 是个聊不完的话题。

本文由作者按照 CC BY 4.0 进行授权