/* GStreamer * Copyright (C) 2007 David Schleef * (C) 2008 Wim Taymans * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 02111-1307, USA. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include "gstapp-marshal.h" #include "gstappsrc.h" GST_DEBUG_CATEGORY (app_src_debug); #define GST_CAT_DEFAULT app_src_debug static const GstElementDetails app_src_details = GST_ELEMENT_DETAILS ("AppSrc", "Generic/Src", "Allow the application to feed buffers to a pipeline", "David Schleef , Wim Taymans dispose = gst_app_src_dispose; gobject_class->finalize = gst_app_src_finalize; gobject_class->set_property = gst_app_src_set_property; gobject_class->get_property = gst_app_src_get_property; g_object_class_install_property (gobject_class, PROP_CAPS, g_param_spec_boxed ("caps", "Caps", "The allowed caps for the src pad", GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SIZE, g_param_spec_int64 ("size", "Size", "The size of the data stream (-1 if unknown)", -1, G_MAXINT64, DEFAULT_PROP_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SEEKABLE, g_param_spec_boolean ("seekable", "Seekable", "If the source is seekable", DEFAULT_PROP_SEEKABLE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS, g_param_spec_uint ("max-buffers", "Max Buffers", "The maximum number of buffers to queue internally (0 = unlimited)", 0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc::need-data: * @appsrc: the appsrc element that emited the signal * * Signal that the source needs more data. In the callback you should call * push-buffer or end-of-stream. */ gst_app_src_signals[SIGNAL_NEED_DATA] = g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, need_data), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); /** * GstAppSrc::enough-data: * @appsrc: the appsrc element that emited the signal * * Signal that the source has enough data. It is recommended that the * application stops calling push-buffer until the need-data signal is * emited again to avoid excessive buffer queueing. */ gst_app_src_signals[SIGNAL_NEED_DATA] = g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, enough_data), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); /** * GstAppSrc::seek-data: * @appsrc: the appsrc element that emited the signal * @offset: the offset to seek to * * Seek to the given offset. The next push-buffer should produce buffers from * the new @offset. */ gst_app_src_signals[SIGNAL_SEEK_DATA] = g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, seek_data), NULL, NULL, gst_app_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64); gst_app_src_signals[SIGNAL_PUSH_BUFFER] = g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, push_buffer), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, GST_TYPE_BUFFER); gst_app_src_signals[SIGNAL_END_OF_STREAM] = g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, end_of_stream), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); pushsrc_class->create = gst_app_src_create; basesrc_class->start = gst_app_src_start; basesrc_class->stop = gst_app_src_stop; basesrc_class->unlock = gst_app_src_unlock; basesrc_class->unlock_stop = gst_app_src_unlock_stop; } static void gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass) { appsrc->mutex = g_mutex_new (); appsrc->cond = g_cond_new (); appsrc->queue = g_queue_new (); appsrc->size = DEFAULT_PROP_SIZE; appsrc->seekable = DEFAULT_PROP_SEEKABLE; appsrc->max_buffers = DEFAULT_PROP_MAX_BUFFERS; } static void gst_app_src_dispose (GObject * obj) { GstAppSrc *appsrc = GST_APP_SRC (obj); if (appsrc->caps) { gst_caps_unref (appsrc->caps); appsrc->caps = NULL; } G_OBJECT_CLASS (parent_class)->dispose (obj); } static void gst_app_src_finalize (GObject * obj) { GstAppSrc *appsrc = GST_APP_SRC (obj); g_mutex_free (appsrc->mutex); g_cond_free (appsrc->cond); g_queue_free (appsrc->queue); G_OBJECT_CLASS (parent_class)->finalize (obj); } static void gst_app_src_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstAppSrc *appsrc = GST_APP_SRC (object); switch (prop_id) { case PROP_CAPS: gst_app_src_set_caps (appsrc, gst_value_get_caps (value)); break; case PROP_SIZE: gst_app_src_set_size (appsrc, g_value_get_int64 (value)); break; case PROP_SEEKABLE: gst_app_src_set_seekable (appsrc, g_value_get_boolean (value)); break; case PROP_MAX_BUFFERS: gst_app_src_set_max_buffers (appsrc, g_value_get_uint (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static void gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstAppSrc *appsrc = GST_APP_SRC (object); switch (prop_id) { case PROP_CAPS: { GstCaps *caps; caps = gst_app_src_get_caps (appsrc); gst_value_set_caps (value, caps); if (caps) gst_caps_unref (caps); break; } case PROP_SIZE: g_value_set_int64 (value, gst_app_src_get_size (appsrc)); break; case PROP_SEEKABLE: g_value_set_boolean (value, gst_app_src_get_seekable (appsrc)); break; case PROP_MAX_BUFFERS: g_value_set_uint (value, gst_app_src_get_max_buffers (appsrc)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static gboolean gst_app_src_unlock (GstBaseSrc * psrc) { GstAppSrc *appsrc = GST_APP_SRC (psrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "unlock start"); appsrc->flushing = TRUE; g_cond_signal (appsrc->cond); g_mutex_unlock (appsrc->mutex); return TRUE; } static gboolean gst_app_src_unlock_stop (GstBaseSrc * psrc) { GstAppSrc *appsrc = GST_APP_SRC (psrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "unlock stop"); appsrc->flushing = FALSE; g_cond_signal (appsrc->cond); g_mutex_unlock (appsrc->mutex); return TRUE; } static gboolean gst_app_src_start (GstBaseSrc * psrc) { GstAppSrc *appsrc = GST_APP_SRC (psrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "starting"); appsrc->started = TRUE; g_mutex_unlock (appsrc->mutex); return TRUE; } static gboolean gst_app_src_stop (GstBaseSrc * psrc) { GstAppSrc *appsrc = GST_APP_SRC (psrc); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "stopping"); appsrc->is_eos = FALSE; appsrc->flushing = TRUE; appsrc->started = FALSE; g_mutex_unlock (appsrc->mutex); return TRUE; } static GstFlowReturn gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf) { GstAppSrc *appsrc = GST_APP_SRC (psrc); GstFlowReturn ret; g_mutex_lock (appsrc->mutex); while (TRUE) { /* check flushing first */ if (appsrc->flushing) goto flushing; /* return data as long as we have some */ if (!g_queue_is_empty (appsrc->queue)) { *buf = g_queue_pop_head (appsrc->queue); gst_buffer_set_caps (*buf, appsrc->caps); GST_DEBUG_OBJECT (appsrc, "we have buffer %p", *buf); ret = GST_FLOW_OK; break; } /* check EOS */ if (appsrc->is_eos) goto eos; /* nothing to return, wait a while for new data or flushing */ g_cond_wait (appsrc->cond, appsrc->mutex); } g_mutex_unlock (appsrc->mutex); return ret; /* ERRORS */ flushing: { GST_DEBUG_OBJECT (appsrc, "we are flushing"); g_mutex_unlock (appsrc->mutex); return GST_FLOW_WRONG_STATE; } eos: { GST_DEBUG_OBJECT (appsrc, "we are EOS"); g_mutex_unlock (appsrc->mutex); return GST_FLOW_UNEXPECTED; } } /* external API */ /** * gst_app_src_set_caps: * @appsrc: a #GstAppSrc * @caps: caps to set * * Set the capabilities on the appsrc element. This function takes * a copy of the caps structure. After calling this method, the source will * only produce caps that match @caps. @caps must be fixed and the caps on the * buffers must match the caps or left NULL. */ void gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps) { GstCaps *old; g_return_if_fail (GST_IS_APP_SRC (appsrc)); GST_OBJECT_LOCK (appsrc); GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps); if ((old = appsrc->caps) != caps) { if (caps) appsrc->caps = gst_caps_copy (caps); else appsrc->caps = NULL; if (old) gst_caps_unref (old); } GST_OBJECT_UNLOCK (appsrc); } /** * gst_app_src_get_caps: * @appsrc: a #GstAppSrc * * Get the configured caps on @appsrc. * * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage. */ GstCaps * gst_app_src_get_caps (GstAppSrc * appsrc) { GstCaps *caps; g_return_val_if_fail (appsrc != NULL, NULL); g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL); GST_OBJECT_LOCK (appsrc); if ((caps = appsrc->caps)) gst_caps_ref (caps); GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps); GST_OBJECT_UNLOCK (appsrc); return caps; } /** * gst_app_src_set_size: * @appsrc: a #GstAppSrc * @size: the size to set * * Set the size of the stream in bytes. A value of -1 means that the size is * not known. */ void gst_app_src_set_size (GstAppSrc * appsrc, gint64 size) { g_return_if_fail (appsrc != NULL); g_return_if_fail (GST_IS_APP_SRC (appsrc)); GST_OBJECT_LOCK (appsrc); GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size); appsrc->size = size; GST_OBJECT_UNLOCK (appsrc); } /** * gst_app_src_get_size: * @appsrc: a #GstAppSrc * * Get the size of the stream in bytes. A value of -1 means that the size is * not known. * * Returns: the size of the stream previously set with gst_app_src_set_size(); */ gint64 gst_app_src_get_size (GstAppSrc * appsrc) { gint64 size; g_return_val_if_fail (appsrc != NULL, -1); g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1); GST_OBJECT_LOCK (appsrc); size = appsrc->size; GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size); GST_OBJECT_UNLOCK (appsrc); return size; } /** * gst_app_src_set_seekable: * @appsrc: a #GstAppSrc * @seekable: the new state * * Set whether the data is seekable. When this flag is set to %TRUE, the * "seek" signal must be connected to. */ void gst_app_src_set_seekable (GstAppSrc * appsrc, gboolean seekable) { g_return_if_fail (appsrc != NULL); g_return_if_fail (GST_IS_APP_SRC (appsrc)); GST_OBJECT_LOCK (appsrc); GST_DEBUG_OBJECT (appsrc, "setting seekable of %d", seekable); appsrc->seekable = seekable; GST_OBJECT_UNLOCK (appsrc); } /** * gst_app_src_get_seekable: * @appsrc: a #GstAppSrc * * Get whether the stream is seekable. Control the seeking behaviour of the * stream with gst_app_src_set_seekable(). * * Returns: %TRUE if the stream is seekable. */ gboolean gst_app_src_get_seekable (GstAppSrc * appsrc) { gboolean seekable; g_return_val_if_fail (appsrc != NULL, FALSE); g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE); GST_OBJECT_LOCK (appsrc); seekable = appsrc->seekable; GST_DEBUG_OBJECT (appsrc, "getting seekable of %d", seekable); GST_OBJECT_UNLOCK (appsrc); return seekable; } /** * gst_app_src_set_max_buffers: * @appsrc: a #GstAppSrc * @max: the maximum number of buffers to queue * * Set the maximum amount of buffers that can be queued in @appsrc. * After the maximum amount of buffers are queued, @appsrc will emit the * "enough-data" signal. */ void gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max) { g_return_if_fail (GST_IS_APP_SRC (appsrc)); g_mutex_lock (appsrc->mutex); if (max != appsrc->max_buffers) { GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %u", max); appsrc->max_buffers = max; /* signal the change */ g_cond_signal (appsrc->cond); } g_mutex_unlock (appsrc->mutex); } /** * gst_app_src_get_max_buffers: * @appsrc: a #GstAppSrc * * Get the maximum amount of buffers that can be queued in @appsrc. * * Returns: The maximum amount of buffers that can be queued. */ guint gst_app_src_get_max_buffers (GstAppSrc * appsrc) { guint result; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0); g_mutex_lock (appsrc->mutex); result = appsrc->max_buffers; GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %u", result); g_mutex_unlock (appsrc->mutex); return result; } /** * gst_app_src_push_buffer: * @appsrc: * @buffer: * * Adds a buffer to the queue of buffers that the appsrc element will * push to its source pad. This function takes ownership of the buffer. */ void gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer) { g_return_if_fail (appsrc); g_return_if_fail (GST_IS_APP_SRC (appsrc)); g_return_if_fail (GST_IS_BUFFER (buffer)); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer); g_queue_push_tail (appsrc->queue, buffer); g_cond_signal (appsrc->cond); g_mutex_unlock (appsrc->mutex); } /** * gst_app_src_end_of_stream: * @appsrc: * * Indicates to the appsrc element that the last buffer queued in the * element is the last buffer of the stream. */ void gst_app_src_end_of_stream (GstAppSrc * appsrc) { g_return_if_fail (appsrc); g_return_if_fail (GST_IS_APP_SRC (appsrc)); g_mutex_lock (appsrc->mutex); GST_DEBUG_OBJECT (appsrc, "sending EOS"); appsrc->is_eos = TRUE; g_cond_signal (appsrc->cond); g_mutex_unlock (appsrc->mutex); }