文章

HLS 读包流程分析

HLS 读包流程分析

在 FFmpeg 众多的解复用器 HLS 应该是最为复杂的一个了,是因为它不仅套娃,还自定义了 avio 读取数据,加载 Segment 重试机制,刷新 m3u8 列表机制等。今天记录走读 hls.c 读包的详细过程。

发起读包操作

fsplayer 在 read_thread 线程里发起读包操作,调用顺序如下

1
2
3
4
ret = av_read_frame(ic, pkt); 
      -> read_frame_internal 
         -> ff_read_packet
            -> err = s->iformat->read_packet(s, pkt); 

这里的 s 就是传入的 ic, s->iformat 就是 hls 解复用器(AVInputFormat)对象,根据其在 hls.c 里定义的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
const AVInputFormat ff_hls_demuxer = {
    .name           = "hls",
    .long_name      = NULL_IF_CONFIG_SMALL("Apple HTTP Live Streaming"),
    .priv_class     = &hls_class,
    .priv_data_size = sizeof(HLSContext),
    .flags          = AVFMT_NOGENSEARCH | AVFMT_TS_DISCONT | AVFMT_NO_BYTE_SEEK,
    .flags_internal = FF_FMT_INIT_CLEANUP,
    .read_probe     = hls_probe,
    .read_header2    = hls_read_header2,
    .read_packet    = hls_read_packet,
    .read_close     = hls_close,
    .read_seek      = hls_read_seek,
};

