diff options
Diffstat (limited to 'gst-libs')
-rw-r--r-- | gst-libs/gst/app/gstapp-marshal.list | 5 | ||||
-rw-r--r-- | gst-libs/gst/app/gstappsrc.c | 468 | ||||
-rw-r--r-- | gst-libs/gst/app/gstappsrc.h | 59 |
3 files changed, 417 insertions, 115 deletions
diff --git a/gst-libs/gst/app/gstapp-marshal.list b/gst-libs/gst/app/gstapp-marshal.list index 648f363e..a5d8bdc8 100644 --- a/gst-libs/gst/app/gstapp-marshal.list +++ b/gst-libs/gst/app/gstapp-marshal.list @@ -1 +1,4 @@ -VOID:UINT64 +VOID:UINT +BOOLEAN:UINT64 +ENUM:OBJECT +ENUM:VOID diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c index c1d01db7..1c6a7fad 100644 --- a/gst-libs/gst/app/gstappsrc.c +++ b/gst-libs/gst/app/gstappsrc.c @@ -23,7 +23,7 @@ #endif #include <gst/gst.h> -#include <gst/base/gstpushsrc.h> +#include <gst/base/gstbasesrc.h> #include <string.h> @@ -53,17 +53,19 @@ enum LAST_SIGNAL }; -#define DEFAULT_PROP_MAX_BUFFERS 0 #define DEFAULT_PROP_SIZE -1 -#define DEFAULT_PROP_SEEKABLE FALSE +#define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM +#define DEFAULT_PROP_MAX_BYTES 200000 +#define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES enum { PROP_0, PROP_CAPS, PROP_SIZE, - PROP_SEEKABLE, - PROP_MAX_BUFFERS, + PROP_STREAM_TYPE, + PROP_MAX_BYTES, + PROP_FORMAT, PROP_LAST }; @@ -74,6 +76,28 @@ GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); + +#define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ()) +static GType +stream_type_get_type (void) +{ + static GType stream_type_type = 0; + static const GEnumValue stream_type[] = { + {GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"}, + {GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"}, + {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"}, + {0, NULL, NULL}, + }; + + if (!stream_type_type) { + stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type); + } + return stream_type_type; +} + +static void gst_app_src_uri_handler_init (gpointer g_iface, + gpointer iface_data); + static void gst_app_src_dispose (GObject * object); static void gst_app_src_finalize (GObject * object); @@ -82,15 +106,32 @@ static void gst_app_src_set_property (GObject * object, guint prop_id, static void gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstFlowReturn gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf); -static gboolean gst_app_src_start (GstBaseSrc * psrc); -static gboolean gst_app_src_stop (GstBaseSrc * psrc); -static gboolean gst_app_src_unlock (GstBaseSrc * psrc); -static gboolean gst_app_src_unlock_stop (GstBaseSrc * psrc); +static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, + guint64 offset, guint size, GstBuffer ** buf); +static gboolean gst_app_src_start (GstBaseSrc * bsrc); +static gboolean gst_app_src_stop (GstBaseSrc * bsrc); +static gboolean gst_app_src_unlock (GstBaseSrc * bsrc); +static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc); +static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment); +static gboolean gst_app_src_is_seekable (GstBaseSrc * src); +static gboolean gst_app_src_check_get_range (GstBaseSrc * src); static guint gst_app_src_signals[LAST_SIGNAL] = { 0 }; -GST_BOILERPLATE (GstAppSrc, gst_app_src, GstPushSrc, GST_TYPE_PUSH_SRC); +static void +_do_init (GType filesrc_type) +{ + static const GInterfaceInfo urihandler_info = { + gst_app_src_uri_handler_init, + NULL, + NULL + }; + g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER, + &urihandler_info); +} + +GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC, + _do_init); static void gst_app_src_base_init (gpointer g_class) @@ -109,7 +150,6 @@ static void gst_app_src_class_init (GstAppSrcClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; - GstPushSrcClass *pushsrc_class = (GstPushSrcClass *) klass; GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass; gobject_class->dispose = gst_app_src_dispose; @@ -123,34 +163,46 @@ gst_app_src_class_init (GstAppSrcClass * klass) "The allowed caps for the src pad", GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_FORMAT, + g_param_spec_enum ("format", "Format", + "The format of the segment events and seek", GST_TYPE_FORMAT, + DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SIZE, g_param_spec_int64 ("size", "Size", "The size of the data stream (-1 if unknown)", -1, G_MAXINT64, DEFAULT_PROP_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SEEKABLE, - g_param_spec_boolean ("seekable", "Seekable", - "If the source is seekable", DEFAULT_PROP_SEEKABLE, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_STREAM_TYPE, + g_param_spec_enum ("stream-type", "Stream Type", + "the type of the stream", GST_TYPE_APP_STREAM_TYPE, + DEFAULT_PROP_STREAM_TYPE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS, - g_param_spec_uint ("max-buffers", "Max Buffers", - "The maximum number of buffers to queue internally (0 = unlimited)", - 0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS, + g_object_class_install_property (gobject_class, PROP_MAX_BYTES, + g_param_spec_uint64 ("max-bytes", "Max bytes", + "The maximum number of bytes to queue internally (0 = unlimited)", + 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc::need-data: * @appsrc: the appsrc element that emited the signal + * @length: the amount of bytes needed. + * + * Signal that the source needs more data. In the callback or from another + * thread you should call push-buffer or end-of-stream. * - * Signal that the source needs more data. In the callback you should call - * push-buffer or end-of-stream. + * @length is just a hint and when it is set to -1, any number of bytes can be + * pushed into @appsrc. + * + * You can call push-buffer multiple times until the enough-data signal is + * fired. */ gst_app_src_signals[SIGNAL_NEED_DATA] = g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, need_data), - NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); + NULL, NULL, gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); /** * GstAppSrc::enough-data: @@ -160,10 +212,11 @@ gst_app_src_class_init (GstAppSrcClass * klass) * application stops calling push-buffer until the need-data signal is * emited again to avoid excessive buffer queueing. */ - gst_app_src_signals[SIGNAL_NEED_DATA] = + gst_app_src_signals[SIGNAL_ENOUGH_DATA] = g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, enough_data), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); + /** * GstAppSrc::seek-data: * @appsrc: the appsrc element that emited the signal @@ -171,29 +224,53 @@ gst_app_src_class_init (GstAppSrcClass * klass) * * Seek to the given offset. The next push-buffer should produce buffers from * the new @offset. + * This callback is only called for seekable stream types. + * + * Returns: %TRUE if the seek succeeded. */ gst_app_src_signals[SIGNAL_SEEK_DATA] = g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, seek_data), - NULL, NULL, gst_app_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64); - + NULL, NULL, gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1, + G_TYPE_UINT64); + + /** + * GstAppSrc::push-buffer: + * @appsrc: the appsrc + * @buffer: a buffer to push + * + * Adds a buffer to the queue of buffers that the appsrc element will + * push to its source pad. This function will take ownership of @buffer. + */ gst_app_src_signals[SIGNAL_PUSH_BUFFER] = g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, - push_buffer), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, - G_TYPE_NONE, 1, GST_TYPE_BUFFER); - + push_buffer), NULL, NULL, gst_app_marshal_ENUM__OBJECT, + GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER); + + /** + * GstAppSrc::end-of-stream: + * @appsrc: the appsrc + * + * Notify @appsrc that no more buffer are available. + */ gst_app_src_signals[SIGNAL_END_OF_STREAM] = g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, - end_of_stream), NULL, NULL, g_cclosure_marshal_VOID__VOID, - G_TYPE_NONE, 0, G_TYPE_NONE); + end_of_stream), NULL, NULL, gst_app_marshal_ENUM__VOID, + GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE); - pushsrc_class->create = gst_app_src_create; + basesrc_class->create = gst_app_src_create; basesrc_class->start = gst_app_src_start; basesrc_class->stop = gst_app_src_stop; basesrc_class->unlock = gst_app_src_unlock; basesrc_class->unlock_stop = gst_app_src_unlock_stop; + basesrc_class->do_seek = gst_app_src_do_seek; + basesrc_class->is_seekable = gst_app_src_is_seekable; + basesrc_class->check_get_range = gst_app_src_check_get_range; + + klass->push_buffer = gst_app_src_push_buffer; + klass->end_of_stream = gst_app_src_end_of_stream; } static void @@ -204,8 +281,18 @@ gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass) appsrc->queue = g_queue_new (); appsrc->size = DEFAULT_PROP_SIZE; - appsrc->seekable = DEFAULT_PROP_SEEKABLE; - appsrc->max_buffers = DEFAULT_PROP_MAX_BUFFERS; + appsrc->stream_type = DEFAULT_PROP_STREAM_TYPE; + appsrc->max_bytes = DEFAULT_PROP_MAX_BYTES; + appsrc->format = DEFAULT_PROP_FORMAT; +} + +static void +gst_app_src_flush_queued (GstAppSrc * src) +{ + GstBuffer *buf; + + while ((buf = g_queue_pop_head (src->queue))) + gst_buffer_unref (buf); } static void @@ -217,6 +304,7 @@ gst_app_src_dispose (GObject * obj) gst_caps_unref (appsrc->caps); appsrc->caps = NULL; } + gst_app_src_flush_queued (appsrc); G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -246,11 +334,14 @@ gst_app_src_set_property (GObject * object, guint prop_id, case PROP_SIZE: gst_app_src_set_size (appsrc, g_value_get_int64 (value)); break; - case PROP_SEEKABLE: - gst_app_src_set_seekable (appsrc, g_value_get_boolean (value)); + case PROP_STREAM_TYPE: + gst_app_src_set_stream_type (appsrc, g_value_get_enum (value)); + break; + case PROP_MAX_BYTES: + gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value)); break; - case PROP_MAX_BUFFERS: - gst_app_src_set_max_buffers (appsrc, g_value_get_uint (value)); + case PROP_FORMAT: + appsrc->format = g_value_get_enum (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -269,6 +360,7 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, { GstCaps *caps; + /* we're missing a _take_caps() function to transfer ownership */ caps = gst_app_src_get_caps (appsrc); gst_value_set_caps (value, caps); if (caps) @@ -278,11 +370,14 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, case PROP_SIZE: g_value_set_int64 (value, gst_app_src_get_size (appsrc)); break; - case PROP_SEEKABLE: - g_value_set_boolean (value, gst_app_src_get_seekable (appsrc)); + case PROP_STREAM_TYPE: + g_value_set_enum (value, gst_app_src_get_stream_type (appsrc)); break; - case PROP_MAX_BUFFERS: - g_value_set_uint (value, gst_app_src_get_max_buffers (appsrc)); + case PROP_MAX_BYTES: + g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc)); + break; + case PROP_FORMAT: + g_value_set_enum (value, appsrc->format); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -291,9 +386,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, } static gboolean -gst_app_src_unlock (GstBaseSrc * psrc) +gst_app_src_unlock (GstBaseSrc * bsrc) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "unlock start"); @@ -305,9 +400,9 @@ gst_app_src_unlock (GstBaseSrc * psrc) } static gboolean -gst_app_src_unlock_stop (GstBaseSrc * psrc) +gst_app_src_unlock_stop (GstBaseSrc * bsrc) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "unlock stop"); @@ -319,37 +414,105 @@ gst_app_src_unlock_stop (GstBaseSrc * psrc) } static gboolean -gst_app_src_start (GstBaseSrc * psrc) +gst_app_src_start (GstBaseSrc * bsrc) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "starting"); appsrc->started = TRUE; g_mutex_unlock (appsrc->mutex); + gst_base_src_set_format (bsrc, appsrc->format); + return TRUE; } static gboolean -gst_app_src_stop (GstBaseSrc * psrc) +gst_app_src_stop (GstBaseSrc * bsrc) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "stopping"); appsrc->is_eos = FALSE; appsrc->flushing = TRUE; appsrc->started = FALSE; + gst_app_src_flush_queued (appsrc); g_mutex_unlock (appsrc->mutex); return TRUE; } +static gboolean +gst_app_src_is_seekable (GstBaseSrc * src) +{ + GstAppSrc *appsrc = GST_APP_SRC (src); + gboolean res = FALSE; + + switch (appsrc->stream_type) { + case GST_APP_STREAM_TYPE_STREAM: + break; + case GST_APP_STREAM_TYPE_SEEKABLE: + case GST_APP_STREAM_TYPE_RANDOM_ACCESS: + res = TRUE; + break; + } + return res; +} + +static gboolean +gst_app_src_check_get_range (GstBaseSrc * src) +{ + GstAppSrc *appsrc = GST_APP_SRC (src); + gboolean res = FALSE; + + switch (appsrc->stream_type) { + case GST_APP_STREAM_TYPE_STREAM: + case GST_APP_STREAM_TYPE_SEEKABLE: + break; + case GST_APP_STREAM_TYPE_RANDOM_ACCESS: + res = TRUE; + break; + } + return res; +} + +/* will be called in push mode */ +static gboolean +gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment) +{ + GstAppSrc *appsrc = GST_APP_SRC (src); + gint64 desired_position; + gboolean res = FALSE; + + desired_position = segment->last_stop; + + GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s", + desired_position, gst_format_get_name (segment->format)); + + /* no need to try to seek in streaming mode */ + if (appsrc->stream_type == GST_APP_STREAM_TYPE_STREAM) + return TRUE; + + g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, + desired_position, &res); + + if (res) { + GST_DEBUG_OBJECT (appsrc, "flushing queue"); + gst_app_src_flush_queued (appsrc); + } else { + GST_WARNING_OBJECT (appsrc, "seek failed"); + } + + return res; +} + static GstFlowReturn -gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf) +gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, + GstBuffer ** buf) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); GstFlowReturn ret; g_mutex_lock (appsrc->mutex); @@ -360,13 +523,31 @@ gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf) /* return data as long as we have some */ if (!g_queue_is_empty (appsrc->queue)) { + again: *buf = g_queue_pop_head (appsrc->queue); + appsrc->queued_bytes -= GST_BUFFER_SIZE (*buf); + gst_buffer_set_caps (*buf, appsrc->caps); GST_DEBUG_OBJECT (appsrc, "we have buffer %p", *buf); ret = GST_FLOW_OK; break; + } else { + /* we have no data, we need some. We fire the signal with the size hint. */ + g_mutex_unlock (appsrc->mutex); + + g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size, + NULL); + + g_mutex_lock (appsrc->mutex); + /* we can be flushing now because we released the lock */ + if (appsrc->flushing) + goto flushing; + + /* if we have a buffer now, retry to return it */ + if (!g_queue_is_empty (appsrc->queue)) + goto again; } /* check EOS */ @@ -395,7 +576,6 @@ eos: } } - /* external API */ /** @@ -499,68 +679,70 @@ gst_app_src_get_size (GstAppSrc * appsrc) } /** - * gst_app_src_set_seekable: + * gst_app_src_set_stream_type: * @appsrc: a #GstAppSrc - * @seekable: the new state + * @type: the new state + * + * Set the stream type on @appsrc. For seekable streams, the "seek" signal must + * be connected to. * - * Set whether the data is seekable. When this flag is set to %TRUE, the - * "seek" signal must be connected to. + * A stream_type stream */ void -gst_app_src_set_seekable (GstAppSrc * appsrc, gboolean seekable) +gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type) { g_return_if_fail (appsrc != NULL); g_return_if_fail (GST_IS_APP_SRC (appsrc)); GST_OBJECT_LOCK (appsrc); - GST_DEBUG_OBJECT (appsrc, "setting seekable of %d", seekable); - appsrc->seekable = seekable; + GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type); + appsrc->stream_type = type; GST_OBJECT_UNLOCK (appsrc); } /** - * gst_app_src_get_seekable: + * gst_app_src_get_stream_type: * @appsrc: a #GstAppSrc * - * Get whether the stream is seekable. Control the seeking behaviour of the - * stream with gst_app_src_set_seekable(). + * Get the stream type. Control the stream type of @appsrc + * with gst_app_src_set_stream_type(). * - * Returns: %TRUE if the stream is seekable. + * Returns: the stream type. */ -gboolean -gst_app_src_get_seekable (GstAppSrc * appsrc) +GstAppStreamType +gst_app_src_get_stream_type (GstAppSrc * appsrc) { - gboolean seekable; + gboolean stream_type; g_return_val_if_fail (appsrc != NULL, FALSE); g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE); GST_OBJECT_LOCK (appsrc); - seekable = appsrc->seekable; - GST_DEBUG_OBJECT (appsrc, "getting seekable of %d", seekable); + stream_type = appsrc->stream_type; + GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type); GST_OBJECT_UNLOCK (appsrc); - return seekable; + return stream_type; } /** - * gst_app_src_set_max_buffers: + * gst_app_src_set_max_bytes: * @appsrc: a #GstAppSrc - * @max: the maximum number of buffers to queue + * @max: the maximum number of bytes to queue * - * Set the maximum amount of buffers that can be queued in @appsrc. - * After the maximum amount of buffers are queued, @appsrc will emit the + * Set the maximum amount of bytes that can be queued in @appsrc. + * After the maximum amount of bytes are queued, @appsrc will emit the * "enough-data" signal. */ void -gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max) +gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max) { g_return_if_fail (GST_IS_APP_SRC (appsrc)); g_mutex_lock (appsrc->mutex); - if (max != appsrc->max_buffers) { - GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %u", max); - appsrc->max_buffers = max; + if (max != appsrc->max_bytes) { + GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %u", max); + appsrc->max_bytes = max; /* signal the change */ g_cond_signal (appsrc->cond); } @@ -568,23 +750,23 @@ gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max) } /** - * gst_app_src_get_max_buffers: + * gst_app_src_get_max_bytes: * @appsrc: a #GstAppSrc * - * Get the maximum amount of buffers that can be queued in @appsrc. + * Get the maximum amount of bytes that can be queued in @appsrc. * - * Returns: The maximum amount of buffers that can be queued. + * Returns: The maximum amount of bytes that can be queued. */ -guint -gst_app_src_get_max_buffers (GstAppSrc * appsrc) +guint64 +gst_app_src_get_max_bytes (GstAppSrc * appsrc) { guint result; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0); g_mutex_lock (appsrc->mutex); - result = appsrc->max_buffers; - GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %u", result); + result = appsrc->max_bytes; + GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %u", result); g_mutex_unlock (appsrc->mutex); return result; @@ -592,42 +774,138 @@ gst_app_src_get_max_buffers (GstAppSrc * appsrc) /** * gst_app_src_push_buffer: - * @appsrc: - * @buffer: + * @appsrc: a #GstAppSrc + * @buffer: a #GstBuffer to push * * Adds a buffer to the queue of buffers that the appsrc element will * push to its source pad. This function takes ownership of the buffer. + * + * Returns: #GST_FLOW_OK when the buffer was successfuly queued. + * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING. + * #GST_FLOW_UNEXPECTED when EOS occured. */ -void +GstFlowReturn gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer) { - g_return_if_fail (appsrc); - g_return_if_fail (GST_IS_APP_SRC (appsrc)); - g_return_if_fail (GST_IS_BUFFER (buffer)); + g_return_val_if_fail (appsrc, GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); g_mutex_lock (appsrc->mutex); + /* can't accept buffers when we are flushing or EOS */ + if (appsrc->flushing) + goto flushing; + + if (appsrc->is_eos) + goto eos; + GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer); g_queue_push_tail (appsrc->queue, buffer); + + appsrc->queued_bytes += GST_BUFFER_SIZE (buffer); + if (appsrc->queued_bytes >= appsrc->max_bytes) { + GST_DEBUG_OBJECT (appsrc, "queue filled (%u >= %u), signal enough-data", + appsrc->queued_bytes, appsrc->max_bytes); + g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0, NULL); + } g_cond_signal (appsrc->cond); g_mutex_unlock (appsrc->mutex); + + return GST_FLOW_OK; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer); + gst_buffer_unref (buffer); + return GST_FLOW_WRONG_STATE; + } +eos: + { + GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer); + gst_buffer_unref (buffer); + return GST_FLOW_UNEXPECTED; + } } /** * gst_app_src_end_of_stream: - * @appsrc: + * @appsrc: a #GstAppSrc * * Indicates to the appsrc element that the last buffer queued in the * element is the last buffer of the stream. + * + * Returns: #GST_FLOW_OK when the EOS was successfuly queued. + * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING. */ -void +GstFlowReturn gst_app_src_end_of_stream (GstAppSrc * appsrc) { - g_return_if_fail (appsrc); - g_return_if_fail (GST_IS_APP_SRC (appsrc)); + g_return_val_if_fail (appsrc, GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR); g_mutex_lock (appsrc->mutex); + /* can't accept buffers when we are flushing. We can accept them when we are + * EOS although it will not do anything. */ + if (appsrc->flushing) + goto flushing; + GST_DEBUG_OBJECT (appsrc, "sending EOS"); appsrc->is_eos = TRUE; g_cond_signal (appsrc->cond); g_mutex_unlock (appsrc->mutex); + + return GST_FLOW_OK; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing"); + return GST_FLOW_WRONG_STATE; + } +} + +/*** GSTURIHANDLER INTERFACE *************************************************/ + +static GstURIType +gst_app_src_uri_get_type (void) +{ + return GST_URI_SRC; +} + +static gchar ** +gst_app_src_uri_get_protocols (void) +{ + static gchar *protocols[] = { "appsrc", NULL }; + + return protocols; +} +static const gchar * +gst_app_src_uri_get_uri (GstURIHandler * handler) +{ + return "appsrc"; +} + +static gboolean +gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri) +{ + gchar *protocol; + gboolean ret; + + protocol = gst_uri_get_protocol (uri); + ret = !strcmp (protocol, "appsrc"); + g_free (protocol); + + return ret; +} + +static void +gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data) +{ + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; + + iface->get_type = gst_app_src_uri_get_type; + iface->get_protocols = gst_app_src_uri_get_protocols; + iface->get_uri = gst_app_src_uri_get_uri; + iface->set_uri = gst_app_src_uri_set_uri; } diff --git a/gst-libs/gst/app/gstappsrc.h b/gst-libs/gst/app/gstappsrc.h index 4ca5cbc9..1ce73b4c 100644 --- a/gst-libs/gst/app/gstappsrc.h +++ b/gst-libs/gst/app/gstappsrc.h @@ -39,9 +39,27 @@ G_BEGIN_DECLS typedef struct _GstAppSrc GstAppSrc; typedef struct _GstAppSrcClass GstAppSrcClass; +/** + * GstAppStreamType: + * @GST_APP_STREAM_TYPE_STREAM: No seeking is supported in the stream, such as a + * live stream. + * @GST_APP_STREAM_TYPE_SEEKABLE: The stream is seekable but seeking might not + * be very fast, such as data from a webserver. + * @GST_APP_STREAM_TYPE_RANDOM_ACCESS: The stream is seekable and seeking is fast, + * such as in a local file. + * + * The stream type. + */ +typedef enum +{ + GST_APP_STREAM_TYPE_STREAM, + GST_APP_STREAM_TYPE_SEEKABLE, + GST_APP_STREAM_TYPE_RANDOM_ACCESS +} GstAppStreamType; + struct _GstAppSrc { - GstPushSrc pushsrc; + GstBaseSrc basesrc; /*< private >*/ GCond *cond; @@ -50,46 +68,49 @@ struct _GstAppSrc GstCaps *caps; gint64 size; - gboolean seekable; - guint max_buffers; + GstAppStreamType stream_type; + guint64 max_bytes; + GstFormat format; gboolean flushing; gboolean started; gboolean is_eos; + guint64 queued_bytes; + GstAppStreamType current_type; }; struct _GstAppSrcClass { - GstPushSrcClass pushsrc_class; + GstBaseSrcClass basesrc_class; /* signals */ - void (*need_data) (GstAppSrc *src); - void (*enough_data) (GstAppSrc *src); - gboolean (*seek_data) (GstAppSrc *src, guint64 offset); + void (*need_data) (GstAppSrc *src, guint length); + void (*enough_data) (GstAppSrc *src); + gboolean (*seek_data) (GstAppSrc *src, guint64 offset); /* actions */ - void (*push_buffer) (GstAppSrc *src, GstBuffer *buffer); - void (*end_of_stream) (GstAppSrc *src); + GstFlowReturn (*push_buffer) (GstAppSrc *src, GstBuffer *buffer); + GstFlowReturn (*end_of_stream) (GstAppSrc *src); }; GType gst_app_src_get_type(void); GST_DEBUG_CATEGORY_EXTERN (app_src_debug); -void gst_app_src_set_caps (GstAppSrc *appsrc, const GstCaps *caps); -GstCaps* gst_app_src_get_caps (GstAppSrc *appsrc); +void gst_app_src_set_caps (GstAppSrc *appsrc, const GstCaps *caps); +GstCaps* gst_app_src_get_caps (GstAppSrc *appsrc); -void gst_app_src_set_size (GstAppSrc *appsrc, gint64 size); -gint64 gst_app_src_get_size (GstAppSrc *appsrc); +void gst_app_src_set_size (GstAppSrc *appsrc, gint64 size); +gint64 gst_app_src_get_size (GstAppSrc *appsrc); -void gst_app_src_set_seekable (GstAppSrc *appsrc, gboolean seekable); -gboolean gst_app_src_get_seekable (GstAppSrc *appsrc); +void gst_app_src_set_stream_type (GstAppSrc *appsrc, GstAppStreamType type); +GstAppStreamType gst_app_src_get_stream_type (GstAppSrc *appsrc); -void gst_app_src_set_max_buffers (GstAppSrc *appsrc, guint max); -guint gst_app_src_get_max_buffers (GstAppSrc *appsrc); +void gst_app_src_set_max_bytes (GstAppSrc *appsrc, guint64 max); +guint64 gst_app_src_get_max_bytes (GstAppSrc *appsrc); -void gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer); -void gst_app_src_end_of_stream (GstAppSrc *appsrc); +GstFlowReturn gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer); +GstFlowReturn gst_app_src_end_of_stream (GstAppSrc *appsrc); G_END_DECLS |