summaryrefslogtreecommitdiffstats
path: root/gst
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@gmail.com>2007-04-03 09:13:17 +0000
committerWim Taymans <wim.taymans@gmail.com>2007-04-03 09:13:17 +0000
commit93b433bd166f99a7e86797077f8100607d5ab943 (patch)
treee05165289f02e21afb07def4979b2420681dc95b /gst
parent14c0bebf4b587eb747649987eb09aeab3e31dbe8 (diff)
downloadgst-plugins-bad-93b433bd166f99a7e86797077f8100607d5ab943.tar.gz
gst-plugins-bad-93b433bd166f99a7e86797077f8100607d5ab943.tar.bz2
gst-plugins-bad-93b433bd166f99a7e86797077f8100607d5ab943.zip
Add RTP session management elements. Still in progress.
Original commit message from CVS: * configure.ac: * gst/rtpmanager/Makefile.am: * gst/rtpmanager/async_jitter_queue.c: (async_jitter_queue_new), (signal_waiting_threads), (async_jitter_queue_ref), (async_jitter_queue_ref_unlocked), (async_jitter_queue_set_low_threshold), (async_jitter_queue_set_high_threshold), (async_jitter_queue_set_max_queue_length), (async_jitter_queue_get_g_queue), (calculate_ts_diff), (async_jitter_queue_length_ts_units_unlocked), (async_jitter_queue_unref_and_unlock), (async_jitter_queue_unref), (async_jitter_queue_lock), (async_jitter_queue_unlock), (async_jitter_queue_push), (async_jitter_queue_push_unlocked), (async_jitter_queue_push_sorted), (async_jitter_queue_push_sorted_unlocked), (async_jitter_queue_insert_after_unlocked), (async_jitter_queue_pop_intern_unlocked), (async_jitter_queue_pop), (async_jitter_queue_pop_unlocked), (async_jitter_queue_length), (async_jitter_queue_length_unlocked), (async_jitter_queue_set_flushing_unlocked), (async_jitter_queue_unset_flushing_unlocked), (async_jitter_queue_set_blocking_unlocked): * gst/rtpmanager/async_jitter_queue.h: * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_base_init), (gst_rtp_bin_class_init), (gst_rtp_bin_init), (gst_rtp_bin_finalize), (gst_rtp_bin_set_property), (gst_rtp_bin_get_property), (gst_rtp_bin_change_state), (gst_rtp_bin_request_new_pad), (gst_rtp_bin_release_pad): * gst/rtpmanager/gstrtpbin.h: * gst/rtpmanager/gstrtpclient.c: (new_pad), (create_stream), (free_stream), (find_stream_by_ssrc), (gst_rtp_client_base_init), (gst_rtp_client_class_init), (gst_rtp_client_init), (gst_rtp_client_finalize), (gst_rtp_client_set_property), (gst_rtp_client_get_property), (gst_rtp_client_change_state), (gst_rtp_client_request_new_pad), (gst_rtp_client_release_pad): * gst/rtpmanager/gstrtpclient.h: * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_base_init), (gst_rtp_jitter_buffer_class_init), (gst_rtp_jitter_buffer_init), (gst_rtp_jitter_buffer_dispose), (gst_rtp_jitter_buffer_getcaps), (gst_jitter_buffer_sink_setcaps), (free_func), (gst_rtp_jitter_buffer_flush_start), (gst_rtp_jitter_buffer_flush_stop), (gst_rtp_jitter_buffer_src_activate_push), (gst_rtp_jitter_buffer_change_state), (priv_compare_rtp_seq_lt), (compare_rtp_buffers_seq_num), (gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_query), (gst_rtp_jitter_buffer_set_property), (gst_rtp_jitter_buffer_get_property): * gst/rtpmanager/gstrtpjitterbuffer.h: * gst/rtpmanager/gstrtpmanager.c: (plugin_init): * gst/rtpmanager/gstrtpptdemux.c: (gst_rtp_pt_demux_base_init), (gst_rtp_pt_demux_class_init), (gst_rtp_pt_demux_init), (gst_rtp_pt_demux_finalize), (gst_rtp_pt_demux_chain), (gst_rtp_pt_demux_getcaps), (find_pad_for_pt), (gst_rtp_pt_demux_setup), (gst_rtp_pt_demux_release), (gst_rtp_pt_demux_change_state): * gst/rtpmanager/gstrtpptdemux.h: * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init), (gst_rtp_session_class_init), (gst_rtp_session_init), (gst_rtp_session_finalize), (gst_rtp_session_set_property), (gst_rtp_session_get_property), (gst_rtp_session_change_state), (gst_rtp_session_chain_recv_rtp), (gst_rtp_session_chain_recv_rtcp), (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), (create_recv_rtcp_sink), (create_send_rtp_sink), (create_rtcp_src), (gst_rtp_session_request_new_pad), (gst_rtp_session_release_pad): * gst/rtpmanager/gstrtpsession.h: Add RTP session management elements. Still in progress.
Diffstat (limited to 'gst')
-rw-r--r--gst/rtpmanager/Makefile.am23
-rw-r--r--gst/rtpmanager/async_jitter_queue.c679
-rw-r--r--gst/rtpmanager/async_jitter_queue.h130
-rw-r--r--gst/rtpmanager/gstrtpbin.c279
-rw-r--r--gst/rtpmanager/gstrtpbin.h56
-rw-r--r--gst/rtpmanager/gstrtpclient.c482
-rw-r--r--gst/rtpmanager/gstrtpclient.h56
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c1085
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.h74
-rw-r--r--gst/rtpmanager/gstrtpmanager.c55
-rw-r--r--gst/rtpmanager/gstrtpptdemux.c350
-rw-r--r--gst/rtpmanager/gstrtpptdemux.h57
-rw-r--r--gst/rtpmanager/gstrtpsession.c453
-rw-r--r--gst/rtpmanager/gstrtpsession.h62
14 files changed, 3841 insertions, 0 deletions
diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am
new file mode 100644
index 00000000..561bf52d
--- /dev/null
+++ b/gst/rtpmanager/Makefile.am
@@ -0,0 +1,23 @@
+
+plugin_LTLIBRARIES = libgstrtpmanager.la
+
+libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
+ gstrtpbin.c \
+ gstrtpclient.c \
+ async_jitter_queue.c \
+ gstrtpjitterbuffer.c \
+ gstrtpptdemux.c \
+ gstrtpsession.c
+
+noinst_HEADERS = gstrtpbin.h \
+ gstrtpclient.h \
+ async_jitter_queue.h \
+ gstrtpjitterbuffer.h \
+ gstrtpsession.h
+
+libgstrtpmanager_la_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(ERROR_CFLAGS)
+libgstrtpmanager_la_LIBADD = $(GST_LIBS_LIBS)
+libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@
+
+EXTRA_DIST =
+
diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c
new file mode 100644
index 00000000..22a8ed0e
--- /dev/null
+++ b/gst/rtpmanager/async_jitter_queue.c
@@ -0,0 +1,679 @@
+/*
+ * Async Jitter Queue based on g_async_queue
+ * This code is GST RTP smart and deals with timestamps
+ *
+ * Farsight Voice+Video library
+ * Copyright 2007 Collabora Ltd,
+ * Copyright 2007 Nokia Corporation
+ * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
+ *
+ * This is an async queue that has a buffering mecanism based on the set low
+ * and high threshold. When the lower threshold is reached, the queue will
+ * fill itself up until the higher threshold is reached before allowing any
+ * pops to occur. This allows a jitterbuffer of at least min threshold items
+ * to be available.
+ */
+
+/* GLIB - Library of useful routines for C programming
+ * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
+ *
+ * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
+ * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * MT safe
+ */
+
+#include "config.h"
+
+#include "async_jitter_queue.h"
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+#define DEFAULT_LOW_THRESHOLD 0.1
+#define DEFAULT_HIGH_THRESHOLD 0.9
+
+struct _AsyncJitterQueue
+{
+ GMutex *mutex;
+ GCond *cond;
+ GQueue *queue;
+ guint waiting_threads;
+ gint32 ref_count;
+ gfloat low_threshold;
+ gfloat high_threshold;
+ guint32 max_queue_length;
+ gboolean buffering;
+ gboolean pop_flushing;
+ gboolean pop_blocking;
+ guint pops_remaining;
+ guint32 tail_buffer_duration;
+};
+
+/**
+ * async_jitter_queue_new:
+ *
+ * Creates a new asynchronous queue with the initial reference count of 1.
+ *
+ * Return value: the new #AsyncJitterQueue.
+ **/
+AsyncJitterQueue *
+async_jitter_queue_new (void)
+{
+ AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
+
+ retval->mutex = g_mutex_new ();
+ retval->cond = g_cond_new ();
+ retval->queue = g_queue_new ();
+ retval->waiting_threads = 0;
+ retval->ref_count = 1;
+ retval->low_threshold = DEFAULT_LOW_THRESHOLD;
+ retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
+ retval->buffering = TRUE; /* we need to buffer initially */
+ retval->pop_flushing = TRUE;
+ retval->pop_blocking = TRUE;
+ retval->pops_remaining = 0;
+ retval->tail_buffer_duration = 0;
+ return retval;
+}
+
+/* checks buffering state and wakes up waiting pops */
+void
+signal_waiting_threads (AsyncJitterQueue * queue)
+{
+ if (async_jitter_queue_length_ts_units_unlocked (queue) >=
+ queue->high_threshold * queue->max_queue_length) {
+ queue->buffering = FALSE;
+ }
+
+ if (queue->waiting_threads > 0) {
+ if (!queue->buffering) {
+ g_cond_signal (queue->cond);
+ }
+ }
+}
+
+/**
+ * async_jitter_queue_ref:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Increases the reference count of the asynchronous @queue by 1. You
+ * do not need to hold the lock to call this function.
+ *
+ * Returns: the @queue that was passed in (since 2.6)
+ **/
+AsyncJitterQueue *
+async_jitter_queue_ref (AsyncJitterQueue * queue)
+{
+ g_return_val_if_fail (queue, NULL);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+ g_atomic_int_inc (&queue->ref_count);
+
+ return queue;
+}
+
+/**
+ * async_jitter_queue_ref_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Increases the reference count of the asynchronous @queue by 1.
+ **/
+void
+async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ g_atomic_int_inc (&queue->ref_count);
+}
+
+/**
+ * async_jitter_queue_set_low_threshold:
+ * @queue: a #AsyncJitterQueue.
+ * @threshold: the lower threshold (fraction of max size)
+ *
+ * Sets the low threshold on the queue. This threshold indicates the minimum
+ * number of items allowed in the queue before we refill it up to the set
+ * maximum threshold.
+ **/
+void
+async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
+ gfloat threshold)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->low_threshold = threshold;
+}
+
+/**
+ * async_jitter_queue_set_max_threshold:
+ * @queue: a #AsyncJitterQueue.
+ * @threshold: the higher threshold (fraction of max size)
+ *
+ * Sets the high threshold on the queue. This threshold indicates the amount of
+ * items to fill in the queue before releasing any blocking pop calls. This
+ * blocking mecanism is only triggered when we reach the low threshold and must
+ * refill the queue.
+ **/
+void
+async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
+ gfloat threshold)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->high_threshold = threshold;
+}
+
+/* set the maximum queue length in RTP timestamp units */
+void
+async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
+ guint32 max_length)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->max_queue_length = max_length;
+}
+
+GQueue *
+async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
+{
+ g_return_val_if_fail (queue, NULL);
+
+ return queue->queue;
+}
+
+static guint32
+calculate_ts_diff (guint32 high_ts, guint32 low_ts)
+{
+ /* it needs to work if ts wraps */
+ if (high_ts >= low_ts) {
+ return high_ts - low_ts;
+ } else {
+ return high_ts + G_MAXUINT32 + 1 - low_ts;
+ }
+}
+
+/* this function returns the length of the queue in timestamp units. It will
+ * also add the duration of the last buffer in the queue */
+/* FIXME This function wrongly assumes that there are no missing packets inside
+ * the buffer, in reality it needs to check for gaps and subsctract those from
+ * the total */
+guint32
+async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
+{
+ guint32 tail_ts;
+ guint32 head_ts;
+ guint32 ret;
+ GstBuffer *head;
+ GstBuffer *tail;
+
+ g_return_val_if_fail (queue, 0);
+
+ if (queue->queue->length < 2) {
+ return 0;
+ }
+
+ tail = g_queue_peek_tail (queue->queue);
+ head = g_queue_peek_head (queue->queue);
+
+ if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
+ return 0;
+
+ tail_ts = gst_rtp_buffer_get_timestamp (tail);
+ head_ts = gst_rtp_buffer_get_timestamp (head);
+
+ ret = calculate_ts_diff (head_ts, tail_ts);
+
+ /* let's add the duration of the tail buffer */
+ ret += queue->tail_buffer_duration;
+
+ return ret;
+}
+
+/**
+ * async_jitter_queue_unref_and_unlock:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Decreases the reference count of the asynchronous @queue by 1 and
+ * releases the lock. This function must be called while holding the
+ * @queue's lock. If the reference count went to 0, the @queue will be
+ * destroyed and the memory allocated will be freed.
+ **/
+void
+async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ g_mutex_unlock (queue->mutex);
+ async_jitter_queue_unref (queue);
+}
+
+/**
+ * async_jitter_queue_unref:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Decreases the reference count of the asynchronous @queue by 1. If
+ * the reference count went to 0, the @queue will be destroyed and the
+ * memory allocated will be freed. So you are not allowed to use the
+ * @queue afterwards, as it might have disappeared. You do not need to
+ * hold the lock to call this function.
+ **/
+void
+async_jitter_queue_unref (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ if (g_atomic_int_dec_and_test (&queue->ref_count)) {
+ g_return_if_fail (queue->waiting_threads == 0);
+ g_mutex_free (queue->mutex);
+ if (queue->cond)
+ g_cond_free (queue->cond);
+ g_queue_free (queue->queue);
+ g_free (queue);
+ }
+}
+
+/**
+ * async_jitter_queue_lock:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Acquires the @queue's lock. After that you can only call the
+ * <function>async_jitter_queue_*_unlocked()</function> function variants on that
+ * @queue. Otherwise it will deadlock.
+ **/
+void
+async_jitter_queue_lock (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ g_mutex_lock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_unlock:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Releases the queue's lock.
+ **/
+void
+async_jitter_queue_unlock (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ g_mutex_unlock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_push:
+ * @queue: a #AsyncJitterQueue.
+ * @data: @data to push into the @queue.
+ *
+ * Pushes the @data into the @queue. @data must not be %NULL.
+ **/
+void
+async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+ g_return_if_fail (data);
+
+ g_mutex_lock (queue->mutex);
+ async_jitter_queue_push_unlocked (queue, data);
+ g_mutex_unlock (queue->mutex);
+}
+
+/**
+ * async_jitter_queue_push_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @data: @data to push into the @queue.
+ *
+ * Pushes the @data into the @queue. @data must not be %NULL. This
+ * function must be called while holding the @queue's lock.
+ **/
+void
+async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+ g_return_if_fail (data);
+
+ g_queue_push_head (queue->queue, data);
+
+ signal_waiting_threads (queue);
+}
+
+/**
+ * async_jitter_queue_push_sorted:
+ * @queue: a #AsyncJitterQueue
+ * @data: the @data to push into the @queue
+ * @func: the #GCompareDataFunc is used to sort @queue. This function
+ * is passed two elements of the @queue. The function should return
+ * 0 if they are equal, a negative value if the first element
+ * should be higher in the @queue or a positive value if the first
+ * element should be lower in the @queue than the second element.
+ * @user_data: user data passed to @func.
+ *
+ * Inserts @data into @queue using @func to determine the new
+ * position.
+ *
+ * This function requires that the @queue is sorted before pushing on
+ * new elements.
+ *
+ * This function will lock @queue before it sorts the queue and unlock
+ * it when it is finished.
+ *
+ * For an example of @func see async_jitter_queue_sort().
+ *
+ * Since: 2.10
+ **/
+gboolean
+async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
+ gpointer data, GCompareDataFunc func, gpointer user_data)
+{
+ g_return_val_if_fail (queue != NULL, FALSE);
+ gboolean ret;
+
+ g_mutex_lock (queue->mutex);
+ ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
+ g_mutex_unlock (queue->mutex);
+
+ return ret;
+}
+
+/**
+ * async_jitter_queue_push_sorted_unlocked:
+ * @queue: a #AsyncJitterQueue
+ * @data: the @data to push into the @queue
+ * @func: the #GCompareDataFunc is used to sort @queue. This function
+ * is passed two elements of the @queue. The function should return
+ * 0 if they are equal, a negative value if the first element
+ * should be higher in the @queue or a positive value if the first
+ * element should be lower in the @queue than the second element.
+ * @user_data: user data passed to @func.
+ *
+ * Inserts @data into @queue using @func to determine the new
+ * position.
+ *
+ * This function requires that the @queue is sorted before pushing on
+ * new elements.
+ *
+ * If @GCompareDataFunc returns 0, this function does not insert @data and
+ * return FALSE.
+ *
+ * This function is called while holding the @queue's lock.
+ *
+ * For an example of @func see async_jitter_queue_sort().
+ *
+ * Since: 2.10
+ **/
+gboolean
+async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
+ gpointer data, GCompareDataFunc func, gpointer user_data)
+{
+ GList *list;
+ gint func_ret = TRUE;
+
+ g_return_val_if_fail (queue != NULL, FALSE);
+
+ list = queue->queue->head;
+ while (list && (func_ret = func (list->data, data, user_data)) < 0)
+ list = list->next;
+
+ if (func_ret == 0) {
+ return FALSE;
+ }
+ if (list) {
+ g_queue_insert_before (queue->queue, list, data);
+ } else {
+ g_queue_push_tail (queue->queue, data);
+ }
+
+ signal_waiting_threads (queue);
+ return TRUE;
+}
+
+void
+async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
+ GList * sibling, gpointer data)
+{
+ g_return_if_fail (queue != NULL);
+
+ g_queue_insert_before (queue->queue, sibling, data);
+
+ signal_waiting_threads (queue);
+}
+
+static gpointer
+async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
+{
+ gpointer retval;
+ GstBuffer *tail_buffer = NULL;
+
+ if (queue->pop_flushing)
+ return NULL;
+
+ while (queue->pop_blocking) {
+ queue->waiting_threads++;
+ g_cond_wait (queue->cond, queue->mutex);
+ queue->waiting_threads--;
+ if (queue->pop_flushing)
+ return NULL;
+ }
+
+ if (async_jitter_queue_length_ts_units_unlocked (queue) <=
+ queue->low_threshold * queue->max_queue_length
+ && queue->pops_remaining == 0) {
+ if (!queue->buffering) {
+ queue->buffering = TRUE;
+ queue->pops_remaining = queue->queue->length;
+ } else {
+ while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
+ queue->waiting_threads++;
+ g_cond_wait (queue->cond, queue->mutex);
+ queue->waiting_threads--;
+ if (queue->pop_flushing)
+ return NULL;
+ }
+ }
+ }
+
+ retval = g_queue_pop_tail (queue->queue);
+ if (queue->pops_remaining)
+ queue->pops_remaining--;
+
+ tail_buffer = g_queue_peek_tail (queue->queue);
+ if (tail_buffer) {
+ if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
+ queue->tail_buffer_duration = 0;
+ } else if (gst_rtp_buffer_get_seq (tail_buffer)
+ - gst_rtp_buffer_get_seq (retval) == 1) {
+ queue->tail_buffer_duration =
+ calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
+ gst_rtp_buffer_get_timestamp (retval));
+ } else {
+ /* There is a sequence number gap -> we can't calculate the duration
+ * let's just set it to 0 */
+ queue->tail_buffer_duration = 0;
+ }
+ }
+
+ g_assert (retval);
+
+ return retval;
+}
+
+/**
+ * async_jitter_queue_pop:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Pops data from the @queue. This function blocks until data become
+ * available. If pop is disabled, tis function return NULL.
+ *
+ * Return value: data from the queue.
+ **/
+gpointer
+async_jitter_queue_pop (AsyncJitterQueue * queue)
+{
+ gpointer retval;
+
+ g_return_val_if_fail (queue, NULL);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+ g_mutex_lock (queue->mutex);
+ retval = async_jitter_queue_pop_intern_unlocked (queue);
+ g_mutex_unlock (queue->mutex);
+
+ return retval;
+}
+
+/**
+ * async_jitter_queue_pop_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Pops data from the @queue. This function blocks until data become
+ * available. This function must be called while holding the @queue's
+ * lock.
+ *
+ * Return value: data from the queue.
+ **/
+gpointer
+async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
+{
+ g_return_val_if_fail (queue, NULL);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
+
+ return async_jitter_queue_pop_intern_unlocked (queue);
+}
+
+/**
+ * async_jitter_queue_length:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Returns the length of the queue
+ * Return value: the length of the @queue.
+ **/
+gint
+async_jitter_queue_length (AsyncJitterQueue * queue)
+{
+ gint retval;
+
+ g_return_val_if_fail (queue, 0);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
+
+ g_mutex_lock (queue->mutex);
+ retval = queue->queue->length;
+ g_mutex_unlock (queue->mutex);
+
+ return retval;
+}
+
+/**
+ * async_jitter_queue_length_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ *
+ * Returns the length of the queue.
+ *
+ * Return value: the length of the @queue.
+ **/
+gint
+async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
+{
+ g_return_val_if_fail (queue, 0);
+ g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
+
+ return queue->queue->length;
+}
+
+/**
+ * async_jitter_queue_set_flushing_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @free_func: a function to call to free the elements
+ * @user_data: user data passed to @free_func
+ *
+ * This function is used to set/unset flushing. If flushing is set any
+ * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
+ * return NULL. Flushing is set by default.
+ */
+void
+async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
+ GFunc free_func, gpointer user_data)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->pop_flushing = TRUE;
+ /* let's unblock any remaining pops */
+ if (queue->waiting_threads > 0)
+ g_cond_broadcast (queue->cond);
+ /* free data from queue */
+ g_queue_foreach (queue->queue, free_func, user_data);
+}
+
+/**
+ * async_jitter_queue_unset_flushing_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @free_func: a function to call to free the elements
+ * @user_data: user data passed to @free_func
+ *
+ * This function is used to set/unset flushing. If flushing is set any
+ * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
+ * return NULL. Flushing is set by default.
+ */
+void
+async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->pop_flushing = FALSE;
+ /* let's unblock any remaining pops */
+ if (queue->waiting_threads > 0)
+ g_cond_broadcast (queue->cond);
+}
+
+/**
+ * async_jitter_queue_set_blocking_unlocked:
+ * @queue: a #AsyncJitterQueue.
+ * @enabled: a boolean to enable/disable blocking
+ *
+ * This function is used to enable/disable blocking. If blocking is enabled any
+ * pops will be blocked until the queue is unblocked. The queue is blocked by
+ * default.
+ */
+void
+async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
+ gboolean blocking)
+{
+ g_return_if_fail (queue);
+ g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
+
+ queue->pop_blocking = blocking;
+ /* let's unblock any remaining pops */
+ if (queue->waiting_threads > 0)
+ g_cond_broadcast (queue->cond);
+}
diff --git a/gst/rtpmanager/async_jitter_queue.h b/gst/rtpmanager/async_jitter_queue.h
new file mode 100644
index 00000000..bea76d6d
--- /dev/null
+++ b/gst/rtpmanager/async_jitter_queue.h
@@ -0,0 +1,130 @@
+/* Async Jitter Queue based on g_async_queue
+ *
+ * Farsight Voice+Video library
+ * Copyright 2007 Collabora Ltd,
+ * Copyright 2007 Nokia Corporation
+ * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
+ */
+
+/* GLIB - Library of useful routines for C programming
+ * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * Modified by the GLib Team and others 1997-2000. See the AUTHORS
+ * file for a list of people on the GLib Team. See the ChangeLog
+ * files for a list of changes. These files are distributed with
+ * GLib at ftp://ftp.gtk.org/pub/gtk/.
+ */
+
+#ifndef __ASYNCJITTERQUEUE_H__
+#define __ASYNCJITTERQUEUE_H__
+
+#include <glib.h>
+#include <glib/gthread.h>
+
+G_BEGIN_DECLS
+
+typedef struct _AsyncJitterQueue AsyncJitterQueue;
+
+/* Asyncronous Queues, can be used to communicate between threads
+ */
+
+/* Get a new AsyncJitterQueue with the ref_count 1 */
+AsyncJitterQueue* async_jitter_queue_new (void);
+
+/* Lock and unlock a AsyncJitterQueue. All functions lock the queue for
+ * themselves, but in certain cirumstances you want to hold the lock longer,
+ * thus you lock the queue, call the *_unlocked functions and unlock it again.
+ */
+void async_jitter_queue_lock (AsyncJitterQueue *queue);
+void async_jitter_queue_unlock (AsyncJitterQueue *queue);
+
+/* Ref and unref the AsyncJitterQueue. */
+AsyncJitterQueue* async_jitter_queue_ref (AsyncJitterQueue *queue);
+void async_jitter_queue_unref (AsyncJitterQueue *queue);
+#ifndef G_DISABLE_DEPRECATED
+/* You don't have to hold the lock for calling *_ref and *_unref anymore. */
+void async_jitter_queue_ref_unlocked (AsyncJitterQueue *queue);
+void async_jitter_queue_unref_and_unlock (AsyncJitterQueue *queue);
+#endif /* !G_DISABLE_DEPRECATED */
+
+void async_jitter_queue_set_low_threshold (AsyncJitterQueue *queue,
+ gfloat threshold);
+void async_jitter_queue_set_high_threshold (AsyncJitterQueue *queue,
+ gfloat threshold);
+
+void async_jitter_queue_set_max_queue_length (AsyncJitterQueue *queue,
+ guint32 max_length);
+
+/* Push data into the async queue. Must not be NULL. */
+void async_jitter_queue_push (AsyncJitterQueue *queue,
+ gpointer data);
+void async_jitter_queue_push_unlocked (AsyncJitterQueue *queue,
+ gpointer data);
+gboolean async_jitter_queue_push_sorted (AsyncJitterQueue *queue,
+ gpointer data,
+ GCompareDataFunc func,
+ gpointer user_data);
+
+void async_jitter_queue_insert_after_unlocked(AsyncJitterQueue *queue,
+ GList *sibling,
+ gpointer data);
+
+gboolean async_jitter_queue_push_sorted_unlocked(AsyncJitterQueue *queue,
+ gpointer data,
+ GCompareDataFunc func,
+ gpointer user_data);
+
+/* Pop data from the async queue. When no data is there, the thread is blocked
+ * until data arrives. */
+gpointer async_jitter_queue_pop (AsyncJitterQueue *queue);
+gpointer async_jitter_queue_pop_unlocked (AsyncJitterQueue *queue);
+
+/* Try to pop data. NULL is returned in case of empty queue. */
+gpointer async_jitter_queue_try_pop (AsyncJitterQueue *queue);
+gpointer async_jitter_queue_try_pop_unlocked (AsyncJitterQueue *queue);
+
+/* Wait for data until at maximum until end_time is reached. NULL is returned
+ * in case of empty queue. */
+gpointer async_jitter_queue_timed_pop (AsyncJitterQueue *queue,
+ GTimeVal *end_time);
+gpointer async_jitter_queue_timed_pop_unlocked (AsyncJitterQueue *queue,
+ GTimeVal *end_time);
+
+/* Return the length of the queue. Negative values mean that threads
+ * are waiting, positve values mean that there are entries in the
+ * queue. Actually this function returns the length of the queue minus
+ * the number of waiting threads, async_jitter_queue_length == 0 could also
+ * mean 'n' entries in the queue and 'n' thread waiting. Such can
+ * happen due to locking of the queue or due to scheduling. */
+gint async_jitter_queue_length (AsyncJitterQueue *queue);
+gint async_jitter_queue_length_unlocked (AsyncJitterQueue *queue);
+
+void async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue* queue,
+ GFunc free_func, gpointer user_data);
+void async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue* queue);
+void async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue* queue,
+ gboolean blocking);
+guint32
+async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue *queue);
+
+G_END_DECLS
+
+#endif /* __ASYNCJITTERQUEUE_H__ */
+
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c
new file mode 100644
index 00000000..629f098d
--- /dev/null
+++ b/gst/rtpmanager/gstrtpbin.c
@@ -0,0 +1,279 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * SECTION:element-rtpbin
+ * @short_description: handle media from one RTP bin
+ * @see_also: rtpjitterbuffer, rtpclient, rtpsession
+ *
+ * <refsect2>
+ * <para>
+ * </para>
+ * <title>Example pipelines</title>
+ * <para>
+ * <programlisting>
+ * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink
+ * </programlisting>
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2007-04-02 (0.10.6)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <string.h>
+
+#include "gstrtpbin.h"
+
+/* elementfactory information */
+static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin",
+ "Filter/Editor/Video",
+ "Implement an RTP bin",
+ "Wim Taymans <wim@fluendo.com>");
+
+/* sink pads */
+static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
+static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%d",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+/* src pads */
+static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
+ GST_PAD_SRC,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
+static GstStaticPadTemplate rtpbin_send_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+#define GST_RTP_BIN_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRTPBinPrivate))
+
+struct _GstRTPBinPrivate
+{
+};
+
+/* signals and args */
+enum
+{
+ /* FILL ME */
+ LAST_SIGNAL
+};
+
+enum
+{
+ PROP_0
+};
+
+/* GObject vmethods */
+static void gst_rtp_bin_finalize (GObject * object);
+static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_rtp_bin_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+/* GstElement vmethods */
+static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element,
+ GstStateChange transition);
+static GstPad *gst_rtp_bin_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name);
+static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad);
+
+/*static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 }; */
+
+GST_BOILERPLATE (GstRTPBin, gst_rtp_bin, GstBin, GST_TYPE_BIN);
+
+static void
+gst_rtp_bin_base_init (gpointer klass)
+{
+ GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+ /* sink pads */
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpbin_send_rtp_sink_template));
+
+ /* src pads */
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpbin_recv_rtp_src_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpbin_send_rtcp_src_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpbin_send_rtp_src_template));
+
+ gst_element_class_set_details (element_class, &rtpbin_details);
+}
+
+static void
+gst_rtp_bin_class_init (GstRTPBinClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+
+ g_type_class_add_private (klass, sizeof (GstRTPBinPrivate));
+
+ gobject_class->finalize = gst_rtp_bin_finalize;
+ gobject_class->set_property = gst_rtp_bin_set_property;
+ gobject_class->get_property = gst_rtp_bin_get_property;
+
+ gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
+ gstelement_class->request_new_pad =
+ GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
+ gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
+}
+
+static void
+gst_rtp_bin_init (GstRTPBin * rtpbin, GstRTPBinClass * klass)
+{
+ rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
+}
+
+static void
+gst_rtp_bin_finalize (GObject * object)
+{
+ GstRTPBin *rtpbin;
+
+ rtpbin = GST_RTP_BIN (object);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_rtp_bin_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRTPBin *rtpbin;
+
+ rtpbin = GST_RTP_BIN (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtp_bin_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRTPBin *rtpbin;
+
+ rtpbin = GST_RTP_BIN (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn res;
+ GstRTPBin *rtpbin;
+
+ rtpbin = GST_RTP_BIN (element);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ break;
+ default:
+ break;
+ }
+
+ res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ break;
+ default:
+ break;
+ }
+ return res;
+}
+
+/*
+ */
+static GstPad *
+gst_rtp_bin_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name)
+{
+ GstRTPBin *rtpbin;
+ GstElementClass *klass;
+
+ g_return_val_if_fail (templ != NULL, NULL);
+ g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL);
+
+ rtpbin = GST_RTP_BIN (element);
+ klass = GST_ELEMENT_GET_CLASS (element);
+
+ return NULL;
+}
+
+static void
+gst_rtp_bin_release_pad (GstElement * element, GstPad * pad)
+{
+}
diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h
new file mode 100644
index 00000000..5b3d7954
--- /dev/null
+++ b/gst/rtpmanager/gstrtpbin.h
@@ -0,0 +1,56 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_RTP_BIN_H__
+#define __GST_RTP_BIN_H__
+
+#include <gst/gst.h>
+
+#define GST_TYPE_RTP_BIN \
+ (gst_rtp_bin_get_type())
+#define GST_RTP_BIN(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_BIN,GstRTPBin))
+#define GST_RTP_BIN_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_BIN,GstRTPBinClass))
+#define GST_IS_RTP_BIN(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_BIN))
+#define GST_IS_RTP_BIN_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_BIN))
+
+typedef struct _GstRTPBin GstRTPBin;
+typedef struct _GstRTPBinClass GstRTPBinClass;
+typedef struct _GstRTPBinPrivate GstRTPBinPrivate;
+
+struct _GstRTPBin {
+ GstBin element;
+
+ /* a list of streams from a client */
+ GList *streams;
+
+ /*< private >*/
+ GstRTPBinPrivate *priv;
+};
+
+struct _GstRTPBinClass {
+ GstBinClass parent_class;
+};
+
+GType gst_rtp_bin_get_type (void);
+
+#endif /* __GST_RTP_BIN_H__ */
diff --git a/gst/rtpmanager/gstrtpclient.c b/gst/rtpmanager/gstrtpclient.c
new file mode 100644
index 00000000..2984d3f9
--- /dev/null
+++ b/gst/rtpmanager/gstrtpclient.c
@@ -0,0 +1,482 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * SECTION:element-rtpclient
+ * @short_description: handle media from one RTP client
+ * @see_also: rtpjitterbuffer, rtpbin, rtpsession
+ *
+ * <refsect2>
+ * <para>
+ * This element handles RTP data from one client. It accepts multiple RTP streams that
+ * should be synchronized together.
+ * </para>
+ * <title>Example pipelines</title>
+ * <para>
+ * <programlisting>
+ * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink
+ * </programlisting>
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2007-04-02 (0.10.6)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <string.h>
+
+#include "gstrtpclient.h"
+
+/* elementfactory information */
+static const GstElementDetails rtpclient_details =
+GST_ELEMENT_DETAILS ("RTP Client",
+ "Filter/Editor/Video",
+ "Implement an RTP client",
+ "Wim Taymans <wim@fluendo.com>");
+
+/* sink pads */
+static GstStaticPadTemplate rtpclient_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("rtp_sink_%d",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate rtpclient_sync_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sync_sink_%d",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
+/* src pads */
+static GstStaticPadTemplate rtpclient_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("rtp_src_%d_%d",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+#define GST_RTP_CLIENT_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_CLIENT, GstRTPClientPrivate))
+
+struct _GstRTPClientPrivate
+{
+};
+
+/* all the info needed to handle the stream with SSRC */
+typedef struct
+{
+ GstRTPClient *client;
+
+ /* the SSRC of this stream */
+ guint32 ssrc;
+
+ /* RTP and RTCP in */
+ GstPad *rtp_sink;
+ GstPad *sync_sink;
+
+ /* the jitterbuffer */
+ GstElement *jitterbuffer;
+ /* the payload demuxer */
+ GstElement *ptdemux;
+ /* the new-pad signal */
+ gulong new_pad_sig;
+} GstRTPClientStream;
+
+/* the PT demuxer found a new payload type */
+static void
+new_pad (GstElement * element, GstPad * pad, GstRTPClientStream * stream)
+{
+}
+
+/* create a new stream for SSRC.
+ *
+ * We create a jitterbuffer and an payload demuxer for the SSRC. The sinkpad of
+ * the jitterbuffer is ghosted to the bin. We connect a pad-added signal to
+ * rtpptdemux so that we can ghost the payload pads outside.
+ *
+ * +-----------------+ +---------------+
+ * | rtpjitterbuffer | | rtpptdemux |
+ * +- sink src - sink |
+ * / +-----------------+ +---------------+
+ *
+ */
+static GstRTPClientStream *
+create_stream (GstRTPClient * rtpclient, guint32 ssrc)
+{
+ GstRTPClientStream *stream;
+ gchar *name;
+ GstPad *srcpad, *sinkpad;
+ GstPadLinkReturn res;
+
+ stream = g_new0 (GstRTPClientStream, 1);
+ stream->ssrc = ssrc;
+ stream->client = rtpclient;
+
+ stream->jitterbuffer = gst_element_factory_make ("rtpjitterbuffer", NULL);
+ if (!stream->jitterbuffer)
+ goto no_jitterbuffer;
+
+ stream->ptdemux = gst_element_factory_make ("rtpptdemux", NULL);
+ if (!stream->ptdemux)
+ goto no_ptdemux;
+
+ /* add elements to bin */
+ gst_bin_add (GST_BIN_CAST (rtpclient), stream->jitterbuffer);
+ gst_bin_add (GST_BIN_CAST (rtpclient), stream->ptdemux);
+
+ /* link jitterbuffer and PT demuxer */
+ srcpad = gst_element_get_pad (stream->jitterbuffer, "src");
+ sinkpad = gst_element_get_pad (stream->ptdemux, "sink");
+ res = gst_pad_link (srcpad, sinkpad);
+ gst_object_unref (srcpad);
+ gst_object_unref (sinkpad);
+
+ if (res != GST_PAD_LINK_OK)
+ goto could_not_link;
+
+ /* add stream to list */
+ rtpclient->streams = g_list_prepend (rtpclient->streams, stream);
+
+ /* ghost sinkpad */
+ name = g_strdup_printf ("rtp_sink_%d", ssrc);
+ sinkpad = gst_element_get_pad (stream->jitterbuffer, "sink");
+ stream->rtp_sink = gst_ghost_pad_new (name, sinkpad);
+ gst_object_unref (sinkpad);
+ g_free (name);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpclient), stream->rtp_sink);
+
+ /* add signal to ptdemuxer */
+ stream->new_pad_sig =
+ g_signal_connect (G_OBJECT (stream->ptdemux), "pad-added",
+ G_CALLBACK (new_pad), stream);
+
+ return stream;
+
+ /* ERRORS */
+no_jitterbuffer:
+ {
+ g_free (stream);
+ g_warning ("could not create rtpjitterbuffer element");
+ return NULL;
+ }
+no_ptdemux:
+ {
+ gst_object_unref (stream->jitterbuffer);
+ g_free (stream);
+ g_warning ("could not create rtpptdemux element");
+ return NULL;
+ }
+could_not_link:
+ {
+ gst_bin_remove (GST_BIN_CAST (rtpclient), stream->jitterbuffer);
+ gst_bin_remove (GST_BIN_CAST (rtpclient), stream->ptdemux);
+ g_free (stream);
+ g_warning ("could not link jitterbuffer and rtpptdemux element");
+ return NULL;
+ }
+}
+
+#if 0
+static void
+free_stream (GstRTPClientStream * stream)
+{
+ gst_object_unref (stream->jitterbuffer);
+ g_free (stream);
+}
+#endif
+
+/* find the stream for the given SSRC, return NULL if the stream did not exist
+ */
+static GstRTPClientStream *
+find_stream_by_ssrc (GstRTPClient * client, guint32 ssrc)
+{
+ GstRTPClientStream *stream;
+ GList *walk;
+
+ for (walk = client->streams; walk; walk = g_list_next (walk)) {
+ stream = (GstRTPClientStream *) walk->data;
+ if (stream->ssrc == ssrc)
+ return stream;
+ }
+ return NULL;
+}
+
+/* signals and args */
+enum
+{
+ /* FILL ME */
+ LAST_SIGNAL
+};
+
+enum
+{
+ PROP_0
+};
+
+/* GObject vmethods */
+static void gst_rtp_client_finalize (GObject * object);
+static void gst_rtp_client_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_rtp_client_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+/* GstElement vmethods */
+static GstStateChangeReturn gst_rtp_client_change_state (GstElement * element,
+ GstStateChange transition);
+static GstPad *gst_rtp_client_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name);
+static void gst_rtp_client_release_pad (GstElement * element, GstPad * pad);
+
+/*static guint gst_rtp_client_signals[LAST_SIGNAL] = { 0 }; */
+
+GST_BOILERPLATE (GstRTPClient, gst_rtp_client, GstBin, GST_TYPE_BIN);
+
+static void
+gst_rtp_client_base_init (gpointer klass)
+{
+ GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+ /* sink pads */
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpclient_rtp_sink_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpclient_sync_sink_template));
+
+ /* src pads */
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpclient_rtp_src_template));
+
+ gst_element_class_set_details (element_class, &rtpclient_details);
+}
+
+static void
+gst_rtp_client_class_init (GstRTPClientClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+
+ g_type_class_add_private (klass, sizeof (GstRTPClientPrivate));
+
+ gobject_class->finalize = gst_rtp_client_finalize;
+ gobject_class->set_property = gst_rtp_client_set_property;
+ gobject_class->get_property = gst_rtp_client_get_property;
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_rtp_client_change_state);
+ gstelement_class->request_new_pad =
+ GST_DEBUG_FUNCPTR (gst_rtp_client_request_new_pad);
+ gstelement_class->release_pad =
+ GST_DEBUG_FUNCPTR (gst_rtp_client_release_pad);
+}
+
+static void
+gst_rtp_client_init (GstRTPClient * rtpclient, GstRTPClientClass * klass)
+{
+ rtpclient->priv = GST_RTP_CLIENT_GET_PRIVATE (rtpclient);
+}
+
+static void
+gst_rtp_client_finalize (GObject * object)
+{
+ GstRTPClient *rtpclient;
+
+ rtpclient = GST_RTP_CLIENT (object);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_rtp_client_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRTPClient *rtpclient;
+
+ rtpclient = GST_RTP_CLIENT (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtp_client_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRTPClient *rtpclient;
+
+ rtpclient = GST_RTP_CLIENT (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_rtp_client_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn res;
+ GstRTPClient *rtpclient;
+
+ rtpclient = GST_RTP_CLIENT (element);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ break;
+ default:
+ break;
+ }
+
+ res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ break;
+ default:
+ break;
+ }
+ return res;
+}
+
+/* We have 2 request pads (rtp_sink_%d and sync_sink_%d), the %d is assumed to
+ * be the SSRC of the stream.
+ *
+ * We require that the rtp pad is requested first for a particular SSRC, then
+ * (optionaly) the sync pad can be requested. If no sync pad is requested, no
+ * sync information can be exchanged for this stream.
+ */
+static GstPad *
+gst_rtp_client_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name)
+{
+ GstRTPClient *rtpclient;
+ GstElementClass *klass;
+ GstPadTemplate *rtp_sink_templ, *sync_sink_templ;
+ guint32 ssrc;
+ GstRTPClientStream *stream;
+ GstPad *result;
+
+ g_return_val_if_fail (templ != NULL, NULL);
+ g_return_val_if_fail (GST_IS_RTP_CLIENT (element), NULL);
+
+ if (templ->direction != GST_PAD_SINK)
+ goto wrong_direction;
+
+ rtpclient = GST_RTP_CLIENT (element);
+ klass = GST_ELEMENT_GET_CLASS (element);
+
+ /* figure out the template */
+ rtp_sink_templ = gst_element_class_get_pad_template (klass, "rtp_sink_%d");
+ sync_sink_templ = gst_element_class_get_pad_template (klass, "sync_sink_%d");
+
+ if (templ != rtp_sink_templ && templ != sync_sink_templ)
+ goto wrong_template;
+
+ if (templ == rtp_sink_templ) {
+ /* create new rtp sink pad. If a stream with the pad number already exists
+ * we have an error, else we create the sinkpad, add a jitterbuffer and
+ * ptdemuxer. */
+ if (name == NULL || strlen (name) < 9)
+ goto no_name;
+
+ ssrc = atoi (&name[9]);
+
+ /* see if a stream with that name exists, if so we have an error. */
+ stream = find_stream_by_ssrc (rtpclient, ssrc);
+ if (stream != NULL)
+ goto stream_exists;
+
+ /* ok, create new stream */
+ stream = create_stream (rtpclient, ssrc);
+ if (stream == NULL)
+ goto stream_not_found;
+
+ result = stream->rtp_sink;
+ } else {
+ /* create new rtp sink pad. We can only do this if the RTP pad was
+ * requested before, meaning the session with the padnumber must exist. */
+ if (name == NULL || strlen (name) < 10)
+ goto no_name;
+
+ ssrc = atoi (&name[10]);
+
+ /* find stream */
+ stream = find_stream_by_ssrc (rtpclient, ssrc);
+ if (stream == NULL)
+ goto stream_not_found;
+
+ stream->sync_sink =
+ gst_pad_new_from_static_template (&rtpclient_sync_sink_template, name);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpclient), stream->sync_sink);
+
+ result = stream->sync_sink;
+ }
+
+ return result;
+
+ /* ERRORS */
+wrong_direction:
+ {
+ g_warning ("rtpclient: request pad that is not a SINK pad");
+ return NULL;
+ }
+wrong_template:
+ {
+ g_warning ("rtpclient: this is not our template");
+ return NULL;
+ }
+no_name:
+ {
+ g_warning ("rtpclient: no padname was specified");
+ return NULL;
+ }
+stream_exists:
+ {
+ g_warning ("rtpclient: stream with SSRC %d already registered", ssrc);
+ return NULL;
+ }
+stream_not_found:
+ {
+ g_warning ("rtpclient: stream with SSRC %d not yet registered", ssrc);
+ return NULL;
+ }
+}
+
+static void
+gst_rtp_client_release_pad (GstElement * element, GstPad * pad)
+{
+}
diff --git a/gst/rtpmanager/gstrtpclient.h b/gst/rtpmanager/gstrtpclient.h
new file mode 100644
index 00000000..de837fd7
--- /dev/null
+++ b/gst/rtpmanager/gstrtpclient.h
@@ -0,0 +1,56 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_RTP_CLIENT_H__
+#define __GST_RTP_CLIENT_H__
+
+#include <gst/gst.h>
+
+#define GST_TYPE_RTP_CLIENT \
+ (gst_rtp_client_get_type())
+#define GST_RTP_CLIENT(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_CLIENT,GstRTPClient))
+#define GST_RTP_CLIENT_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_CLIENT,GstRTPClientClass))
+#define GST_IS_RTP_CLIENT(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_CLIENT))
+#define GST_IS_RTP_CLIENT_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_CLIENT))
+
+typedef struct _GstRTPClient GstRTPClient;
+typedef struct _GstRTPClientClass GstRTPClientClass;
+typedef struct _GstRTPClientPrivate GstRTPClientPrivate;
+
+struct _GstRTPClient {
+ GstBin parent_bin;
+
+ /* a list of streams from a client */
+ GList *streams;
+
+ /*< private >*/
+ GstRTPClientPrivate *priv;
+};
+
+struct _GstRTPClientClass {
+ GstBinClass parent_class;
+};
+
+GType gst_rtp_client_get_type (void);
+
+#endif /* __GST_RTP_CLIENT_H__ */
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
new file mode 100644
index 00000000..58b48a34
--- /dev/null
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -0,0 +1,1085 @@
+/*
+ * Farsight Voice+Video library
+ *
+ * Copyright 2007 Collabora Ltd,
+ * Copyright 2007 Nokia Corporation
+ * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
+ * Copyright 2007 Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ */
+
+/**
+ * SECTION:element-rtpjitterbuffer
+ * @short_description: buffer, reorder and remove duplicate RTP packets to
+ * compensate for network oddities.
+ *
+ * <refsect2>
+ * <para>
+ * This element reorders and removes duplicate RTP packets as they are received
+ * from a network source. It will also wait for missing packets up to a
+ * configurable time limit using the ::latency property. Packets arriving too
+ * late are considered as lost packets.
+ * </para>
+ * <para>
+ * This element acts as a live element and so adds ::latency to the pipeline.
+ * </para>
+ * <title>Example pipelines</title>
+ * <para>
+ * <programlisting>
+ * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
+ * </programlisting>
+ * Connect to a streaming server and decode the MPEG video. The jitterbuffer is
+ * inserted into the pipeline to smooth out network jitter and to reorder the
+ * out-of-order RTP packets.
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2007-03-27 (0.10.13)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <string.h>
+#include <gst/rtp/gstrtpbuffer.h>
+#include "gstrtpjitterbuffer.h"
+
+#include "async_jitter_queue.h"
+
+GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
+#define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
+
+/* low and high threshold tell the queue when to start and stop buffering */
+#define LOW_THRESHOLD 0.2
+#define HIGH_THRESHOLD 0.8
+
+/* elementfactory information */
+static const GstElementDetails gst_rtp_jitter_buffer_details =
+GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
+ "Filter/Network",
+ "A buffer that deals with network jitter and other transmission faults",
+ "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
+ "Wim Taymans <wim@fluendo.com>");
+
+/* RTPJitterBuffer signals and args */
+enum
+{
+ /* FILL ME */
+ LAST_SIGNAL
+};
+
+#define DEFAULT_LATENCY_MS 200
+#define DEFAULT_DROP_ON_LATENCY FALSE
+
+enum
+{
+ ARG_0,
+ ARG_LATENCY,
+ ARG_DROP_ON_LATENCY
+};
+
+struct _GstRTPJitterBufferPrivate
+{
+ GstPad *sinkpad, *srcpad;
+
+ AsyncJitterQueue *queue;
+
+ /* properties */
+ guint latency_ms;
+ gboolean drop_on_latency;
+
+ /* the last seqnum we pushed out */
+ guint32 last_popped_seqnum;
+ /* the next expected seqnum */
+ guint32 next_seqnum;
+
+ /* clock rate and rtp timestamp offset */
+ gint32 clock_rate;
+ guint64 clock_base;
+
+ /* when we are shutting down */
+ GstFlowReturn srcresult;
+
+ /* for sync */
+ GstSegment segment;
+ GstClockID clock_id;
+ guint32 waiting_seqnum;
+
+ /* some accounting */
+ guint64 num_late;
+ guint64 num_duplicates;
+};
+
+#define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
+ GstRTPJitterBufferPrivate))
+
+static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp, "
+ "clock-rate = (int) [ 1, 2147483647 ]"
+ /* "payload = (int) , "
+ * "encoding-name = (string) "
+ */ )
+ );
+
+static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
+GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp"
+ /* "payload = (int) , "
+ * "clock-rate = (int) , "
+ * "encoding-name = (string) "
+ */ )
+ );
+
+GST_BOILERPLATE (GstRTPJitterBuffer, gst_rtp_jitter_buffer, GstElement,
+ GST_TYPE_ELEMENT);
+
+/* object overrides */
+static void gst_rtp_jitter_buffer_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec);
+static void gst_rtp_jitter_buffer_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec);
+static void gst_rtp_jitter_buffer_dispose (GObject * object);
+
+/* element overrides */
+static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
+ * element, GstStateChange transition);
+
+/* pad overrides */
+static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
+
+/* sinkpad overrides */
+static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
+static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
+ GstEvent * event);
+static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
+ GstBuffer * buffer);
+
+/* srcpad overrides */
+static gboolean
+gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
+static void gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer);
+static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
+
+static void
+gst_rtp_jitter_buffer_base_init (gpointer klass)
+{
+ GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
+ gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details);
+}
+
+static void
+gst_rtp_jitter_buffer_class_init (GstRTPJitterBufferClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+
+ g_type_class_add_private (klass, sizeof (GstRTPJitterBufferPrivate));
+
+ gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_dispose);
+
+ gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
+ gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
+
+ g_object_class_install_property (gobject_class, ARG_LATENCY,
+ g_param_spec_uint ("latency", "Buffer latency in ms",
+ "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
+ G_PARAM_READWRITE));
+
+ g_object_class_install_property (gobject_class, ARG_DROP_ON_LATENCY,
+ g_param_spec_boolean ("drop_on_latency",
+ "Drop buffers when maximum latency is reached",
+ "Tells the jitterbuffer to never exceed the given latency in size",
+ DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE));
+
+ gstelement_class->change_state = gst_rtp_jitter_buffer_change_state;
+
+ GST_DEBUG_CATEGORY_INIT
+ (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+}
+
+static void
+gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer,
+ GstRTPJitterBufferClass * klass)
+{
+ GstRTPJitterBufferPrivate *priv;
+
+ priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
+ jitterbuffer->priv = priv;
+
+ priv->latency_ms = DEFAULT_LATENCY_MS;
+ priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
+
+ priv->queue = async_jitter_queue_new ();
+ async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD);
+ async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD);
+
+ priv->waiting_seqnum = -1;
+
+ priv->srcpad =
+ gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
+ "src");
+
+ gst_pad_set_activatepush_function (priv->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_push));
+ gst_pad_set_query_function (priv->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
+ gst_pad_set_getcaps_function (priv->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
+
+ priv->sinkpad =
+ gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
+ "sink");
+
+ gst_pad_set_chain_function (priv->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
+ gst_pad_set_event_function (priv->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
+ gst_pad_set_setcaps_function (priv->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_jitter_buffer_sink_setcaps));
+ gst_pad_set_getcaps_function (priv->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
+
+ gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
+ gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
+}
+
+static void
+gst_rtp_jitter_buffer_dispose (GObject * object)
+{
+ GstRTPJitterBuffer *jitterbuffer;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+ if (jitterbuffer->priv->queue) {
+ async_jitter_queue_unref (jitterbuffer->priv->queue);
+ jitterbuffer->priv->queue = NULL;
+ }
+
+ G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
+static GstCaps *
+gst_rtp_jitter_buffer_getcaps (GstPad * pad)
+{
+ GstRTPJitterBuffer *jitterbuffer;
+ GstRTPJitterBufferPrivate *priv;
+ GstPad *other;
+ GstCaps *caps;
+ const GstCaps *templ;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+ priv = jitterbuffer->priv;
+
+ other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
+
+ caps = gst_pad_peer_get_caps (other);
+
+ templ = gst_pad_get_pad_template_caps (pad);
+ if (caps == NULL) {
+ GST_DEBUG_OBJECT (jitterbuffer, "copy template");
+ caps = gst_caps_copy (templ);
+ } else {
+ GstCaps *intersect;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
+
+ intersect = gst_caps_intersect (caps, templ);
+ gst_caps_unref (caps);
+
+ caps = intersect;
+ }
+ gst_object_unref (jitterbuffer);
+
+ return caps;
+}
+
+static gboolean
+gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
+{
+ GstRTPJitterBuffer *jitterbuffer;
+ GstRTPJitterBufferPrivate *priv;
+ GstStructure *caps_struct;
+ const GValue *value;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+ priv = jitterbuffer->priv;
+
+ /* first parse the caps */
+ caps_struct = gst_caps_get_structure (caps, 0);
+
+ /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
+ * measure the amount of data in the buffer */
+ if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
+ goto error;
+
+ if (priv->clock_rate <= 0)
+ goto wrong_rate;
+
+ /* gah, clock-base is uint. If we don't have a base, we will use the first
+ * buffer timestamp as the base time. This will screw up sync but it's better
+ * than nothing. */
+ value = gst_structure_get_value (caps_struct, "clock-base");
+ if (value && G_VALUE_HOLDS_UINT (value))
+ priv->clock_base = g_value_get_uint (value);
+ else
+ priv->clock_base = -1;
+
+ /* first expected seqnum */
+ value = gst_structure_get_value (caps_struct, "seqnum-base");
+ if (value && G_VALUE_HOLDS_UINT (value))
+ priv->next_seqnum = g_value_get_uint (value);
+ else
+ priv->next_seqnum = -1;
+
+ async_jitter_queue_set_max_queue_length (priv->queue,
+ priv->latency_ms * priv->clock_rate / 1000);
+
+ /* set same caps on srcpad */
+ gst_pad_set_caps (priv->srcpad, caps);
+
+ gst_object_unref (jitterbuffer);
+
+ return TRUE;
+
+ /* ERRORS */
+error:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
+ gst_object_unref (jitterbuffer);
+ return FALSE;
+ }
+wrong_rate:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
+ gst_object_unref (jitterbuffer);
+ return FALSE;
+ }
+}
+
+static void
+free_func (gpointer data, GstRTPJitterBuffer * user_data)
+{
+ if (GST_IS_BUFFER (data))
+ gst_buffer_unref (GST_BUFFER_CAST (data));
+ else
+ gst_event_unref (GST_EVENT_CAST (data));
+}
+
+static void
+gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer)
+{
+ GstRTPJitterBufferPrivate *priv;
+
+ priv = jitterbuffer->priv;
+
+ async_jitter_queue_lock (priv->queue);
+ /* mark ourselves as flushing */
+ priv->srcresult = GST_FLOW_WRONG_STATE;
+ GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
+ /* this unblocks any waiting pops on the src pad task */
+ async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue,
+ (GFunc) free_func, jitterbuffer);
+ /* unlock clock, we just unschedule, the entry will be released by the
+ * locking streaming thread. */
+ if (priv->clock_id)
+ gst_clock_id_unschedule (priv->clock_id);
+
+ async_jitter_queue_unlock (priv->queue);
+}
+
+static void
+gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
+{
+ GstRTPJitterBufferPrivate *priv;
+
+ priv = jitterbuffer->priv;
+
+ async_jitter_queue_lock (priv->queue);
+ GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
+ /* Mark as non flushing */
+ priv->srcresult = GST_FLOW_OK;
+ gst_segment_init (&priv->segment, GST_FORMAT_TIME);
+ priv->last_popped_seqnum = -1;
+ priv->next_seqnum = -1;
+ /* allow pops from the src pad task */
+ async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue);
+ async_jitter_queue_unlock (priv->queue);
+}
+
+static gboolean
+gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active)
+{
+ gboolean result = TRUE;
+ GstRTPJitterBuffer *jitterbuffer = NULL;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+
+ if (active) {
+ /* allow data processing */
+ gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
+
+ /* start pushing out buffers */
+ GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
+ gst_pad_start_task (jitterbuffer->priv->srcpad,
+ (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer);
+ } else {
+ /* make sure all data processing stops ASAP */
+ gst_rtp_jitter_buffer_flush_start (jitterbuffer);
+
+ /* NOTE this will hardlock if the state change is called from the src pad
+ * task thread because we will _join() the thread. */
+ GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
+ result = gst_pad_stop_task (pad);
+ }
+
+ gst_object_unref (jitterbuffer);
+
+ return result;
+}
+
+static GstStateChangeReturn
+gst_rtp_jitter_buffer_change_state (GstElement * element,
+ GstStateChange transition)
+{
+ GstRTPJitterBuffer *jitterbuffer;
+ GstRTPJitterBufferPrivate *priv;
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (element);
+ priv = jitterbuffer->priv;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ async_jitter_queue_lock (priv->queue);
+ /* reset negotiated values */
+ priv->clock_rate = -1;
+ priv->clock_base = -1;
+ /* block until we go to PLAYING */
+ async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
+ TRUE);
+ async_jitter_queue_unlock (priv->queue);
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ async_jitter_queue_lock (priv->queue);
+ /* unblock to allow streaming in PLAYING */
+ async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
+ FALSE);
+ async_jitter_queue_unlock (priv->queue);
+ break;
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ /* we are a live element because we sync to the clock, which we can only
+ * do in the PLAYING state */
+ if (ret != GST_STATE_CHANGE_FAILURE)
+ ret = GST_STATE_CHANGE_NO_PREROLL;
+ break;
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ async_jitter_queue_lock (priv->queue);
+ /* block to stop streaming when PAUSED */
+ async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
+ TRUE);
+ async_jitter_queue_unlock (priv->queue);
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+/**
+ * Performs comparison 'b - a' with check for overflows.
+ */
+static inline gint
+priv_compare_rtp_seq_lt (guint16 a, guint16 b)
+{
+ /* check if diff more than half of the 16bit range */
+ if (abs (b - a) > (1 << 15)) {
+ /* one of a/b has wrapped */
+ return a - b;
+ } else {
+ return b - a;
+ }
+}
+
+/**
+ * gets the seqnum from the buffers and compare them
+ */
+static gint
+compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b)
+{
+ gint ret;
+
+ if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) {
+ /* two buffers */
+ ret = priv_compare_rtp_seq_lt
+ (gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)),
+ gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b)));
+ } else {
+ /* one of them is an event, the event always goes before the other element
+ * so we return -1. */
+ if (GST_IS_EVENT (a))
+ ret = -1;
+ else
+ ret = 1;
+ }
+ return ret;
+}
+
+static gboolean
+gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
+{
+ gboolean ret = TRUE;
+ GstRTPJitterBuffer *jitterbuffer;
+ GstRTPJitterBufferPrivate *priv;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+ priv = jitterbuffer->priv;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_NEWSEGMENT:
+ {
+ GstFormat format;
+ gdouble rate, arate;
+ gint64 start, stop, time;
+ gboolean update;
+
+ gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
+ &start, &stop, &time);
+
+ /* we need time for now */
+ if (format != GST_FORMAT_TIME)
+ goto newseg_wrong_format;
+
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "newsegment: update %d, rate %g, arate %g, start %" GST_TIME_FORMAT
+ ", stop %" GST_TIME_FORMAT ", time %" GST_TIME_FORMAT,
+ update, rate, arate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
+ GST_TIME_ARGS (time));
+
+ /* now configure the values, we need these to time the release of the
+ * buffers on the srcpad. */
+ gst_segment_set_newsegment_full (&priv->segment, update,
+ rate, arate, format, start, stop, time);
+
+ /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
+ ret = gst_pad_push_event (priv->srcpad, event);
+ break;
+ }
+ case GST_EVENT_FLUSH_START:
+ gst_rtp_jitter_buffer_flush_start (jitterbuffer);
+ break;
+ case GST_EVENT_FLUSH_STOP:
+ gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
+ break;
+ case GST_EVENT_EOS:
+ {
+ /* push EOS in queue. We always push it at the head */
+ async_jitter_queue_lock (priv->queue);
+ /* check for flushing, we need to discard the event and return FALSE when
+ * we are flushing */
+ ret = priv->srcresult == GST_FLOW_OK;
+ if (ret)
+ async_jitter_queue_push_unlocked (priv->queue, event);
+ else
+ gst_event_unref (event);
+ async_jitter_queue_unlock (priv->queue);
+ break;
+ }
+ default:
+ ret = gst_pad_push_event (priv->srcpad, event);
+ break;
+ }
+
+done:
+ gst_object_unref (jitterbuffer);
+
+ return ret;
+
+ /* ERRORS */
+newseg_wrong_format:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
+ ret = FALSE;
+ goto done;
+ }
+}
+
+static GstFlowReturn
+gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
+{
+ GstRTPJitterBuffer *jitterbuffer;
+ GstRTPJitterBufferPrivate *priv;
+ guint16 seqnum;
+ GstFlowReturn ret;
+
+
+ g_return_val_if_fail (gst_rtp_buffer_validate (buffer), GST_FLOW_ERROR);
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+ priv = jitterbuffer->priv;
+
+ if (priv->clock_rate == -1)
+ goto not_negotiated;
+
+ seqnum = gst_rtp_buffer_get_seq (buffer);
+ GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
+
+ async_jitter_queue_lock (priv->queue);
+ ret = priv->srcresult;
+ if (ret != GST_FLOW_OK)
+ goto out_flushing;
+
+ /* let's check if this buffer is too late, we cannot accept packets with
+ * bigger seqnum than the one we already pushed. */
+ if (priv->last_popped_seqnum != -1) {
+ if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0)
+ goto too_late;
+ }
+
+ /* let's drop oldest packet if the queue is already full and drop-on-latency
+ * is set. */
+ if (priv->drop_on_latency) {
+ if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >=
+ priv->latency_ms * priv->clock_rate / 1000) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
+ seqnum);
+ GstBuffer *old_buf;
+
+ old_buf = async_jitter_queue_pop_unlocked (priv->queue);
+ gst_buffer_unref (old_buf);
+ }
+ }
+
+ /* now insert the packet into the queue in sorted order. This function returns
+ * FALSE if a packet with the same seqnum was already in the queue, meaning we
+ * have a duplicate. */
+ if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer,
+ (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL))
+ goto duplicate;
+
+ /* let's unschedule and unblock any waiting buffers. We only want to do this
+ * if there is a currently waiting newer (> seqnum) buffer */
+ if (priv->clock_id) {
+ if (priv->waiting_seqnum > seqnum) {
+ gst_clock_id_unschedule (priv->clock_id);
+ GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer");
+ }
+ }
+
+ GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d",
+ seqnum, async_jitter_queue_length_unlocked (priv->queue));
+
+finished:
+ async_jitter_queue_unlock (priv->queue);
+
+ gst_object_unref (jitterbuffer);
+
+ return ret;
+
+ /* ERRORS */
+not_negotiated:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
+ gst_buffer_unref (buffer);
+ gst_object_unref (jitterbuffer);
+ return GST_FLOW_NOT_NEGOTIATED;
+ }
+out_flushing:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
+ gst_buffer_unref (buffer);
+ gst_object_unref (jitterbuffer);
+ goto finished;
+ }
+too_late:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
+ " popped, dropping", seqnum, priv->last_popped_seqnum);
+ priv->num_late++;
+ gst_buffer_unref (buffer);
+ goto finished;
+ }
+duplicate:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
+ seqnum);
+ priv->num_duplicates++;
+ gst_buffer_unref (buffer);
+ goto finished;
+ }
+}
+
+/**
+ * This funcion will push out buffers on the source pad.
+ *
+ * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
+ * different seqnum (missing packets before B), this function will wait for the
+ * missing packet to arrive up to the rtp timestamp of buffer B.
+ */
+static void
+gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
+{
+ GstRTPJitterBufferPrivate *priv;
+ gpointer elem;
+ GstBuffer *outbuf;
+ GstFlowReturn result;
+ guint16 seqnum;
+ guint32 rtp_time;
+ GstClockTime timestamp;
+ gint64 running_time;
+
+ priv = jitterbuffer->priv;
+
+ async_jitter_queue_lock (priv->queue);
+again:
+ elem = async_jitter_queue_pop_unlocked (priv->queue);
+ if (!elem)
+ goto no_elem;
+
+ /* special code for events */
+ if (G_UNLIKELY (GST_IS_EVENT (elem))) {
+ GstEvent *event = GST_EVENT_CAST (elem);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_EOS:
+ GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue");
+ /* we don't expect more data now, makes upstream perform EOS actions */
+ priv->srcresult = GST_FLOW_UNEXPECTED;
+ break;
+ default:
+ GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue",
+ GST_EVENT_TYPE_NAME (event));
+ break;
+ }
+ async_jitter_queue_unlock (priv->queue);
+
+ /* push event */
+ gst_pad_push_event (priv->srcpad, event);
+ return;
+ }
+
+ /* pop a buffer, we will get NULL if the queue was shut down */
+ outbuf = GST_BUFFER_CAST (elem);
+
+ seqnum = gst_rtp_buffer_get_seq (outbuf);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d",
+ gst_rtp_buffer_get_seq (outbuf),
+ async_jitter_queue_length_unlocked (priv->queue));
+
+ /* If we don't know what the next seqnum should be (== -1) we have to wait
+ * because it might be possible that we are not receiving this buffer in-order,
+ * a buffer with a lower seqnum could arrive later and we want to push that
+ * earlier buffer before this buffer then.
+ * If we know the expected seqnum, we can compare it to the current seqnum to
+ * determine if we have missing a packet. If we have a missing packet (which
+ * must be before this packet) we can wait for it until the deadline for this
+ * packet expires. */
+ if (priv->next_seqnum == -1 || priv->next_seqnum != seqnum) {
+ GstClockID id;
+ GstClockTimeDiff jitter;
+ GstClockReturn ret;
+ GstClock *clock;
+
+ if (priv->next_seqnum != -1) {
+ /* we expected next_seqnum but received something else, that's a gap */
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum,
+ seqnum);
+ } else {
+ /* we don't know what the next_seqnum should be, wait for the last
+ * possible moment to push this buffer, maybe we get an earlier seqnum
+ * while we wait */
+ GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
+ }
+
+ /* get the max deadline to wait for the missing packets, this is the time
+ * of the currently popped packet */
+ rtp_time = gst_rtp_buffer_get_timestamp (outbuf);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "rtp_time %u, base %u", rtp_time,
+ priv->clock_base);
+
+ /* if no clock_base was given, take first ts as base */
+ if (priv->clock_base == -1)
+ priv->clock_base = rtp_time;
+
+ /* take rtp timestamp offset into account, this can wrap around */
+ rtp_time -= priv->clock_base;
+
+ /* bring timestamp to gst time */
+ timestamp =
+ gst_util_uint64_scale_int (GST_SECOND, rtp_time, priv->clock_rate);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "timestamp %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timestamp));
+
+ /* bring to running time */
+ running_time = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
+ timestamp);
+
+ /* correct for sync against the gstreamer clock, add latency */
+ GST_OBJECT_LOCK (jitterbuffer);
+ clock = GST_ELEMENT_CLOCK (jitterbuffer);
+ if (!clock) {
+ GST_OBJECT_UNLOCK (jitterbuffer);
+ /* let's just push if there is no clock */
+ goto push_buffer;
+ }
+
+ /* add latency */
+ running_time += (priv->latency_ms * GST_MSECOND);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "sync to running_time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (running_time));
+
+ /* prepare for sync against clock */
+ running_time += GST_ELEMENT_CAST (jitterbuffer)->base_time;
+
+ /* create an entry for the clock */
+ id = priv->clock_id = gst_clock_new_single_shot_id (clock, running_time);
+ priv->waiting_seqnum = seqnum;
+ GST_OBJECT_UNLOCK (jitterbuffer);
+
+ /* release the lock so that the other end can push stuff or unlock */
+ async_jitter_queue_unlock (priv->queue);
+
+ ret = gst_clock_id_wait (id, &jitter);
+
+ async_jitter_queue_lock (priv->queue);
+ /* and free the entry */
+ gst_clock_id_unref (id);
+ priv->clock_id = NULL;
+ priv->waiting_seqnum = -1;
+
+ /* at this point, the clock could have been unlocked by a timeout, a new
+ * tail element was added to the queue or because we are shutting down. Check
+ * for shutdown first. */
+ if (priv->srcresult != GST_FLOW_OK)
+ goto flushing;
+
+ /* if we got unscheduled and we are not flushing, it's because a new tail
+ * element became available in the queue. Grab it and try to push or sync. */
+ if (ret == GST_CLOCK_UNSCHEDULED) {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Wait got unscheduled, will retry to push with new buffer");
+ /* reinserting popped buffer into queue */
+ if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf,
+ (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Duplicate packet #%d detected, dropping", seqnum);
+ priv->num_duplicates++;
+ gst_buffer_unref (outbuf);
+ }
+ goto again;
+ }
+ }
+push_buffer:
+ /* check if we are pushing something unexpected */
+ if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
+ gint dropped;
+
+ /* calc number of missing packets, careful for wraparounds */
+ dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum);
+
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Pushing DISCONT after dropping %d (%d to %d)", dropped,
+ priv->next_seqnum, seqnum);
+
+ /* update stats */
+ priv->num_late += dropped;
+
+ /* set DISCONT flag */
+ outbuf = gst_buffer_make_metadata_writable (outbuf);
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+ }
+ /* now we are ready to push the buffer. Save the seqnum and release the lock
+ * so the other end can push stuff in the queue again. */
+ priv->last_popped_seqnum = seqnum;
+ priv->next_seqnum = (seqnum + 1) & 0xffff;
+ async_jitter_queue_unlock (priv->queue);
+
+ /* push buffer */
+ GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum);
+ result = gst_pad_push (priv->srcpad, outbuf);
+ if (result != GST_FLOW_OK)
+ goto pause;
+
+ return;
+
+ /* ERRORS */
+no_elem:
+ {
+ /* store result, we are flushing now */
+ GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing");
+ priv->srcresult = GST_FLOW_WRONG_STATE;
+ gst_pad_pause_task (priv->srcpad);
+ async_jitter_queue_unlock (priv->queue);
+ return;
+ }
+flushing:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
+ gst_buffer_unref (outbuf);
+ async_jitter_queue_unlock (priv->queue);
+ return;
+ }
+pause:
+ {
+ const gchar *reason = gst_flow_get_name (result);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
+
+ async_jitter_queue_lock (priv->queue);
+ /* store result */
+ priv->srcresult = result;
+ /* we don't post errors or anything because upstream will do that for us
+ * when we pass the return value upstream. */
+ gst_pad_pause_task (priv->srcpad);
+ async_jitter_queue_unlock (priv->queue);
+ return;
+ }
+}
+
+static gboolean
+gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
+{
+ GstRTPJitterBuffer *jitterbuffer;
+ GstRTPJitterBufferPrivate *priv;
+ gboolean res = FALSE;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+ priv = jitterbuffer->priv;
+
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_LATENCY:
+ {
+ /* We need to send the query upstream and add the returned latency to our
+ * own */
+ GstClockTime min_latency, max_latency;
+ gboolean us_live;
+ GstPad *peer;
+
+ if ((peer = gst_pad_get_peer (priv->sinkpad))) {
+ if ((res = gst_pad_query (peer, query))) {
+ gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
+
+ min_latency += priv->latency_ms * GST_MSECOND;
+ max_latency += priv->latency_ms * GST_MSECOND;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
+ GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+
+ gst_query_set_latency (query, TRUE, min_latency, max_latency);
+ }
+ gst_object_unref (peer);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ return res;
+}
+
+static void
+gst_rtp_jitter_buffer_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstRTPJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+
+ switch (prop_id) {
+ case ARG_LATENCY:
+ {
+ guint new_latency, old_latency;
+
+ /* FIXME, not threadsafe */
+ new_latency = g_value_get_uint (value);
+ old_latency = jitterbuffer->priv->latency_ms;
+
+ jitterbuffer->priv->latency_ms = new_latency;
+ if (jitterbuffer->priv->clock_rate != -1) {
+ async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue,
+ gst_util_uint64_scale_int (new_latency,
+ jitterbuffer->priv->clock_rate, 1000));
+ }
+ /* post message if latency changed, this will infor the parent pipeline
+ * that a latency reconfiguration is possible. */
+ if (new_latency != old_latency) {
+ gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
+ gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
+ }
+ break;
+ }
+ case ARG_DROP_ON_LATENCY:
+ {
+ jitterbuffer->priv->drop_on_latency = g_value_get_boolean (value);
+ break;
+ }
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtp_jitter_buffer_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstRTPJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+
+ switch (prop_id) {
+ case ARG_LATENCY:
+ g_value_set_uint (value, jitterbuffer->priv->latency_ms);
+ break;
+ case ARG_DROP_ON_LATENCY:
+ g_value_set_boolean (value, jitterbuffer->priv->drop_on_latency);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h
new file mode 100644
index 00000000..a8671440
--- /dev/null
+++ b/gst/rtpmanager/gstrtpjitterbuffer.h
@@ -0,0 +1,74 @@
+/*
+ * Farsight Voice+Video library
+ *
+ * Copyright 2007 Collabora Ltd,
+ * Copyright 2007 Nokia Corporation
+ * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
+ * Copyright 2007 Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ */
+
+#ifndef __GST_RTP_JITTER_BUFFER_H__
+#define __GST_RTP_JITTER_BUFFER_H__
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+G_BEGIN_DECLS
+
+/* #define's don't like whitespacey bits */
+#define GST_TYPE_RTP_JITTER_BUFFER \
+ (gst_rtp_jitter_buffer_get_type())
+#define GST_RTP_JITTER_BUFFER(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj), \
+ GST_TYPE_RTP_JITTER_BUFFER,GstRTPJitterBuffer))
+#define GST_RTP_JITTER_BUFFER_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass), \
+ GST_TYPE_RTP_JITTER_BUFFER,GstRTPJitterBufferClass))
+#define GST_IS_RTP_JITTER_BUFFER(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_JITTER_BUFFER))
+#define GST_IS_RTP_JITTER_BUFFER_CLASS(obj) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_JITTER_BUFFER))
+
+typedef struct _GstRTPJitterBuffer GstRTPJitterBuffer;
+typedef struct _GstRTPJitterBufferClass GstRTPJitterBufferClass;
+typedef struct _GstRTPJitterBufferPrivate GstRTPJitterBufferPrivate;
+
+struct _GstRTPJitterBuffer
+{
+ GstElement parent;
+
+ GstRTPJitterBufferPrivate *priv;
+
+ /*< private > */
+ gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstRTPJitterBufferClass
+{
+ GstElementClass parent_class;
+
+ /*< private > */
+ gpointer _gst_reserved[GST_PADDING];
+};
+
+GType gst_rtp_jitter_buffer_get_type (void);
+
+G_END_DECLS
+
+#endif /* __GST_RTP_JITTER_BUFFER_H__ */
diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c
new file mode 100644
index 00000000..a059cad8
--- /dev/null
+++ b/gst/rtpmanager/gstrtpmanager.c
@@ -0,0 +1,55 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstrtpclient.h"
+#include "gstrtpjitterbuffer.h"
+#include "gstrtpptdemux.h"
+#include "gstrtpsession.h"
+
+static gboolean
+plugin_init (GstPlugin * plugin)
+{
+ if (!gst_element_register (plugin, "rtpclient", GST_RANK_NONE,
+ GST_TYPE_RTP_CLIENT))
+ return FALSE;
+
+ if (!gst_element_register (plugin, "rtpjitterbuffer", GST_RANK_NONE,
+ GST_TYPE_RTP_JITTER_BUFFER))
+ return FALSE;
+
+ if (!gst_element_register (plugin, "rtpptdemux", GST_RANK_NONE,
+ GST_TYPE_RTP_PT_DEMUX))
+ return FALSE;
+
+ if (!gst_element_register (plugin, "rtpsession", GST_RANK_NONE,
+ GST_TYPE_RTP_SESSION))
+ return FALSE;
+
+ return TRUE;
+}
+
+GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
+ GST_VERSION_MINOR,
+ "rtpmanager",
+ "RTP session management plugin library",
+ plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
diff --git a/gst/rtpmanager/gstrtpptdemux.c b/gst/rtpmanager/gstrtpptdemux.c
new file mode 100644
index 00000000..5950f619
--- /dev/null
+++ b/gst/rtpmanager/gstrtpptdemux.c
@@ -0,0 +1,350 @@
+/*
+ * RTP Demux element
+ *
+ * Copyright (C) 2005 Nokia Corporation.
+ * @author Kai Vehmanen <kai.vehmanen@nokia.com>
+ *
+ * Loosely based on GStreamer gstdecodebin
+ * Copyright (C) <2004> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * Contributors:
+ * Andre Moreira Magalhaes <andre.magalhaes@indt.org.br>
+ */
+
+/*
+ * Status:
+ * - works with the test_rtpdemux.c tool
+ *
+ * Check:
+ * - is emitting a signal enough, or should we
+ * use GstEvent to notify downstream elements
+ * of the new packet... no?
+ *
+ * Notes:
+ * - emits event both for new PTs, and whenever
+ * a PT is changed
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <string.h>
+#include <gst/gst.h>
+#include "gstrtpptdemux.h"
+#include <gst/rtp/gstrtpbuffer.h>
+
+/* generic templates */
+static GstStaticPadTemplate rtp_pt_demux_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp, "
+ "payload = (int) [ 0, 255 ], " "clock-rate = (int) [ 0, 2147483647 ]")
+ );
+
+static GstStaticPadTemplate rtp_pt_demux_src_template =
+GST_STATIC_PAD_TEMPLATE ("src%d",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS_ANY);
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtp_pt_demux_debug);
+#define GST_CAT_DEFAULT gst_rtp_pt_demux_debug
+
+/**
+ * Item for storing GstPad<->pt pairs.
+ */
+struct _GstRTPPtDemuxPad
+{
+ GstPad *pad; /**< pointer to the actual pad */
+ gint pt; /**< RTP payload-type attached to pad */
+};
+
+/* signals */
+enum
+{
+ SIGNAL_NEW_PAYLOAD_TYPE,
+ SIGNAL_PAYLOAD_TYPE_CHANGE,
+ LAST_SIGNAL
+};
+
+GST_BOILERPLATE (GstRTPPtDemux, gst_rtp_pt_demux, GstElement, GST_TYPE_ELEMENT);
+
+static void gst_rtp_pt_demux_finalize (GObject * object);
+
+static void gst_rtp_pt_demux_release (GstElement * element);
+static gboolean gst_rtp_pt_demux_setup (GstElement * element);
+
+static GstFlowReturn gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf);
+static GstCaps *gst_rtp_pt_demux_getcaps (GstPad * pad);
+static GstStateChangeReturn gst_rtp_pt_demux_change_state (GstElement * element,
+ GstStateChange transition);
+
+static GstPad *find_pad_for_pt (GstRTPPtDemux * rtpdemux, guint8 pt);
+
+static guint gst_rtp_pt_demux_signals[LAST_SIGNAL] = { 0 };
+
+static GstElementDetails gst_rtp_pt_demux_details = {
+ "RTP Demux",
+ /* XXX: what's the correct hierarchy? */
+ "Codec/Demux/Network",
+ "Parses codec streams transmitted in the same RTP session",
+ "Kai Vehmanen <kai.vehmanen@nokia.com>"
+};
+
+static void
+gst_rtp_pt_demux_base_init (gpointer g_class)
+{
+ GstElementClass *gstelement_klass = GST_ELEMENT_CLASS (g_class);
+
+ gst_element_class_add_pad_template (gstelement_klass,
+ gst_static_pad_template_get (&rtp_pt_demux_sink_template));
+ gst_element_class_add_pad_template (gstelement_klass,
+ gst_static_pad_template_get (&rtp_pt_demux_src_template));
+
+ gst_element_class_set_details (gstelement_klass, &gst_rtp_pt_demux_details);
+}
+
+static void
+gst_rtp_pt_demux_class_init (GstRTPPtDemuxClass * klass)
+{
+ GObjectClass *gobject_klass;
+ GstElementClass *gstelement_klass;
+
+ gobject_klass = (GObjectClass *) klass;
+ gstelement_klass = (GstElementClass *) klass;
+
+ gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE] =
+ g_signal_new ("new-payload-type",
+ G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstRTPPtDemuxClass, new_payload_type),
+ NULL, NULL,
+ g_cclosure_marshal_VOID__UINT_POINTER,
+ G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_PAD);
+ gst_rtp_pt_demux_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] =
+ g_signal_new ("payload-type-change",
+ G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstRTPPtDemuxClass, payload_type_change),
+ NULL, NULL, gst_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+
+ gobject_klass->finalize = GST_DEBUG_FUNCPTR (gst_rtp_pt_demux_finalize);
+
+ gstelement_klass->change_state =
+ GST_DEBUG_FUNCPTR (gst_rtp_pt_demux_change_state);
+
+ GST_DEBUG_CATEGORY_INIT (gst_rtp_pt_demux_debug,
+ "rtpptdemux", 0, "RTP codec demuxer");
+
+}
+
+static void
+gst_rtp_pt_demux_init (GstRTPPtDemux * ptdemux, GstRTPPtDemuxClass * g_class)
+{
+ GstElementClass *klass = GST_ELEMENT_GET_CLASS (ptdemux);
+
+ ptdemux->sink =
+ gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+ "sink"), "sink");
+ g_assert (ptdemux->sink != NULL);
+
+ gst_pad_set_chain_function (ptdemux->sink, gst_rtp_pt_demux_chain);
+
+ gst_element_add_pad (GST_ELEMENT (ptdemux), ptdemux->sink);
+}
+
+static void
+gst_rtp_pt_demux_finalize (GObject * object)
+{
+ gst_rtp_pt_demux_release (GST_ELEMENT (object));
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static GstFlowReturn
+gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstRTPPtDemux *rtpdemux;
+ GstElement *element = GST_ELEMENT (GST_OBJECT_PARENT (pad));
+ guint8 pt;
+ GstPad *srcpad;
+
+ rtpdemux = GST_RTP_PT_DEMUX (GST_OBJECT_PARENT (pad));
+
+ g_return_val_if_fail (gst_rtp_buffer_validate (buf), GST_FLOW_ERROR);
+
+ pt = gst_rtp_buffer_get_payload_type (buf);
+
+ srcpad = find_pad_for_pt (rtpdemux, pt);
+ if (srcpad == NULL) {
+ /* new PT, create a src pad */
+ GstElementClass *klass;
+ GstPadTemplate *templ;
+ gchar *padname;
+ GstCaps *caps;
+ GstRTPPtDemuxPad *rtpdemuxpad;
+
+ klass = GST_ELEMENT_GET_CLASS (rtpdemux);
+ templ = gst_element_class_get_pad_template (klass, "src%d");
+ padname = g_strdup_printf ("src%d", pt);
+ srcpad = gst_pad_new_from_template (templ, padname);
+ g_free (padname);
+
+ caps = gst_pad_get_caps (srcpad);
+ caps = gst_caps_make_writable (caps);
+ gst_caps_append_structure (caps,
+ gst_structure_new ("payload", "payload", G_TYPE_INT, pt, NULL));
+ gst_pad_set_caps (srcpad, caps);
+
+ /* XXX: set _link () function */
+ gst_pad_set_getcaps_function (srcpad, gst_rtp_pt_demux_getcaps);
+ gst_pad_set_active (srcpad, TRUE);
+ gst_element_add_pad (element, srcpad);
+
+ if (srcpad) {
+ GST_DEBUG ("Adding pt=%d to the list.", pt);
+ rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1);
+ rtpdemuxpad->pt = pt;
+ rtpdemuxpad->pad = srcpad;
+ rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad);
+
+ GST_DEBUG ("emitting new-payload_type for pt %d", pt);
+ g_signal_emit (G_OBJECT (rtpdemux),
+ gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad);
+ }
+ }
+
+ if (pt != rtpdemux->last_pt) {
+ gint emit_pt = pt;
+
+ /* our own signal with an extra flag that this is the only pad */
+ rtpdemux->last_pt = pt;
+ GST_DEBUG ("emitting payload-type-changed for pt %d", emit_pt);
+ g_signal_emit (G_OBJECT (rtpdemux),
+ gst_rtp_pt_demux_signals[SIGNAL_PAYLOAD_TYPE_CHANGE], 0, emit_pt);
+ }
+
+ /* push to srcpad */
+ if (srcpad)
+ gst_pad_push (srcpad, GST_BUFFER (buf));
+
+ return ret;
+}
+
+static GstCaps *
+gst_rtp_pt_demux_getcaps (GstPad * pad)
+{
+ GstCaps *caps;
+
+ GST_OBJECT_LOCK (pad);
+ if ((caps = GST_PAD_CAPS (pad)))
+ caps = gst_caps_ref (caps);
+ GST_OBJECT_UNLOCK (pad);
+
+ return caps;
+}
+
+static GstPad *
+find_pad_for_pt (GstRTPPtDemux * rtpdemux, guint8 pt)
+{
+ GstPad *respad = NULL;
+ GSList *item = rtpdemux->srcpads;
+
+ for (; item; item = g_slist_next (item)) {
+ GstRTPPtDemuxPad *pad = item->data;
+
+ if (pad->pt == pt) {
+ respad = pad->pad;
+ break;
+ }
+ }
+
+ return respad;
+}
+
+/**
+ * Reserves resources for the object.
+ */
+static gboolean
+gst_rtp_pt_demux_setup (GstElement * element)
+{
+ GstRTPPtDemux *ptdemux = GST_RTP_PT_DEMUX (element);
+ gboolean res = TRUE;
+
+ if (ptdemux) {
+ ptdemux->srcpads = NULL;
+ ptdemux->last_pt = 0xFFFF;
+ }
+
+ return res;
+}
+
+/**
+ * Free resources for the object.
+ */
+static void
+gst_rtp_pt_demux_release (GstElement * element)
+{
+ GstRTPPtDemux *ptdemux = GST_RTP_PT_DEMUX (element);
+
+ if (ptdemux) {
+ /* note: GstElement's dispose() will handle the pads */
+ g_slist_free (ptdemux->srcpads);
+ ptdemux->srcpads = NULL;
+ }
+}
+
+static GstStateChangeReturn
+gst_rtp_pt_demux_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn ret;
+ GstRTPPtDemux *ptdemux;
+
+ ptdemux = GST_RTP_PT_DEMUX (element);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ if (gst_rtp_pt_demux_setup (element) != TRUE)
+ ret = GST_STATE_CHANGE_FAILURE;
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ gst_rtp_pt_demux_release (element);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
diff --git a/gst/rtpmanager/gstrtpptdemux.h b/gst/rtpmanager/gstrtpptdemux.h
new file mode 100644
index 00000000..93be3959
--- /dev/null
+++ b/gst/rtpmanager/gstrtpptdemux.h
@@ -0,0 +1,57 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_RTP_PT_DEMUX_H__
+#define __GST_RTP_PT_DEMUX_H__
+
+#include <gst/gst.h>
+
+#define GST_TYPE_RTP_PT_DEMUX (gst_rtp_pt_demux_get_type())
+#define GST_RTP_PT_DEMUX(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_PT_DEMUX,GstRTPPtDemux))
+#define GST_RTP_PT_DEMUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_PT_DEMUX,GstRTPPtDemuxClass))
+#define GST_IS_RTP_PT_DEMUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_PT_DEMUX))
+#define GST_IS_RTP_PT_DEMUX_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_PT_DEMUX))
+
+typedef struct _GstRTPPtDemux GstRTPPtDemux;
+typedef struct _GstRTPPtDemuxClass GstRTPPtDemuxClass;
+typedef struct _GstRTPPtDemuxPad GstRTPPtDemuxPad;
+
+struct _GstRTPPtDemux
+{
+ GstElement parent; /**< parent class */
+
+ GstPad *sink; /**< the sink pad */
+ guint16 last_pt; /**< pt of the last packet 0xFFFF if none */
+ GSList *srcpads; /**< a linked list of GstRTPPtDemuxPad objects */
+};
+
+struct _GstRTPPtDemuxClass
+{
+ GstElementClass parent_class;
+
+ /* signal emmited when a new PT is found from the incoming stream */
+ void (*new_payload_type) (GstElement * element, gint pt, GstPad * pad);
+
+ /* signal emitted when the payload type changes */
+ void (*payload_type_change) (GstElement * element, gint pt);
+};
+
+GType gst_rtp_pt_demux_get_type (void);
+
+#endif /* __GST_RTP_PT_DEMUX_H__ */
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c
new file mode 100644
index 00000000..47df756f
--- /dev/null
+++ b/gst/rtpmanager/gstrtpsession.c
@@ -0,0 +1,453 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * SECTION:element-rtpsession
+ * @short_description: an RTP session manager
+ * @see_also: rtpjitterbuffer, rtpbin
+ *
+ * <refsect2>
+ * <para>
+ * </para>
+ * <title>Example pipelines</title>
+ * <para>
+ * <programlisting>
+ * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink
+ * </programlisting>
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2007-04-02 (0.10.6)
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "gstrtpsession.h"
+
+/* elementfactory information */
+static const GstElementDetails rtpsession_details =
+GST_ELEMENT_DETAILS ("RTP Session",
+ "Filter/Editor/Video",
+ "Implement an RTP session",
+ "Wim Taymans <wim@fluendo.com>");
+
+/* sink pads */
+static GstStaticPadTemplate rtpsession_recv_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
+static GstStaticPadTemplate rtpsession_send_rtp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtp_sink",
+ GST_PAD_SINK,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+/* src pads */
+static GstStaticPadTemplate rtpsession_recv_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("recv_rtp_src",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate rtpsession_sync_src_template =
+GST_STATIC_PAD_TEMPLATE ("sync_src",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
+static GstStaticPadTemplate rtpsession_send_rtp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate rtpsession_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("rtcp_src",
+ GST_PAD_SRC,
+ GST_PAD_REQUEST,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
+/* signals and args */
+enum
+{
+ /* FILL ME */
+ LAST_SIGNAL
+};
+
+enum
+{
+ PROP_0
+};
+
+/* GObject vmethods */
+static void gst_rtp_session_finalize (GObject * object);
+static void gst_rtp_session_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_rtp_session_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+
+/* GstElement vmethods */
+static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element,
+ GstStateChange transition);
+static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name);
+static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
+
+/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */
+
+GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
+
+static void
+gst_rtp_session_base_init (gpointer klass)
+{
+ GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+
+ /* sink pads */
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpsession_send_rtp_sink_template));
+
+ /* src pads */
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpsession_recv_rtp_src_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpsession_sync_src_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&rtpsession_rtcp_src_template));
+
+ gst_element_class_set_details (element_class, &rtpsession_details);
+}
+
+static void
+gst_rtp_session_class_init (GstRTPSessionClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+
+ gobject_class->finalize = gst_rtp_session_finalize;
+ gobject_class->set_property = gst_rtp_session_set_property;
+ gobject_class->get_property = gst_rtp_session_get_property;
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
+ gstelement_class->request_new_pad =
+ GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
+ gstelement_class->release_pad =
+ GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
+}
+
+static void
+gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
+{
+}
+
+static void
+gst_rtp_session_finalize (GObject * object)
+{
+ GstRTPSession *rtpsession;
+
+ rtpsession = GST_RTP_SESSION (object);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+static void
+gst_rtp_session_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRTPSession *rtpsession;
+
+ rtpsession = GST_RTP_SESSION (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtp_session_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRTPSession *rtpsession;
+
+ rtpsession = GST_RTP_SESSION (object);
+
+ switch (prop_id) {
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn res;
+ GstRTPSession *rtpsession;
+
+ rtpsession = GST_RTP_SESSION (element);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_NULL_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ break;
+ default:
+ break;
+ }
+
+ res = parent_class->change_state (element, transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ break;
+ case GST_STATE_CHANGE_READY_TO_NULL:
+ break;
+ default:
+ break;
+ }
+ return res;
+}
+
+/* receive a packet from a sender, send it to the RTP session manager and
+ * forward the packet on the rtp_src pad
+ */
+static GstFlowReturn
+gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
+{
+ GstRTPSession *rtpsession;
+ GstFlowReturn ret;
+
+ rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+ /* FIXME, do something */
+ ret = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+
+ gst_object_unref (rtpsession);
+
+ return ret;
+}
+
+/* Receive an RTCP packet from a sender, send it to the RTP session manager and
+ * forward the SR packets to the sync_src pad.
+ */
+static GstFlowReturn
+gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
+{
+ GstRTPSession *rtpsession;
+ GstFlowReturn ret;
+
+ rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+ /* FIXME, do something */
+ ret = gst_pad_push (rtpsession->sync_src, buffer);
+
+ gst_object_unref (rtpsession);
+
+ return ret;
+}
+
+/* Recieve an RTP packet to be send to the receivers, send to RTP session
+ * manager and forward to send_rtp_src.
+ */
+static GstFlowReturn
+gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
+{
+ GstRTPSession *rtpsession;
+ GstFlowReturn ret;
+
+ rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+ /* FIXME, do something */
+ ret = gst_pad_push (rtpsession->send_rtp_src, buffer);
+
+ gst_object_unref (rtpsession);
+
+ return ret;
+}
+
+
+/* Create sinkpad to receive RTP packets from senders. This will also create a
+ * srcpad for the RTP packets.
+ */
+static GstPad *
+create_recv_rtp_sink (GstRTPSession * rtpsession)
+{
+ rtpsession->recv_rtp_sink =
+ gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
+ NULL);
+ gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
+ gst_rtp_session_chain_recv_rtp);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+ rtpsession->recv_rtp_sink);
+
+ rtpsession->recv_rtp_src =
+ gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
+ NULL);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
+
+ return rtpsession->recv_rtp_sink;
+}
+
+/* Create a sinkpad to receive RTCP messages from senders, this will also create a
+ * sync_src pad for the SR packets.
+ */
+static GstPad *
+create_recv_rtcp_sink (GstRTPSession * rtpsession)
+{
+ rtpsession->recv_rtcp_sink =
+ gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
+ NULL);
+ gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
+ gst_rtp_session_chain_recv_rtcp);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+ rtpsession->recv_rtcp_sink);
+
+ rtpsession->sync_src =
+ gst_pad_new_from_static_template (&rtpsession_sync_src_template, NULL);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
+
+ return rtpsession->recv_rtcp_sink;
+}
+
+/* Create a sinkpad to receive RTP packets for receivers. This will also create a
+ * send_rtp_src pad.
+ */
+static GstPad *
+create_send_rtp_sink (GstRTPSession * rtpsession)
+{
+ rtpsession->send_rtp_sink =
+ gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
+ NULL);
+ gst_pad_set_chain_function (rtpsession->send_rtp_sink,
+ gst_rtp_session_chain_send_rtp);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+ rtpsession->recv_rtcp_sink);
+
+ rtpsession->send_rtp_src =
+ gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
+ NULL);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
+
+ return rtpsession->send_rtp_sink;
+}
+
+/* Create a srcpad with the RTCP packets to send out.
+ * This pad will be driven by the RTP session manager when it wants to send out
+ * RTCP packets.
+ */
+static GstPad *
+create_rtcp_src (GstRTPSession * rtpsession)
+{
+ rtpsession->rtcp_src =
+ gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src);
+
+ return rtpsession->rtcp_src;
+}
+
+static GstPad *
+gst_rtp_session_request_new_pad (GstElement * element,
+ GstPadTemplate * templ, const gchar * name)
+{
+ GstRTPSession *rtpsession;
+ GstElementClass *klass;
+ GstPad *result;
+
+ g_return_val_if_fail (templ != NULL, NULL);
+ g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL);
+
+ rtpsession = GST_RTP_SESSION (element);
+ klass = GST_ELEMENT_GET_CLASS (element);
+
+ /* figure out the template */
+ if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
+ if (rtpsession->recv_rtp_sink != NULL)
+ goto exists;
+
+ result = create_recv_rtp_sink (rtpsession);
+ } else if (templ == gst_element_class_get_pad_template (klass,
+ "recv_rtcp_sink")) {
+ if (rtpsession->recv_rtcp_sink != NULL)
+ goto exists;
+
+ result = create_recv_rtcp_sink (rtpsession);
+ } else if (templ == gst_element_class_get_pad_template (klass,
+ "send_rtp_sink")) {
+ if (rtpsession->send_rtp_sink != NULL)
+ goto exists;
+
+ result = create_send_rtp_sink (rtpsession);
+ } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) {
+ if (rtpsession->rtcp_src != NULL)
+ goto exists;
+
+ result = create_rtcp_src (rtpsession);
+ } else
+ goto wrong_template;
+
+ return result;
+
+ /* ERRORS */
+wrong_template:
+ {
+ g_warning ("rtpsession: this is not our template");
+ return NULL;
+ }
+exists:
+ {
+ g_warning ("rtpsession: pad already requested");
+ return NULL;
+ }
+}
+
+static void
+gst_rtp_session_release_pad (GstElement * element, GstPad * pad)
+{
+}
diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h
new file mode 100644
index 00000000..8b343064
--- /dev/null
+++ b/gst/rtpmanager/gstrtpsession.h
@@ -0,0 +1,62 @@
+/* GStreamer
+ * Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_RTP_SESSION_H__
+#define __GST_RTP_SESSION_H__
+
+#include <gst/gst.h>
+
+#define GST_TYPE_RTP_SESSION \
+ (gst_rtp_session_get_type())
+#define GST_RTP_SESSION(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_SESSION,GstRTPSession))
+#define GST_RTP_SESSION_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_SESSION,GstRTPSessionClass))
+#define GST_IS_RTP_SESSION(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_SESSION))
+#define GST_IS_RTP_SESSION_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_SESSION))
+
+typedef struct _GstRTPSession GstRTPSession;
+typedef struct _GstRTPSessionClass GstRTPSessionClass;
+typedef struct _GstRTPSessionPrivate GstRTPSessionPrivate;
+
+struct _GstRTPSession {
+ GstElement element;
+
+ /*< private >*/
+ GstPad *recv_rtp_sink;
+ GstPad *recv_rtcp_sink;
+ GstPad *send_rtp_sink;
+
+ GstPad *recv_rtp_src;
+ GstPad *sync_src;
+ GstPad *send_rtp_src;
+ GstPad *rtcp_src;
+
+ GstRTPSessionPrivate *priv;
+};
+
+struct _GstRTPSessionClass {
+ GstElementClass parent_class;
+};
+
+GType gst_rtp_session_get_type (void);
+
+#endif /* __GST_RTP_SESSION_H__ */