可以确定调用 iformat->read_packet() 实际上就是在调用 hls_read_packet 函数,大致功能是从列表里读 packet,读取成功就转换下时基返回,读取失败时如果是 EOF 就返回 EOF,如果 avio 没有 EOF 那就返回 ret 这个具体错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
static int hls_read_packet(AVFormatContext *s, AVPacket *pkt)
{
    HLSContext *c = s->priv_data;
    int ret, i, minplaylist = -1;

    recheck_discard_flags(s, c->first_packet);
    c->first_packet = 0;

    for (i = 0; i < c->n_playlists; i++) {
        struct playlist *pls = c->playlists[i];
        /* Make sure we've got one buffered packet from each open playlist
         * stream */
        if (pls->needed && !pls->pkt->data) {
            while (1) {
                int64_t pkt_ts = AV_NOPTS_VALUE;
                int64_t ts_diff;
                AVRational tb;
                struct segment *seg = NULL;
                //从播放列表读 packet
                ret = av_read_frame(pls->ctx, pls->pkt);
                if (ret < 0) {
                    if (!avio_feof(&pls->pb.pub) && ret != AVERROR_EOF)
                        return ret;
                    break;
                } else {
                    /* stream_index check prevents matching picture attachments etc. */
                    if (pls->is_id3_timestamped && pls->pkt->stream_index == 0) {
                        /* audio elementary streams are id3 timestamped */
                        fill_timing_for_id3_timestamped_stream(pls);
                    } else {
                        //discontinuity:ts pts need add up.
                        if (pls->finished) {
                            int seq_no = pls->cur_seq_no - pls->start_seq_no;
                            if (seq_no < pls->n_segments && s->streams[pkt->stream_index]) {
                                struct segment *seg = pls->segments[seq_no];
                                if (seg->previous_duration > 0) {
                                    int64_t pred = av_rescale_q(seg->previous_duration,
                                                            AV_TIME_BASE_Q,
                                                            s->streams[pkt->stream_index]->time_base);
                                    int64_t max_ts = av_rescale_q(seg->start_time + seg->duration,
                                                                AV_TIME_BASE_Q,
                                                                s->streams[pkt->stream_index]->time_base);
                                    /* EXTINF duration is not precise enough */
                                    max_ts += 2 * AV_TIME_BASE;
                                    if (s->start_time > 0) {
                                        max_ts += av_rescale_q(s->start_time,
                                                            AV_TIME_BASE_Q,
                                                            s->streams[pkt->stream_index]->time_base);
                                    }
                                    if (pls->pkt->dts != AV_NOPTS_VALUE && pls->pkt->dts + pred < max_ts) pls->pkt->dts += pred;
                                    if (pls->pkt->pts != AV_NOPTS_VALUE && pls->pkt->pts + pred < max_ts) pls->pkt->pts += pred;
                                }
                            }
                        }
                    }

                    if (pls->pkt->pts != AV_NOPTS_VALUE)
                        pkt_ts = pls->pkt->pts;
                    else if (pls->pkt->dts != AV_NOPTS_VALUE)
                        pkt_ts = pls->pkt->dts;

                    if (c->first_timestamp == AV_NOPTS_VALUE && pkt_ts != AV_NOPTS_VALUE)
                        c->first_timestamp = av_rescale_q(pkt_ts, get_timebase(pls), AV_TIME_BASE_Q);
                }

                seg = current_segment(pls);
                if (seg && seg->key_type == KEY_SAMPLE_AES && !strstr(pls->ctx->iformat->name, "mov")) {
                    enum AVCodecID codec_id = pls->ctx->streams[pls->pkt->stream_index]->codecpar->codec_id;
                    memcpy(c->crypto_ctx.iv, seg->iv, sizeof(seg->iv));
                    memcpy(c->crypto_ctx.key, pls->key, sizeof(pls->key));
                    ff_hls_senc_decrypt_frame(codec_id, &c->crypto_ctx, pls->pkt);
                }

                if (pls->seek_timestamp == AV_NOPTS_VALUE)
                    break;

                if (pls->seek_stream_index < 0 ||
                    pls->seek_stream_index == pls->pkt->stream_index) {

                    if (pkt_ts == AV_NOPTS_VALUE) {
                        pls->seek_timestamp = AV_NOPTS_VALUE;
                        break;
                    }

                    tb = get_timebase(pls);
                    ts_diff = av_rescale_rnd(pkt_ts, AV_TIME_BASE,
                                            tb.den, AV_ROUND_DOWN) -
                            pls->seek_timestamp;
                    if (ts_diff >= 0 && (pls->seek_flags  & AVSEEK_FLAG_ANY ||
                                        pls->pkt->flags & AV_PKT_FLAG_KEY)) {
                        pls->seek_timestamp = AV_NOPTS_VALUE;
                        break;
                    }
                }
                av_packet_unref(pls->pkt);
            }
        }
        /* Check if this stream has the packet with the lowest dts */
        if (pls->pkt->data) {
            struct playlist *minpls = minplaylist < 0 ?
                                     NULL : c->playlists[minplaylist];
            if (minplaylist < 0) {
                minplaylist = i;
            } else {
                int64_t dts     =    pls->pkt->dts;
                int64_t mindts  = minpls->pkt->dts;

                if (dts == AV_NOPTS_VALUE ||
                    (mindts != AV_NOPTS_VALUE && compare_ts_with_wrapdetect(dts, pls, mindts, minpls) < 0))
                    minplaylist = i;
            }
        }
    }

    /* If we got a packet, return it */
    if (minplaylist >= 0) {
        struct playlist *pls = c->playlists[minplaylist];
        AVStream *ist;
        AVStream *st;

        ret = update_streams_from_subdemuxer(s, pls);
        if (ret < 0) {
            av_packet_unref(pls->pkt);
            return ret;
        }

        // If sub-demuxer reports updated metadata, copy it to the first stream
        // and set its AVSTREAM_EVENT_FLAG_METADATA_UPDATED flag.
        if (pls->ctx->event_flags & AVFMT_EVENT_FLAG_METADATA_UPDATED) {
            if (pls->n_main_streams) {
                st = pls->main_streams[0];
                av_dict_copy(&st->metadata, pls->ctx->metadata, 0);
                st->event_flags |= AVSTREAM_EVENT_FLAG_METADATA_UPDATED;
            }
            pls->ctx->event_flags &= ~AVFMT_EVENT_FLAG_METADATA_UPDATED;
        }

        /* check if noheader flag has been cleared by the subdemuxer */
        if (pls->has_noheader_flag && !(pls->ctx->ctx_flags & AVFMTCTX_NOHEADER)) {
            pls->has_noheader_flag = 0;
            update_noheader_flag(s);
        }

        if (pls->pkt->stream_index >= pls->n_main_streams) {
            av_log(s, AV_LOG_ERROR, "stream index inconsistency: index %d, %d main streams, %d subdemuxer streams\n",
                   pls->pkt->stream_index, pls->n_main_streams, pls->ctx->nb_streams);
            av_packet_unref(pls->pkt);
            return AVERROR_BUG;
        }

        ist = pls->ctx->streams[pls->pkt->stream_index];
        st = pls->main_streams[pls->pkt->stream_index];

        av_packet_move_ref(pkt, pls->pkt);
        pkt->stream_index = st->index;

        if (pkt->dts != AV_NOPTS_VALUE)
            c->cur_timestamp = av_rescale_q(pkt->dts,
                                            ist->time_base,
                                            AV_TIME_BASE_Q);

        /* There may be more situations where this would be useful, but this at least
         * handles newly probed codecs properly (i.e. request_probe by mpegts). */
        if (ist->codecpar->codec_id != st->codecpar->codec_id) {
            ret = set_stream_info_from_input_stream(st, pls, ist);
            if (ret < 0) {
                return ret;
            }
        }

        return 0;
    }
    return AVERROR_EOF;
}

