From 815d2620ace9b289bebd163cfe84727836432f11 Mon Sep 17 00:00:00 2001 From: Stefan Kost Date: Thu, 9 Oct 2008 09:21:44 +0000 Subject: ext/amrwb/gstamrwbparse.*: Add flush seek handler. Taken from recent armnbparse changes. Original commit message from CVS: * ext/amrwb/gstamrwbparse.c: * ext/amrwb/gstamrwbparse.h: Add flush seek handler. Taken from recent armnbparse changes. Sync the code more and use #defines for HEADER. --- ext/amrwb/gstamrwbparse.c | 314 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 276 insertions(+), 38 deletions(-) (limited to 'ext/amrwb/gstamrwbparse.c') diff --git a/ext/amrwb/gstamrwbparse.c b/ext/amrwb/gstamrwbparse.c index de8e1c6f..bcee1704 100644 --- a/ext/amrwb/gstamrwbparse.c +++ b/ext/amrwb/gstamrwbparse.c @@ -56,6 +56,9 @@ GST_DEBUG_CATEGORY_STATIC (gst_amrwbparse_debug); extern const UWord8 block_size[]; +#define AMRWB_HEADER_SIZE 9 +#define AMRWB_HEADER_STR "#!AMR-WB\n" + static void gst_amrwbparse_base_init (gpointer klass); static void gst_amrwbparse_class_init (GstAmrwbParseClass * klass); static void gst_amrwbparse_init (GstAmrwbParse * amrwbparse, @@ -64,11 +67,15 @@ static void gst_amrwbparse_init (GstAmrwbParse * amrwbparse, static const GstQueryType *gst_amrwbparse_querytypes (GstPad * pad); static gboolean gst_amrwbparse_query (GstPad * pad, GstQuery * query); +static gboolean gst_amrwbparse_sink_event (GstPad * pad, GstEvent * event); +static gboolean gst_amrwbparse_src_event (GstPad * pad, GstEvent * event); static GstFlowReturn gst_amrwbparse_chain (GstPad * pad, GstBuffer * buffer); static void gst_amrwbparse_loop (GstPad * pad); static gboolean gst_amrwbparse_sink_activate (GstPad * sinkpad); static gboolean gst_amrwbparse_sink_activate_pull (GstPad * sinkpad, gboolean active); +static gboolean gst_amrwbparse_sink_activate_push (GstPad * sinkpad, + gboolean active); static GstStateChangeReturn gst_amrwbparse_state_change (GstElement * element, GstStateChange transition); @@ -117,16 +124,20 @@ gst_amrwbparse_init (GstAmrwbParse * amrwbparse, GstAmrwbParseClass * klass) gst_pad_new_from_static_template (&sink_template, "sink"); gst_pad_set_chain_function (amrwbparse->sinkpad, GST_DEBUG_FUNCPTR (gst_amrwbparse_chain)); - + gst_pad_set_event_function (amrwbparse->sinkpad, + GST_DEBUG_FUNCPTR (gst_amrwbparse_sink_event)); gst_pad_set_activate_function (amrwbparse->sinkpad, gst_amrwbparse_sink_activate); gst_pad_set_activatepull_function (amrwbparse->sinkpad, gst_amrwbparse_sink_activate_pull); - + gst_pad_set_activatepush_function (amrwbparse->sinkpad, + gst_amrwbparse_sink_activate_push); gst_element_add_pad (GST_ELEMENT (amrwbparse), amrwbparse->sinkpad); /* create the src pad */ amrwbparse->srcpad = gst_pad_new_from_static_template (&src_template, "src"); + gst_pad_set_event_function (amrwbparse->srcpad, + GST_DEBUG_FUNCPTR (gst_amrwbparse_src_event)); gst_pad_set_query_function (amrwbparse->srcpad, GST_DEBUG_FUNCPTR (gst_amrwbparse_query)); gst_pad_set_query_type_function (amrwbparse->srcpad, @@ -158,6 +169,7 @@ gst_amrwbparse_querytypes (GstPad * pad) { static const GstQueryType list[] = { GST_QUERY_POSITION, + GST_QUERY_DURATION, 0 }; @@ -234,9 +246,193 @@ gst_amrwbparse_query (GstPad * pad, GstQuery * query) } +static gboolean +gst_amrwbparse_handle_pull_seek (GstAmrwbParse * amrwbparse, GstPad * pad, + GstEvent * event) +{ + GstFormat format; + gdouble rate; + GstSeekFlags flags; + GstSeekType cur_type, stop_type; + gint64 cur, stop; + gint64 byte_cur = -1, byte_stop = -1; + gboolean flush; + + gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur, + &stop_type, &stop); + + GST_DEBUG_OBJECT (amrwbparse, "Performing seek to %" GST_TIME_FORMAT, + GST_TIME_ARGS (cur)); + + /* For any format other than TIME, see if upstream handles + * it directly or fail. For TIME, try upstream, but do it ourselves if + * it fails upstream */ + if (format != GST_FORMAT_TIME) { + return gst_pad_push_event (amrwbparse->sinkpad, event); + } else { + if (gst_pad_push_event (amrwbparse->sinkpad, event)) + return TRUE; + } + + flush = flags & GST_SEEK_FLAG_FLUSH; + + /* send flush start */ + if (flush) + gst_pad_push_event (amrwbparse->sinkpad, gst_event_new_flush_start ()); + /* we only handle FLUSH seeks at the moment */ + else + return FALSE; + + /* grab streaming lock, this should eventually be possible, either + * because the task is paused or our streaming thread stopped + * because our peer is flushing. */ + GST_PAD_STREAM_LOCK (amrwbparse->sinkpad); + + /* Convert the TIME to the appropriate BYTE position at which to resume + * decoding. */ + cur = cur / (20 * GST_MSECOND) * (20 * GST_MSECOND); + if (cur != -1) + byte_cur = amrwbparse->block * (cur / 20 / GST_MSECOND) + AMRWB_HEADER_SIZE; + if (stop != -1) + byte_stop = + amrwbparse->block * (stop / 20 / GST_MSECOND) + AMRWB_HEADER_SIZE; + amrwbparse->offset = byte_cur; + amrwbparse->ts = cur; + + GST_DEBUG_OBJECT (amrwbparse, "Seeking to byte range %" G_GINT64_FORMAT + " to %" G_GINT64_FORMAT, byte_cur, cur); + + /* and prepare to continue streaming */ + /* send flush stop, peer will accept data and events again. We + * are not yet providing data as we still have the STREAM_LOCK. */ + gst_pad_push_event (amrwbparse->sinkpad, gst_event_new_flush_stop ()); + gst_pad_push_event (amrwbparse->srcpad, gst_event_new_new_segment (FALSE, + rate, format, cur, -1, cur)); + + /* and restart the task in case it got paused explicitely or by + * the FLUSH_START event we pushed out. */ + gst_pad_start_task (amrwbparse->sinkpad, + (GstTaskFunction) gst_amrwbparse_loop, amrwbparse->sinkpad); + + /* and release the lock again so we can continue streaming */ + GST_PAD_STREAM_UNLOCK (amrwbparse->sinkpad); + + return TRUE; +} + +static gboolean +gst_amrwbparse_handle_push_seek (GstAmrwbParse * amrwbparse, GstPad * pad, + GstEvent * event) +{ + GstFormat format; + gdouble rate; + GstSeekFlags flags; + GstSeekType cur_type, stop_type; + gint64 cur, stop; + gint64 byte_cur = -1, byte_stop = -1; + + gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur, + &stop_type, &stop); + + GST_DEBUG_OBJECT (amrwbparse, "Performing seek to %" GST_TIME_FORMAT, + GST_TIME_ARGS (cur)); + + /* For any format other than TIME, see if upstream handles + * it directly or fail. For TIME, try upstream, but do it ourselves if + * it fails upstream */ + if (format != GST_FORMAT_TIME) { + return gst_pad_push_event (amrwbparse->sinkpad, event); + } else { + if (gst_pad_push_event (amrwbparse->sinkpad, event)) + return TRUE; + } + + /* Convert the TIME to the appropriate BYTE position at which to resume + * decoding. */ + cur = cur / (20 * GST_MSECOND) * (20 * GST_MSECOND); + if (cur != -1) + byte_cur = amrwbparse->block * (cur / 20 / GST_MSECOND) + AMRWB_HEADER_SIZE; + if (stop != -1) + byte_stop = + amrwbparse->block * (stop / 20 / GST_MSECOND) + AMRWB_HEADER_SIZE; + amrwbparse->ts = cur; + + GST_DEBUG_OBJECT (amrwbparse, "Seeking to byte range %" G_GINT64_FORMAT + " to %" G_GINT64_FORMAT, byte_cur, byte_stop); + + /* Send BYTE based seek upstream */ + event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, cur_type, + byte_cur, stop_type, byte_stop); + + return gst_pad_push_event (amrwbparse->sinkpad, event); +} + +static gboolean +gst_amrwbparse_src_event (GstPad * pad, GstEvent * event) +{ + GstAmrwbParse *amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (pad)); + gboolean res; + + GST_DEBUG_OBJECT (amrwbparse, "handling event %d", GST_EVENT_TYPE (event)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK: + if (amrwbparse->seek_handler) + res = amrwbparse->seek_handler (amrwbparse, pad, event); + else + res = FALSE; + break; + default: + res = gst_pad_push_event (amrwbparse->sinkpad, event); + break; + } + gst_object_unref (amrwbparse); + + return res; +} + + /* * Data reading. */ +static gboolean +gst_amrwbparse_sink_event (GstPad * pad, GstEvent * event) +{ + GstAmrwbParse *amrwbparse; + gboolean res; + + amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (pad)); + + GST_LOG ("handling event %d", GST_EVENT_TYPE (event)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + res = gst_pad_push_event (amrwbparse->srcpad, event); + break; + case GST_EVENT_FLUSH_STOP: + gst_adapter_clear (amrwbparse->adapter); + gst_segment_init (&amrwbparse->segment, GST_FORMAT_TIME); + res = gst_pad_push_event (amrwbparse->srcpad, event); + break; + case GST_EVENT_EOS: + res = gst_pad_push_event (amrwbparse->srcpad, event); + break; + case GST_EVENT_NEWSEGMENT: + { + /* eat for now, we send a newsegment at start with infinite + * duration. */ + gst_event_unref (event); + res = TRUE; + break; + } + default: + res = gst_pad_push_event (amrwbparse->srcpad, event); + break; + } + gst_object_unref (amrwbparse); + + return res; +} /* streaming mode */ static GstFlowReturn @@ -247,9 +443,17 @@ gst_amrwbparse_chain (GstPad * pad, GstBuffer * buffer) gint mode; const guint8 *data; GstBuffer *out; + GstClockTime timestamp; amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (pad)); + timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + GST_DEBUG_OBJECT (amrwbparse, "Lock on timestamp %" GST_TIME_FORMAT, + GST_TIME_ARGS (timestamp)); + amrwbparse->ts = timestamp; + } + gst_adapter_push (amrwbparse->adapter, buffer); /* init */ @@ -257,14 +461,14 @@ gst_amrwbparse_chain (GstPad * pad, GstBuffer * buffer) GstEvent *segev; GstCaps *caps; - if (gst_adapter_available (amrwbparse->adapter) < 9) + if (gst_adapter_available (amrwbparse->adapter) < AMRWB_HEADER_SIZE) goto done; - data = gst_adapter_peek (amrwbparse->adapter, 9); - if (memcmp (data, "#!AMR-WB\n", 9) != 0) + data = gst_adapter_peek (amrwbparse->adapter, AMRWB_HEADER_SIZE); + if (memcmp (data, AMRWB_HEADER_STR, AMRWB_HEADER_SIZE) != 0) goto done; - gst_adapter_flush (amrwbparse->adapter, 9); + gst_adapter_flush (amrwbparse->adapter, AMRWB_HEADER_SIZE); amrwbparse->need_header = FALSE; @@ -323,39 +527,39 @@ static gboolean gst_amrwbparse_pull_header (GstAmrwbParse * amrwbparse) { GstBuffer *buffer; - gboolean ret = TRUE; + GstFlowReturn ret; guint8 *data; gint size; - const guint8 magic_number_size = 9; /* sizeof("#!AMR-WB\n")-1 */ - if (GST_FLOW_OK != gst_pad_pull_range (amrwbparse->sinkpad, - amrwbparse->offset, magic_number_size, &buffer)) { - ret = FALSE; - goto done; - } + ret = gst_pad_pull_range (amrwbparse->sinkpad, G_GUINT64_CONSTANT (0), + AMRWB_HEADER_SIZE, &buffer); + if (ret != GST_FLOW_OK) + return FALSE; data = GST_BUFFER_DATA (buffer); size = GST_BUFFER_SIZE (buffer); - if (size < magic_number_size) { - /* not enough */ - ret = FALSE; - goto done; - } - - if (memcmp (data, "#!AMR-WB\n", magic_number_size)) { - /* no header */ - ret = FALSE; - goto done; - } + if (size < AMRWB_HEADER_SIZE) + goto not_enough; - amrwbparse->offset += magic_number_size; - -done: + if (memcmp (data, AMRWB_HEADER_STR, AMRWB_HEADER_SIZE)) + goto no_header; gst_buffer_unref (buffer); - return ret; + amrwbparse->offset = AMRWB_HEADER_SIZE; + return TRUE; + +not_enough: + { + gst_buffer_unref (buffer); + return FALSE; + } +no_header: + { + gst_buffer_unref (buffer); + return FALSE; + } } /* random access mode, could just read a fixed size buffer and push it to @@ -374,6 +578,7 @@ gst_amrwbparse_loop (GstPad * pad) /* init */ if (G_UNLIKELY (amrwbparse->need_header)) { + GstCaps *caps; if (!gst_amrwbparse_pull_header (amrwbparse)) { GST_ELEMENT_ERROR (amrwbparse, STREAM, WRONG_TYPE, (NULL), (NULL)); @@ -381,6 +586,11 @@ gst_amrwbparse_loop (GstPad * pad) goto need_pause; } + caps = gst_caps_new_simple ("audio/AMR-WB", + "rate", G_TYPE_INT, 16000, "channels", G_TYPE_INT, 1, NULL); + gst_pad_set_caps (amrwbparse->srcpad, caps); + gst_caps_unref (caps); + GST_DEBUG_OBJECT (amrwbparse, "Sending newsegment event"); gst_pad_push_event (amrwbparse->srcpad, gst_event_new_new_segment_full (FALSE, 1.0, 1.0, @@ -389,8 +599,8 @@ gst_amrwbparse_loop (GstPad * pad) amrwbparse->need_header = FALSE; } - ret = gst_pad_pull_range (amrwbparse->sinkpad, - amrwbparse->offset, 1, &buffer); + ret = + gst_pad_pull_range (amrwbparse->sinkpad, amrwbparse->offset, 1, &buffer); if (ret == GST_FLOW_UNEXPECTED) goto eos; @@ -463,36 +673,60 @@ eos: static gboolean gst_amrwbparse_sink_activate (GstPad * sinkpad) { + gboolean result = FALSE; GstAmrwbParse *amrwbparse; - amrwbparse = GST_AMRWBPARSE (GST_PAD_PARENT (sinkpad)); + amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (sinkpad)); + if (gst_pad_check_pull_range (sinkpad)) { - return gst_pad_activate_pull (sinkpad, TRUE); + GST_DEBUG ("Trying to activate in pull mode"); + amrwbparse->seekable = TRUE; + amrwbparse->ts = 0; + result = gst_pad_activate_pull (sinkpad, TRUE); } else { + GST_DEBUG ("Try to activate in push mode"); amrwbparse->seekable = FALSE; - return gst_pad_activate_push (sinkpad, TRUE); + result = gst_pad_activate_push (sinkpad, TRUE); } + + gst_object_unref (amrwbparse); + return result; } + +static gboolean +gst_amrwbparse_sink_activate_push (GstPad * sinkpad, gboolean active) +{ + GstAmrwbParse *amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (sinkpad)); + + if (active) { + amrwbparse->seek_handler = gst_amrwbparse_handle_push_seek; + } else { + amrwbparse->seek_handler = NULL; + } + + gst_object_unref (amrwbparse); + return TRUE; +} + static gboolean gst_amrwbparse_sink_activate_pull (GstPad * sinkpad, gboolean active) { gboolean result; GstAmrwbParse *amrwbparse; - amrwbparse = GST_AMRWBPARSE (GST_PAD_PARENT (sinkpad)); + amrwbparse = GST_AMRWBPARSE (gst_pad_get_parent (sinkpad)); if (active) { - amrwbparse->need_header = TRUE; - amrwbparse->seekable = TRUE; - amrwbparse->ts = 0; - /* if we have a scheduler we can start the task */ + amrwbparse->seek_handler = gst_amrwbparse_handle_pull_seek; result = gst_pad_start_task (sinkpad, (GstTaskFunction) gst_amrwbparse_loop, sinkpad); } else { + amrwbparse->seek_handler = NULL; result = gst_pad_stop_task (sinkpad); } + gst_object_unref (amrwbparse); return result; } @@ -509,6 +743,10 @@ gst_amrwbparse_state_change (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: + amrwbparse->need_header = TRUE; + amrwbparse->ts = -1; + amrwbparse->block = 0; + gst_segment_init (&amrwbparse->segment, GST_FORMAT_TIME); break; default: break; -- cgit v1.2.1