summaryrefslogtreecommitdiffstats
path: root/gst-libs
diff options
context:
space:
mode:
Diffstat (limited to 'gst-libs')
-rw-r--r--gst-libs/gst/app/gstapp-marshal.list5
-rw-r--r--gst-libs/gst/app/gstappsrc.c468
-rw-r--r--gst-libs/gst/app/gstappsrc.h59
3 files changed, 417 insertions, 115 deletions
diff --git a/gst-libs/gst/app/gstapp-marshal.list b/gst-libs/gst/app/gstapp-marshal.list
index 648f363e..a5d8bdc8 100644
--- a/gst-libs/gst/app/gstapp-marshal.list
+++ b/gst-libs/gst/app/gstapp-marshal.list
@@ -1 +1,4 @@
-VOID:UINT64
+VOID:UINT
+BOOLEAN:UINT64
+ENUM:OBJECT
+ENUM:VOID
diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c
index c1d01db7..1c6a7fad 100644
--- a/gst-libs/gst/app/gstappsrc.c
+++ b/gst-libs/gst/app/gstappsrc.c
@@ -23,7 +23,7 @@
#endif
#include <gst/gst.h>
-#include <gst/base/gstpushsrc.h>
+#include <gst/base/gstbasesrc.h>
#include <string.h>
@@ -53,17 +53,19 @@ enum
LAST_SIGNAL
};
-#define DEFAULT_PROP_MAX_BUFFERS 0
#define DEFAULT_PROP_SIZE -1
-#define DEFAULT_PROP_SEEKABLE FALSE
+#define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
+#define DEFAULT_PROP_MAX_BYTES 200000
+#define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
enum
{
PROP_0,
PROP_CAPS,
PROP_SIZE,
- PROP_SEEKABLE,
- PROP_MAX_BUFFERS,
+ PROP_STREAM_TYPE,
+ PROP_MAX_BYTES,
+ PROP_FORMAT,
PROP_LAST
};
@@ -74,6 +76,28 @@ GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
+
+#define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ())
+static GType
+stream_type_get_type (void)
+{
+ static GType stream_type_type = 0;
+ static const GEnumValue stream_type[] = {
+ {GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"},
+ {GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"},
+ {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"},
+ {0, NULL, NULL},
+ };
+
+ if (!stream_type_type) {
+ stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type);
+ }
+ return stream_type_type;
+}
+
+static void gst_app_src_uri_handler_init (gpointer g_iface,
+ gpointer iface_data);
+
static void gst_app_src_dispose (GObject * object);
static void gst_app_src_finalize (GObject * object);
@@ -82,15 +106,32 @@ static void gst_app_src_set_property (GObject * object, guint prop_id,
static void gst_app_src_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static GstFlowReturn gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf);
-static gboolean gst_app_src_start (GstBaseSrc * psrc);
-static gboolean gst_app_src_stop (GstBaseSrc * psrc);
-static gboolean gst_app_src_unlock (GstBaseSrc * psrc);
-static gboolean gst_app_src_unlock_stop (GstBaseSrc * psrc);
+static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
+ guint64 offset, guint size, GstBuffer ** buf);
+static gboolean gst_app_src_start (GstBaseSrc * bsrc);
+static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
+static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
+static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
+static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
+static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
+static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
-GST_BOILERPLATE (GstAppSrc, gst_app_src, GstPushSrc, GST_TYPE_PUSH_SRC);
+static void
+_do_init (GType filesrc_type)
+{
+ static const GInterfaceInfo urihandler_info = {
+ gst_app_src_uri_handler_init,
+ NULL,
+ NULL
+ };
+ g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
+ &urihandler_info);
+}
+
+GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
+ _do_init);
static void
gst_app_src_base_init (gpointer g_class)
@@ -109,7 +150,6 @@ static void
gst_app_src_class_init (GstAppSrcClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
- GstPushSrcClass *pushsrc_class = (GstPushSrcClass *) klass;
GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
gobject_class->dispose = gst_app_src_dispose;
@@ -123,34 +163,46 @@ gst_app_src_class_init (GstAppSrcClass * klass)
"The allowed caps for the src pad", GST_TYPE_CAPS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_FORMAT,
+ g_param_spec_enum ("format", "Format",
+ "The format of the segment events and seek", GST_TYPE_FORMAT,
+ DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
g_object_class_install_property (gobject_class, PROP_SIZE,
g_param_spec_int64 ("size", "Size",
"The size of the data stream (-1 if unknown)",
-1, G_MAXINT64, DEFAULT_PROP_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_SEEKABLE,
- g_param_spec_boolean ("seekable", "Seekable",
- "If the source is seekable", DEFAULT_PROP_SEEKABLE,
- G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
+ g_param_spec_enum ("stream-type", "Stream Type",
+ "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
+ DEFAULT_PROP_STREAM_TYPE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
- g_param_spec_uint ("max-buffers", "Max Buffers",
- "The maximum number of buffers to queue internally (0 = unlimited)",
- 0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS,
+ g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
+ g_param_spec_uint64 ("max-bytes", "Max bytes",
+ "The maximum number of bytes to queue internally (0 = unlimited)",
+ 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::need-data:
* @appsrc: the appsrc element that emited the signal
+ * @length: the amount of bytes needed.
+ *
+ * Signal that the source needs more data. In the callback or from another
+ * thread you should call push-buffer or end-of-stream.
*
- * Signal that the source needs more data. In the callback you should call
- * push-buffer or end-of-stream.
+ * @length is just a hint and when it is set to -1, any number of bytes can be
+ * pushed into @appsrc.
+ *
+ * You can call push-buffer multiple times until the enough-data signal is
+ * fired.
*/
gst_app_src_signals[SIGNAL_NEED_DATA] =
g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, need_data),
- NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+ NULL, NULL, gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstAppSrc::enough-data:
@@ -160,10 +212,11 @@ gst_app_src_class_init (GstAppSrcClass * klass)
* application stops calling push-buffer until the need-data signal is
* emited again to avoid excessive buffer queueing.
*/
- gst_app_src_signals[SIGNAL_NEED_DATA] =
+ gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+
/**
* GstAppSrc::seek-data:
* @appsrc: the appsrc element that emited the signal
@@ -171,29 +224,53 @@ gst_app_src_class_init (GstAppSrcClass * klass)
*
* Seek to the given offset. The next push-buffer should produce buffers from
* the new @offset.
+ * This callback is only called for seekable stream types.
+ *
+ * Returns: %TRUE if the seek succeeded.
*/
gst_app_src_signals[SIGNAL_SEEK_DATA] =
g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
- NULL, NULL, gst_app_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
-
+ NULL, NULL, gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
+ G_TYPE_UINT64);
+
+ /**
+ * GstAppSrc::push-buffer:
+ * @appsrc: the appsrc
+ * @buffer: a buffer to push
+ *
+ * Adds a buffer to the queue of buffers that the appsrc element will
+ * push to its source pad. This function will take ownership of @buffer.
+ */
gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
- push_buffer), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
- G_TYPE_NONE, 1, GST_TYPE_BUFFER);
-
+ push_buffer), NULL, NULL, gst_app_marshal_ENUM__OBJECT,
+ GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
+
+ /**
+ * GstAppSrc::end-of-stream:
+ * @appsrc: the appsrc
+ *
+ * Notify @appsrc that no more buffer are available.
+ */
gst_app_src_signals[SIGNAL_END_OF_STREAM] =
g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
- end_of_stream), NULL, NULL, g_cclosure_marshal_VOID__VOID,
- G_TYPE_NONE, 0, G_TYPE_NONE);
+ end_of_stream), NULL, NULL, gst_app_marshal_ENUM__VOID,
+ GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
- pushsrc_class->create = gst_app_src_create;
+ basesrc_class->create = gst_app_src_create;
basesrc_class->start = gst_app_src_start;
basesrc_class->stop = gst_app_src_stop;
basesrc_class->unlock = gst_app_src_unlock;
basesrc_class->unlock_stop = gst_app_src_unlock_stop;
+ basesrc_class->do_seek = gst_app_src_do_seek;
+ basesrc_class->is_seekable = gst_app_src_is_seekable;
+ basesrc_class->check_get_range = gst_app_src_check_get_range;
+
+ klass->push_buffer = gst_app_src_push_buffer;
+ klass->end_of_stream = gst_app_src_end_of_stream;
}
static void
@@ -204,8 +281,18 @@ gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
appsrc->queue = g_queue_new ();
appsrc->size = DEFAULT_PROP_SIZE;
- appsrc->seekable = DEFAULT_PROP_SEEKABLE;
- appsrc->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
+ appsrc->stream_type = DEFAULT_PROP_STREAM_TYPE;
+ appsrc->max_bytes = DEFAULT_PROP_MAX_BYTES;
+ appsrc->format = DEFAULT_PROP_FORMAT;
+}
+
+static void
+gst_app_src_flush_queued (GstAppSrc * src)
+{
+ GstBuffer *buf;
+
+ while ((buf = g_queue_pop_head (src->queue)))
+ gst_buffer_unref (buf);
}
static void
@@ -217,6 +304,7 @@ gst_app_src_dispose (GObject * obj)
gst_caps_unref (appsrc->caps);
appsrc->caps = NULL;
}
+ gst_app_src_flush_queued (appsrc);
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@@ -246,11 +334,14 @@ gst_app_src_set_property (GObject * object, guint prop_id,
case PROP_SIZE:
gst_app_src_set_size (appsrc, g_value_get_int64 (value));
break;
- case PROP_SEEKABLE:
- gst_app_src_set_seekable (appsrc, g_value_get_boolean (value));
+ case PROP_STREAM_TYPE:
+ gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
+ break;
+ case PROP_MAX_BYTES:
+ gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
break;
- case PROP_MAX_BUFFERS:
- gst_app_src_set_max_buffers (appsrc, g_value_get_uint (value));
+ case PROP_FORMAT:
+ appsrc->format = g_value_get_enum (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -269,6 +360,7 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
{
GstCaps *caps;
+ /* we're missing a _take_caps() function to transfer ownership */
caps = gst_app_src_get_caps (appsrc);
gst_value_set_caps (value, caps);
if (caps)
@@ -278,11 +370,14 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_SIZE:
g_value_set_int64 (value, gst_app_src_get_size (appsrc));
break;
- case PROP_SEEKABLE:
- g_value_set_boolean (value, gst_app_src_get_seekable (appsrc));
+ case PROP_STREAM_TYPE:
+ g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
break;
- case PROP_MAX_BUFFERS:
- g_value_set_uint (value, gst_app_src_get_max_buffers (appsrc));
+ case PROP_MAX_BYTES:
+ g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
+ break;
+ case PROP_FORMAT:
+ g_value_set_enum (value, appsrc->format);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -291,9 +386,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
}
static gboolean
-gst_app_src_unlock (GstBaseSrc * psrc)
+gst_app_src_unlock (GstBaseSrc * bsrc)
{
- GstAppSrc *appsrc = GST_APP_SRC (psrc);
+ GstAppSrc *appsrc = GST_APP_SRC (bsrc);
g_mutex_lock (appsrc->mutex);
GST_DEBUG_OBJECT (appsrc, "unlock start");
@@ -305,9 +400,9 @@ gst_app_src_unlock (GstBaseSrc * psrc)
}
static gboolean
-gst_app_src_unlock_stop (GstBaseSrc * psrc)
+gst_app_src_unlock_stop (GstBaseSrc * bsrc)
{
- GstAppSrc *appsrc = GST_APP_SRC (psrc);
+ GstAppSrc *appsrc = GST_APP_SRC (bsrc);
g_mutex_lock (appsrc->mutex);
GST_DEBUG_OBJECT (appsrc, "unlock stop");
@@ -319,37 +414,105 @@ gst_app_src_unlock_stop (GstBaseSrc * psrc)
}
static gboolean
-gst_app_src_start (GstBaseSrc * psrc)
+gst_app_src_start (GstBaseSrc * bsrc)
{
- GstAppSrc *appsrc = GST_APP_SRC (psrc);
+ GstAppSrc *appsrc = GST_APP_SRC (bsrc);
g_mutex_lock (appsrc->mutex);
GST_DEBUG_OBJECT (appsrc, "starting");
appsrc->started = TRUE;
g_mutex_unlock (appsrc->mutex);
+ gst_base_src_set_format (bsrc, appsrc->format);
+
return TRUE;
}
static gboolean
-gst_app_src_stop (GstBaseSrc * psrc)
+gst_app_src_stop (GstBaseSrc * bsrc)
{
- GstAppSrc *appsrc = GST_APP_SRC (psrc);
+ GstAppSrc *appsrc = GST_APP_SRC (bsrc);
g_mutex_lock (appsrc->mutex);
GST_DEBUG_OBJECT (appsrc, "stopping");
appsrc->is_eos = FALSE;
appsrc->flushing = TRUE;
appsrc->started = FALSE;
+ gst_app_src_flush_queued (appsrc);
g_mutex_unlock (appsrc->mutex);
return TRUE;
}
+static gboolean
+gst_app_src_is_seekable (GstBaseSrc * src)
+{
+ GstAppSrc *appsrc = GST_APP_SRC (src);
+ gboolean res = FALSE;
+
+ switch (appsrc->stream_type) {
+ case GST_APP_STREAM_TYPE_STREAM:
+ break;
+ case GST_APP_STREAM_TYPE_SEEKABLE:
+ case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
+ res = TRUE;
+ break;
+ }
+ return res;
+}
+
+static gboolean
+gst_app_src_check_get_range (GstBaseSrc * src)
+{
+ GstAppSrc *appsrc = GST_APP_SRC (src);
+ gboolean res = FALSE;
+
+ switch (appsrc->stream_type) {
+ case GST_APP_STREAM_TYPE_STREAM:
+ case GST_APP_STREAM_TYPE_SEEKABLE:
+ break;
+ case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
+ res = TRUE;
+ break;
+ }
+ return res;
+}
+
+/* will be called in push mode */
+static gboolean
+gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
+{
+ GstAppSrc *appsrc = GST_APP_SRC (src);
+ gint64 desired_position;
+ gboolean res = FALSE;
+
+ desired_position = segment->last_stop;
+
+ GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
+ desired_position, gst_format_get_name (segment->format));
+
+ /* no need to try to seek in streaming mode */
+ if (appsrc->stream_type == GST_APP_STREAM_TYPE_STREAM)
+ return TRUE;
+
+ g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
+ desired_position, &res);
+
+ if (res) {
+ GST_DEBUG_OBJECT (appsrc, "flushing queue");
+ gst_app_src_flush_queued (appsrc);
+ } else {
+ GST_WARNING_OBJECT (appsrc, "seek failed");
+ }
+
+ return res;
+}
+
static GstFlowReturn
-gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf)
+gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
+ GstBuffer ** buf)
{
- GstAppSrc *appsrc = GST_APP_SRC (psrc);
+ GstAppSrc *appsrc = GST_APP_SRC (bsrc);
GstFlowReturn ret;
g_mutex_lock (appsrc->mutex);
@@ -360,13 +523,31 @@ gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf)
/* return data as long as we have some */
if (!g_queue_is_empty (appsrc->queue)) {
+ again:
*buf = g_queue_pop_head (appsrc->queue);
+ appsrc->queued_bytes -= GST_BUFFER_SIZE (*buf);
+
gst_buffer_set_caps (*buf, appsrc->caps);
GST_DEBUG_OBJECT (appsrc, "we have buffer %p", *buf);
ret = GST_FLOW_OK;
break;
+ } else {
+ /* we have no data, we need some. We fire the signal with the size hint. */
+ g_mutex_unlock (appsrc->mutex);
+
+ g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
+ NULL);
+
+ g_mutex_lock (appsrc->mutex);
+ /* we can be flushing now because we released the lock */
+ if (appsrc->flushing)
+ goto flushing;
+
+ /* if we have a buffer now, retry to return it */
+ if (!g_queue_is_empty (appsrc->queue))
+ goto again;
}
/* check EOS */
@@ -395,7 +576,6 @@ eos:
}
}
-
/* external API */
/**
@@ -499,68 +679,70 @@ gst_app_src_get_size (GstAppSrc * appsrc)
}
/**
- * gst_app_src_set_seekable:
+ * gst_app_src_set_stream_type:
* @appsrc: a #GstAppSrc
- * @seekable: the new state
+ * @type: the new state
+ *
+ * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
+ * be connected to.
*
- * Set whether the data is seekable. When this flag is set to %TRUE, the
- * "seek" signal must be connected to.
+ * A stream_type stream
*/
void
-gst_app_src_set_seekable (GstAppSrc * appsrc, gboolean seekable)
+gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
{
g_return_if_fail (appsrc != NULL);
g_return_if_fail (GST_IS_APP_SRC (appsrc));
GST_OBJECT_LOCK (appsrc);
- GST_DEBUG_OBJECT (appsrc, "setting seekable of %d", seekable);
- appsrc->seekable = seekable;
+ GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
+ appsrc->stream_type = type;
GST_OBJECT_UNLOCK (appsrc);
}
/**
- * gst_app_src_get_seekable:
+ * gst_app_src_get_stream_type:
* @appsrc: a #GstAppSrc
*
- * Get whether the stream is seekable. Control the seeking behaviour of the
- * stream with gst_app_src_set_seekable().
+ * Get the stream type. Control the stream type of @appsrc
+ * with gst_app_src_set_stream_type().
*
- * Returns: %TRUE if the stream is seekable.
+ * Returns: the stream type.
*/
-gboolean
-gst_app_src_get_seekable (GstAppSrc * appsrc)
+GstAppStreamType
+gst_app_src_get_stream_type (GstAppSrc * appsrc)
{
- gboolean seekable;
+ gboolean stream_type;
g_return_val_if_fail (appsrc != NULL, FALSE);
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
GST_OBJECT_LOCK (appsrc);
- seekable = appsrc->seekable;
- GST_DEBUG_OBJECT (appsrc, "getting seekable of %d", seekable);
+ stream_type = appsrc->stream_type;
+ GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
GST_OBJECT_UNLOCK (appsrc);
- return seekable;
+ return stream_type;
}
/**
- * gst_app_src_set_max_buffers:
+ * gst_app_src_set_max_bytes:
* @appsrc: a #GstAppSrc
- * @max: the maximum number of buffers to queue
+ * @max: the maximum number of bytes to queue
*
- * Set the maximum amount of buffers that can be queued in @appsrc.
- * After the maximum amount of buffers are queued, @appsrc will emit the
+ * Set the maximum amount of bytes that can be queued in @appsrc.
+ * After the maximum amount of bytes are queued, @appsrc will emit the
* "enough-data" signal.
*/
void
-gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max)
+gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
{
g_return_if_fail (GST_IS_APP_SRC (appsrc));
g_mutex_lock (appsrc->mutex);
- if (max != appsrc->max_buffers) {
- GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %u", max);
- appsrc->max_buffers = max;
+ if (max != appsrc->max_bytes) {
+ GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %u", max);
+ appsrc->max_bytes = max;
/* signal the change */
g_cond_signal (appsrc->cond);
}
@@ -568,23 +750,23 @@ gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max)
}
/**
- * gst_app_src_get_max_buffers:
+ * gst_app_src_get_max_bytes:
* @appsrc: a #GstAppSrc
*
- * Get the maximum amount of buffers that can be queued in @appsrc.
+ * Get the maximum amount of bytes that can be queued in @appsrc.
*
- * Returns: The maximum amount of buffers that can be queued.
+ * Returns: The maximum amount of bytes that can be queued.
*/
-guint
-gst_app_src_get_max_buffers (GstAppSrc * appsrc)
+guint64
+gst_app_src_get_max_bytes (GstAppSrc * appsrc)
{
guint result;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
g_mutex_lock (appsrc->mutex);
- result = appsrc->max_buffers;
- GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %u", result);
+ result = appsrc->max_bytes;
+ GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %u", result);
g_mutex_unlock (appsrc->mutex);
return result;
@@ -592,42 +774,138 @@ gst_app_src_get_max_buffers (GstAppSrc * appsrc)
/**
* gst_app_src_push_buffer:
- * @appsrc:
- * @buffer:
+ * @appsrc: a #GstAppSrc
+ * @buffer: a #GstBuffer to push
*
* Adds a buffer to the queue of buffers that the appsrc element will
* push to its source pad. This function takes ownership of the buffer.
+ *
+ * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
+ * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
+ * #GST_FLOW_UNEXPECTED when EOS occured.
*/
-void
+GstFlowReturn
gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
{
- g_return_if_fail (appsrc);
- g_return_if_fail (GST_IS_APP_SRC (appsrc));
- g_return_if_fail (GST_IS_BUFFER (buffer));
+ g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_mutex_lock (appsrc->mutex);
+ /* can't accept buffers when we are flushing or EOS */
+ if (appsrc->flushing)
+ goto flushing;
+
+ if (appsrc->is_eos)
+ goto eos;
+
GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
g_queue_push_tail (appsrc->queue, buffer);
+
+ appsrc->queued_bytes += GST_BUFFER_SIZE (buffer);
+ if (appsrc->queued_bytes >= appsrc->max_bytes) {
+ GST_DEBUG_OBJECT (appsrc, "queue filled (%u >= %u), signal enough-data",
+ appsrc->queued_bytes, appsrc->max_bytes);
+ g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0, NULL);
+ }
g_cond_signal (appsrc->cond);
g_mutex_unlock (appsrc->mutex);
+
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
+ gst_buffer_unref (buffer);
+ return GST_FLOW_WRONG_STATE;
+ }
+eos:
+ {
+ GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
+ gst_buffer_unref (buffer);
+ return GST_FLOW_UNEXPECTED;
+ }
}
/**
* gst_app_src_end_of_stream:
- * @appsrc:
+ * @appsrc: a #GstAppSrc
*
* Indicates to the appsrc element that the last buffer queued in the
* element is the last buffer of the stream.
+ *
+ * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
+ * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
*/
-void
+GstFlowReturn
gst_app_src_end_of_stream (GstAppSrc * appsrc)
{
- g_return_if_fail (appsrc);
- g_return_if_fail (GST_IS_APP_SRC (appsrc));
+ g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
g_mutex_lock (appsrc->mutex);
+ /* can't accept buffers when we are flushing. We can accept them when we are
+ * EOS although it will not do anything. */
+ if (appsrc->flushing)
+ goto flushing;
+
GST_DEBUG_OBJECT (appsrc, "sending EOS");
appsrc->is_eos = TRUE;
g_cond_signal (appsrc->cond);
g_mutex_unlock (appsrc->mutex);
+
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
+ return GST_FLOW_WRONG_STATE;
+ }
+}
+
+/*** GSTURIHANDLER INTERFACE *************************************************/
+
+static GstURIType
+gst_app_src_uri_get_type (void)
+{
+ return GST_URI_SRC;
+}
+
+static gchar **
+gst_app_src_uri_get_protocols (void)
+{
+ static gchar *protocols[] = { "appsrc", NULL };
+
+ return protocols;
+}
+static const gchar *
+gst_app_src_uri_get_uri (GstURIHandler * handler)
+{
+ return "appsrc";
+}
+
+static gboolean
+gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
+{
+ gchar *protocol;
+ gboolean ret;
+
+ protocol = gst_uri_get_protocol (uri);
+ ret = !strcmp (protocol, "appsrc");
+ g_free (protocol);
+
+ return ret;
+}
+
+static void
+gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
+{
+ GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
+
+ iface->get_type = gst_app_src_uri_get_type;
+ iface->get_protocols = gst_app_src_uri_get_protocols;
+ iface->get_uri = gst_app_src_uri_get_uri;
+ iface->set_uri = gst_app_src_uri_set_uri;
}
diff --git a/gst-libs/gst/app/gstappsrc.h b/gst-libs/gst/app/gstappsrc.h
index 4ca5cbc9..1ce73b4c 100644
--- a/gst-libs/gst/app/gstappsrc.h
+++ b/gst-libs/gst/app/gstappsrc.h
@@ -39,9 +39,27 @@ G_BEGIN_DECLS
typedef struct _GstAppSrc GstAppSrc;
typedef struct _GstAppSrcClass GstAppSrcClass;
+/**
+ * GstAppStreamType:
+ * @GST_APP_STREAM_TYPE_STREAM: No seeking is supported in the stream, such as a
+ * live stream.
+ * @GST_APP_STREAM_TYPE_SEEKABLE: The stream is seekable but seeking might not
+ * be very fast, such as data from a webserver.
+ * @GST_APP_STREAM_TYPE_RANDOM_ACCESS: The stream is seekable and seeking is fast,
+ * such as in a local file.
+ *
+ * The stream type.
+ */
+typedef enum
+{
+ GST_APP_STREAM_TYPE_STREAM,
+ GST_APP_STREAM_TYPE_SEEKABLE,
+ GST_APP_STREAM_TYPE_RANDOM_ACCESS
+} GstAppStreamType;
+
struct _GstAppSrc
{
- GstPushSrc pushsrc;
+ GstBaseSrc basesrc;
/*< private >*/
GCond *cond;
@@ -50,46 +68,49 @@ struct _GstAppSrc
GstCaps *caps;
gint64 size;
- gboolean seekable;
- guint max_buffers;
+ GstAppStreamType stream_type;
+ guint64 max_bytes;
+ GstFormat format;
gboolean flushing;
gboolean started;
gboolean is_eos;
+ guint64 queued_bytes;
+ GstAppStreamType current_type;
};
struct _GstAppSrcClass
{
- GstPushSrcClass pushsrc_class;
+ GstBaseSrcClass basesrc_class;
/* signals */
- void (*need_data) (GstAppSrc *src);
- void (*enough_data) (GstAppSrc *src);
- gboolean (*seek_data) (GstAppSrc *src, guint64 offset);
+ void (*need_data) (GstAppSrc *src, guint length);
+ void (*enough_data) (GstAppSrc *src);
+ gboolean (*seek_data) (GstAppSrc *src, guint64 offset);
/* actions */
- void (*push_buffer) (GstAppSrc *src, GstBuffer *buffer);
- void (*end_of_stream) (GstAppSrc *src);
+ GstFlowReturn (*push_buffer) (GstAppSrc *src, GstBuffer *buffer);
+ GstFlowReturn (*end_of_stream) (GstAppSrc *src);
};
GType gst_app_src_get_type(void);
GST_DEBUG_CATEGORY_EXTERN (app_src_debug);
-void gst_app_src_set_caps (GstAppSrc *appsrc, const GstCaps *caps);
-GstCaps* gst_app_src_get_caps (GstAppSrc *appsrc);
+void gst_app_src_set_caps (GstAppSrc *appsrc, const GstCaps *caps);
+GstCaps* gst_app_src_get_caps (GstAppSrc *appsrc);
-void gst_app_src_set_size (GstAppSrc *appsrc, gint64 size);
-gint64 gst_app_src_get_size (GstAppSrc *appsrc);
+void gst_app_src_set_size (GstAppSrc *appsrc, gint64 size);
+gint64 gst_app_src_get_size (GstAppSrc *appsrc);
-void gst_app_src_set_seekable (GstAppSrc *appsrc, gboolean seekable);
-gboolean gst_app_src_get_seekable (GstAppSrc *appsrc);
+void gst_app_src_set_stream_type (GstAppSrc *appsrc, GstAppStreamType type);
+GstAppStreamType gst_app_src_get_stream_type (GstAppSrc *appsrc);
-void gst_app_src_set_max_buffers (GstAppSrc *appsrc, guint max);
-guint gst_app_src_get_max_buffers (GstAppSrc *appsrc);
+void gst_app_src_set_max_bytes (GstAppSrc *appsrc, guint64 max);
+guint64 gst_app_src_get_max_bytes (GstAppSrc *appsrc);
-void gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer);
-void gst_app_src_end_of_stream (GstAppSrc *appsrc);
+GstFlowReturn gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer);
+GstFlowReturn gst_app_src_end_of_stream (GstAppSrc *appsrc);
G_END_DECLS