From 19557fc2e6b8eb5b5160540e575d51cc681d2a3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 18 Jan 2008 16:56:19 +0000 Subject: gst/rawparse/gstrawparse.*: Implement pull mode. Original commit message from CVS: * gst/rawparse/gstrawparse.c: (gst_raw_parse_class_init), (gst_raw_parse_init), (gst_raw_parse_reset), (gst_raw_parse_set_src_caps), (gst_raw_parse_push_buffer), (gst_raw_parse_chain), (gst_raw_parse_loop), (gst_raw_parse_sink_activate), (gst_raw_parse_sink_activatepull), (gst_raw_parse_change_state), (gst_raw_parse_sink_event), (gst_raw_parse_handle_seek_push), (gst_raw_parse_handle_seek_pull), (gst_raw_parse_src_event), (gst_raw_parse_src_query): * gst/rawparse/gstrawparse.h: Implement pull mode. --- gst/rawparse/gstrawparse.c | 469 +++++++++++++++++++++++++++++++++++++-------- gst/rawparse/gstrawparse.h | 6 +- 2 files changed, 397 insertions(+), 78 deletions(-) (limited to 'gst/rawparse') diff --git a/gst/rawparse/gstrawparse.c b/gst/rawparse/gstrawparse.c index fa584d1a..840d056a 100644 --- a/gst/rawparse/gstrawparse.c +++ b/gst/rawparse/gstrawparse.c @@ -35,6 +35,12 @@ static void gst_raw_parse_dispose (GObject * object); +static gboolean gst_raw_parse_sink_activate (GstPad * sinkpad); +static gboolean gst_raw_parse_sink_activatepull (GstPad * sinkpad, + gboolean active); +static void gst_raw_parse_loop (GstElement * element); +static GstStateChangeReturn gst_raw_parse_change_state (GstElement * element, + GstStateChange transition); static GstFlowReturn gst_raw_parse_chain (GstPad * pad, GstBuffer * buffer); static gboolean gst_raw_parse_sink_event (GstPad * pad, GstEvent * event); static gboolean gst_raw_parse_src_event (GstPad * pad, GstEvent * event); @@ -44,6 +50,8 @@ static gboolean gst_raw_parse_convert (GstRawParse * rp, GstFormat src_format, gint64 src_value, GstFormat dest_format, gint64 * dest_value); +static void gst_raw_parse_reset (GstRawParse * rp); + static GstStaticPadTemplate gst_raw_parse_sink_pad_template = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, @@ -71,8 +79,12 @@ static void gst_raw_parse_class_init (GstRawParseClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); gobject_class->dispose = gst_raw_parse_dispose; + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_raw_parse_change_state); } static void @@ -86,8 +98,14 @@ gst_raw_parse_init (GstRawParse * rp, GstRawParseClass * g_class) "sink"); gst_element_add_pad (GST_ELEMENT (rp), rp->sinkpad); - gst_pad_set_chain_function (rp->sinkpad, gst_raw_parse_chain); - gst_pad_set_event_function (rp->sinkpad, gst_raw_parse_sink_event); + gst_pad_set_chain_function (rp->sinkpad, + GST_DEBUG_FUNCPTR (gst_raw_parse_chain)); + gst_pad_set_event_function (rp->sinkpad, + GST_DEBUG_FUNCPTR (gst_raw_parse_sink_event)); + gst_pad_set_activate_function (rp->sinkpad, + GST_DEBUG_FUNCPTR (gst_raw_parse_sink_activate)); + gst_pad_set_activatepull_function (rp->sinkpad, + GST_DEBUG_FUNCPTR (gst_raw_parse_sink_activatepull)); src_pad_template = gst_element_class_get_pad_template (element_class, "src"); @@ -100,16 +118,21 @@ gst_raw_parse_init (GstRawParse * rp, GstRawParseClass * g_class) gst_element_add_pad (GST_ELEMENT (rp), rp->srcpad); - gst_pad_set_event_function (rp->srcpad, gst_raw_parse_src_event); + gst_pad_set_event_function (rp->srcpad, + GST_DEBUG_FUNCPTR (gst_raw_parse_src_event)); - gst_pad_set_query_type_function (rp->srcpad, gst_raw_parse_src_query_type); - gst_pad_set_query_function (rp->srcpad, gst_raw_parse_src_query); + gst_pad_set_query_type_function (rp->srcpad, + GST_DEBUG_FUNCPTR (gst_raw_parse_src_query_type)); + gst_pad_set_query_function (rp->srcpad, + GST_DEBUG_FUNCPTR (gst_raw_parse_src_query)); rp->adapter = gst_adapter_new (); rp->fps_n = 1; rp->fps_d = 0; rp->framesize = 1; + + gst_raw_parse_reset (rp); } static void @@ -153,19 +176,81 @@ static void gst_raw_parse_reset (GstRawParse * rp) { rp->n_frames = 0; + rp->offset = 0; rp->discont = TRUE; + rp->upstream_length = 0; + gst_segment_init (&rp->segment, GST_FORMAT_TIME); + rp->need_newsegment = TRUE; gst_adapter_clear (rp->adapter); } +static gboolean +gst_raw_parse_set_src_caps (GstRawParse * rp) +{ + GstRawParseClass *rp_class = GST_RAW_PARSE_GET_CLASS (rp); + GstCaps *caps; + + if (rp->negotiated) + return TRUE; + + if (rp_class->get_caps) { + caps = rp_class->get_caps (rp); + } else { + GST_WARNING + ("Subclass doesn't implement get_caps() method, using ANY caps"); + caps = gst_caps_new_any (); + } + + rp->negotiated = gst_pad_set_caps (rp->srcpad, caps); + + return rp->negotiated; +} + +static GstFlowReturn +gst_raw_parse_push_buffer (GstRawParse * rp, GstBuffer * buffer) +{ + GstFlowReturn ret; + gint nframes; + + nframes = GST_BUFFER_SIZE (buffer) / rp->framesize; + + if (rp->fps_n) { + GST_BUFFER_TIMESTAMP (buffer) = rp->segment.start + + gst_util_uint64_scale (rp->n_frames, GST_SECOND * rp->fps_d, rp->fps_n); + GST_BUFFER_DURATION (buffer) = + gst_util_uint64_scale (nframes * GST_SECOND, rp->fps_d, rp->fps_n); + } else { + GST_BUFFER_TIMESTAMP (buffer) = rp->segment.start; + GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE; + } + gst_buffer_set_caps (buffer, GST_PAD_CAPS (rp->srcpad)); + if (rp->discont) { + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + rp->discont = FALSE; + } + + rp->offset += GST_BUFFER_SIZE (buffer); + rp->n_frames += nframes; + + rp->segment.last_stop = GST_BUFFER_TIMESTAMP (buffer); + + GST_LOG_OBJECT (rp, "Pushing buffer with time %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); + + ret = gst_pad_push (rp->srcpad, buffer); + + return ret; +} + static GstFlowReturn gst_raw_parse_chain (GstPad * pad, GstBuffer * buffer) { GstRawParse *rp = GST_RAW_PARSE (gst_pad_get_parent (pad)); GstFlowReturn ret = GST_FLOW_OK; GstRawParseClass *rp_class = GST_RAW_PARSE_GET_CLASS (rp); - guint buffersize, nframes; + guint buffersize; if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) { GST_DEBUG_OBJECT (rp, "received DISCONT buffer"); @@ -173,60 +258,188 @@ gst_raw_parse_chain (GstPad * pad, GstBuffer * buffer) rp->discont = TRUE; } - if (!rp->negotiated) { - GstCaps *caps; - - if (rp_class->get_caps) { - caps = rp_class->get_caps (rp); - } else { - GST_WARNING - ("Subclass doesn't implement get_caps() method, using ANY caps"); - caps = gst_caps_new_any (); - } - - rp->negotiated = gst_pad_set_caps (rp->srcpad, caps); - } - - g_return_val_if_fail (rp->negotiated, GST_FLOW_ERROR); + g_return_val_if_fail (gst_raw_parse_set_src_caps (rp), GST_FLOW_ERROR); gst_adapter_push (rp->adapter, buffer); if (rp_class->multiple_frames_per_buffer) { buffersize = gst_adapter_available (rp->adapter); buffersize -= buffersize % rp->framesize; - nframes = buffersize / rp->framesize; } else { buffersize = rp->framesize; - nframes = 1; } while (gst_adapter_available (rp->adapter) >= buffersize) { buffer = gst_adapter_take_buffer (rp->adapter, buffersize); - if (rp->fps_n) { - GST_BUFFER_TIMESTAMP (buffer) = rp->segment.start + - gst_util_uint64_scale (rp->n_frames, GST_SECOND * rp->fps_d, - rp->fps_n); - GST_BUFFER_DURATION (buffer) = - gst_util_uint64_scale (nframes * GST_SECOND, rp->fps_d, rp->fps_n); - } else { - GST_BUFFER_TIMESTAMP (buffer) = rp->segment.start; - GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE; + ret = gst_raw_parse_push_buffer (rp, buffer); + + if (ret != GST_FLOW_OK) + break; + } + + gst_object_unref (rp); + return ret; +} + +static void +gst_raw_parse_loop (GstElement * element) +{ + GstRawParse *rp = GST_RAW_PARSE (element); + GstRawParseClass *rp_class = GST_RAW_PARSE_GET_CLASS (rp); + GstFlowReturn ret; + GstBuffer *buffer; + gint size; + + if (!gst_raw_parse_set_src_caps (rp)) { + ret = GST_FLOW_ERROR; + goto pause; + } + + if (rp_class->multiple_frames_per_buffer) + size = 1024 * rp->framesize; + else + size = rp->framesize; + + if (rp->offset + size > rp->upstream_length) { + GstFormat fmt = GST_FORMAT_BYTES; + + if (!gst_pad_query_peer_duration (rp->sinkpad, &fmt, &rp->upstream_length) + || rp->upstream_length < rp->offset + rp->framesize) { + ret = GST_FLOW_UNEXPECTED; + goto pause; } - gst_buffer_set_caps (buffer, GST_PAD_CAPS (rp->srcpad)); - if (rp->discont) { - GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); - rp->discont = FALSE; + + if (rp->offset + size > rp->upstream_length) { + size = rp->upstream_length - rp->offset; + size -= size % rp->framesize; } + } - rp->n_frames += nframes; + ret = gst_pad_pull_range (rp->sinkpad, rp->offset, size, &buffer); - ret = gst_pad_push (rp->srcpad, buffer); - if (ret != GST_FLOW_OK) + if (GST_FLOW_IS_FATAL (ret)) { + GST_DEBUG_OBJECT (rp, "pull_range (%" G_GINT64_FORMAT ", %u) " + "failed, flow: %s", rp->offset, size, gst_flow_get_name (ret)); + buffer = NULL; + goto pause; + } + + if (GST_BUFFER_SIZE (buffer) < size) { + GST_DEBUG_OBJECT (rp, "Short read at offset %" G_GINT64_FORMAT + ", got only %u of %u bytes", rp->offset, GST_BUFFER_SIZE (buffer), + size); + gst_buffer_unref (buffer); + buffer = NULL; + ret = GST_FLOW_UNEXPECTED; + goto pause; + } + + if (rp->need_newsegment) { + GST_DEBUG_OBJECT (rp, "sending newsegment from %" GST_TIME_FORMAT + " to %" GST_TIME_FORMAT, GST_TIME_ARGS (rp->segment.start), + GST_TIME_ARGS (rp->segment.stop)); + + if (gst_pad_push_event (rp->srcpad, gst_event_new_new_segment (FALSE, + rp->segment.rate, GST_FORMAT_TIME, rp->segment.start, + rp->segment.stop, rp->segment.last_stop))); + rp->need_newsegment = FALSE; + } + + ret = gst_raw_parse_push_buffer (rp, buffer); + if (GST_FLOW_IS_FATAL (ret)) + goto pause; + + return; + +pause: + { + const gchar *reason = gst_flow_get_name (ret); + + GST_LOG_OBJECT (rp, "pausing task, reason %s", reason); + gst_pad_pause_task (rp->sinkpad); + + if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) { + if (ret == GST_FLOW_UNEXPECTED && rp->srcpad) { + if (rp->segment.flags & GST_SEEK_FLAG_SEGMENT) { + GstClockTime stop; + + GST_LOG_OBJECT (rp, "Sending segment done"); + + if ((stop = rp->segment.stop) == -1) + stop = rp->segment.duration; + + gst_element_post_message (GST_ELEMENT_CAST (rp), + gst_message_new_segment_done (GST_OBJECT_CAST (rp), + rp->segment.format, stop)); + } else { + GST_LOG_OBJECT (rp, "Sending EOS, at end of stream"); + gst_pad_push_event (rp->srcpad, gst_event_new_eos ()); + } + } else { + GST_ELEMENT_ERROR (rp, STREAM, FAILED, + ("Internal data stream error."), + ("stream stopped, reason %s", reason)); + if (rp->srcpad) + gst_pad_push_event (rp->srcpad, gst_event_new_eos ()); + } + } + return; + } +} + +static gboolean +gst_raw_parse_sink_activate (GstPad * sinkpad) +{ + if (gst_pad_check_pull_range (sinkpad)) { + GST_RAW_PARSE (GST_PAD_PARENT (sinkpad))->mode = GST_ACTIVATE_PULL; + return gst_pad_activate_pull (sinkpad, TRUE); + } else { + GST_RAW_PARSE (GST_PAD_PARENT (sinkpad))->mode = GST_ACTIVATE_PUSH; + return gst_pad_activate_push (sinkpad, TRUE); + } +} + +static gboolean +gst_raw_parse_sink_activatepull (GstPad * sinkpad, gboolean active) +{ + gboolean result; + + if (active) { + result = gst_pad_start_task (sinkpad, + (GstTaskFunction) gst_raw_parse_loop, GST_PAD_PARENT (sinkpad)); + } else { + result = gst_pad_stop_task (sinkpad); + } + + return result; +} + +static GstStateChangeReturn +gst_raw_parse_change_state (GstElement * element, GstStateChange transition) +{ + GstRawParse *rp = GST_RAW_PARSE (element); + GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_segment_init (&rp->segment, GST_FORMAT_TIME); + rp->segment.last_stop = 0; + default: + break; + } + + if (GST_ELEMENT_CLASS (parent_class)->change_state) + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_raw_parse_reset (rp); + break; + default: break; } - gst_object_unref (rp); return ret; } @@ -338,7 +551,9 @@ gst_raw_parse_sink_event (GstPad * pad, GstEvent * event) gboolean ret; switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: case GST_EVENT_FLUSH_STOP: + /* Only happens in push mode */ gst_raw_parse_reset (rp); ret = gst_pad_push_event (rp->srcpad, event); break; @@ -349,6 +564,8 @@ gst_raw_parse_sink_event (GstPad * pad, GstEvent * event) gboolean update; GstFormat format; + /* Only happens in push mode */ + gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format, &start, &stop, &time); @@ -380,6 +597,7 @@ gst_raw_parse_sink_event (GstPad * pad, GstEvent * event) if (ret) { rp->n_frames = 0; + rp->offset = 0; rp->discont = TRUE; gst_adapter_clear (rp->adapter); } @@ -395,58 +613,156 @@ gst_raw_parse_sink_event (GstPad * pad, GstEvent * event) return ret; } + static gboolean -gst_raw_parse_src_event (GstPad * pad, GstEvent * event) +gst_raw_parse_handle_seek_push (GstRawParse * rp, GstEvent * event) { - GstRawParse *rp = GST_RAW_PARSE (gst_pad_get_parent (pad)); - gboolean ret; + GstFormat format; + gdouble rate; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + gboolean ret = FALSE; + + gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, + &stop_type, &stop); + + /* First try if upstream handles the seek */ + ret = gst_pad_push_event (rp->sinkpad, event); + if (ret) + return ret; - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_SEEK:{ - GstFormat format; - gdouble rate; - GstSeekFlags flags; - GstSeekType start_type, stop_type; - gint64 start, stop; - gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, - &stop_type, &stop); + gst_event_unref (event); + + /* Otherwise convert to bytes and push upstream */ + if (format == GST_FORMAT_TIME || format == GST_FORMAT_DEFAULT) { + ret = gst_raw_parse_convert (rp, format, start, GST_FORMAT_BYTES, &start); + ret &= gst_raw_parse_convert (rp, format, stop, GST_FORMAT_BYTES, &stop); + + if (ret) { + /* Seek on a frame boundary */ + start -= start % rp->framesize; + if (stop != -1) + stop += rp->framesize - stop % rp->framesize; + + event = + gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, start_type, + start, stop_type, stop); - /* First try if upstream handles the seek */ ret = gst_pad_push_event (rp->sinkpad, event); - if (ret) - goto done; + } + } + return ret; +} - /* Otherwise convert to bytes and push upstream */ - if (format == GST_FORMAT_TIME || format == GST_FORMAT_DEFAULT) { - gst_event_unref (event); +static gboolean +gst_raw_parse_handle_seek_pull (GstRawParse * rp, GstEvent * event) +{ + GstFormat format; + gdouble rate; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + gint64 start_byte, stop_byte; + gint64 last_stop; + gboolean ret = FALSE; + gboolean flush; + GstFormat fmt = GST_FORMAT_BYTES; + GstSegment segment; - ret = - gst_raw_parse_convert (rp, format, start, GST_FORMAT_BYTES, &start); - ret &= - gst_raw_parse_convert (rp, format, stop, GST_FORMAT_BYTES, &stop); + gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, + &stop_type, &stop); - if (ret) { - /* Seek on a frame boundary */ - start -= start % rp->framesize; - if (stop != -1) - stop += rp->framesize - stop % rp->framesize; + gst_event_unref (event); - event = - gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, start_type, - start, stop_type, stop); + if (format != GST_FORMAT_TIME && format != GST_FORMAT_DEFAULT) { + GST_DEBUG ("seeking is only supported in TIME or DEFAULT format"); + return FALSE; + } - ret = gst_pad_push_event (rp->sinkpad, event); - } - } - break; + GST_OBJECT_LOCK (rp); + + + if (stop == -1 && !gst_pad_query_peer_duration (rp->sinkpad, &fmt, &stop)) + stop = -1; + + ret = + gst_raw_parse_convert (rp, format, start, GST_FORMAT_BYTES, &start_byte); + ret &= gst_raw_parse_convert (rp, format, stop, GST_FORMAT_BYTES, &stop_byte); + + if (ret) { + /* Seek on a frame boundary */ + start_byte -= start_byte % rp->framesize; + if (stop_byte != -1) + stop_byte += rp->framesize - stop_byte % rp->framesize; + + flush = ((flags & GST_SEEK_FLAG_FLUSH) != 0); + + segment = rp->segment; + + gst_segment_set_seek (&segment, rate, GST_FORMAT_TIME, flags, start_type, + start, stop_type, stop, NULL); + + gst_pad_push_event (rp->sinkpad, gst_event_new_flush_start ()); + if (flush) + gst_pad_push_event (rp->srcpad, gst_event_new_flush_start ()); + else + gst_pad_pause_task (rp->sinkpad); + + GST_PAD_STREAM_LOCK (rp->sinkpad); + + last_stop = rp->segment.last_stop; + + gst_pad_push_event (rp->sinkpad, gst_event_new_flush_stop ()); + if (flush) + gst_pad_push_event (rp->srcpad, gst_event_new_flush_stop ()); + + GST_DEBUG_OBJECT (rp, "Performing seek to %" GST_TIME_FORMAT ", byte %" + G_GINT64_FORMAT, GST_TIME_ARGS (segment.start), start_byte); + + rp->offset = start_byte; + rp->segment = segment; + rp->segment.last_stop = start; + rp->need_newsegment = TRUE; + rp->discont = (last_stop != start) ? TRUE : FALSE; + + if (rp->segment.flags & GST_SEEK_FLAG_SEGMENT) { + gst_element_post_message (GST_ELEMENT_CAST (rp), + gst_message_new_segment_start (GST_OBJECT_CAST (rp), + rp->segment.format, rp->segment.last_stop)); } + + GST_PAD_STREAM_UNLOCK (rp->sinkpad); + } else { + GST_DEBUG_OBJECT (rp, "Seek failed: couldn't convert to byte positions"); + } + + GST_OBJECT_UNLOCK (rp); + + gst_pad_start_task (rp->sinkpad, (GstTaskFunction) gst_raw_parse_loop, rp); + + return ret; +} + +static gboolean +gst_raw_parse_src_event (GstPad * pad, GstEvent * event) +{ + GstRawParse *rp = GST_RAW_PARSE (gst_pad_get_parent (pad)); + gboolean ret; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEEK: + if (rp->mode == GST_ACTIVATE_PUSH) + ret = gst_raw_parse_handle_seek_push (rp, event); + else + ret = gst_raw_parse_handle_seek_pull (rp, event); + break; default: ret = gst_pad_event_default (rp->srcpad, event); break; } -done: gst_object_unref (rp); return ret; @@ -483,8 +799,7 @@ gst_raw_parse_src_query (GstPad * pad, GstQuery * query) gst_query_parse_position (query, &format, NULL); - time = gst_util_uint64_scale (rp->n_frames, - GST_SECOND * rp->fps_d, rp->fps_n); + time = rp->segment.last_stop; ret = gst_raw_parse_convert (rp, GST_FORMAT_TIME, time, format, &value); gst_query_set_position (query, format, value); diff --git a/gst/rawparse/gstrawparse.h b/gst/rawparse/gstrawparse.h index dd486c34..ce63a1a1 100644 --- a/gst/rawparse/gstrawparse.h +++ b/gst/rawparse/gstrawparse.h @@ -52,6 +52,7 @@ struct _GstRawParse GstPad *sinkpad; GstPad *srcpad; + GstActivateMode mode; GstAdapter *adapter; int framesize; @@ -60,11 +61,14 @@ struct _GstRawParse gboolean discont; guint64 n_frames; + gint64 offset; GstSegment segment; + gint64 upstream_length; + gboolean negotiated; - gboolean have_new_segment; + gboolean need_newsegment; }; struct _GstRawParseClass -- cgit v1.2.1