diff options
-rw-r--r-- | ChangeLog | 38 | ||||
m--------- | common | 0 | ||||
-rw-r--r-- | examples/app/.gitignore | 1 | ||||
-rw-r--r-- | examples/app/Makefile.am | 5 | ||||
-rw-r--r-- | examples/app/appsrc-stream.c | 248 | ||||
-rw-r--r-- | examples/app/appsrc_ex.c | 4 | ||||
-rw-r--r-- | gst-libs/gst/app/gstapp-marshal.list | 5 | ||||
-rw-r--r-- | gst-libs/gst/app/gstappsrc.c | 468 | ||||
-rw-r--r-- | gst-libs/gst/app/gstappsrc.h | 59 |
9 files changed, 711 insertions, 117 deletions
@@ -1,3 +1,41 @@ +2008-06-05 Wim Taymans <wim.taymans@collabora.co.uk> + + * examples/app/.cvsignore: + * examples/app/Makefile.am: + * examples/app/appsrc-stream.c: (read_data), (start_feed), + (stop_feed), (found_source), (bus_message), (main): + Added an example on how to use appsrc in playbin in streaming mode from + an mmapped file. + + * examples/app/appsrc_ex.c: (main): + Set pipeline to NULL to free queued buffers. + + * gst-libs/gst/app/gstapp-marshal.list: + * gst-libs/gst/app/gstappsrc.c: (stream_type_get_type), (_do_init), + (gst_app_src_class_init), (gst_app_src_init), + (gst_app_src_flush_queued), (gst_app_src_dispose), + (gst_app_src_set_property), (gst_app_src_get_property), + (gst_app_src_unlock), (gst_app_src_unlock_stop), + (gst_app_src_start), (gst_app_src_stop), (gst_app_src_is_seekable), + (gst_app_src_check_get_range), (gst_app_src_do_seek), + (gst_app_src_create), (gst_app_src_set_stream_type), + (gst_app_src_get_stream_type), (gst_app_src_set_max_bytes), + (gst_app_src_get_max_bytes), (gst_app_src_push_buffer), + (gst_app_src_end_of_stream), (gst_app_src_uri_get_type), + (gst_app_src_uri_get_protocols), (gst_app_src_uri_get_uri), + (gst_app_src_uri_set_uri), (gst_app_src_uri_handler_init): + * gst-libs/gst/app/gstappsrc.h: + Measure max queue size in bytes instead. + Add support for 3 modes of operation, streaming, seekable and + random-access, making basesrc handle the scheduling modes for each. + Add appsrc:// uri handler so that automatic plugging can be done from + playbin2 or uridecodebin, for example. + Added support for custom segment formats. + Add support for push and pull based operations from the application. + Expand the methods so that errors can be detected. + Flush the queued buffers on seeks and when shutting down. + Add signals to inform the app that a seek must happen. + 2008-06-05 Sebastian Dröge <slomo@circular-chaos.org> * gst/interleave/deinterleave.c: (gst_deinterleave_add_new_pads), diff --git a/common b/common -Subproject 68fb019d4044b9878aef4ca223fc13c19ffc7d0 +Subproject 46ec7dfc1c09ff550ed6b7a4e0d3f2b2ac7d3ee diff --git a/examples/app/.gitignore b/examples/app/.gitignore index f76db7ef..27d8cc2a 100644 --- a/examples/app/.gitignore +++ b/examples/app/.gitignore @@ -1 +1,2 @@ appsrc_ex +appsrc-stream diff --git a/examples/app/Makefile.am b/examples/app/Makefile.am index 0b950515..db1393dd 100644 --- a/examples/app/Makefile.am +++ b/examples/app/Makefile.am @@ -1,5 +1,5 @@ -noinst_PROGRAMS = appsrc_ex +noinst_PROGRAMS = appsrc_ex appsrc-stream appsrc_ex_SOURCES = appsrc_ex.c appsrc_ex_CFLAGS = $(GST_CFLAGS) $(GCONF_CFLAGS) @@ -7,3 +7,6 @@ appsrc_ex_LDFLAGS = \ $(GST_LIBS) \ $(top_builddir)/gst-libs/gst/app/libgstapp-@GST_MAJORMINOR@.la +appsrc_stream_SOURCES = appsrc-stream.c +appsrc_stream_CFLAGS = $(GST_CFLAGS) $(GCONF_CFLAGS) +appsrc_stream_LDFLAGS = $(GST_LIBS) diff --git a/examples/app/appsrc-stream.c b/examples/app/appsrc-stream.c new file mode 100644 index 00000000..781adc31 --- /dev/null +++ b/examples/app/appsrc-stream.c @@ -0,0 +1,248 @@ +/* GStreamer + * + * appsrc-stream.c: example for using appsrc in streaming mode. + * + * Copyright (C) 2008 Wim Taymans <wim.taymans@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gst/gst.h> + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> + +GST_DEBUG_CATEGORY (appsrc_playbin_debug); +#define GST_CAT_DEFAULT appsrc_playbin_debug + +/** + * an example application of using appsrc in streaming push mode. We simply push + * buffers into appsrc. The size of the buffers we push can be any size we + * choose. + * + * This example is very close to how one would deal with a streaming webserver + * that does not support range requests or does not report the total file size. + * + * Some optimisations are done so that we don't push too much data. We connect + * to the need-data and enough-data signals to start/stop sending buffers. + * + * Appsrc in streaming mode (the default) does not support seeking so we don't + * have to handle any seek callbacks. + * + * Some formats are able to estimate the duration of the media file based on the + * file length (mp3, mpeg,..), others report an unknown length (ogg,..). + */ +typedef struct _App App; + +struct _App +{ + GstElement *playbin; + GstElement *appsrc; + + GMainLoop *loop; + guint sourceid; + + GMappedFile *file; + guint8 *data; + gsize length; + guint64 offset; +}; + +App s_app; + +#define CHUNK_SIZE 4096 + +/* This method is called by the idle GSource in the mainloop. We feed CHUNK_SIZE + * bytes into appsrc. + * The ide handler is added to the mainloop when appsrc requests us to start + * sending data (need-data signal) and is removed when appsrc has enough data + * (enough-data signal). + */ +static gboolean +read_data (App * app) +{ + GstBuffer *buffer; + guint len; + GstFlowReturn ret; + + buffer = gst_buffer_new (); + + if (app->offset >= app->length) { + /* we are EOS, send end-of-stream and remove the source */ + g_signal_emit_by_name (app->appsrc, "end-of-stream", &ret); + return FALSE; + } + + /* read the next chunk */ + len = CHUNK_SIZE; + if (app->offset + len > app->length) + len = app->length - app->offset; + + GST_BUFFER_DATA (buffer) = app->data + app->offset; + GST_BUFFER_SIZE (buffer) = len; + + GST_DEBUG ("feed buffer %p, offset %" G_GUINT64_FORMAT "-%u", buffer, + app->offset, len); + g_signal_emit_by_name (app->appsrc, "push-buffer", buffer, &ret); + if (ret != GST_FLOW_OK) { + /* some error, stop sending data */ + return FALSE; + } + + app->offset += len; + + return TRUE; +} + +/* This signal callback is called when appsrc needs data, we add an idle handler + * to the mainloop to start pushing data into the appsrc */ +static void +start_feed (GstElement * playbin, guint size, App * app) +{ + if (app->sourceid == 0) { + GST_DEBUG ("start feeding"); + app->sourceid = g_idle_add ((GSourceFunc) read_data, app); + } +} + +/* This callback is called when appsrc has enough data and we can stop sending. + * We remove the idle handler from the mainloop */ +static void +stop_feed (GstElement * playbin, App * app) +{ + if (app->sourceid != 0) { + GST_DEBUG ("stop feeding"); + g_source_remove (app->sourceid); + app->sourceid = 0; + } +} + +/* this callback is called when playbin2 has constructed a source object to read + * from. Since we provided the appsrc:// uri to playbin2, this will be the + * appsrc that we must handle. We set up some signals to start and stop pushing + * data into appsrc */ +static void +found_source (GObject * playbin, GParamSpec * pspec, App * app) +{ + /* get a handle to the appsrc */ + g_object_get (playbin, pspec->name, &app->appsrc, NULL); + + GST_DEBUG ("got appsrc %p", app->appsrc); + + /* we can set the length in appsrc. This allows some elements to estimate the + * total duration of the stream. It's a good idea to set the property when you + * can but it's not required. */ + g_object_set (app->appsrc, "size", app->length, NULL); + + /* configure the appsrc, we will push data into the appsrc from the + * mainloop. */ + g_signal_connect (app->appsrc, "need-data", G_CALLBACK (start_feed), app); + g_signal_connect (app->appsrc, "enough-data", G_CALLBACK (stop_feed), app); +} + +static gboolean +bus_message (GstBus * bus, GstMessage * message, App * app) +{ + GST_DEBUG ("got message %s", + gst_message_type_get_name (GST_MESSAGE_TYPE (message))); + + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ERROR: + g_error ("received error"); + g_main_loop_quit (app->loop); + break; + case GST_MESSAGE_EOS: + g_main_loop_quit (app->loop); + break; + default: + break; + } + return TRUE; +} + +int +main (int argc, char *argv[]) +{ + App *app = &s_app; + GError *error = NULL; + GstBus *bus; + + gst_init (&argc, &argv); + + GST_DEBUG_CATEGORY_INIT (appsrc_playbin_debug, "appsrc-playbin", 0, + "appsrc playbin example"); + + if (argc < 2) { + g_print ("usage: %s <filename>\n", argv[0]); + return -1; + } + + /* try to open the file as an mmapped file */ + app->file = g_mapped_file_new (argv[1], FALSE, &error); + if (error) { + g_print ("failed to open file: %s\n", error->message); + g_error_free (error); + return -2; + } + /* get some vitals, this will be used to read data from the mmapped file and + * feed it to appsrc. */ + app->length = g_mapped_file_get_length (app->file); + app->data = (guint8 *) g_mapped_file_get_contents (app->file); + app->offset = 0; + + /* create a mainloop to get messages and to handle the idle handler that will + * feed data to appsrc. */ + app->loop = g_main_loop_new (NULL, TRUE); + + app->playbin = gst_element_factory_make ("playbin2", NULL); + g_assert (app->playbin); + + bus = gst_pipeline_get_bus (GST_PIPELINE (app->playbin)); + + /* add watch for messages */ + gst_bus_add_watch (bus, (GstBusFunc) bus_message, app); + + /* set to read from appsrc */ + g_object_set (app->playbin, "uri", "appsrc://", NULL); + + /* get notification when the source is created so that we get a handle to it + * and can configure it */ + g_signal_connect (app->playbin, "notify::source", (GCallback) found_source, + app); + + /* go to playing and wait in a mainloop. */ + gst_element_set_state (app->playbin, GST_STATE_PLAYING); + + /* this mainloop is stopped when we receive an error or EOS */ + g_main_loop_run (app->loop); + + GST_DEBUG ("stopping"); + + gst_element_set_state (app->playbin, GST_STATE_NULL); + + /* free the file */ + g_mapped_file_free (app->file); + + gst_object_unref (bus); + g_main_loop_unref (app->loop); + + return 0; +} diff --git a/examples/app/appsrc_ex.c b/examples/app/appsrc_ex.c index c6f8b29e..bc629fb0 100644 --- a/examples/app/appsrc_ex.c +++ b/examples/app/appsrc_ex.c @@ -62,8 +62,8 @@ main (int argc, char *argv[]) data = malloc (100); memset (data, i, 100); - printf ("%d: creating buffer for pointer %p\n", i, data); buf = gst_app_buffer_new (data, 100, dont_eat_my_chicken_wings, data); + printf ("%d: creating buffer for pointer %p, %p\n", i, data, buf); gst_app_src_push_buffer (GST_APP_SRC (app->src), buf); } @@ -73,8 +73,10 @@ main (int argc, char *argv[]) GstBuffer *buf; buf = gst_app_sink_pull_buffer (GST_APP_SINK (app->sink)); + printf ("retrieved buffer %p\n", buf); gst_buffer_unref (buf); } + gst_element_set_state (app->pipe, GST_STATE_NULL); return 0; } diff --git a/gst-libs/gst/app/gstapp-marshal.list b/gst-libs/gst/app/gstapp-marshal.list index 648f363e..a5d8bdc8 100644 --- a/gst-libs/gst/app/gstapp-marshal.list +++ b/gst-libs/gst/app/gstapp-marshal.list @@ -1 +1,4 @@ -VOID:UINT64 +VOID:UINT +BOOLEAN:UINT64 +ENUM:OBJECT +ENUM:VOID diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c index c1d01db7..1c6a7fad 100644 --- a/gst-libs/gst/app/gstappsrc.c +++ b/gst-libs/gst/app/gstappsrc.c @@ -23,7 +23,7 @@ #endif #include <gst/gst.h> -#include <gst/base/gstpushsrc.h> +#include <gst/base/gstbasesrc.h> #include <string.h> @@ -53,17 +53,19 @@ enum LAST_SIGNAL }; -#define DEFAULT_PROP_MAX_BUFFERS 0 #define DEFAULT_PROP_SIZE -1 -#define DEFAULT_PROP_SEEKABLE FALSE +#define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM +#define DEFAULT_PROP_MAX_BYTES 200000 +#define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES enum { PROP_0, PROP_CAPS, PROP_SIZE, - PROP_SEEKABLE, - PROP_MAX_BUFFERS, + PROP_STREAM_TYPE, + PROP_MAX_BYTES, + PROP_FORMAT, PROP_LAST }; @@ -74,6 +76,28 @@ GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); + +#define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ()) +static GType +stream_type_get_type (void) +{ + static GType stream_type_type = 0; + static const GEnumValue stream_type[] = { + {GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"}, + {GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"}, + {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"}, + {0, NULL, NULL}, + }; + + if (!stream_type_type) { + stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type); + } + return stream_type_type; +} + +static void gst_app_src_uri_handler_init (gpointer g_iface, + gpointer iface_data); + static void gst_app_src_dispose (GObject * object); static void gst_app_src_finalize (GObject * object); @@ -82,15 +106,32 @@ static void gst_app_src_set_property (GObject * object, guint prop_id, static void gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstFlowReturn gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf); -static gboolean gst_app_src_start (GstBaseSrc * psrc); -static gboolean gst_app_src_stop (GstBaseSrc * psrc); -static gboolean gst_app_src_unlock (GstBaseSrc * psrc); -static gboolean gst_app_src_unlock_stop (GstBaseSrc * psrc); +static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, + guint64 offset, guint size, GstBuffer ** buf); +static gboolean gst_app_src_start (GstBaseSrc * bsrc); +static gboolean gst_app_src_stop (GstBaseSrc * bsrc); +static gboolean gst_app_src_unlock (GstBaseSrc * bsrc); +static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc); +static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment); +static gboolean gst_app_src_is_seekable (GstBaseSrc * src); +static gboolean gst_app_src_check_get_range (GstBaseSrc * src); static guint gst_app_src_signals[LAST_SIGNAL] = { 0 }; -GST_BOILERPLATE (GstAppSrc, gst_app_src, GstPushSrc, GST_TYPE_PUSH_SRC); +static void +_do_init (GType filesrc_type) +{ + static const GInterfaceInfo urihandler_info = { + gst_app_src_uri_handler_init, + NULL, + NULL + }; + g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER, + &urihandler_info); +} + +GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC, + _do_init); static void gst_app_src_base_init (gpointer g_class) @@ -109,7 +150,6 @@ static void gst_app_src_class_init (GstAppSrcClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; - GstPushSrcClass *pushsrc_class = (GstPushSrcClass *) klass; GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass; gobject_class->dispose = gst_app_src_dispose; @@ -123,34 +163,46 @@ gst_app_src_class_init (GstAppSrcClass * klass) "The allowed caps for the src pad", GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_FORMAT, + g_param_spec_enum ("format", "Format", + "The format of the segment events and seek", GST_TYPE_FORMAT, + DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SIZE, g_param_spec_int64 ("size", "Size", "The size of the data stream (-1 if unknown)", -1, G_MAXINT64, DEFAULT_PROP_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SEEKABLE, - g_param_spec_boolean ("seekable", "Seekable", - "If the source is seekable", DEFAULT_PROP_SEEKABLE, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_STREAM_TYPE, + g_param_spec_enum ("stream-type", "Stream Type", + "the type of the stream", GST_TYPE_APP_STREAM_TYPE, + DEFAULT_PROP_STREAM_TYPE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS, - g_param_spec_uint ("max-buffers", "Max Buffers", - "The maximum number of buffers to queue internally (0 = unlimited)", - 0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS, + g_object_class_install_property (gobject_class, PROP_MAX_BYTES, + g_param_spec_uint64 ("max-bytes", "Max bytes", + "The maximum number of bytes to queue internally (0 = unlimited)", + 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc::need-data: * @appsrc: the appsrc element that emited the signal + * @length: the amount of bytes needed. + * + * Signal that the source needs more data. In the callback or from another + * thread you should call push-buffer or end-of-stream. * - * Signal that the source needs more data. In the callback you should call - * push-buffer or end-of-stream. + * @length is just a hint and when it is set to -1, any number of bytes can be + * pushed into @appsrc. + * + * You can call push-buffer multiple times until the enough-data signal is + * fired. */ gst_app_src_signals[SIGNAL_NEED_DATA] = g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, need_data), - NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); + NULL, NULL, gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); /** * GstAppSrc::enough-data: @@ -160,10 +212,11 @@ gst_app_src_class_init (GstAppSrcClass * klass) * application stops calling push-buffer until the need-data signal is * emited again to avoid excessive buffer queueing. */ - gst_app_src_signals[SIGNAL_NEED_DATA] = + gst_app_src_signals[SIGNAL_ENOUGH_DATA] = g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, enough_data), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); + /** * GstAppSrc::seek-data: * @appsrc: the appsrc element that emited the signal @@ -171,29 +224,53 @@ gst_app_src_class_init (GstAppSrcClass * klass) * * Seek to the given offset. The next push-buffer should produce buffers from * the new @offset. + * This callback is only called for seekable stream types. + * + * Returns: %TRUE if the seek succeeded. */ gst_app_src_signals[SIGNAL_SEEK_DATA] = g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, seek_data), - NULL, NULL, gst_app_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64); - + NULL, NULL, gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1, + G_TYPE_UINT64); + + /** + * GstAppSrc::push-buffer: + * @appsrc: the appsrc + * @buffer: a buffer to push + * + * Adds a buffer to the queue of buffers that the appsrc element will + * push to its source pad. This function will take ownership of @buffer. + */ gst_app_src_signals[SIGNAL_PUSH_BUFFER] = g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, - push_buffer), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, - G_TYPE_NONE, 1, GST_TYPE_BUFFER); - + push_buffer), NULL, NULL, gst_app_marshal_ENUM__OBJECT, + GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER); + + /** + * GstAppSrc::end-of-stream: + * @appsrc: the appsrc + * + * Notify @appsrc that no more buffer are available. + */ gst_app_src_signals[SIGNAL_END_OF_STREAM] = g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, - end_of_stream), NULL, NULL, g_cclosure_marshal_VOID__VOID, - G_TYPE_NONE, 0, G_TYPE_NONE); + end_of_stream), NULL, NULL, gst_app_marshal_ENUM__VOID, + GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE); - pushsrc_class->create = gst_app_src_create; + basesrc_class->create = gst_app_src_create; basesrc_class->start = gst_app_src_start; basesrc_class->stop = gst_app_src_stop; basesrc_class->unlock = gst_app_src_unlock; basesrc_class->unlock_stop = gst_app_src_unlock_stop; + basesrc_class->do_seek = gst_app_src_do_seek; + basesrc_class->is_seekable = gst_app_src_is_seekable; + basesrc_class->check_get_range = gst_app_src_check_get_range; + + klass->push_buffer = gst_app_src_push_buffer; + klass->end_of_stream = gst_app_src_end_of_stream; } static void @@ -204,8 +281,18 @@ gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass) appsrc->queue = g_queue_new (); appsrc->size = DEFAULT_PROP_SIZE; - appsrc->seekable = DEFAULT_PROP_SEEKABLE; - appsrc->max_buffers = DEFAULT_PROP_MAX_BUFFERS; + appsrc->stream_type = DEFAULT_PROP_STREAM_TYPE; + appsrc->max_bytes = DEFAULT_PROP_MAX_BYTES; + appsrc->format = DEFAULT_PROP_FORMAT; +} + +static void +gst_app_src_flush_queued (GstAppSrc * src) +{ + GstBuffer *buf; + + while ((buf = g_queue_pop_head (src->queue))) + gst_buffer_unref (buf); } static void @@ -217,6 +304,7 @@ gst_app_src_dispose (GObject * obj) gst_caps_unref (appsrc->caps); appsrc->caps = NULL; } + gst_app_src_flush_queued (appsrc); G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -246,11 +334,14 @@ gst_app_src_set_property (GObject * object, guint prop_id, case PROP_SIZE: gst_app_src_set_size (appsrc, g_value_get_int64 (value)); break; - case PROP_SEEKABLE: - gst_app_src_set_seekable (appsrc, g_value_get_boolean (value)); + case PROP_STREAM_TYPE: + gst_app_src_set_stream_type (appsrc, g_value_get_enum (value)); + break; + case PROP_MAX_BYTES: + gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value)); break; - case PROP_MAX_BUFFERS: - gst_app_src_set_max_buffers (appsrc, g_value_get_uint (value)); + case PROP_FORMAT: + appsrc->format = g_value_get_enum (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -269,6 +360,7 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, { GstCaps *caps; + /* we're missing a _take_caps() function to transfer ownership */ caps = gst_app_src_get_caps (appsrc); gst_value_set_caps (value, caps); if (caps) @@ -278,11 +370,14 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, case PROP_SIZE: g_value_set_int64 (value, gst_app_src_get_size (appsrc)); break; - case PROP_SEEKABLE: - g_value_set_boolean (value, gst_app_src_get_seekable (appsrc)); + case PROP_STREAM_TYPE: + g_value_set_enum (value, gst_app_src_get_stream_type (appsrc)); break; - case PROP_MAX_BUFFERS: - g_value_set_uint (value, gst_app_src_get_max_buffers (appsrc)); + case PROP_MAX_BYTES: + g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc)); + break; + case PROP_FORMAT: + g_value_set_enum (value, appsrc->format); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -291,9 +386,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, } static gboolean -gst_app_src_unlock (GstBaseSrc * psrc) +gst_app_src_unlock (GstBaseSrc * bsrc) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "unlock start"); @@ -305,9 +400,9 @@ gst_app_src_unlock (GstBaseSrc * psrc) } static gboolean -gst_app_src_unlock_stop (GstBaseSrc * psrc) +gst_app_src_unlock_stop (GstBaseSrc * bsrc) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "unlock stop"); @@ -319,37 +414,105 @@ gst_app_src_unlock_stop (GstBaseSrc * psrc) } static gboolean -gst_app_src_start (GstBaseSrc * psrc) +gst_app_src_start (GstBaseSrc * bsrc) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "starting"); appsrc->started = TRUE; g_mutex_unlock (appsrc->mutex); + gst_base_src_set_format (bsrc, appsrc->format); + return TRUE; } static gboolean -gst_app_src_stop (GstBaseSrc * psrc) +gst_app_src_stop (GstBaseSrc * bsrc) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "stopping"); appsrc->is_eos = FALSE; appsrc->flushing = TRUE; appsrc->started = FALSE; + gst_app_src_flush_queued (appsrc); g_mutex_unlock (appsrc->mutex); return TRUE; } +static gboolean +gst_app_src_is_seekable (GstBaseSrc * src) +{ + GstAppSrc *appsrc = GST_APP_SRC (src); + gboolean res = FALSE; + + switch (appsrc->stream_type) { + case GST_APP_STREAM_TYPE_STREAM: + break; + case GST_APP_STREAM_TYPE_SEEKABLE: + case GST_APP_STREAM_TYPE_RANDOM_ACCESS: + res = TRUE; + break; + } + return res; +} + +static gboolean +gst_app_src_check_get_range (GstBaseSrc * src) +{ + GstAppSrc *appsrc = GST_APP_SRC (src); + gboolean res = FALSE; + + switch (appsrc->stream_type) { + case GST_APP_STREAM_TYPE_STREAM: + case GST_APP_STREAM_TYPE_SEEKABLE: + break; + case GST_APP_STREAM_TYPE_RANDOM_ACCESS: + res = TRUE; + break; + } + return res; +} + +/* will be called in push mode */ +static gboolean +gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment) +{ + GstAppSrc *appsrc = GST_APP_SRC (src); + gint64 desired_position; + gboolean res = FALSE; + + desired_position = segment->last_stop; + + GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s", + desired_position, gst_format_get_name (segment->format)); + + /* no need to try to seek in streaming mode */ + if (appsrc->stream_type == GST_APP_STREAM_TYPE_STREAM) + return TRUE; + + g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, + desired_position, &res); + + if (res) { + GST_DEBUG_OBJECT (appsrc, "flushing queue"); + gst_app_src_flush_queued (appsrc); + } else { + GST_WARNING_OBJECT (appsrc, "seek failed"); + } + + return res; +} + static GstFlowReturn -gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf) +gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, + GstBuffer ** buf) { - GstAppSrc *appsrc = GST_APP_SRC (psrc); + GstAppSrc *appsrc = GST_APP_SRC (bsrc); GstFlowReturn ret; g_mutex_lock (appsrc->mutex); @@ -360,13 +523,31 @@ gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf) /* return data as long as we have some */ if (!g_queue_is_empty (appsrc->queue)) { + again: *buf = g_queue_pop_head (appsrc->queue); + appsrc->queued_bytes -= GST_BUFFER_SIZE (*buf); + gst_buffer_set_caps (*buf, appsrc->caps); GST_DEBUG_OBJECT (appsrc, "we have buffer %p", *buf); ret = GST_FLOW_OK; break; + } else { + /* we have no data, we need some. We fire the signal with the size hint. */ + g_mutex_unlock (appsrc->mutex); + + g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size, + NULL); + + g_mutex_lock (appsrc->mutex); + /* we can be flushing now because we released the lock */ + if (appsrc->flushing) + goto flushing; + + /* if we have a buffer now, retry to return it */ + if (!g_queue_is_empty (appsrc->queue)) + goto again; } /* check EOS */ @@ -395,7 +576,6 @@ eos: } } - /* external API */ /** @@ -499,68 +679,70 @@ gst_app_src_get_size (GstAppSrc * appsrc) } /** - * gst_app_src_set_seekable: + * gst_app_src_set_stream_type: * @appsrc: a #GstAppSrc - * @seekable: the new state + * @type: the new state + * + * Set the stream type on @appsrc. For seekable streams, the "seek" signal must + * be connected to. * - * Set whether the data is seekable. When this flag is set to %TRUE, the - * "seek" signal must be connected to. + * A stream_type stream */ void -gst_app_src_set_seekable (GstAppSrc * appsrc, gboolean seekable) +gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type) { g_return_if_fail (appsrc != NULL); g_return_if_fail (GST_IS_APP_SRC (appsrc)); GST_OBJECT_LOCK (appsrc); - GST_DEBUG_OBJECT (appsrc, "setting seekable of %d", seekable); - appsrc->seekable = seekable; + GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type); + appsrc->stream_type = type; GST_OBJECT_UNLOCK (appsrc); } /** - * gst_app_src_get_seekable: + * gst_app_src_get_stream_type: * @appsrc: a #GstAppSrc * - * Get whether the stream is seekable. Control the seeking behaviour of the - * stream with gst_app_src_set_seekable(). + * Get the stream type. Control the stream type of @appsrc + * with gst_app_src_set_stream_type(). * - * Returns: %TRUE if the stream is seekable. + * Returns: the stream type. */ -gboolean -gst_app_src_get_seekable (GstAppSrc * appsrc) +GstAppStreamType +gst_app_src_get_stream_type (GstAppSrc * appsrc) { - gboolean seekable; + gboolean stream_type; g_return_val_if_fail (appsrc != NULL, FALSE); g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE); GST_OBJECT_LOCK (appsrc); - seekable = appsrc->seekable; - GST_DEBUG_OBJECT (appsrc, "getting seekable of %d", seekable); + stream_type = appsrc->stream_type; + GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type); GST_OBJECT_UNLOCK (appsrc); - return seekable; + return stream_type; } /** - * gst_app_src_set_max_buffers: + * gst_app_src_set_max_bytes: * @appsrc: a #GstAppSrc - * @max: the maximum number of buffers to queue + * @max: the maximum number of bytes to queue * - * Set the maximum amount of buffers that can be queued in @appsrc. - * After the maximum amount of buffers are queued, @appsrc will emit the + * Set the maximum amount of bytes that can be queued in @appsrc. + * After the maximum amount of bytes are queued, @appsrc will emit the * "enough-data" signal. */ void -gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max) +gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max) { g_return_if_fail (GST_IS_APP_SRC (appsrc)); g_mutex_lock (appsrc->mutex); - if (max != appsrc->max_buffers) { - GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %u", max); - appsrc->max_buffers = max; + if (max != appsrc->max_bytes) { + GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %u", max); + appsrc->max_bytes = max; /* signal the change */ g_cond_signal (appsrc->cond); } @@ -568,23 +750,23 @@ gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max) } /** - * gst_app_src_get_max_buffers: + * gst_app_src_get_max_bytes: * @appsrc: a #GstAppSrc * - * Get the maximum amount of buffers that can be queued in @appsrc. + * Get the maximum amount of bytes that can be queued in @appsrc. * - * Returns: The maximum amount of buffers that can be queued. + * Returns: The maximum amount of bytes that can be queued. */ -guint -gst_app_src_get_max_buffers (GstAppSrc * appsrc) +guint64 +gst_app_src_get_max_bytes (GstAppSrc * appsrc) { guint result; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0); g_mutex_lock (appsrc->mutex); - result = appsrc->max_buffers; - GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %u", result); + result = appsrc->max_bytes; + GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %u", result); g_mutex_unlock (appsrc->mutex); return result; @@ -592,42 +774,138 @@ gst_app_src_get_max_buffers (GstAppSrc * appsrc) /** * gst_app_src_push_buffer: - * @appsrc: - * @buffer: + * @appsrc: a #GstAppSrc + * @buffer: a #GstBuffer to push * * Adds a buffer to the queue of buffers that the appsrc element will * push to its source pad. This function takes ownership of the buffer. + * + * Returns: #GST_FLOW_OK when the buffer was successfuly queued. + * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING. + * #GST_FLOW_UNEXPECTED when EOS occured. */ -void +GstFlowReturn gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer) { - g_return_if_fail (appsrc); - g_return_if_fail (GST_IS_APP_SRC (appsrc)); - g_return_if_fail (GST_IS_BUFFER (buffer)); + g_return_val_if_fail (appsrc, GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); g_mutex_lock (appsrc->mutex); + /* can't accept buffers when we are flushing or EOS */ + if (appsrc->flushing) + goto flushing; + + if (appsrc->is_eos) + goto eos; + GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer); g_queue_push_tail (appsrc->queue, buffer); + + appsrc->queued_bytes += GST_BUFFER_SIZE (buffer); + if (appsrc->queued_bytes >= appsrc->max_bytes) { + GST_DEBUG_OBJECT (appsrc, "queue filled (%u >= %u), signal enough-data", + appsrc->queued_bytes, appsrc->max_bytes); + g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0, NULL); + } g_cond_signal (appsrc->cond); g_mutex_unlock (appsrc->mutex); + + return GST_FLOW_OK; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer); + gst_buffer_unref (buffer); + return GST_FLOW_WRONG_STATE; + } +eos: + { + GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer); + gst_buffer_unref (buffer); + return GST_FLOW_UNEXPECTED; + } } /** * gst_app_src_end_of_stream: - * @appsrc: + * @appsrc: a #GstAppSrc * * Indicates to the appsrc element that the last buffer queued in the * element is the last buffer of the stream. + * + * Returns: #GST_FLOW_OK when the EOS was successfuly queued. + * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING. */ -void +GstFlowReturn gst_app_src_end_of_stream (GstAppSrc * appsrc) { - g_return_if_fail (appsrc); - g_return_if_fail (GST_IS_APP_SRC (appsrc)); + g_return_val_if_fail (appsrc, GST_FLOW_ERROR); + g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR); g_mutex_lock (appsrc->mutex); + /* can't accept buffers when we are flushing. We can accept them when we are + * EOS although it will not do anything. */ + if (appsrc->flushing) + goto flushing; + GST_DEBUG_OBJECT (appsrc, "sending EOS"); appsrc->is_eos = TRUE; g_cond_signal (appsrc->cond); g_mutex_unlock (appsrc->mutex); + + return GST_FLOW_OK; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing"); + return GST_FLOW_WRONG_STATE; + } +} + +/*** GSTURIHANDLER INTERFACE *************************************************/ + +static GstURIType +gst_app_src_uri_get_type (void) +{ + return GST_URI_SRC; +} + +static gchar ** +gst_app_src_uri_get_protocols (void) +{ + static gchar *protocols[] = { "appsrc", NULL }; + + return protocols; +} +static const gchar * +gst_app_src_uri_get_uri (GstURIHandler * handler) +{ + return "appsrc"; +} + +static gboolean +gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri) +{ + gchar *protocol; + gboolean ret; + + protocol = gst_uri_get_protocol (uri); + ret = !strcmp (protocol, "appsrc"); + g_free (protocol); + + return ret; +} + +static void +gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data) +{ + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; + + iface->get_type = gst_app_src_uri_get_type; + iface->get_protocols = gst_app_src_uri_get_protocols; + iface->get_uri = gst_app_src_uri_get_uri; + iface->set_uri = gst_app_src_uri_set_uri; } diff --git a/gst-libs/gst/app/gstappsrc.h b/gst-libs/gst/app/gstappsrc.h index 4ca5cbc9..1ce73b4c 100644 --- a/gst-libs/gst/app/gstappsrc.h +++ b/gst-libs/gst/app/gstappsrc.h @@ -39,9 +39,27 @@ G_BEGIN_DECLS typedef struct _GstAppSrc GstAppSrc; typedef struct _GstAppSrcClass GstAppSrcClass; +/** + * GstAppStreamType: + * @GST_APP_STREAM_TYPE_STREAM: No seeking is supported in the stream, such as a + * live stream. + * @GST_APP_STREAM_TYPE_SEEKABLE: The stream is seekable but seeking might not + * be very fast, such as data from a webserver. + * @GST_APP_STREAM_TYPE_RANDOM_ACCESS: The stream is seekable and seeking is fast, + * such as in a local file. + * + * The stream type. + */ +typedef enum +{ + GST_APP_STREAM_TYPE_STREAM, + GST_APP_STREAM_TYPE_SEEKABLE, + GST_APP_STREAM_TYPE_RANDOM_ACCESS +} GstAppStreamType; + struct _GstAppSrc { - GstPushSrc pushsrc; + GstBaseSrc basesrc; /*< private >*/ GCond *cond; @@ -50,46 +68,49 @@ struct _GstAppSrc GstCaps *caps; gint64 size; - gboolean seekable; - guint max_buffers; + GstAppStreamType stream_type; + guint64 max_bytes; + GstFormat format; gboolean flushing; gboolean started; gboolean is_eos; + guint64 queued_bytes; + GstAppStreamType current_type; }; struct _GstAppSrcClass { - GstPushSrcClass pushsrc_class; + GstBaseSrcClass basesrc_class; /* signals */ - void (*need_data) (GstAppSrc *src); - void (*enough_data) (GstAppSrc *src); - gboolean (*seek_data) (GstAppSrc *src, guint64 offset); + void (*need_data) (GstAppSrc *src, guint length); + void (*enough_data) (GstAppSrc *src); + gboolean (*seek_data) (GstAppSrc *src, guint64 offset); /* actions */ - void (*push_buffer) (GstAppSrc *src, GstBuffer *buffer); - void (*end_of_stream) (GstAppSrc *src); + GstFlowReturn (*push_buffer) (GstAppSrc *src, GstBuffer *buffer); + GstFlowReturn (*end_of_stream) (GstAppSrc *src); }; GType gst_app_src_get_type(void); GST_DEBUG_CATEGORY_EXTERN (app_src_debug); -void gst_app_src_set_caps (GstAppSrc *appsrc, const GstCaps *caps); -GstCaps* gst_app_src_get_caps (GstAppSrc *appsrc); +void gst_app_src_set_caps (GstAppSrc *appsrc, const GstCaps *caps); +GstCaps* gst_app_src_get_caps (GstAppSrc *appsrc); -void gst_app_src_set_size (GstAppSrc *appsrc, gint64 size); -gint64 gst_app_src_get_size (GstAppSrc *appsrc); +void gst_app_src_set_size (GstAppSrc *appsrc, gint64 size); +gint64 gst_app_src_get_size (GstAppSrc *appsrc); -void gst_app_src_set_seekable (GstAppSrc *appsrc, gboolean seekable); -gboolean gst_app_src_get_seekable (GstAppSrc *appsrc); +void gst_app_src_set_stream_type (GstAppSrc *appsrc, GstAppStreamType type); +GstAppStreamType gst_app_src_get_stream_type (GstAppSrc *appsrc); -void gst_app_src_set_max_buffers (GstAppSrc *appsrc, guint max); -guint gst_app_src_get_max_buffers (GstAppSrc *appsrc); +void gst_app_src_set_max_bytes (GstAppSrc *appsrc, guint64 max); +guint64 gst_app_src_get_max_bytes (GstAppSrc *appsrc); -void gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer); -void gst_app_src_end_of_stream (GstAppSrc *appsrc); +GstFlowReturn gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer); +GstFlowReturn gst_app_src_end_of_stream (GstAppSrc *appsrc); G_END_DECLS |