解下来就开始套娃了,因为在 hls 里要调用 av_read_frame 了,还记得开篇时 av_read_frame 是从 read_thread 线程发起的么。

这里的 pls->ctx 是 AVFormatContext 对象,它何时创建的呢?在 hls_read_header2 函数里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (!(pls->ctx = avformat_alloc_context()))
            return AVERROR(ENOMEM);

pls->read_buffer = av_malloc(INITIAL_BUFFER_SIZE);
        if (!pls->read_buffer){
            avformat_free_context(pls->ctx);
            pls->ctx = NULL;
            return AVERROR(ENOMEM);
        }

        ffio_init_context(&pls->pb, pls->read_buffer, INITIAL_BUFFER_SIZE, 0, pls,
                          read_data, NULL, NULL);


pls->ctx->pb       = &pls->pb.pub;
pls->ctx->io_open  = nested_io_open;
pls->ctx->flags   |= s->flags & ~AVFMT_FLAG_CUSTOM_IO;

这个 ctx 可不简单啊,跟平时用的不一样,自定义了 pb,什么是 pb ? pb 是 AVIOContext,用来接管 ctx 默认读取的方式,这里只接管了读,所以 ctx 需要读数据时就会调用 hls.c 里的 read_data 函数了。这个大前提知道后,继续看套娃逻辑:

开始套娃

跟 read_thread 线程一样,调用顺序如下:

1
2
3
4
ret = av_read_frame(pls->ctx, pls->pkt); 
      -> read_frame_internal 
           -> ff_read_packet 
            -> err = s->iformat->read_packet(s, pkt);

这里的 s 就是传入的 pls->ctx,如果是 ts 片段,这里的 iformat 就是 mpegts 解复用器(AVInputFormat), 根据结构体的定义:

1
2
3
4
5
6
7
8
9
10
11
12
const AVInputFormat ff_mpegts_demuxer = {
    .name           = "mpegts",
    .long_name      = NULL_IF_CONFIG_SMALL("MPEG-TS (MPEG-2 Transport Stream)"),
    .priv_data_size = sizeof(MpegTSContext),
    .read_probe     = mpegts_probe,
    .read_header    = mpegts_read_header,
    .read_packet    = mpegts_read_packet,
    .read_close     = mpegts_read_close,
    .read_timestamp = mpegts_get_dts,
    .flags          = AVFMT_SHOW_IDS | AVFMT_TS_DISCONT,
    .priv_class     = &mpegts_class,
};

可以确定调用 iformat->read_packet() 实际上就是在调用 mpegts_read_packet 函数,其内部调用如下:

1
ret = handle_packets(ts, 0); -> ret = read_packet(s, packet, ts->raw_packet_size, &data);

下面是 read_packet 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int read_packet(AVFormatContext *s, uint8_t *buf, int raw_packet_size,
                       const uint8_t **data)
{
    AVIOContext *pb = s->pb;
    int len;

    for (;;) {
        len = ffio_read_indirect(pb, buf, TS_PACKET_SIZE, data);
        if (len != TS_PACKET_SIZE)
            return len < 0 ? len : AVERROR_EOF;
        /* check packet sync byte */
        if ((*data)[0] != 0x47) {
            /* find a new packet start */

            if (mpegts_resync(s, raw_packet_size, *data) < 0)
                return AVERROR(EAGAIN);
            else
                continue;
        } else {
            break;
        }
    }
    return 0;

}

这里的 s 是从 hls 传过来的 pls->ctx,这里的 pb 就是在 hls 里自定义的 AVIO 对象,紧接着会调用 ffio_read_indirect 函数:

1
2
3
4
5
6
7
8
9
10
11
int ffio_read_indirect(AVIOContext *s, unsigned char *buf, int size, const unsigned char **data)
{
    if (s->buf_end - s->buf_ptr >= size && !s->write_flag) {
        *data = s->buf_ptr;
        s->buf_ptr += size;
        return size;
    } else {
        *data = buf;
        return avio_read(s, buf, size);
    }
}

