From c8eb591688cc64c3da68ef34729b2720fd94a99b Mon Sep 17 00:00:00 2001 From: Josep Torra Date: Mon, 2 Feb 2009 23:12:07 +0100 Subject: Add pull mode to mpegpsdemux and report duration reading first and last PTS. Some random cleanups. --- gst/mpegdemux/gstmpegdemux.c | 910 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 829 insertions(+), 81 deletions(-) (limited to 'gst/mpegdemux/gstmpegdemux.c') diff --git a/gst/mpegdemux/gstmpegdemux.c b/gst/mpegdemux/gstmpegdemux.c index 9b5dbe99..3a0727ea 100644 --- a/gst/mpegdemux/gstmpegdemux.c +++ b/gst/mpegdemux/gstmpegdemux.c @@ -50,8 +50,17 @@ #include "gstmpegdefs.h" #include "gstmpegdemux.h" -#define MAX_DVD_AUDIO_STREAMS 8 -#define MAX_DVD_SUBPICTURE_STREAMS 32 +#define MAX_DVD_AUDIO_STREAMS 8 +#define MAX_DVD_SUBPICTURE_STREAMS 32 +#define BLOCK_SZ 4096 +#define SCAN_SZ 80 + +typedef enum +{ + SCAN_SCR, + SCAN_DTS, + SCAN_PTS +} SCAN_MODE; /* We clamp scr delta with 0 so negative bytes won't be possible */ #define GSTTIME_TO_BYTES(time) \ @@ -179,9 +188,16 @@ static void gst_flups_demux_finalize (GstFluPSDemux * demux); static void gst_flups_demux_reset (GstFluPSDemux * demux); static gboolean gst_flups_demux_sink_event (GstPad * pad, GstEvent * event); +static GstFlowReturn gst_flups_demux_chain (GstPad * pad, GstBuffer * buffer); +static gboolean gst_flups_demux_sink_activate (GstPad * sinkpad); +static gboolean gst_flups_demux_sink_activate_push (GstPad * sinkpad, + gboolean active); +static gboolean gst_flups_demux_sink_activate_pull (GstPad * sinkpad, + gboolean active); +static void gst_flups_demux_loop (GstPad * pad); + static gboolean gst_flups_demux_src_event (GstPad * pad, GstEvent * event); static gboolean gst_flups_demux_src_query (GstPad * pad, GstQuery * query); -static GstFlowReturn gst_flups_demux_chain (GstPad * pad, GstBuffer * buffer); static GstStateChangeReturn gst_flups_demux_change_state (GstElement * element, GstStateChange transition); @@ -259,8 +275,17 @@ gst_flups_demux_init (GstFluPSDemux * demux) GstFluPSDemuxClass *klass = GST_FLUPS_DEMUX_GET_CLASS (demux); demux->sinkpad = gst_pad_new_from_template (klass->sink_template, "sink"); - gst_pad_set_event_function (demux->sinkpad, gst_flups_demux_sink_event); - gst_pad_set_chain_function (demux->sinkpad, gst_flups_demux_chain); + gst_pad_set_event_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_flups_demux_sink_event)); + gst_pad_set_chain_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_flups_demux_chain)); + gst_pad_set_activate_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_flups_demux_sink_activate)); + gst_pad_set_activatepull_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_flups_demux_sink_activate_pull)); + gst_pad_set_activatepush_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_flups_demux_sink_activate_push)); + gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad); demux->streams = @@ -392,8 +417,10 @@ gst_flups_demux_create_stream (GstFluPSDemux * demux, gint id, gint stream_type) stream->notlinked = FALSE; stream->type = stream_type; stream->pad = gst_pad_new_from_template (template, name); - gst_pad_set_event_function (stream->pad, gst_flups_demux_src_event); - gst_pad_set_query_function (stream->pad, gst_flups_demux_src_query); + gst_pad_set_event_function (stream->pad, + GST_DEBUG_FUNCPTR (gst_flups_demux_src_event)); + gst_pad_set_query_function (stream->pad, + GST_DEBUG_FUNCPTR (gst_flups_demux_src_query)); gst_pad_use_fixed_caps (stream->pad); gst_pad_set_caps (stream->pad, caps); gst_caps_unref (caps); @@ -510,12 +537,19 @@ gst_flups_demux_send_data (GstFluPSDemux * demux, GstFluPSStream * stream, gst_pad_push_event (stream->pad, newsegment); stream->need_segment = FALSE; + if (!demux->is_segment_open) { + GST_DEBUG_OBJECT (demux, "segment opened"); + demux->is_segment_open = TRUE; + } } /* OK, sent new segment now prepare the buffer for sending */ /* caps */ gst_buffer_set_caps (buf, GST_PAD_CAPS (stream->pad)); GST_BUFFER_TIMESTAMP (buf) = timestamp; + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + gst_segment_set_last_stop (&demux->src_segment, GST_FORMAT_TIME, timestamp); + } /* Set the buffer discont flag, and clear discont state on the stream */ if (stream->discont) { @@ -682,6 +716,52 @@ gst_flups_demux_handle_dvd_event (GstFluPSDemux * demux, GstEvent * event) return TRUE; } +static void +gst_flups_demux_flush (GstFluPSDemux * demux) +{ + GST_DEBUG_OBJECT (demux, "flushing demuxer"); + gst_adapter_clear (demux->adapter); + gst_adapter_clear (demux->rev_adapter); + gst_pes_filter_drain (&demux->filter); + demux->adapter_offset = G_MAXUINT64; + demux->current_scr = G_MAXUINT64; + demux->bytes_since_scr = 0; +} + +static void +gst_flups_demux_close_segment (GstFluPSDemux * demux) +{ + if (G_UNLIKELY (!demux->is_segment_open)) { + GST_DEBUG_OBJECT (demux, "no running segment to close"); + return; + } +#if POST_10_10 + GST_INFO_OBJECT (demux, "closing running segment %" GST_SEGMENT_FORMAT, + &demux->src_segment); +#endif + + /* Close the current segment for a linear playback */ + if (demux->src_segment.rate >= 0) { + /* for forward playback, we played from start to last_stop */ + gst_flups_demux_send_event (demux, gst_event_new_new_segment (TRUE, + demux->src_segment.rate, demux->src_segment.format, + demux->src_segment.start, demux->src_segment.last_stop, + demux->src_segment.time)); + } else { + gint64 stop; + + if ((stop = demux->src_segment.stop) == -1) + stop = demux->src_segment.duration; + + /* for reverse playback, we played from stop to last_stop. */ + gst_flups_demux_send_event (demux, gst_event_new_new_segment (TRUE, + demux->src_segment.rate, demux->src_segment.format, + demux->src_segment.last_stop, stop, demux->src_segment.last_stop)); + } + + demux->is_segment_open = FALSE; +} + static gboolean gst_flups_demux_sink_event (GstPad * pad, GstEvent * event) { @@ -696,14 +776,8 @@ gst_flups_demux_sink_event (GstPad * pad, GstEvent * event) break; case GST_EVENT_FLUSH_STOP: gst_flups_demux_send_event (demux, event); - gst_segment_init (&demux->sink_segment, GST_FORMAT_UNDEFINED); - gst_adapter_clear (demux->adapter); - gst_adapter_clear (demux->rev_adapter); - demux->adapter_offset = G_MAXUINT64; - gst_pes_filter_drain (&demux->filter); - demux->current_scr = G_MAXUINT64; - demux->bytes_since_scr = 0; + gst_flups_demux_flush (demux); break; case GST_EVENT_NEWSEGMENT: { @@ -712,6 +786,9 @@ gst_flups_demux_sink_event (GstPad * pad, GstEvent * event) GstFormat format; gint64 start, stop, time; + /* Close current segment */ + gst_flups_demux_close_segment (demux); + #ifdef HAVE_NEWSEG_FULL { gdouble arate; @@ -786,71 +863,221 @@ gst_flups_demux_sink_event (GstPad * pad, GstEvent * event) } static gboolean -gst_flups_demux_src_event (GstPad * pad, GstEvent * event) +gst_flups_demux_handle_seek_push (GstFluPSDemux * demux, GstEvent * event) { gboolean res = FALSE; - GstFluPSDemux *demux; + gdouble rate; + GstFormat format; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + gint64 bstart, bstop; + GstEvent *bevent; + + gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, + &stop_type, &stop); + + GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT + " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), + GST_TIME_ARGS (stop)); + + if (format == GST_FORMAT_BYTES) { + GST_DEBUG_OBJECT (demux, "seek not supported on format %d", format); + goto not_supported; + } - demux = GST_FLUPS_DEMUX (gst_pad_get_parent (pad)); + GST_DEBUG_OBJECT (demux, "seek - trying directly upstream first"); - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_SEEK: - { - gdouble rate; - GstFormat format; - GstSeekFlags flags; - GstSeekType start_type, stop_type; - gint64 start, stop; - gint64 bstart, bstop; - GstEvent *bevent; + /* first try original format seek */ + (void) gst_event_ref (event); + if ((res = gst_pad_push_event (demux->sinkpad, event))) + goto done; + + if (format != GST_FORMAT_TIME) { + /* From here down, we only support time based seeks */ + GST_DEBUG_OBJECT (demux, "seek not supported on format %d", format); + goto not_supported; + } - gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, - &stop_type, &stop); + /* We need to convert to byte based seek and we need a scr_rate for that. */ + if (demux->scr_rate_n == G_MAXUINT64 || demux->scr_rate_d == G_MAXUINT64) { + GST_DEBUG_OBJECT (demux, "seek not possible, no scr_rate"); + goto not_supported; + } - GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT - " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), - GST_TIME_ARGS (stop)); + GST_DEBUG_OBJECT (demux, "try with scr_rate interpolation"); - if (format == GST_FORMAT_BYTES) { - GST_DEBUG_OBJECT (demux, "seek not supported on format %d", format); - goto not_supported; - } + bstart = GSTTIME_TO_BYTES (start); + bstop = GSTTIME_TO_BYTES (stop); - GST_DEBUG_OBJECT (demux, "seek - trying directly upstream first"); + GST_DEBUG_OBJECT (demux, "in bytes bstart %" G_GINT64_FORMAT " bstop %" + G_GINT64_FORMAT, bstart, bstop); + bevent = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, start_type, + bstart, stop_type, bstop); - /* first try original format seek */ - (void) gst_event_ref (event); - if ((res = gst_pad_push_event (demux->sinkpad, event))) - goto done; + res = gst_pad_push_event (demux->sinkpad, bevent); - if (format != GST_FORMAT_TIME) { - /* From here down, we only support time based seeks */ - GST_DEBUG_OBJECT (demux, "seek not supported on format %d", format); - goto not_supported; - } +done: + gst_event_unref (event); + return res; - /* We need to convert to byte based seek and we need a scr_rate for that. */ - if (demux->scr_rate_n == G_MAXUINT64 || demux->scr_rate_d == G_MAXUINT64) { - GST_DEBUG_OBJECT (demux, "seek not possible, no scr_rate"); - goto not_supported; - } +not_supported: + { + gst_event_unref (event); - GST_DEBUG_OBJECT (demux, "try with scr_rate interpolation"); + return FALSE; + } +} - bstart = GSTTIME_TO_BYTES (start); - bstop = GSTTIME_TO_BYTES (stop); +static gboolean +gst_flups_demux_handle_seek_pull (GstFluPSDemux * demux, GstEvent * event) +{ + GstFormat format; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + gdouble rate; + gboolean update, flush, keyframe; + GstSegment seeksegment; + GstClockTime first_pts = MPEGTIME_TO_GSTTIME (demux->first_pts); + + gst_event_parse_seek (event, &rate, &format, &flags, + &start_type, &start, &stop_type, &stop); + + if (format != GST_FORMAT_TIME) + goto wrong_format; + + /* We need to convert to byte based seek and we need a scr_rate for that. */ + if (demux->scr_rate_n == G_MAXUINT64 || demux->scr_rate_d == G_MAXUINT64) + goto no_scr_rate; + + flush = flags & GST_SEEK_FLAG_FLUSH; + keyframe = flags & GST_SEEK_FLAG_KEY_UNIT; + + if (flush) { + /* Flush start up and downstream to make sure data flow and loops are + idle */ + gst_flups_demux_send_event (demux, gst_event_new_flush_start ()); + gst_pad_push_event (demux->sinkpad, gst_event_new_flush_start ()); + } else { + /* Pause the pulling task */ + gst_pad_pause_task (demux->sinkpad); + } - GST_DEBUG_OBJECT (demux, "in bytes bstart %" G_GINT64_FORMAT " bstop %" - G_GINT64_FORMAT, bstart, bstop); - bevent = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, start_type, - bstart, stop_type, bstop); + /* Take the stream lock */ + GST_PAD_STREAM_LOCK (demux->sinkpad); - res = gst_pad_push_event (demux->sinkpad, bevent); + if (flush) { + /* Stop flushing upstream we need to pull */ + gst_pad_push_event (demux->sinkpad, gst_event_new_flush_stop ()); + } - done: - gst_event_unref (event); + /* Work on a copy until we are sure the seek succeeded. */ + memcpy (&seeksegment, &demux->src_segment, sizeof (GstSegment)); + +#if POST_10_10 + GST_DEBUG_OBJECT (demux, "segment before configure %" GST_SEGMENT_FORMAT, + &demux->src_segment); +#endif + + /* Apply the seek to our segment */ + gst_segment_set_seek (&seeksegment, rate, format, flags, + start_type, start, stop_type, stop, &update); + + if (flush || seeksegment.last_stop != demux->src_segment.last_stop) { + /* Do the actual seeking */ +#if POST_10_10 + GST_INFO_OBJECT (demux, "sink segment configured %" GST_SEGMENT_FORMAT, + &demux->sink_segment); +#endif + gst_segment_set_last_stop (&demux->sink_segment, GST_FORMAT_BYTES, + MIN (GSTTIME_TO_BYTES (seeksegment.last_stop), + demux->sink_segment.stop)); +#if POST_10_10 + GST_INFO_OBJECT (demux, "sink segment configured %" GST_SEGMENT_FORMAT, + &demux->sink_segment); +#endif + } + + /* check the limits */ + if (seeksegment.start < first_pts) + seeksegment.start = first_pts; + + if (seeksegment.last_stop < first_pts) + seeksegment.last_stop = first_pts; + + /* update the rate in our src segment */ + demux->sink_segment.rate = rate; + +#if POST_10_10 + GST_DEBUG_OBJECT (demux, "seek segment configured %" GST_SEGMENT_FORMAT, + &seeksegment); +#endif + + if (flush) { + /* Stop flushing, the sinks are at time 0 now */ + gst_flups_demux_send_event (demux, gst_event_new_flush_stop ()); + } else { + gst_flups_demux_close_segment (demux); + } + + if (flush || seeksegment.last_stop != demux->src_segment.last_stop) { + gst_flups_demux_flush (demux); + } + + /* Ok seek succeeded, take the newly configured segment */ + memcpy (&demux->src_segment, &seeksegment, sizeof (GstSegment)); + + /* Notify about the start of a new segment */ + if (demux->src_segment.flags & GST_SEEK_FLAG_SEGMENT) { + gst_element_post_message (GST_ELEMENT (demux), + gst_message_new_segment_start (GST_OBJECT (demux), + demux->src_segment.format, demux->src_segment.last_stop)); + } + + /* Tell all the stream a new segment is needed */ + gst_flups_demux_mark_discont (demux, TRUE, TRUE); + + gst_pad_start_task (demux->sinkpad, + (GstTaskFunction) gst_flups_demux_loop, demux->sinkpad); + + GST_PAD_STREAM_UNLOCK (demux->sinkpad); + + gst_event_unref (event); + return TRUE; + + /* ERRORS */ +wrong_format: + { + GST_WARNING_OBJECT (demux, "we only support seeking in TIME or BYTES " + "formats"); + gst_event_unref (event); + return FALSE; + } +no_scr_rate: + { + GST_WARNING_OBJECT (demux, "seek not possible, no scr_rate"); + gst_event_unref (event); + return FALSE; + } +} + +static gboolean +gst_flups_demux_src_event (GstPad * pad, GstEvent * event) +{ + gboolean res = FALSE; + GstFluPSDemux *demux; + + demux = GST_FLUPS_DEMUX (gst_pad_get_parent (pad)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK: + if (demux->random_access) { + res = gst_flups_demux_handle_seek_pull (demux, event); + } else { + res = gst_flups_demux_handle_seek_push (demux, event); + } break; - } default: res = gst_pad_push_event (demux->sinkpad, event); break; @@ -859,14 +1086,6 @@ gst_flups_demux_src_event (GstPad * pad, GstEvent * event) gst_object_unref (demux); return res; - -not_supported: - { - gst_object_unref (demux); - gst_event_unref (event); - - return FALSE; - } } static gboolean @@ -884,7 +1103,6 @@ gst_flups_demux_src_query (GstPad * pad, GstQuery * query) case GST_QUERY_POSITION: { GstFormat format; - gint64 position; gst_query_parse_position (query, &format, NULL); @@ -893,17 +1111,11 @@ gst_flups_demux_src_query (GstPad * pad, GstQuery * query) format); goto not_supported; } - if (demux->current_scr == G_MAXUINT64 || demux->first_scr == G_MAXUINT64) { - GST_DEBUG_OBJECT (demux, "position not possible, no current_scr"); - goto not_supported; - } - - position = MPEGTIME_TO_GSTTIME (demux->current_scr - demux->first_scr); GST_LOG_OBJECT (demux, "Position at GStreamer Time:%" GST_TIME_FORMAT, - GST_TIME_ARGS (position)); + GST_TIME_ARGS (demux->src_segment.last_stop)); - gst_query_set_position (query, format, position); + gst_query_set_position (query, format, demux->src_segment.last_stop); res = TRUE; break; } @@ -915,6 +1127,14 @@ gst_flups_demux_src_query (GstPad * pad, GstQuery * query) gst_query_parse_duration (query, &format, NULL); + if (G_LIKELY (format == GST_FORMAT_TIME && + GST_CLOCK_TIME_IS_VALID (demux->src_segment.duration))) { + gst_query_set_duration (query, GST_FORMAT_TIME, + demux->src_segment.duration); + res = TRUE; + break; + } + if ((peer = gst_pad_get_peer (demux->sinkpad)) == NULL) { GST_DEBUG_OBJECT (demux, "duration not possible, no peer"); goto not_supported; @@ -1694,13 +1914,534 @@ need_data: } } -static gboolean +static inline gboolean gst_flups_demux_is_pes_sync (guint32 sync) { return ((sync & 0xfc) == 0xbc) || ((sync & 0xe0) == 0xc0) || ((sync & 0xf0) == 0xe0); } +static inline gboolean +gst_flups_demux_scan_ts (GstFluPSDemux * demux, const guint8 * data, + SCAN_MODE mode, guint64 * rts) +{ + gboolean ret = FALSE; + guint32 scr1, scr2; + guint64 scr; + guint64 pts, dts; + guint32 code; + + /* read the 4 bytes for the sync code */ + code = GST_READ_UINT32_BE (data); + if (G_LIKELY (code != ID_PS_PACK_START_CODE)) + goto beach; + + /* skip start code */ + data += 4; + + scr1 = GUINT32_FROM_BE (*(guint32 *) data); + scr2 = GUINT32_FROM_BE (*(guint32 *) (data + 4)); + + /* start parsing the stream */ + if ((*data & 0xc0) == 0x40) { + guint32 scr_ext; + guint32 next32; + guint8 stuffing_bytes; + + /* :2=01 ! scr:3 ! marker:1==1 ! scr:15 ! marker:1==1 ! scr:15 */ + + /* check markers */ + if ((scr1 & 0xc4000400) != 0x44000400) + goto beach; + + scr = ((guint64) scr1 & 0x38000000) << 3; + scr |= ((guint64) scr1 & 0x03fff800) << 4; + scr |= ((guint64) scr1 & 0x000003ff) << 5; + scr |= ((guint64) scr2 & 0xf8000000) >> 27; + + /* marker:1==1 ! scr_ext:9 ! marker:1==1 */ + if ((scr2 & 0x04010000) != 0x04010000) + goto beach; + + scr_ext = (scr2 & 0x03fe0000) >> 17; + + if (scr_ext) { + scr = (scr * 300 + scr_ext % 300) / 300; + } + /* SCR has been converted into units of 90Khz ticks to make it comparable + to DTS/PTS, that also implies 1 tick rounding error */ + data += 6; + /* PMR:22 ! :2==11 ! reserved:5 ! stuffing_len:3 */ + next32 = (GUINT32_FROM_BE ((*(guint32 *) data))); + if ((next32 & 0x00000300) != 0x00000300) + goto beach; + + stuffing_bytes = (next32 & 0x07); + data += 4; + while (stuffing_bytes--) { + if (*data++ != 0xff) + goto beach; + } + } else { + /* check markers */ + if ((scr1 & 0xf1000100) != 0x21000100) + goto beach; + + if ((scr2 & 0x01800001) != 0x01800001) + goto beach; + + /* :4=0010 ! scr:3 ! marker:1==1 ! scr:15 ! marker:1==1 ! scr:15 ! marker:1==1 */ + scr = ((guint64) scr1 & 0x0e000000) << 5; + scr |= ((guint64) scr1 & 0x00fffe00) << 6; + scr |= ((guint64) scr1 & 0x000000ff) << 7; + scr |= ((guint64) scr2 & 0xfe000000) >> 25; + data += 8; + } + + if (mode == SCAN_SCR) { + *rts = scr; + ret = TRUE; + } + + /* read the 4 bytes for the PES sync code */ + code = GST_READ_UINT32_BE (data); + if (!gst_flups_demux_is_pes_sync (code)) + goto beach; + + switch (code) { + case ID_PS_PROGRAM_STREAM_MAP: + case ID_PRIVATE_STREAM_2: + case ID_ECM_STREAM: + case ID_EMM_STREAM: + case ID_PROGRAM_STREAM_DIRECTORY: + case ID_DSMCC_STREAM: + case ID_ITU_TREC_H222_TYPE_E_STREAM: + case ID_PADDING_STREAM: + goto beach; + default: + break; + } + + /* skip sync code and size */ + data += 6; + + pts = dts = -1; + + /* stuffing bits, first two bits are '10' for mpeg2 pes so this code is + * not triggered. */ + while (TRUE) { + if (*data != 0xff) + break; + data++; + } + + /* STD buffer size, never for mpeg2 */ + if ((*data & 0xc0) == 0x40) + data += 3; + + /* PTS but no DTS, never for mpeg2 */ + if ((*data & 0xf0) == 0x20) { + READ_TS (data, pts, beach); + } + /* PTS and DTS, never for mpeg2 */ + else if ((*data & 0xf0) == 0x30) { + READ_TS (data, pts, beach); + READ_TS (data, dts, beach); + } else if ((*data & 0xc0) == 0x80) { + /* mpeg2 case */ + guchar flags; + + /* 2: '10' + * 2: PES_scrambling_control + * 1: PES_priority + * 1: data_alignment_indicator + * 1: copyright + * 1: original_or_copy + */ + flags = *data++; + + if ((flags & 0xc0) != 0x80) + goto beach; + + /* 2: PTS_DTS_flags + * 1: ESCR_flag + * 1: ES_rate_flag + * 1: DSM_trick_mode_flag + * 1: additional_copy_info_flag + * 1: PES_CRC_flag + * 1: PES_extension_flag + */ + flags = *data++; + + /* 8: PES_header_data_length */ + data++; + + /* only DTS: this is invalid */ + if ((flags & 0xc0) == 0x40) + goto beach; + + /* check for PTS */ + if ((flags & 0x80)) { + READ_TS (data, pts, beach); + } + /* check for DTS */ + if ((flags & 0x40)) { + READ_TS (data, dts, beach); + } + } + + if (mode == SCAN_DTS && dts != -1) { + *rts = dts; + ret = TRUE; + } + + if (mode == SCAN_PTS && pts != -1) { + *rts = pts; + ret = TRUE; + } +beach: + return ret; +} + +static inline void +gst_flups_demux_scan_forward_ts (GstFluPSDemux * demux, guint64 * pos, + SCAN_MODE mode, guint64 * rts) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *buffer = NULL; + guint64 offset = *pos; + gboolean found = FALSE; + guint64 ts = 0; + + /* read some data */ + ret = gst_pad_pull_range (demux->sinkpad, offset, BLOCK_SZ, &buffer); + offset += BLOCK_SZ - SCAN_SZ + 1; + do { + const guint8 *data = GST_BUFFER_DATA (buffer); + do { + found = gst_flups_demux_scan_ts (demux, data++, mode, &ts); + } while (!found && data < (GST_BUFFER_DATA (buffer) + BLOCK_SZ - SCAN_SZ)); + + /* done with the buffer, unref it */ + gst_buffer_unref (buffer); + + if (found) { + *rts = ts; + *pos = offset - 1; + break; + } + + ret = gst_pad_pull_range (demux->sinkpad, offset, BLOCK_SZ, &buffer); + offset += BLOCK_SZ; + } while (offset < (demux->sink_segment.stop * 0.10)); +} + +static inline void +gst_flups_demux_scan_backward_ts (GstFluPSDemux * demux, guint64 * pos, + SCAN_MODE mode, guint64 * rts) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *buffer = NULL; + guint64 offset = *pos; + gboolean found = FALSE; + guint64 ts = 0; + + /* read some data */ + ret = gst_pad_pull_range (demux->sinkpad, offset, BLOCK_SZ, &buffer); + offset -= (BLOCK_SZ - SCAN_SZ + 1); + do { + const guint8 *data = GST_BUFFER_DATA (buffer) + BLOCK_SZ - SCAN_SZ; + do { + found = gst_flups_demux_scan_ts (demux, data--, mode, &ts); + } while (!found && data > GST_BUFFER_DATA (buffer)); + + /* done with the buffer, unref it */ + gst_buffer_unref (buffer); + + if (found) { + *rts = ts; + *pos = offset + 1; + break; + } + + ret = gst_pad_pull_range (demux->sinkpad, offset, BLOCK_SZ, &buffer); + offset -= BLOCK_SZ; + } while (offset > (demux->sink_segment.stop * 0.90)); +} + +static inline gboolean +gst_flups_sink_get_duration (GstFluPSDemux * demux) +{ + gboolean res = FALSE; + GstPad *peer; + GstFormat format = GST_FORMAT_BYTES; + gint64 length = 0; + guint64 offset; + + /* init the sink segment */ + gst_segment_init (&demux->sink_segment, format); + + /* get peer to figure out length */ + if ((peer = gst_pad_get_peer (demux->sinkpad)) == NULL) + goto beach; + + res = gst_pad_query_duration (peer, &format, &length); + gst_object_unref (peer); + + if (!res || length <= 0) + goto beach; + + GST_DEBUG_OBJECT (demux, "file length %" G_GINT64_FORMAT, length); + + /* update the sink segment */ + demux->sink_segment.stop = length; + gst_segment_set_duration (&demux->sink_segment, format, length); + gst_segment_set_last_stop (&demux->sink_segment, format, 0); + + /* Scan for notorious SCR and PTS to calculate the duration */ + /* scan for first SCR in the stream */ + offset = demux->sink_segment.start; + gst_flups_demux_scan_forward_ts (demux, &offset, SCAN_SCR, &demux->first_scr); + demux->base_time = MPEGTIME_TO_GSTTIME (demux->first_scr); + GST_DEBUG_OBJECT (demux, "First SCR: %" G_GINT64_FORMAT " %" GST_TIME_FORMAT + " in packet starting at %" G_GUINT64_FORMAT, + demux->first_scr, GST_TIME_ARGS (demux->base_time), offset); + /* scan for last SCR in the stream */ + offset = demux->sink_segment.stop - BLOCK_SZ; + gst_flups_demux_scan_backward_ts (demux, &offset, SCAN_SCR, &demux->last_scr); + GST_DEBUG_OBJECT (demux, "Last SCR: %" G_GINT64_FORMAT " %" GST_TIME_FORMAT + " in packet starting at %" G_GUINT64_FORMAT, + demux->last_scr, GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (demux->last_scr)), + offset); + /* scan for first PTS in the stream */ + offset = demux->sink_segment.start; + gst_flups_demux_scan_forward_ts (demux, &offset, SCAN_PTS, &demux->first_pts); + GST_DEBUG_OBJECT (demux, "First PTS: %" G_GINT64_FORMAT " %" GST_TIME_FORMAT + " in packet starting at %" G_GUINT64_FORMAT, + demux->first_pts, GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (demux->first_pts)), + offset); + /* scan for last PTS in the stream */ + offset = demux->sink_segment.stop - BLOCK_SZ; + gst_flups_demux_scan_backward_ts (demux, &offset, SCAN_PTS, &demux->last_pts); + GST_DEBUG_OBJECT (demux, "Last PTS: %" G_GINT64_FORMAT " %" GST_TIME_FORMAT + " in packet starting at %" G_GUINT64_FORMAT, + demux->last_pts, GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (demux->last_pts)), + offset); + + if (G_LIKELY (demux->first_pts != G_MAXUINT64 && + demux->last_pts != G_MAXUINT64)) { + /* update the src segment */ + demux->src_segment.start = MPEGTIME_TO_GSTTIME (demux->first_pts); + demux->src_segment.stop = -1; + gst_segment_set_duration (&demux->src_segment, GST_FORMAT_TIME, + MPEGTIME_TO_GSTTIME (demux->last_pts - demux->first_pts)); + gst_segment_set_last_stop (&demux->src_segment, GST_FORMAT_TIME, + demux->src_segment.start); + } +#if POST_10_10 + GST_INFO_OBJECT (demux, "sink segment configured %" GST_SEGMENT_FORMAT, + &demux->sink_segment); + GST_INFO_OBJECT (demux, "src segment configured %" GST_SEGMENT_FORMAT, + &demux->src_segment); +#endif + + res = TRUE; + +beach: + return res; +} + +static inline GstFlowReturn +gst_flups_demux_pull_block (GstPad * pad, GstFluPSDemux * demux, + guint64 offset, guint size) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *buffer; + ret = gst_pad_pull_range (pad, offset, size, &buffer); + if (G_UNLIKELY (ret != GST_FLOW_OK)) { + GST_DEBUG_OBJECT (demux, "pull range at %" G_GUINT64_FORMAT + " size %u failed", offset, size); + goto beach; + } else + GST_LOG_OBJECT (demux, "pull range at %" G_GUINT64_FORMAT + " size %u done", offset, size); + + if (demux->sink_segment.rate < 0) { + GST_LOG_OBJECT (demux, "setting discont flag on backward rate"); + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + } + ret = gst_flups_demux_chain (pad, buffer); + +beach: + return ret; +} + +static void +gst_flups_demux_loop (GstPad * pad) +{ + GstFluPSDemux *demux; + GstFlowReturn ret = GST_FLOW_OK; + guint offset = 0; + + demux = GST_FLUPS_DEMUX (gst_pad_get_parent (pad)); + + if (G_UNLIKELY (demux->sink_segment.format == GST_FORMAT_UNDEFINED)) + gst_flups_sink_get_duration (demux); + + offset = demux->sink_segment.last_stop; + if (demux->sink_segment.rate >= 0) { + guint size = BLOCK_SZ; + if (G_LIKELY (demux->sink_segment.stop != -1)) { + size = MIN (size, demux->sink_segment.stop - offset); + } + /* pull in data */ + ret = gst_flups_demux_pull_block (pad, demux, offset, size); + + /* pause if something went wrong */ + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto pause; + + /* update our position */ + offset += size; + gst_segment_set_last_stop (&demux->sink_segment, GST_FORMAT_BYTES, offset); + + /* check EOS condition */ + if ((demux->src_segment.flags & GST_SEEK_FLAG_SEGMENT) && + ((demux->sink_segment.last_stop >= demux->sink_segment.stop) || + (demux->src_segment.stop != -1 && + demux->src_segment.last_stop >= demux->src_segment.stop))) { + ret = GST_FLOW_UNEXPECTED; + goto pause; + } + } else { /* Reverse playback */ + guint size = MIN (offset, BLOCK_SZ); + + /* pull in data */ + ret = gst_flups_demux_pull_block (pad, demux, offset - size, size); + + /* pause if something went wrong */ + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto pause; + + /* update our position */ + offset -= size; + gst_segment_set_last_stop (&demux->sink_segment, GST_FORMAT_BYTES, offset); + + /* check EOS condition */ + if (demux->sink_segment.last_stop <= demux->sink_segment.start || + demux->src_segment.last_stop <= demux->src_segment.start) { + ret = GST_FLOW_UNEXPECTED; + goto pause; + } + } + + gst_object_unref (demux); + + return; + +pause: + { + const gchar *reason = gst_flow_get_name (ret); + + GST_LOG_OBJECT (demux, "pausing task, reason %s", reason); + gst_pad_pause_task (pad); + + if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) { + if (ret == GST_FLOW_UNEXPECTED) { + /* perform EOS logic */ + gst_element_no_more_pads (GST_ELEMENT_CAST (demux)); + if (demux->src_segment.flags & GST_SEEK_FLAG_SEGMENT) { + gint64 stop; + + /* for segment playback we need to post when (in stream time) + * we stopped, this is either stop (when set) or the duration. */ + if ((stop = demux->src_segment.stop) == -1) + stop = demux->src_segment.duration; + + if (demux->sink_segment.rate >= 0) { + GST_LOG_OBJECT (demux, "Sending segment done, at end of segment"); + gst_element_post_message (GST_ELEMENT_CAST (demux), + gst_message_new_segment_done (GST_OBJECT_CAST (demux), + GST_FORMAT_TIME, stop)); + } else { /* Reverse playback */ + GST_LOG_OBJECT (demux, "Sending segment done, at beginning of " + "segment"); + gst_element_post_message (GST_ELEMENT_CAST (demux), + gst_message_new_segment_done (GST_OBJECT_CAST (demux), + GST_FORMAT_TIME, demux->src_segment.start)); + } + } else { + /* normal playback, send EOS to all linked pads */ + gst_element_no_more_pads (GST_ELEMENT (demux)); + GST_LOG_OBJECT (demux, "Sending EOS, at end of stream"); + if (!gst_flups_demux_send_event (demux, gst_event_new_eos ())) { + GST_WARNING_OBJECT (demux, "failed pushing EOS on streams"); + GST_ELEMENT_ERROR (demux, STREAM, FAILED, + ("Internal data stream error."), ("Can't push EOS downstream")); + } + } + } else { + GST_ELEMENT_ERROR (demux, STREAM, FAILED, + ("Internal data stream error."), + ("stream stopped, reason %s", reason)); + gst_flups_demux_send_event (demux, gst_event_new_eos ()); + } + } + + gst_object_unref (demux); + return; + } +} + +/* If we can pull that's prefered */ +static gboolean +gst_flups_demux_sink_activate (GstPad * sinkpad) +{ + if (gst_pad_check_pull_range (sinkpad)) { + return gst_pad_activate_pull (sinkpad, TRUE); + } else { + return gst_pad_activate_push (sinkpad, TRUE); + } +} + +/* This function gets called when we activate ourselves in push mode. */ +static gboolean +gst_flups_demux_sink_activate_push (GstPad * sinkpad, gboolean active) +{ + GstFluPSDemux *demux; + + demux = GST_FLUPS_DEMUX (gst_pad_get_parent (sinkpad)); + + demux->random_access = FALSE; + + gst_object_unref (demux); + + return TRUE; +} + +/* this function gets called when we activate ourselves in pull mode. + * We can perform random access to the resource and we start a task + * to start reading */ +static gboolean +gst_flups_demux_sink_activate_pull (GstPad * sinkpad, gboolean active) +{ + GstFluPSDemux *demux; + + demux = GST_FLUPS_DEMUX (gst_pad_get_parent (sinkpad)); + + if (active) { + GST_DEBUG ("pull mode activated"); + demux->random_access = TRUE; + gst_object_unref (demux); + return gst_pad_start_task (sinkpad, (GstTaskFunction) gst_flups_demux_loop, + sinkpad); + } else { + demux->random_access = FALSE; + gst_object_unref (demux); + return gst_pad_stop_task (sinkpad); + } +} + static GstFlowReturn gst_flups_demux_chain (GstPad * pad, GstBuffer * buffer) { @@ -1855,10 +2596,14 @@ gst_flups_demux_change_state (GstElement * element, GstStateChange transition) (GstPESFilterResync) gst_flups_demux_resync_cb, demux); demux->filter.gather_pes = TRUE; demux->first_scr = G_MAXUINT64; + demux->last_scr = G_MAXUINT64; demux->current_scr = G_MAXUINT64; demux->base_time = G_MAXUINT64; demux->scr_rate_n = G_MAXUINT64; demux->scr_rate_d = G_MAXUINT64; + demux->first_pts = G_MAXUINT64; + demux->last_pts = G_MAXUINT64; + demux->is_segment_open = FALSE; break; case GST_STATE_CHANGE_READY_TO_PAUSED: demux->current_scr = G_MAXUINT64; @@ -1866,11 +2611,14 @@ gst_flups_demux_change_state (GstElement * element, GstStateChange transition) demux->next_pts = G_MAXUINT64; demux->next_dts = G_MAXUINT64; demux->first_scr = G_MAXUINT64; + demux->last_scr = G_MAXUINT64; demux->base_time = G_MAXUINT64; demux->scr_rate_n = G_MAXUINT64; demux->scr_rate_d = G_MAXUINT64; demux->need_no_more_pads = TRUE; - + demux->first_pts = G_MAXUINT64; + demux->last_pts = G_MAXUINT64; + demux->is_segment_open = FALSE; gst_flups_demux_reset_psm (demux); gst_segment_init (&demux->sink_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&demux->src_segment, GST_FORMAT_TIME); -- cgit v1.2.1