当 buffer不够时,就会调用 avio_read(s, buf, size); -> fill_buffer(s); 来填充

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
static void fill_buffer(AVIOContext *s)
{
    FFIOContext *const ctx = (FFIOContext *)s;
    int max_buffer_size = s->max_packet_size ?
                          s->max_packet_size : IO_BUFFER_SIZE;
    uint8_t *dst        = s->buf_end - s->buffer + max_buffer_size <= s->buffer_size ?
                          s->buf_end : s->buffer;
    int len             = s->buffer_size - (dst - s->buffer);

    /* can't fill the buffer without read_packet, just set EOF if appropriate */
    if (!s->read_packet && s->buf_ptr >= s->buf_end)
        s->eof_reached = 1;

    //注意这个逻辑,如果 eof 了就不会再读取了 
    /* no need to do anything if EOF already reached */
    if (s->eof_reached)
        return;

    if (s->update_checksum && dst == s->buffer) {
        if (s->buf_end > s->checksum_ptr)
            s->checksum = s->update_checksum(s->checksum, s->checksum_ptr,
                                             s->buf_end - s->checksum_ptr);
        s->checksum_ptr = s->buffer;
    }

    /* make buffer smaller in case it ended up large after probing */
    if (s->read_packet && ctx->orig_buffer_size &&
        s->buffer_size > ctx->orig_buffer_size  && len >= ctx->orig_buffer_size) {
        if (dst == s->buffer && s->buf_ptr != dst) {
            int ret = set_buf_size(s, ctx->orig_buffer_size);
            if (ret < 0)
                av_log(s, AV_LOG_WARNING, "Failed to decrease buffer size\n");

            s->checksum_ptr = dst = s->buffer;
        }
        len = ctx->orig_buffer_size;
    }
    //开始读
    len = read_packet_wrapper(s, dst, len);
    if (len == AVERROR_EOF) {
        /* do not modify buffer if EOF reached so that a seek back can
           be done without rereading data */
        s->eof_reached = 1;
    } else if (len < 0) {
       //坑不坑?如果读错了就认为EOF了
        s->eof_reached = 1;
        s->error= len;
    } else {
        s->pos += len;
        s->buf_ptr = dst;
        s->buf_end = dst + len;
        ffiocontext(s)->bytes_read += len;
        s->bytes_read = ffiocontext(s)->bytes_read;
    }

}

读数据走到了 len = read_packet_wrapper(s, dst, len); 别着急,好戏马上开始了:

1
2
3
4
5
6
7
8
9
10
11
static int read_packet_wrapper(AVIOContext *s, uint8_t *buf, int size)
{
    int ret;

    if (!s->read_packet)
        return AVERROR(EINVAL);
    ret = s->read_packet(s->opaque, buf, size);
    av_assert2(ret || s->max_packet_size);
    return ret;

}

这个 s-read_packet 会走向哪里呢?我们需要知道什么时候赋值的,答案是在 hls 里初始化的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void ffio_init_context(FFIOContext *ctx,
                  unsigned char *buffer,
                  int buffer_size,
                  int write_flag,
                  void *opaque,
                  int (*read_packet)(void *opaque, uint8_t *buf, int buf_size),
#if FF_API_AVIO_WRITE_NONCONST
                  int (*write_packet)(void *opaque, uint8_t *buf, int buf_size),
#else
                  int (*write_packet)(void *opaque, const uint8_t *buf, int buf_size),
#endif
                  int64_t (*seek)(void *opaque, int64_t offset, int whence))
{
    AVIOContext *const s = &ctx->pub;

    memset(ctx, 0, sizeof(*ctx));

    s->buffer      = buffer;
    ctx->orig_buffer_size =
    s->buffer_size = buffer_size;
    s->buf_ptr     = buffer;
    s->buf_ptr_max = buffer;
    s->opaque      = opaque;
    s->direct      = 0;

    url_resetbuf(s, write_flag ? AVIO_FLAG_WRITE : AVIO_FLAG_READ);

    s->write_packet    = write_packet;
    s->read_packet     = read_packet;
    s->seek            = seek;
    s->pos             = 0;
    s->eof_reached     = 0;
    s->error           = 0;
    s->seekable        = seek ? AVIO_SEEKABLE_NORMAL : 0;
    s->min_packet_size = 0;
    s->max_packet_size = 0;
    s->update_checksum = NULL;
    ctx->short_seek_threshold = SHORT_SEEK_THRESHOLD;

    if (!read_packet && !write_flag) {
        s->pos     = buffer_size;
        s->buf_end = s->buffer + buffer_size;
    }
    s->read_pause = NULL;
    s->read_seek  = NULL;

    s->write_data_type       = NULL;
    s->ignore_boundary_point = 0;
    ctx->current_type        = AVIO_DATA_MARKER_UNKNOWN;
    ctx->last_time           = AV_NOPTS_VALUE;
    ctx->short_seek_get      = NULL;

}

还记得吧,在 hls.c 里初始化 avio 的时候传进来的,它就是 hls.c 里的 read_data 函数。你就说这套逻辑牛不牛吧,不看几遍根本搞不懂,函数跳来跳去,最终又回来了。以上就是与 hls.c 相关的读报流程,记录下来没事了看看。

本文由作者按照 CC BY 4.0 进行授权