diff options
author | Wim Taymans <wim.taymans@gmail.com> | 2007-04-03 09:13:17 +0000 |
---|---|---|
committer | Wim Taymans <wim.taymans@gmail.com> | 2007-04-03 09:13:17 +0000 |
commit | 93b433bd166f99a7e86797077f8100607d5ab943 (patch) | |
tree | e05165289f02e21afb07def4979b2420681dc95b /gst/rtpmanager | |
parent | 14c0bebf4b587eb747649987eb09aeab3e31dbe8 (diff) | |
download | gst-plugins-bad-93b433bd166f99a7e86797077f8100607d5ab943.tar.gz gst-plugins-bad-93b433bd166f99a7e86797077f8100607d5ab943.tar.bz2 gst-plugins-bad-93b433bd166f99a7e86797077f8100607d5ab943.zip |
Add RTP session management elements. Still in progress.
Original commit message from CVS:
* configure.ac:
* gst/rtpmanager/Makefile.am:
* gst/rtpmanager/async_jitter_queue.c: (async_jitter_queue_new),
(signal_waiting_threads), (async_jitter_queue_ref),
(async_jitter_queue_ref_unlocked),
(async_jitter_queue_set_low_threshold),
(async_jitter_queue_set_high_threshold),
(async_jitter_queue_set_max_queue_length),
(async_jitter_queue_get_g_queue), (calculate_ts_diff),
(async_jitter_queue_length_ts_units_unlocked),
(async_jitter_queue_unref_and_unlock), (async_jitter_queue_unref),
(async_jitter_queue_lock), (async_jitter_queue_unlock),
(async_jitter_queue_push), (async_jitter_queue_push_unlocked),
(async_jitter_queue_push_sorted),
(async_jitter_queue_push_sorted_unlocked),
(async_jitter_queue_insert_after_unlocked),
(async_jitter_queue_pop_intern_unlocked), (async_jitter_queue_pop),
(async_jitter_queue_pop_unlocked), (async_jitter_queue_length),
(async_jitter_queue_length_unlocked),
(async_jitter_queue_set_flushing_unlocked),
(async_jitter_queue_unset_flushing_unlocked),
(async_jitter_queue_set_blocking_unlocked):
* gst/rtpmanager/async_jitter_queue.h:
* gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_base_init),
(gst_rtp_bin_class_init), (gst_rtp_bin_init),
(gst_rtp_bin_finalize), (gst_rtp_bin_set_property),
(gst_rtp_bin_get_property), (gst_rtp_bin_change_state),
(gst_rtp_bin_request_new_pad), (gst_rtp_bin_release_pad):
* gst/rtpmanager/gstrtpbin.h:
* gst/rtpmanager/gstrtpclient.c: (new_pad), (create_stream),
(free_stream), (find_stream_by_ssrc), (gst_rtp_client_base_init),
(gst_rtp_client_class_init), (gst_rtp_client_init),
(gst_rtp_client_finalize), (gst_rtp_client_set_property),
(gst_rtp_client_get_property), (gst_rtp_client_change_state),
(gst_rtp_client_request_new_pad), (gst_rtp_client_release_pad):
* gst/rtpmanager/gstrtpclient.h:
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_base_init),
(gst_rtp_jitter_buffer_class_init), (gst_rtp_jitter_buffer_init),
(gst_rtp_jitter_buffer_dispose), (gst_rtp_jitter_buffer_getcaps),
(gst_jitter_buffer_sink_setcaps), (free_func),
(gst_rtp_jitter_buffer_flush_start),
(gst_rtp_jitter_buffer_flush_stop),
(gst_rtp_jitter_buffer_src_activate_push),
(gst_rtp_jitter_buffer_change_state), (priv_compare_rtp_seq_lt),
(compare_rtp_buffers_seq_num), (gst_rtp_jitter_buffer_sink_event),
(gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop),
(gst_rtp_jitter_buffer_query),
(gst_rtp_jitter_buffer_set_property),
(gst_rtp_jitter_buffer_get_property):
* gst/rtpmanager/gstrtpjitterbuffer.h:
* gst/rtpmanager/gstrtpmanager.c: (plugin_init):
* gst/rtpmanager/gstrtpptdemux.c: (gst_rtp_pt_demux_base_init),
(gst_rtp_pt_demux_class_init), (gst_rtp_pt_demux_init),
(gst_rtp_pt_demux_finalize), (gst_rtp_pt_demux_chain),
(gst_rtp_pt_demux_getcaps), (find_pad_for_pt),
(gst_rtp_pt_demux_setup), (gst_rtp_pt_demux_release),
(gst_rtp_pt_demux_change_state):
* gst/rtpmanager/gstrtpptdemux.h:
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init),
(gst_rtp_session_class_init), (gst_rtp_session_init),
(gst_rtp_session_finalize), (gst_rtp_session_set_property),
(gst_rtp_session_get_property), (gst_rtp_session_change_state),
(gst_rtp_session_chain_recv_rtp),
(gst_rtp_session_chain_recv_rtcp),
(gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink),
(create_recv_rtcp_sink), (create_send_rtp_sink), (create_rtcp_src),
(gst_rtp_session_request_new_pad), (gst_rtp_session_release_pad):
* gst/rtpmanager/gstrtpsession.h:
Add RTP session management elements. Still in progress.
Diffstat (limited to 'gst/rtpmanager')
-rw-r--r-- | gst/rtpmanager/Makefile.am | 23 | ||||
-rw-r--r-- | gst/rtpmanager/async_jitter_queue.c | 679 | ||||
-rw-r--r-- | gst/rtpmanager/async_jitter_queue.h | 130 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpbin.c | 279 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpbin.h | 56 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpclient.c | 482 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpclient.h | 56 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpjitterbuffer.c | 1085 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpjitterbuffer.h | 74 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpmanager.c | 55 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpptdemux.c | 350 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpptdemux.h | 57 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 453 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpsession.h | 62 |
14 files changed, 3841 insertions, 0 deletions
diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am new file mode 100644 index 00000000..561bf52d --- /dev/null +++ b/gst/rtpmanager/Makefile.am @@ -0,0 +1,23 @@ + +plugin_LTLIBRARIES = libgstrtpmanager.la + +libgstrtpmanager_la_SOURCES = gstrtpmanager.c \ + gstrtpbin.c \ + gstrtpclient.c \ + async_jitter_queue.c \ + gstrtpjitterbuffer.c \ + gstrtpptdemux.c \ + gstrtpsession.c + +noinst_HEADERS = gstrtpbin.h \ + gstrtpclient.h \ + async_jitter_queue.h \ + gstrtpjitterbuffer.h \ + gstrtpsession.h + +libgstrtpmanager_la_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(ERROR_CFLAGS) +libgstrtpmanager_la_LIBADD = $(GST_LIBS_LIBS) +libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@ + +EXTRA_DIST = + diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c new file mode 100644 index 00000000..22a8ed0e --- /dev/null +++ b/gst/rtpmanager/async_jitter_queue.c @@ -0,0 +1,679 @@ +/* + * Async Jitter Queue based on g_async_queue + * This code is GST RTP smart and deals with timestamps + * + * Farsight Voice+Video library + * Copyright 2007 Collabora Ltd, + * Copyright 2007 Nokia Corporation + * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>. + * + * This is an async queue that has a buffering mecanism based on the set low + * and high threshold. When the lower threshold is reached, the queue will + * fill itself up until the higher threshold is reached before allowing any + * pops to occur. This allows a jitterbuffer of at least min threshold items + * to be available. + */ + +/* GLIB - Library of useful routines for C programming + * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald + * + * GAsyncQueue: asynchronous queue implementation, based on Gqueue. + * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/* + * MT safe + */ + +#include "config.h" + +#include "async_jitter_queue.h" + +#include <gst/gst.h> +#include <gst/rtp/gstrtpbuffer.h> + +#define DEFAULT_LOW_THRESHOLD 0.1 +#define DEFAULT_HIGH_THRESHOLD 0.9 + +struct _AsyncJitterQueue +{ + GMutex *mutex; + GCond *cond; + GQueue *queue; + guint waiting_threads; + gint32 ref_count; + gfloat low_threshold; + gfloat high_threshold; + guint32 max_queue_length; + gboolean buffering; + gboolean pop_flushing; + gboolean pop_blocking; + guint pops_remaining; + guint32 tail_buffer_duration; +}; + +/** + * async_jitter_queue_new: + * + * Creates a new asynchronous queue with the initial reference count of 1. + * + * Return value: the new #AsyncJitterQueue. + **/ +AsyncJitterQueue * +async_jitter_queue_new (void) +{ + AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1); + + retval->mutex = g_mutex_new (); + retval->cond = g_cond_new (); + retval->queue = g_queue_new (); + retval->waiting_threads = 0; + retval->ref_count = 1; + retval->low_threshold = DEFAULT_LOW_THRESHOLD; + retval->high_threshold = DEFAULT_HIGH_THRESHOLD; + retval->buffering = TRUE; /* we need to buffer initially */ + retval->pop_flushing = TRUE; + retval->pop_blocking = TRUE; + retval->pops_remaining = 0; + retval->tail_buffer_duration = 0; + return retval; +} + +/* checks buffering state and wakes up waiting pops */ +void +signal_waiting_threads (AsyncJitterQueue * queue) +{ + if (async_jitter_queue_length_ts_units_unlocked (queue) >= + queue->high_threshold * queue->max_queue_length) { + queue->buffering = FALSE; + } + + if (queue->waiting_threads > 0) { + if (!queue->buffering) { + g_cond_signal (queue->cond); + } + } +} + +/** + * async_jitter_queue_ref: + * @queue: a #AsyncJitterQueue. + * + * Increases the reference count of the asynchronous @queue by 1. You + * do not need to hold the lock to call this function. + * + * Returns: the @queue that was passed in (since 2.6) + **/ +AsyncJitterQueue * +async_jitter_queue_ref (AsyncJitterQueue * queue) +{ + g_return_val_if_fail (queue, NULL); + g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); + + g_atomic_int_inc (&queue->ref_count); + + return queue; +} + +/** + * async_jitter_queue_ref_unlocked: + * @queue: a #AsyncJitterQueue. + * + * Increases the reference count of the asynchronous @queue by 1. + **/ +void +async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + g_atomic_int_inc (&queue->ref_count); +} + +/** + * async_jitter_queue_set_low_threshold: + * @queue: a #AsyncJitterQueue. + * @threshold: the lower threshold (fraction of max size) + * + * Sets the low threshold on the queue. This threshold indicates the minimum + * number of items allowed in the queue before we refill it up to the set + * maximum threshold. + **/ +void +async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue, + gfloat threshold) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + queue->low_threshold = threshold; +} + +/** + * async_jitter_queue_set_max_threshold: + * @queue: a #AsyncJitterQueue. + * @threshold: the higher threshold (fraction of max size) + * + * Sets the high threshold on the queue. This threshold indicates the amount of + * items to fill in the queue before releasing any blocking pop calls. This + * blocking mecanism is only triggered when we reach the low threshold and must + * refill the queue. + **/ +void +async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue, + gfloat threshold) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + queue->high_threshold = threshold; +} + +/* set the maximum queue length in RTP timestamp units */ +void +async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue, + guint32 max_length) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + queue->max_queue_length = max_length; +} + +GQueue * +async_jitter_queue_get_g_queue (AsyncJitterQueue * queue) +{ + g_return_val_if_fail (queue, NULL); + + return queue->queue; +} + +static guint32 +calculate_ts_diff (guint32 high_ts, guint32 low_ts) +{ + /* it needs to work if ts wraps */ + if (high_ts >= low_ts) { + return high_ts - low_ts; + } else { + return high_ts + G_MAXUINT32 + 1 - low_ts; + } +} + +/* this function returns the length of the queue in timestamp units. It will + * also add the duration of the last buffer in the queue */ +/* FIXME This function wrongly assumes that there are no missing packets inside + * the buffer, in reality it needs to check for gaps and subsctract those from + * the total */ +guint32 +async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue) +{ + guint32 tail_ts; + guint32 head_ts; + guint32 ret; + GstBuffer *head; + GstBuffer *tail; + + g_return_val_if_fail (queue, 0); + + if (queue->queue->length < 2) { + return 0; + } + + tail = g_queue_peek_tail (queue->queue); + head = g_queue_peek_head (queue->queue); + + if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head)) + return 0; + + tail_ts = gst_rtp_buffer_get_timestamp (tail); + head_ts = gst_rtp_buffer_get_timestamp (head); + + ret = calculate_ts_diff (head_ts, tail_ts); + + /* let's add the duration of the tail buffer */ + ret += queue->tail_buffer_duration; + + return ret; +} + +/** + * async_jitter_queue_unref_and_unlock: + * @queue: a #AsyncJitterQueue. + * + * Decreases the reference count of the asynchronous @queue by 1 and + * releases the lock. This function must be called while holding the + * @queue's lock. If the reference count went to 0, the @queue will be + * destroyed and the memory allocated will be freed. + **/ +void +async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + g_mutex_unlock (queue->mutex); + async_jitter_queue_unref (queue); +} + +/** + * async_jitter_queue_unref: + * @queue: a #AsyncJitterQueue. + * + * Decreases the reference count of the asynchronous @queue by 1. If + * the reference count went to 0, the @queue will be destroyed and the + * memory allocated will be freed. So you are not allowed to use the + * @queue afterwards, as it might have disappeared. You do not need to + * hold the lock to call this function. + **/ +void +async_jitter_queue_unref (AsyncJitterQueue * queue) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + if (g_atomic_int_dec_and_test (&queue->ref_count)) { + g_return_if_fail (queue->waiting_threads == 0); + g_mutex_free (queue->mutex); + if (queue->cond) + g_cond_free (queue->cond); + g_queue_free (queue->queue); + g_free (queue); + } +} + +/** + * async_jitter_queue_lock: + * @queue: a #AsyncJitterQueue. + * + * Acquires the @queue's lock. After that you can only call the + * <function>async_jitter_queue_*_unlocked()</function> function variants on that + * @queue. Otherwise it will deadlock. + **/ +void +async_jitter_queue_lock (AsyncJitterQueue * queue) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + g_mutex_lock (queue->mutex); +} + +/** + * async_jitter_queue_unlock: + * @queue: a #AsyncJitterQueue. + * + * Releases the queue's lock. + **/ +void +async_jitter_queue_unlock (AsyncJitterQueue * queue) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + g_mutex_unlock (queue->mutex); +} + +/** + * async_jitter_queue_push: + * @queue: a #AsyncJitterQueue. + * @data: @data to push into the @queue. + * + * Pushes the @data into the @queue. @data must not be %NULL. + **/ +void +async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + g_return_if_fail (data); + + g_mutex_lock (queue->mutex); + async_jitter_queue_push_unlocked (queue, data); + g_mutex_unlock (queue->mutex); +} + +/** + * async_jitter_queue_push_unlocked: + * @queue: a #AsyncJitterQueue. + * @data: @data to push into the @queue. + * + * Pushes the @data into the @queue. @data must not be %NULL. This + * function must be called while holding the @queue's lock. + **/ +void +async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + g_return_if_fail (data); + + g_queue_push_head (queue->queue, data); + + signal_waiting_threads (queue); +} + +/** + * async_jitter_queue_push_sorted: + * @queue: a #AsyncJitterQueue + * @data: the @data to push into the @queue + * @func: the #GCompareDataFunc is used to sort @queue. This function + * is passed two elements of the @queue. The function should return + * 0 if they are equal, a negative value if the first element + * should be higher in the @queue or a positive value if the first + * element should be lower in the @queue than the second element. + * @user_data: user data passed to @func. + * + * Inserts @data into @queue using @func to determine the new + * position. + * + * This function requires that the @queue is sorted before pushing on + * new elements. + * + * This function will lock @queue before it sorts the queue and unlock + * it when it is finished. + * + * For an example of @func see async_jitter_queue_sort(). + * + * Since: 2.10 + **/ +gboolean +async_jitter_queue_push_sorted (AsyncJitterQueue * queue, + gpointer data, GCompareDataFunc func, gpointer user_data) +{ + g_return_val_if_fail (queue != NULL, FALSE); + gboolean ret; + + g_mutex_lock (queue->mutex); + ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data); + g_mutex_unlock (queue->mutex); + + return ret; +} + +/** + * async_jitter_queue_push_sorted_unlocked: + * @queue: a #AsyncJitterQueue + * @data: the @data to push into the @queue + * @func: the #GCompareDataFunc is used to sort @queue. This function + * is passed two elements of the @queue. The function should return + * 0 if they are equal, a negative value if the first element + * should be higher in the @queue or a positive value if the first + * element should be lower in the @queue than the second element. + * @user_data: user data passed to @func. + * + * Inserts @data into @queue using @func to determine the new + * position. + * + * This function requires that the @queue is sorted before pushing on + * new elements. + * + * If @GCompareDataFunc returns 0, this function does not insert @data and + * return FALSE. + * + * This function is called while holding the @queue's lock. + * + * For an example of @func see async_jitter_queue_sort(). + * + * Since: 2.10 + **/ +gboolean +async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue, + gpointer data, GCompareDataFunc func, gpointer user_data) +{ + GList *list; + gint func_ret = TRUE; + + g_return_val_if_fail (queue != NULL, FALSE); + + list = queue->queue->head; + while (list && (func_ret = func (list->data, data, user_data)) < 0) + list = list->next; + + if (func_ret == 0) { + return FALSE; + } + if (list) { + g_queue_insert_before (queue->queue, list, data); + } else { + g_queue_push_tail (queue->queue, data); + } + + signal_waiting_threads (queue); + return TRUE; +} + +void +async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue, + GList * sibling, gpointer data) +{ + g_return_if_fail (queue != NULL); + + g_queue_insert_before (queue->queue, sibling, data); + + signal_waiting_threads (queue); +} + +static gpointer +async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue) +{ + gpointer retval; + GstBuffer *tail_buffer = NULL; + + if (queue->pop_flushing) + return NULL; + + while (queue->pop_blocking) { + queue->waiting_threads++; + g_cond_wait (queue->cond, queue->mutex); + queue->waiting_threads--; + if (queue->pop_flushing) + return NULL; + } + + if (async_jitter_queue_length_ts_units_unlocked (queue) <= + queue->low_threshold * queue->max_queue_length + && queue->pops_remaining == 0) { + if (!queue->buffering) { + queue->buffering = TRUE; + queue->pops_remaining = queue->queue->length; + } else { + while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) { + queue->waiting_threads++; + g_cond_wait (queue->cond, queue->mutex); + queue->waiting_threads--; + if (queue->pop_flushing) + return NULL; + } + } + } + + retval = g_queue_pop_tail (queue->queue); + if (queue->pops_remaining) + queue->pops_remaining--; + + tail_buffer = g_queue_peek_tail (queue->queue); + if (tail_buffer) { + if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) { + queue->tail_buffer_duration = 0; + } else if (gst_rtp_buffer_get_seq (tail_buffer) + - gst_rtp_buffer_get_seq (retval) == 1) { + queue->tail_buffer_duration = + calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer), + gst_rtp_buffer_get_timestamp (retval)); + } else { + /* There is a sequence number gap -> we can't calculate the duration + * let's just set it to 0 */ + queue->tail_buffer_duration = 0; + } + } + + g_assert (retval); + + return retval; +} + +/** + * async_jitter_queue_pop: + * @queue: a #AsyncJitterQueue. + * + * Pops data from the @queue. This function blocks until data become + * available. If pop is disabled, tis function return NULL. + * + * Return value: data from the queue. + **/ +gpointer +async_jitter_queue_pop (AsyncJitterQueue * queue) +{ + gpointer retval; + + g_return_val_if_fail (queue, NULL); + g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); + + g_mutex_lock (queue->mutex); + retval = async_jitter_queue_pop_intern_unlocked (queue); + g_mutex_unlock (queue->mutex); + + return retval; +} + +/** + * async_jitter_queue_pop_unlocked: + * @queue: a #AsyncJitterQueue. + * + * Pops data from the @queue. This function blocks until data become + * available. This function must be called while holding the @queue's + * lock. + * + * Return value: data from the queue. + **/ +gpointer +async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue) +{ + g_return_val_if_fail (queue, NULL); + g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); + + return async_jitter_queue_pop_intern_unlocked (queue); +} + +/** + * async_jitter_queue_length: + * @queue: a #AsyncJitterQueue. + * + * Returns the length of the queue + * Return value: the length of the @queue. + **/ +gint +async_jitter_queue_length (AsyncJitterQueue * queue) +{ + gint retval; + + g_return_val_if_fail (queue, 0); + g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0); + + g_mutex_lock (queue->mutex); + retval = queue->queue->length; + g_mutex_unlock (queue->mutex); + + return retval; +} + +/** + * async_jitter_queue_length_unlocked: + * @queue: a #AsyncJitterQueue. + * + * Returns the length of the queue. + * + * Return value: the length of the @queue. + **/ +gint +async_jitter_queue_length_unlocked (AsyncJitterQueue * queue) +{ + g_return_val_if_fail (queue, 0); + g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0); + + return queue->queue->length; +} + +/** + * async_jitter_queue_set_flushing_unlocked: + * @queue: a #AsyncJitterQueue. + * @free_func: a function to call to free the elements + * @user_data: user data passed to @free_func + * + * This function is used to set/unset flushing. If flushing is set any + * waiting/blocked pops will be unblocked. Any subsequent calls to pop will + * return NULL. Flushing is set by default. + */ +void +async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue, + GFunc free_func, gpointer user_data) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + queue->pop_flushing = TRUE; + /* let's unblock any remaining pops */ + if (queue->waiting_threads > 0) + g_cond_broadcast (queue->cond); + /* free data from queue */ + g_queue_foreach (queue->queue, free_func, user_data); +} + +/** + * async_jitter_queue_unset_flushing_unlocked: + * @queue: a #AsyncJitterQueue. + * @free_func: a function to call to free the elements + * @user_data: user data passed to @free_func + * + * This function is used to set/unset flushing. If flushing is set any + * waiting/blocked pops will be unblocked. Any subsequent calls to pop will + * return NULL. Flushing is set by default. + */ +void +async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + queue->pop_flushing = FALSE; + /* let's unblock any remaining pops */ + if (queue->waiting_threads > 0) + g_cond_broadcast (queue->cond); +} + +/** + * async_jitter_queue_set_blocking_unlocked: + * @queue: a #AsyncJitterQueue. + * @enabled: a boolean to enable/disable blocking + * + * This function is used to enable/disable blocking. If blocking is enabled any + * pops will be blocked until the queue is unblocked. The queue is blocked by + * default. + */ +void +async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue, + gboolean blocking) +{ + g_return_if_fail (queue); + g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); + + queue->pop_blocking = blocking; + /* let's unblock any remaining pops */ + if (queue->waiting_threads > 0) + g_cond_broadcast (queue->cond); +} diff --git a/gst/rtpmanager/async_jitter_queue.h b/gst/rtpmanager/async_jitter_queue.h new file mode 100644 index 00000000..bea76d6d --- /dev/null +++ b/gst/rtpmanager/async_jitter_queue.h @@ -0,0 +1,130 @@ +/* Async Jitter Queue based on g_async_queue + * + * Farsight Voice+Video library + * Copyright 2007 Collabora Ltd, + * Copyright 2007 Nokia Corporation + * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>. + */ + +/* GLIB - Library of useful routines for C programming + * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/* + * Modified by the GLib Team and others 1997-2000. See the AUTHORS + * file for a list of people on the GLib Team. See the ChangeLog + * files for a list of changes. These files are distributed with + * GLib at ftp://ftp.gtk.org/pub/gtk/. + */ + +#ifndef __ASYNCJITTERQUEUE_H__ +#define __ASYNCJITTERQUEUE_H__ + +#include <glib.h> +#include <glib/gthread.h> + +G_BEGIN_DECLS + +typedef struct _AsyncJitterQueue AsyncJitterQueue; + +/* Asyncronous Queues, can be used to communicate between threads + */ + +/* Get a new AsyncJitterQueue with the ref_count 1 */ +AsyncJitterQueue* async_jitter_queue_new (void); + +/* Lock and unlock a AsyncJitterQueue. All functions lock the queue for + * themselves, but in certain cirumstances you want to hold the lock longer, + * thus you lock the queue, call the *_unlocked functions and unlock it again. + */ +void async_jitter_queue_lock (AsyncJitterQueue *queue); +void async_jitter_queue_unlock (AsyncJitterQueue *queue); + +/* Ref and unref the AsyncJitterQueue. */ +AsyncJitterQueue* async_jitter_queue_ref (AsyncJitterQueue *queue); +void async_jitter_queue_unref (AsyncJitterQueue *queue); +#ifndef G_DISABLE_DEPRECATED +/* You don't have to hold the lock for calling *_ref and *_unref anymore. */ +void async_jitter_queue_ref_unlocked (AsyncJitterQueue *queue); +void async_jitter_queue_unref_and_unlock (AsyncJitterQueue *queue); +#endif /* !G_DISABLE_DEPRECATED */ + +void async_jitter_queue_set_low_threshold (AsyncJitterQueue *queue, + gfloat threshold); +void async_jitter_queue_set_high_threshold (AsyncJitterQueue *queue, + gfloat threshold); + +void async_jitter_queue_set_max_queue_length (AsyncJitterQueue *queue, + guint32 max_length); + +/* Push data into the async queue. Must not be NULL. */ +void async_jitter_queue_push (AsyncJitterQueue *queue, + gpointer data); +void async_jitter_queue_push_unlocked (AsyncJitterQueue *queue, + gpointer data); +gboolean async_jitter_queue_push_sorted (AsyncJitterQueue *queue, + gpointer data, + GCompareDataFunc func, + gpointer user_data); + +void async_jitter_queue_insert_after_unlocked(AsyncJitterQueue *queue, + GList *sibling, + gpointer data); + +gboolean async_jitter_queue_push_sorted_unlocked(AsyncJitterQueue *queue, + gpointer data, + GCompareDataFunc func, + gpointer user_data); + +/* Pop data from the async queue. When no data is there, the thread is blocked + * until data arrives. */ +gpointer async_jitter_queue_pop (AsyncJitterQueue *queue); +gpointer async_jitter_queue_pop_unlocked (AsyncJitterQueue *queue); + +/* Try to pop data. NULL is returned in case of empty queue. */ +gpointer async_jitter_queue_try_pop (AsyncJitterQueue *queue); +gpointer async_jitter_queue_try_pop_unlocked (AsyncJitterQueue *queue); + +/* Wait for data until at maximum until end_time is reached. NULL is returned + * in case of empty queue. */ +gpointer async_jitter_queue_timed_pop (AsyncJitterQueue *queue, + GTimeVal *end_time); +gpointer async_jitter_queue_timed_pop_unlocked (AsyncJitterQueue *queue, + GTimeVal *end_time); + +/* Return the length of the queue. Negative values mean that threads + * are waiting, positve values mean that there are entries in the + * queue. Actually this function returns the length of the queue minus + * the number of waiting threads, async_jitter_queue_length == 0 could also + * mean 'n' entries in the queue and 'n' thread waiting. Such can + * happen due to locking of the queue or due to scheduling. */ +gint async_jitter_queue_length (AsyncJitterQueue *queue); +gint async_jitter_queue_length_unlocked (AsyncJitterQueue *queue); + +void async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue* queue, + GFunc free_func, gpointer user_data); +void async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue* queue); +void async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue* queue, + gboolean blocking); +guint32 +async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue *queue); + +G_END_DECLS + +#endif /* __ASYNCJITTERQUEUE_H__ */ + diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c new file mode 100644 index 00000000..629f098d --- /dev/null +++ b/gst/rtpmanager/gstrtpbin.c @@ -0,0 +1,279 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:element-rtpbin + * @short_description: handle media from one RTP bin + * @see_also: rtpjitterbuffer, rtpclient, rtpsession + * + * <refsect2> + * <para> + * </para> + * <title>Example pipelines</title> + * <para> + * <programlisting> + * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink + * </programlisting> + * </para> + * </refsect2> + * + * Last reviewed on 2007-04-02 (0.10.6) + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <string.h> + +#include "gstrtpbin.h" + +/* elementfactory information */ +static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin", + "Filter/Editor/Video", + "Implement an RTP bin", + "Wim Taymans <wim@fluendo.com>"); + +/* sink pads */ +static GstStaticPadTemplate rtpbin_recv_rtp_sink_template = +GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template = +GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtcp") + ); + +static GstStaticPadTemplate rtpbin_send_rtp_sink_template = +GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%d", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtp") + ); + +/* src pads */ +static GstStaticPadTemplate rtpbin_recv_rtp_src_template = +GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate rtpbin_send_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d", + GST_PAD_SRC, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtcp") + ); + +static GstStaticPadTemplate rtpbin_send_rtp_src_template = +GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtp") + ); + +#define GST_RTP_BIN_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRTPBinPrivate)) + +struct _GstRTPBinPrivate +{ +}; + +/* signals and args */ +enum +{ + /* FILL ME */ + LAST_SIGNAL +}; + +enum +{ + PROP_0 +}; + +/* GObject vmethods */ +static void gst_rtp_bin_finalize (GObject * object); +static void gst_rtp_bin_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_rtp_bin_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +/* GstElement vmethods */ +static GstStateChangeReturn gst_rtp_bin_change_state (GstElement * element, + GstStateChange transition); +static GstPad *gst_rtp_bin_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name); +static void gst_rtp_bin_release_pad (GstElement * element, GstPad * pad); + +/*static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 }; */ + +GST_BOILERPLATE (GstRTPBin, gst_rtp_bin, GstBin, GST_TYPE_BIN); + +static void +gst_rtp_bin_base_init (gpointer klass) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + + /* sink pads */ + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpbin_recv_rtp_sink_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpbin_recv_rtcp_sink_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpbin_send_rtp_sink_template)); + + /* src pads */ + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpbin_recv_rtp_src_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpbin_send_rtcp_src_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpbin_send_rtp_src_template)); + + gst_element_class_set_details (element_class, &rtpbin_details); +} + +static void +gst_rtp_bin_class_init (GstRTPBinClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + g_type_class_add_private (klass, sizeof (GstRTPBinPrivate)); + + gobject_class->finalize = gst_rtp_bin_finalize; + gobject_class->set_property = gst_rtp_bin_set_property; + gobject_class->get_property = gst_rtp_bin_get_property; + + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); + gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad); +} + +static void +gst_rtp_bin_init (GstRTPBin * rtpbin, GstRTPBinClass * klass) +{ + rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin); +} + +static void +gst_rtp_bin_finalize (GObject * object) +{ + GstRTPBin *rtpbin; + + rtpbin = GST_RTP_BIN (object); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_rtp_bin_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRTPBin *rtpbin; + + rtpbin = GST_RTP_BIN (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_bin_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRTPBin *rtpbin; + + rtpbin = GST_RTP_BIN (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rtp_bin_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn res; + GstRTPBin *rtpbin; + + rtpbin = GST_RTP_BIN (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + break; + default: + break; + } + + res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + return res; +} + +/* + */ +static GstPad * +gst_rtp_bin_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name) +{ + GstRTPBin *rtpbin; + GstElementClass *klass; + + g_return_val_if_fail (templ != NULL, NULL); + g_return_val_if_fail (GST_IS_RTP_BIN (element), NULL); + + rtpbin = GST_RTP_BIN (element); + klass = GST_ELEMENT_GET_CLASS (element); + + return NULL; +} + +static void +gst_rtp_bin_release_pad (GstElement * element, GstPad * pad) +{ +} diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h new file mode 100644 index 00000000..5b3d7954 --- /dev/null +++ b/gst/rtpmanager/gstrtpbin.h @@ -0,0 +1,56 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_RTP_BIN_H__ +#define __GST_RTP_BIN_H__ + +#include <gst/gst.h> + +#define GST_TYPE_RTP_BIN \ + (gst_rtp_bin_get_type()) +#define GST_RTP_BIN(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_BIN,GstRTPBin)) +#define GST_RTP_BIN_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_BIN,GstRTPBinClass)) +#define GST_IS_RTP_BIN(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_BIN)) +#define GST_IS_RTP_BIN_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_BIN)) + +typedef struct _GstRTPBin GstRTPBin; +typedef struct _GstRTPBinClass GstRTPBinClass; +typedef struct _GstRTPBinPrivate GstRTPBinPrivate; + +struct _GstRTPBin { + GstBin element; + + /* a list of streams from a client */ + GList *streams; + + /*< private >*/ + GstRTPBinPrivate *priv; +}; + +struct _GstRTPBinClass { + GstBinClass parent_class; +}; + +GType gst_rtp_bin_get_type (void); + +#endif /* __GST_RTP_BIN_H__ */ diff --git a/gst/rtpmanager/gstrtpclient.c b/gst/rtpmanager/gstrtpclient.c new file mode 100644 index 00000000..2984d3f9 --- /dev/null +++ b/gst/rtpmanager/gstrtpclient.c @@ -0,0 +1,482 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:element-rtpclient + * @short_description: handle media from one RTP client + * @see_also: rtpjitterbuffer, rtpbin, rtpsession + * + * <refsect2> + * <para> + * This element handles RTP data from one client. It accepts multiple RTP streams that + * should be synchronized together. + * </para> + * <title>Example pipelines</title> + * <para> + * <programlisting> + * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink + * </programlisting> + * </para> + * </refsect2> + * + * Last reviewed on 2007-04-02 (0.10.6) + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <string.h> + +#include "gstrtpclient.h" + +/* elementfactory information */ +static const GstElementDetails rtpclient_details = +GST_ELEMENT_DETAILS ("RTP Client", + "Filter/Editor/Video", + "Implement an RTP client", + "Wim Taymans <wim@fluendo.com>"); + +/* sink pads */ +static GstStaticPadTemplate rtpclient_rtp_sink_template = +GST_STATIC_PAD_TEMPLATE ("rtp_sink_%d", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate rtpclient_sync_sink_template = +GST_STATIC_PAD_TEMPLATE ("sync_sink_%d", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtcp") + ); + +/* src pads */ +static GstStaticPadTemplate rtpclient_rtp_src_template = +GST_STATIC_PAD_TEMPLATE ("rtp_src_%d_%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtp") + ); + +#define GST_RTP_CLIENT_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_CLIENT, GstRTPClientPrivate)) + +struct _GstRTPClientPrivate +{ +}; + +/* all the info needed to handle the stream with SSRC */ +typedef struct +{ + GstRTPClient *client; + + /* the SSRC of this stream */ + guint32 ssrc; + + /* RTP and RTCP in */ + GstPad *rtp_sink; + GstPad *sync_sink; + + /* the jitterbuffer */ + GstElement *jitterbuffer; + /* the payload demuxer */ + GstElement *ptdemux; + /* the new-pad signal */ + gulong new_pad_sig; +} GstRTPClientStream; + +/* the PT demuxer found a new payload type */ +static void +new_pad (GstElement * element, GstPad * pad, GstRTPClientStream * stream) +{ +} + +/* create a new stream for SSRC. + * + * We create a jitterbuffer and an payload demuxer for the SSRC. The sinkpad of + * the jitterbuffer is ghosted to the bin. We connect a pad-added signal to + * rtpptdemux so that we can ghost the payload pads outside. + * + * +-----------------+ +---------------+ + * | rtpjitterbuffer | | rtpptdemux | + * +- sink src - sink | + * / +-----------------+ +---------------+ + * + */ +static GstRTPClientStream * +create_stream (GstRTPClient * rtpclient, guint32 ssrc) +{ + GstRTPClientStream *stream; + gchar *name; + GstPad *srcpad, *sinkpad; + GstPadLinkReturn res; + + stream = g_new0 (GstRTPClientStream, 1); + stream->ssrc = ssrc; + stream->client = rtpclient; + + stream->jitterbuffer = gst_element_factory_make ("rtpjitterbuffer", NULL); + if (!stream->jitterbuffer) + goto no_jitterbuffer; + + stream->ptdemux = gst_element_factory_make ("rtpptdemux", NULL); + if (!stream->ptdemux) + goto no_ptdemux; + + /* add elements to bin */ + gst_bin_add (GST_BIN_CAST (rtpclient), stream->jitterbuffer); + gst_bin_add (GST_BIN_CAST (rtpclient), stream->ptdemux); + + /* link jitterbuffer and PT demuxer */ + srcpad = gst_element_get_pad (stream->jitterbuffer, "src"); + sinkpad = gst_element_get_pad (stream->ptdemux, "sink"); + res = gst_pad_link (srcpad, sinkpad); + gst_object_unref (srcpad); + gst_object_unref (sinkpad); + + if (res != GST_PAD_LINK_OK) + goto could_not_link; + + /* add stream to list */ + rtpclient->streams = g_list_prepend (rtpclient->streams, stream); + + /* ghost sinkpad */ + name = g_strdup_printf ("rtp_sink_%d", ssrc); + sinkpad = gst_element_get_pad (stream->jitterbuffer, "sink"); + stream->rtp_sink = gst_ghost_pad_new (name, sinkpad); + gst_object_unref (sinkpad); + g_free (name); + gst_element_add_pad (GST_ELEMENT_CAST (rtpclient), stream->rtp_sink); + + /* add signal to ptdemuxer */ + stream->new_pad_sig = + g_signal_connect (G_OBJECT (stream->ptdemux), "pad-added", + G_CALLBACK (new_pad), stream); + + return stream; + + /* ERRORS */ +no_jitterbuffer: + { + g_free (stream); + g_warning ("could not create rtpjitterbuffer element"); + return NULL; + } +no_ptdemux: + { + gst_object_unref (stream->jitterbuffer); + g_free (stream); + g_warning ("could not create rtpptdemux element"); + return NULL; + } +could_not_link: + { + gst_bin_remove (GST_BIN_CAST (rtpclient), stream->jitterbuffer); + gst_bin_remove (GST_BIN_CAST (rtpclient), stream->ptdemux); + g_free (stream); + g_warning ("could not link jitterbuffer and rtpptdemux element"); + return NULL; + } +} + +#if 0 +static void +free_stream (GstRTPClientStream * stream) +{ + gst_object_unref (stream->jitterbuffer); + g_free (stream); +} +#endif + +/* find the stream for the given SSRC, return NULL if the stream did not exist + */ +static GstRTPClientStream * +find_stream_by_ssrc (GstRTPClient * client, guint32 ssrc) +{ + GstRTPClientStream *stream; + GList *walk; + + for (walk = client->streams; walk; walk = g_list_next (walk)) { + stream = (GstRTPClientStream *) walk->data; + if (stream->ssrc == ssrc) + return stream; + } + return NULL; +} + +/* signals and args */ +enum +{ + /* FILL ME */ + LAST_SIGNAL +}; + +enum +{ + PROP_0 +}; + +/* GObject vmethods */ +static void gst_rtp_client_finalize (GObject * object); +static void gst_rtp_client_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_rtp_client_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +/* GstElement vmethods */ +static GstStateChangeReturn gst_rtp_client_change_state (GstElement * element, + GstStateChange transition); +static GstPad *gst_rtp_client_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name); +static void gst_rtp_client_release_pad (GstElement * element, GstPad * pad); + +/*static guint gst_rtp_client_signals[LAST_SIGNAL] = { 0 }; */ + +GST_BOILERPLATE (GstRTPClient, gst_rtp_client, GstBin, GST_TYPE_BIN); + +static void +gst_rtp_client_base_init (gpointer klass) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + + /* sink pads */ + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpclient_rtp_sink_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpclient_sync_sink_template)); + + /* src pads */ + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpclient_rtp_src_template)); + + gst_element_class_set_details (element_class, &rtpclient_details); +} + +static void +gst_rtp_client_class_init (GstRTPClientClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + g_type_class_add_private (klass, sizeof (GstRTPClientPrivate)); + + gobject_class->finalize = gst_rtp_client_finalize; + gobject_class->set_property = gst_rtp_client_set_property; + gobject_class->get_property = gst_rtp_client_get_property; + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtp_client_change_state); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_rtp_client_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_rtp_client_release_pad); +} + +static void +gst_rtp_client_init (GstRTPClient * rtpclient, GstRTPClientClass * klass) +{ + rtpclient->priv = GST_RTP_CLIENT_GET_PRIVATE (rtpclient); +} + +static void +gst_rtp_client_finalize (GObject * object) +{ + GstRTPClient *rtpclient; + + rtpclient = GST_RTP_CLIENT (object); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_rtp_client_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRTPClient *rtpclient; + + rtpclient = GST_RTP_CLIENT (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_client_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRTPClient *rtpclient; + + rtpclient = GST_RTP_CLIENT (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rtp_client_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn res; + GstRTPClient *rtpclient; + + rtpclient = GST_RTP_CLIENT (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + break; + default: + break; + } + + res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + return res; +} + +/* We have 2 request pads (rtp_sink_%d and sync_sink_%d), the %d is assumed to + * be the SSRC of the stream. + * + * We require that the rtp pad is requested first for a particular SSRC, then + * (optionaly) the sync pad can be requested. If no sync pad is requested, no + * sync information can be exchanged for this stream. + */ +static GstPad * +gst_rtp_client_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name) +{ + GstRTPClient *rtpclient; + GstElementClass *klass; + GstPadTemplate *rtp_sink_templ, *sync_sink_templ; + guint32 ssrc; + GstRTPClientStream *stream; + GstPad *result; + + g_return_val_if_fail (templ != NULL, NULL); + g_return_val_if_fail (GST_IS_RTP_CLIENT (element), NULL); + + if (templ->direction != GST_PAD_SINK) + goto wrong_direction; + + rtpclient = GST_RTP_CLIENT (element); + klass = GST_ELEMENT_GET_CLASS (element); + + /* figure out the template */ + rtp_sink_templ = gst_element_class_get_pad_template (klass, "rtp_sink_%d"); + sync_sink_templ = gst_element_class_get_pad_template (klass, "sync_sink_%d"); + + if (templ != rtp_sink_templ && templ != sync_sink_templ) + goto wrong_template; + + if (templ == rtp_sink_templ) { + /* create new rtp sink pad. If a stream with the pad number already exists + * we have an error, else we create the sinkpad, add a jitterbuffer and + * ptdemuxer. */ + if (name == NULL || strlen (name) < 9) + goto no_name; + + ssrc = atoi (&name[9]); + + /* see if a stream with that name exists, if so we have an error. */ + stream = find_stream_by_ssrc (rtpclient, ssrc); + if (stream != NULL) + goto stream_exists; + + /* ok, create new stream */ + stream = create_stream (rtpclient, ssrc); + if (stream == NULL) + goto stream_not_found; + + result = stream->rtp_sink; + } else { + /* create new rtp sink pad. We can only do this if the RTP pad was + * requested before, meaning the session with the padnumber must exist. */ + if (name == NULL || strlen (name) < 10) + goto no_name; + + ssrc = atoi (&name[10]); + + /* find stream */ + stream = find_stream_by_ssrc (rtpclient, ssrc); + if (stream == NULL) + goto stream_not_found; + + stream->sync_sink = + gst_pad_new_from_static_template (&rtpclient_sync_sink_template, name); + gst_element_add_pad (GST_ELEMENT_CAST (rtpclient), stream->sync_sink); + + result = stream->sync_sink; + } + + return result; + + /* ERRORS */ +wrong_direction: + { + g_warning ("rtpclient: request pad that is not a SINK pad"); + return NULL; + } +wrong_template: + { + g_warning ("rtpclient: this is not our template"); + return NULL; + } +no_name: + { + g_warning ("rtpclient: no padname was specified"); + return NULL; + } +stream_exists: + { + g_warning ("rtpclient: stream with SSRC %d already registered", ssrc); + return NULL; + } +stream_not_found: + { + g_warning ("rtpclient: stream with SSRC %d not yet registered", ssrc); + return NULL; + } +} + +static void +gst_rtp_client_release_pad (GstElement * element, GstPad * pad) +{ +} diff --git a/gst/rtpmanager/gstrtpclient.h b/gst/rtpmanager/gstrtpclient.h new file mode 100644 index 00000000..de837fd7 --- /dev/null +++ b/gst/rtpmanager/gstrtpclient.h @@ -0,0 +1,56 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_RTP_CLIENT_H__ +#define __GST_RTP_CLIENT_H__ + +#include <gst/gst.h> + +#define GST_TYPE_RTP_CLIENT \ + (gst_rtp_client_get_type()) +#define GST_RTP_CLIENT(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_CLIENT,GstRTPClient)) +#define GST_RTP_CLIENT_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_CLIENT,GstRTPClientClass)) +#define GST_IS_RTP_CLIENT(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_CLIENT)) +#define GST_IS_RTP_CLIENT_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_CLIENT)) + +typedef struct _GstRTPClient GstRTPClient; +typedef struct _GstRTPClientClass GstRTPClientClass; +typedef struct _GstRTPClientPrivate GstRTPClientPrivate; + +struct _GstRTPClient { + GstBin parent_bin; + + /* a list of streams from a client */ + GList *streams; + + /*< private >*/ + GstRTPClientPrivate *priv; +}; + +struct _GstRTPClientClass { + GstBinClass parent_class; +}; + +GType gst_rtp_client_get_type (void); + +#endif /* __GST_RTP_CLIENT_H__ */ diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c new file mode 100644 index 00000000..58b48a34 --- /dev/null +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -0,0 +1,1085 @@ +/* + * Farsight Voice+Video library + * + * Copyright 2007 Collabora Ltd, + * Copyright 2007 Nokia Corporation + * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>. + * Copyright 2007 Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + * + */ + +/** + * SECTION:element-rtpjitterbuffer + * @short_description: buffer, reorder and remove duplicate RTP packets to + * compensate for network oddities. + * + * <refsect2> + * <para> + * This element reorders and removes duplicate RTP packets as they are received + * from a network source. It will also wait for missing packets up to a + * configurable time limit using the ::latency property. Packets arriving too + * late are considered as lost packets. + * </para> + * <para> + * This element acts as a live element and so adds ::latency to the pipeline. + * </para> + * <title>Example pipelines</title> + * <para> + * <programlisting> + * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink + * </programlisting> + * Connect to a streaming server and decode the MPEG video. The jitterbuffer is + * inserted into the pipeline to smooth out network jitter and to reorder the + * out-of-order RTP packets. + * </para> + * </refsect2> + * + * Last reviewed on 2007-03-27 (0.10.13) + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <string.h> +#include <gst/rtp/gstrtpbuffer.h> +#include "gstrtpjitterbuffer.h" + +#include "async_jitter_queue.h" + +GST_DEBUG_CATEGORY (rtpjitterbuffer_debug); +#define GST_CAT_DEFAULT (rtpjitterbuffer_debug) + +/* low and high threshold tell the queue when to start and stop buffering */ +#define LOW_THRESHOLD 0.2 +#define HIGH_THRESHOLD 0.8 + +/* elementfactory information */ +static const GstElementDetails gst_rtp_jitter_buffer_details = +GST_ELEMENT_DETAILS ("RTP packet jitter-buffer", + "Filter/Network", + "A buffer that deals with network jitter and other transmission faults", + "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, " + "Wim Taymans <wim@fluendo.com>"); + +/* RTPJitterBuffer signals and args */ +enum +{ + /* FILL ME */ + LAST_SIGNAL +}; + +#define DEFAULT_LATENCY_MS 200 +#define DEFAULT_DROP_ON_LATENCY FALSE + +enum +{ + ARG_0, + ARG_LATENCY, + ARG_DROP_ON_LATENCY +}; + +struct _GstRTPJitterBufferPrivate +{ + GstPad *sinkpad, *srcpad; + + AsyncJitterQueue *queue; + + /* properties */ + guint latency_ms; + gboolean drop_on_latency; + + /* the last seqnum we pushed out */ + guint32 last_popped_seqnum; + /* the next expected seqnum */ + guint32 next_seqnum; + + /* clock rate and rtp timestamp offset */ + gint32 clock_rate; + guint64 clock_base; + + /* when we are shutting down */ + GstFlowReturn srcresult; + + /* for sync */ + GstSegment segment; + GstClockID clock_id; + guint32 waiting_seqnum; + + /* some accounting */ + guint64 num_late; + guint64 num_duplicates; +}; + +#define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \ + GstRTPJitterBufferPrivate)) + +static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp, " + "clock-rate = (int) [ 1, 2147483647 ]" + /* "payload = (int) , " + * "encoding-name = (string) " + */ ) + ); + +static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template = +GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp" + /* "payload = (int) , " + * "clock-rate = (int) , " + * "encoding-name = (string) " + */ ) + ); + +GST_BOILERPLATE (GstRTPJitterBuffer, gst_rtp_jitter_buffer, GstElement, + GST_TYPE_ELEMENT); + +/* object overrides */ +static void gst_rtp_jitter_buffer_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec); +static void gst_rtp_jitter_buffer_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec); +static void gst_rtp_jitter_buffer_dispose (GObject * object); + +/* element overrides */ +static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement + * element, GstStateChange transition); + +/* pad overrides */ +static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad); + +/* sinkpad overrides */ +static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps); +static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad, + GstEvent * event); +static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad, + GstBuffer * buffer); + +/* srcpad overrides */ +static gboolean +gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active); +static void gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer); +static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query); + +static void +gst_rtp_jitter_buffer_base_init (gpointer klass) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template)); + gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details); +} + +static void +gst_rtp_jitter_buffer_class_init (GstRTPJitterBufferClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + g_type_class_add_private (klass, sizeof (GstRTPJitterBufferPrivate)); + + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_dispose); + + gobject_class->set_property = gst_rtp_jitter_buffer_set_property; + gobject_class->get_property = gst_rtp_jitter_buffer_get_property; + + g_object_class_install_property (gobject_class, ARG_LATENCY, + g_param_spec_uint ("latency", "Buffer latency in ms", + "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS, + G_PARAM_READWRITE)); + + g_object_class_install_property (gobject_class, ARG_DROP_ON_LATENCY, + g_param_spec_boolean ("drop_on_latency", + "Drop buffers when maximum latency is reached", + "Tells the jitterbuffer to never exceed the given latency in size", + DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE)); + + gstelement_class->change_state = gst_rtp_jitter_buffer_change_state; + + GST_DEBUG_CATEGORY_INIT + (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer"); +} + +static void +gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer, + GstRTPJitterBufferClass * klass) +{ + GstRTPJitterBufferPrivate *priv; + + priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer); + jitterbuffer->priv = priv; + + priv->latency_ms = DEFAULT_LATENCY_MS; + priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY; + + priv->queue = async_jitter_queue_new (); + async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD); + async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD); + + priv->waiting_seqnum = -1; + + priv->srcpad = + gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template, + "src"); + + gst_pad_set_activatepush_function (priv->srcpad, + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_push)); + gst_pad_set_query_function (priv->srcpad, + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query)); + gst_pad_set_getcaps_function (priv->srcpad, + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps)); + + priv->sinkpad = + gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template, + "sink"); + + gst_pad_set_chain_function (priv->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain)); + gst_pad_set_event_function (priv->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event)); + gst_pad_set_setcaps_function (priv->sinkpad, + GST_DEBUG_FUNCPTR (gst_jitter_buffer_sink_setcaps)); + gst_pad_set_getcaps_function (priv->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps)); + + gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad); + gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad); +} + +static void +gst_rtp_jitter_buffer_dispose (GObject * object) +{ + GstRTPJitterBuffer *jitterbuffer; + + jitterbuffer = GST_RTP_JITTER_BUFFER (object); + if (jitterbuffer->priv->queue) { + async_jitter_queue_unref (jitterbuffer->priv->queue); + jitterbuffer->priv->queue = NULL; + } + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +static GstCaps * +gst_rtp_jitter_buffer_getcaps (GstPad * pad) +{ + GstRTPJitterBuffer *jitterbuffer; + GstRTPJitterBufferPrivate *priv; + GstPad *other; + GstCaps *caps; + const GstCaps *templ; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad); + + caps = gst_pad_peer_get_caps (other); + + templ = gst_pad_get_pad_template_caps (pad); + if (caps == NULL) { + GST_DEBUG_OBJECT (jitterbuffer, "copy template"); + caps = gst_caps_copy (templ); + } else { + GstCaps *intersect; + + GST_DEBUG_OBJECT (jitterbuffer, "intersect with template"); + + intersect = gst_caps_intersect (caps, templ); + gst_caps_unref (caps); + + caps = intersect; + } + gst_object_unref (jitterbuffer); + + return caps; +} + +static gboolean +gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps) +{ + GstRTPJitterBuffer *jitterbuffer; + GstRTPJitterBufferPrivate *priv; + GstStructure *caps_struct; + const GValue *value; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + /* first parse the caps */ + caps_struct = gst_caps_get_structure (caps, 0); + + /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to + * measure the amount of data in the buffer */ + if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate)) + goto error; + + if (priv->clock_rate <= 0) + goto wrong_rate; + + /* gah, clock-base is uint. If we don't have a base, we will use the first + * buffer timestamp as the base time. This will screw up sync but it's better + * than nothing. */ + value = gst_structure_get_value (caps_struct, "clock-base"); + if (value && G_VALUE_HOLDS_UINT (value)) + priv->clock_base = g_value_get_uint (value); + else + priv->clock_base = -1; + + /* first expected seqnum */ + value = gst_structure_get_value (caps_struct, "seqnum-base"); + if (value && G_VALUE_HOLDS_UINT (value)) + priv->next_seqnum = g_value_get_uint (value); + else + priv->next_seqnum = -1; + + async_jitter_queue_set_max_queue_length (priv->queue, + priv->latency_ms * priv->clock_rate / 1000); + + /* set same caps on srcpad */ + gst_pad_set_caps (priv->srcpad, caps); + + gst_object_unref (jitterbuffer); + + return TRUE; + + /* ERRORS */ +error: + { + GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!"); + gst_object_unref (jitterbuffer); + return FALSE; + } +wrong_rate: + { + GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate); + gst_object_unref (jitterbuffer); + return FALSE; + } +} + +static void +free_func (gpointer data, GstRTPJitterBuffer * user_data) +{ + if (GST_IS_BUFFER (data)) + gst_buffer_unref (GST_BUFFER_CAST (data)); + else + gst_event_unref (GST_EVENT_CAST (data)); +} + +static void +gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer) +{ + GstRTPJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + async_jitter_queue_lock (priv->queue); + /* mark ourselves as flushing */ + priv->srcresult = GST_FLOW_WRONG_STATE; + GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue"); + /* this unblocks any waiting pops on the src pad task */ + async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue, + (GFunc) free_func, jitterbuffer); + /* unlock clock, we just unschedule, the entry will be released by the + * locking streaming thread. */ + if (priv->clock_id) + gst_clock_id_unschedule (priv->clock_id); + + async_jitter_queue_unlock (priv->queue); +} + +static void +gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer) +{ + GstRTPJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + async_jitter_queue_lock (priv->queue); + GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue"); + /* Mark as non flushing */ + priv->srcresult = GST_FLOW_OK; + gst_segment_init (&priv->segment, GST_FORMAT_TIME); + priv->last_popped_seqnum = -1; + priv->next_seqnum = -1; + /* allow pops from the src pad task */ + async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue); + async_jitter_queue_unlock (priv->queue); +} + +static gboolean +gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active) +{ + gboolean result = TRUE; + GstRTPJitterBuffer *jitterbuffer = NULL; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + + if (active) { + /* allow data processing */ + gst_rtp_jitter_buffer_flush_stop (jitterbuffer); + + /* start pushing out buffers */ + GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad"); + gst_pad_start_task (jitterbuffer->priv->srcpad, + (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer); + } else { + /* make sure all data processing stops ASAP */ + gst_rtp_jitter_buffer_flush_start (jitterbuffer); + + /* NOTE this will hardlock if the state change is called from the src pad + * task thread because we will _join() the thread. */ + GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad"); + result = gst_pad_stop_task (pad); + } + + gst_object_unref (jitterbuffer); + + return result; +} + +static GstStateChangeReturn +gst_rtp_jitter_buffer_change_state (GstElement * element, + GstStateChange transition) +{ + GstRTPJitterBuffer *jitterbuffer; + GstRTPJitterBufferPrivate *priv; + GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; + + jitterbuffer = GST_RTP_JITTER_BUFFER (element); + priv = jitterbuffer->priv; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + async_jitter_queue_lock (priv->queue); + /* reset negotiated values */ + priv->clock_rate = -1; + priv->clock_base = -1; + /* block until we go to PLAYING */ + async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, + TRUE); + async_jitter_queue_unlock (priv->queue); + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + async_jitter_queue_lock (priv->queue); + /* unblock to allow streaming in PLAYING */ + async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, + FALSE); + async_jitter_queue_unlock (priv->queue); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + /* we are a live element because we sync to the clock, which we can only + * do in the PLAYING state */ + if (ret != GST_STATE_CHANGE_FAILURE) + ret = GST_STATE_CHANGE_NO_PREROLL; + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + async_jitter_queue_lock (priv->queue); + /* block to stop streaming when PAUSED */ + async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, + TRUE); + async_jitter_queue_unlock (priv->queue); + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + + return ret; +} + +/** + * Performs comparison 'b - a' with check for overflows. + */ +static inline gint +priv_compare_rtp_seq_lt (guint16 a, guint16 b) +{ + /* check if diff more than half of the 16bit range */ + if (abs (b - a) > (1 << 15)) { + /* one of a/b has wrapped */ + return a - b; + } else { + return b - a; + } +} + +/** + * gets the seqnum from the buffers and compare them + */ +static gint +compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b) +{ + gint ret; + + if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) { + /* two buffers */ + ret = priv_compare_rtp_seq_lt + (gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)), + gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b))); + } else { + /* one of them is an event, the event always goes before the other element + * so we return -1. */ + if (GST_IS_EVENT (a)) + ret = -1; + else + ret = 1; + } + return ret; +} + +static gboolean +gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event) +{ + gboolean ret = TRUE; + GstRTPJitterBuffer *jitterbuffer; + GstRTPJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_NEWSEGMENT: + { + GstFormat format; + gdouble rate, arate; + gint64 start, stop, time; + gboolean update; + + gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format, + &start, &stop, &time); + + /* we need time for now */ + if (format != GST_FORMAT_TIME) + goto newseg_wrong_format; + + GST_DEBUG_OBJECT (jitterbuffer, + "newsegment: update %d, rate %g, arate %g, start %" GST_TIME_FORMAT + ", stop %" GST_TIME_FORMAT ", time %" GST_TIME_FORMAT, + update, rate, arate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop), + GST_TIME_ARGS (time)); + + /* now configure the values, we need these to time the release of the + * buffers on the srcpad. */ + gst_segment_set_newsegment_full (&priv->segment, update, + rate, arate, format, start, stop, time); + + /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */ + ret = gst_pad_push_event (priv->srcpad, event); + break; + } + case GST_EVENT_FLUSH_START: + gst_rtp_jitter_buffer_flush_start (jitterbuffer); + break; + case GST_EVENT_FLUSH_STOP: + gst_rtp_jitter_buffer_flush_stop (jitterbuffer); + break; + case GST_EVENT_EOS: + { + /* push EOS in queue. We always push it at the head */ + async_jitter_queue_lock (priv->queue); + /* check for flushing, we need to discard the event and return FALSE when + * we are flushing */ + ret = priv->srcresult == GST_FLOW_OK; + if (ret) + async_jitter_queue_push_unlocked (priv->queue, event); + else + gst_event_unref (event); + async_jitter_queue_unlock (priv->queue); + break; + } + default: + ret = gst_pad_push_event (priv->srcpad, event); + break; + } + +done: + gst_object_unref (jitterbuffer); + + return ret; + + /* ERRORS */ +newseg_wrong_format: + { + GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment"); + ret = FALSE; + goto done; + } +} + +static GstFlowReturn +gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) +{ + GstRTPJitterBuffer *jitterbuffer; + GstRTPJitterBufferPrivate *priv; + guint16 seqnum; + GstFlowReturn ret; + + + g_return_val_if_fail (gst_rtp_buffer_validate (buffer), GST_FLOW_ERROR); + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + if (priv->clock_rate == -1) + goto not_negotiated; + + seqnum = gst_rtp_buffer_get_seq (buffer); + GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum); + + async_jitter_queue_lock (priv->queue); + ret = priv->srcresult; + if (ret != GST_FLOW_OK) + goto out_flushing; + + /* let's check if this buffer is too late, we cannot accept packets with + * bigger seqnum than the one we already pushed. */ + if (priv->last_popped_seqnum != -1) { + if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0) + goto too_late; + } + + /* let's drop oldest packet if the queue is already full and drop-on-latency + * is set. */ + if (priv->drop_on_latency) { + if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >= + priv->latency_ms * priv->clock_rate / 1000) { + GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d", + seqnum); + GstBuffer *old_buf; + + old_buf = async_jitter_queue_pop_unlocked (priv->queue); + gst_buffer_unref (old_buf); + } + } + + /* now insert the packet into the queue in sorted order. This function returns + * FALSE if a packet with the same seqnum was already in the queue, meaning we + * have a duplicate. */ + if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer, + (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) + goto duplicate; + + /* let's unschedule and unblock any waiting buffers. We only want to do this + * if there is a currently waiting newer (> seqnum) buffer */ + if (priv->clock_id) { + if (priv->waiting_seqnum > seqnum) { + gst_clock_id_unschedule (priv->clock_id); + GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer"); + } + } + + GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d", + seqnum, async_jitter_queue_length_unlocked (priv->queue)); + +finished: + async_jitter_queue_unlock (priv->queue); + + gst_object_unref (jitterbuffer); + + return ret; + + /* ERRORS */ +not_negotiated: + { + GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!"); + gst_buffer_unref (buffer); + gst_object_unref (jitterbuffer); + return GST_FLOW_NOT_NEGOTIATED; + } +out_flushing: + { + GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret)); + gst_buffer_unref (buffer); + gst_object_unref (jitterbuffer); + goto finished; + } +too_late: + { + GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already" + " popped, dropping", seqnum, priv->last_popped_seqnum); + priv->num_late++; + gst_buffer_unref (buffer); + goto finished; + } +duplicate: + { + GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", + seqnum); + priv->num_duplicates++; + gst_buffer_unref (buffer); + goto finished; + } +} + +/** + * This funcion will push out buffers on the source pad. + * + * For each pushed buffer, the seqnum is recorded, if the next buffer B has a + * different seqnum (missing packets before B), this function will wait for the + * missing packet to arrive up to the rtp timestamp of buffer B. + */ +static void +gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer) +{ + GstRTPJitterBufferPrivate *priv; + gpointer elem; + GstBuffer *outbuf; + GstFlowReturn result; + guint16 seqnum; + guint32 rtp_time; + GstClockTime timestamp; + gint64 running_time; + + priv = jitterbuffer->priv; + + async_jitter_queue_lock (priv->queue); +again: + elem = async_jitter_queue_pop_unlocked (priv->queue); + if (!elem) + goto no_elem; + + /* special code for events */ + if (G_UNLIKELY (GST_IS_EVENT (elem))) { + GstEvent *event = GST_EVENT_CAST (elem); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue"); + /* we don't expect more data now, makes upstream perform EOS actions */ + priv->srcresult = GST_FLOW_UNEXPECTED; + break; + default: + GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue", + GST_EVENT_TYPE_NAME (event)); + break; + } + async_jitter_queue_unlock (priv->queue); + + /* push event */ + gst_pad_push_event (priv->srcpad, event); + return; + } + + /* pop a buffer, we will get NULL if the queue was shut down */ + outbuf = GST_BUFFER_CAST (elem); + + seqnum = gst_rtp_buffer_get_seq (outbuf); + + GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d", + gst_rtp_buffer_get_seq (outbuf), + async_jitter_queue_length_unlocked (priv->queue)); + + /* If we don't know what the next seqnum should be (== -1) we have to wait + * because it might be possible that we are not receiving this buffer in-order, + * a buffer with a lower seqnum could arrive later and we want to push that + * earlier buffer before this buffer then. + * If we know the expected seqnum, we can compare it to the current seqnum to + * determine if we have missing a packet. If we have a missing packet (which + * must be before this packet) we can wait for it until the deadline for this + * packet expires. */ + if (priv->next_seqnum == -1 || priv->next_seqnum != seqnum) { + GstClockID id; + GstClockTimeDiff jitter; + GstClockReturn ret; + GstClock *clock; + + if (priv->next_seqnum != -1) { + /* we expected next_seqnum but received something else, that's a gap */ + GST_DEBUG_OBJECT (jitterbuffer, + "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum, + seqnum); + } else { + /* we don't know what the next_seqnum should be, wait for the last + * possible moment to push this buffer, maybe we get an earlier seqnum + * while we wait */ + GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum); + } + + /* get the max deadline to wait for the missing packets, this is the time + * of the currently popped packet */ + rtp_time = gst_rtp_buffer_get_timestamp (outbuf); + + GST_DEBUG_OBJECT (jitterbuffer, "rtp_time %u, base %u", rtp_time, + priv->clock_base); + + /* if no clock_base was given, take first ts as base */ + if (priv->clock_base == -1) + priv->clock_base = rtp_time; + + /* take rtp timestamp offset into account, this can wrap around */ + rtp_time -= priv->clock_base; + + /* bring timestamp to gst time */ + timestamp = + gst_util_uint64_scale_int (GST_SECOND, rtp_time, priv->clock_rate); + + GST_DEBUG_OBJECT (jitterbuffer, "timestamp %" GST_TIME_FORMAT, + GST_TIME_ARGS (timestamp)); + + /* bring to running time */ + running_time = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, + timestamp); + + /* correct for sync against the gstreamer clock, add latency */ + GST_OBJECT_LOCK (jitterbuffer); + clock = GST_ELEMENT_CLOCK (jitterbuffer); + if (!clock) { + GST_OBJECT_UNLOCK (jitterbuffer); + /* let's just push if there is no clock */ + goto push_buffer; + } + + /* add latency */ + running_time += (priv->latency_ms * GST_MSECOND); + + GST_DEBUG_OBJECT (jitterbuffer, "sync to running_time %" GST_TIME_FORMAT, + GST_TIME_ARGS (running_time)); + + /* prepare for sync against clock */ + running_time += GST_ELEMENT_CAST (jitterbuffer)->base_time; + + /* create an entry for the clock */ + id = priv->clock_id = gst_clock_new_single_shot_id (clock, running_time); + priv->waiting_seqnum = seqnum; + GST_OBJECT_UNLOCK (jitterbuffer); + + /* release the lock so that the other end can push stuff or unlock */ + async_jitter_queue_unlock (priv->queue); + + ret = gst_clock_id_wait (id, &jitter); + + async_jitter_queue_lock (priv->queue); + /* and free the entry */ + gst_clock_id_unref (id); + priv->clock_id = NULL; + priv->waiting_seqnum = -1; + + /* at this point, the clock could have been unlocked by a timeout, a new + * tail element was added to the queue or because we are shutting down. Check + * for shutdown first. */ + if (priv->srcresult != GST_FLOW_OK) + goto flushing; + + /* if we got unscheduled and we are not flushing, it's because a new tail + * element became available in the queue. Grab it and try to push or sync. */ + if (ret == GST_CLOCK_UNSCHEDULED) { + GST_DEBUG_OBJECT (jitterbuffer, + "Wait got unscheduled, will retry to push with new buffer"); + /* reinserting popped buffer into queue */ + if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf, + (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) { + GST_DEBUG_OBJECT (jitterbuffer, + "Duplicate packet #%d detected, dropping", seqnum); + priv->num_duplicates++; + gst_buffer_unref (outbuf); + } + goto again; + } + } +push_buffer: + /* check if we are pushing something unexpected */ + if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) { + gint dropped; + + /* calc number of missing packets, careful for wraparounds */ + dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum); + + GST_DEBUG_OBJECT (jitterbuffer, + "Pushing DISCONT after dropping %d (%d to %d)", dropped, + priv->next_seqnum, seqnum); + + /* update stats */ + priv->num_late += dropped; + + /* set DISCONT flag */ + outbuf = gst_buffer_make_metadata_writable (outbuf); + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + } + /* now we are ready to push the buffer. Save the seqnum and release the lock + * so the other end can push stuff in the queue again. */ + priv->last_popped_seqnum = seqnum; + priv->next_seqnum = (seqnum + 1) & 0xffff; + async_jitter_queue_unlock (priv->queue); + + /* push buffer */ + GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum); + result = gst_pad_push (priv->srcpad, outbuf); + if (result != GST_FLOW_OK) + goto pause; + + return; + + /* ERRORS */ +no_elem: + { + /* store result, we are flushing now */ + GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing"); + priv->srcresult = GST_FLOW_WRONG_STATE; + gst_pad_pause_task (priv->srcpad); + async_jitter_queue_unlock (priv->queue); + return; + } +flushing: + { + GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); + gst_buffer_unref (outbuf); + async_jitter_queue_unlock (priv->queue); + return; + } +pause: + { + const gchar *reason = gst_flow_get_name (result); + + GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason); + + async_jitter_queue_lock (priv->queue); + /* store result */ + priv->srcresult = result; + /* we don't post errors or anything because upstream will do that for us + * when we pass the return value upstream. */ + gst_pad_pause_task (priv->srcpad); + async_jitter_queue_unlock (priv->queue); + return; + } +} + +static gboolean +gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query) +{ + GstRTPJitterBuffer *jitterbuffer; + GstRTPJitterBufferPrivate *priv; + gboolean res = FALSE; + + jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); + priv = jitterbuffer->priv; + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_LATENCY: + { + /* We need to send the query upstream and add the returned latency to our + * own */ + GstClockTime min_latency, max_latency; + gboolean us_live; + GstPad *peer; + + if ((peer = gst_pad_get_peer (priv->sinkpad))) { + if ((res = gst_pad_query (peer, query))) { + gst_query_parse_latency (query, &us_live, &min_latency, &max_latency); + + min_latency += priv->latency_ms * GST_MSECOND; + max_latency += priv->latency_ms * GST_MSECOND; + + GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %" + GST_TIME_FORMAT " max %" GST_TIME_FORMAT, + GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency)); + + gst_query_set_latency (query, TRUE, min_latency, max_latency); + } + gst_object_unref (peer); + } + break; + } + default: + break; + } + return res; +} + +static void +gst_rtp_jitter_buffer_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstRTPJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object); + + switch (prop_id) { + case ARG_LATENCY: + { + guint new_latency, old_latency; + + /* FIXME, not threadsafe */ + new_latency = g_value_get_uint (value); + old_latency = jitterbuffer->priv->latency_ms; + + jitterbuffer->priv->latency_ms = new_latency; + if (jitterbuffer->priv->clock_rate != -1) { + async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue, + gst_util_uint64_scale_int (new_latency, + jitterbuffer->priv->clock_rate, 1000)); + } + /* post message if latency changed, this will infor the parent pipeline + * that a latency reconfiguration is possible. */ + if (new_latency != old_latency) { + gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), + gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer))); + } + break; + } + case ARG_DROP_ON_LATENCY: + { + jitterbuffer->priv->drop_on_latency = g_value_get_boolean (value); + break; + } + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_jitter_buffer_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstRTPJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object); + + switch (prop_id) { + case ARG_LATENCY: + g_value_set_uint (value, jitterbuffer->priv->latency_ms); + break; + case ARG_DROP_ON_LATENCY: + g_value_set_boolean (value, jitterbuffer->priv->drop_on_latency); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h new file mode 100644 index 00000000..a8671440 --- /dev/null +++ b/gst/rtpmanager/gstrtpjitterbuffer.h @@ -0,0 +1,74 @@ +/* + * Farsight Voice+Video library + * + * Copyright 2007 Collabora Ltd, + * Copyright 2007 Nokia Corporation + * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>. + * Copyright 2007 Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + * + */ + +#ifndef __GST_RTP_JITTER_BUFFER_H__ +#define __GST_RTP_JITTER_BUFFER_H__ + +#include <gst/gst.h> +#include <gst/rtp/gstrtpbuffer.h> + +G_BEGIN_DECLS + +/* #define's don't like whitespacey bits */ +#define GST_TYPE_RTP_JITTER_BUFFER \ + (gst_rtp_jitter_buffer_get_type()) +#define GST_RTP_JITTER_BUFFER(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj), \ + GST_TYPE_RTP_JITTER_BUFFER,GstRTPJitterBuffer)) +#define GST_RTP_JITTER_BUFFER_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass), \ + GST_TYPE_RTP_JITTER_BUFFER,GstRTPJitterBufferClass)) +#define GST_IS_RTP_JITTER_BUFFER(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_JITTER_BUFFER)) +#define GST_IS_RTP_JITTER_BUFFER_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_JITTER_BUFFER)) + +typedef struct _GstRTPJitterBuffer GstRTPJitterBuffer; +typedef struct _GstRTPJitterBufferClass GstRTPJitterBufferClass; +typedef struct _GstRTPJitterBufferPrivate GstRTPJitterBufferPrivate; + +struct _GstRTPJitterBuffer +{ + GstElement parent; + + GstRTPJitterBufferPrivate *priv; + + /*< private > */ + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstRTPJitterBufferClass +{ + GstElementClass parent_class; + + /*< private > */ + gpointer _gst_reserved[GST_PADDING]; +}; + +GType gst_rtp_jitter_buffer_get_type (void); + +G_END_DECLS + +#endif /* __GST_RTP_JITTER_BUFFER_H__ */ diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c new file mode 100644 index 00000000..a059cad8 --- /dev/null +++ b/gst/rtpmanager/gstrtpmanager.c @@ -0,0 +1,55 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstrtpclient.h" +#include "gstrtpjitterbuffer.h" +#include "gstrtpptdemux.h" +#include "gstrtpsession.h" + +static gboolean +plugin_init (GstPlugin * plugin) +{ + if (!gst_element_register (plugin, "rtpclient", GST_RANK_NONE, + GST_TYPE_RTP_CLIENT)) + return FALSE; + + if (!gst_element_register (plugin, "rtpjitterbuffer", GST_RANK_NONE, + GST_TYPE_RTP_JITTER_BUFFER)) + return FALSE; + + if (!gst_element_register (plugin, "rtpptdemux", GST_RANK_NONE, + GST_TYPE_RTP_PT_DEMUX)) + return FALSE; + + if (!gst_element_register (plugin, "rtpsession", GST_RANK_NONE, + GST_TYPE_RTP_SESSION)) + return FALSE; + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "rtpmanager", + "RTP session management plugin library", + plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/gst/rtpmanager/gstrtpptdemux.c b/gst/rtpmanager/gstrtpptdemux.c new file mode 100644 index 00000000..5950f619 --- /dev/null +++ b/gst/rtpmanager/gstrtpptdemux.c @@ -0,0 +1,350 @@ +/* + * RTP Demux element + * + * Copyright (C) 2005 Nokia Corporation. + * @author Kai Vehmanen <kai.vehmanen@nokia.com> + * + * Loosely based on GStreamer gstdecodebin + * Copyright (C) <2004> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/* + * Contributors: + * Andre Moreira Magalhaes <andre.magalhaes@indt.org.br> + */ + +/* + * Status: + * - works with the test_rtpdemux.c tool + * + * Check: + * - is emitting a signal enough, or should we + * use GstEvent to notify downstream elements + * of the new packet... no? + * + * Notes: + * - emits event both for new PTs, and whenever + * a PT is changed + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <string.h> +#include <gst/gst.h> +#include "gstrtpptdemux.h" +#include <gst/rtp/gstrtpbuffer.h> + +/* generic templates */ +static GstStaticPadTemplate rtp_pt_demux_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp, " + "payload = (int) [ 0, 255 ], " "clock-rate = (int) [ 0, 2147483647 ]") + ); + +static GstStaticPadTemplate rtp_pt_demux_src_template = +GST_STATIC_PAD_TEMPLATE ("src%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS_ANY); + +GST_DEBUG_CATEGORY_STATIC (gst_rtp_pt_demux_debug); +#define GST_CAT_DEFAULT gst_rtp_pt_demux_debug + +/** + * Item for storing GstPad<->pt pairs. + */ +struct _GstRTPPtDemuxPad +{ + GstPad *pad; /**< pointer to the actual pad */ + gint pt; /**< RTP payload-type attached to pad */ +}; + +/* signals */ +enum +{ + SIGNAL_NEW_PAYLOAD_TYPE, + SIGNAL_PAYLOAD_TYPE_CHANGE, + LAST_SIGNAL +}; + +GST_BOILERPLATE (GstRTPPtDemux, gst_rtp_pt_demux, GstElement, GST_TYPE_ELEMENT); + +static void gst_rtp_pt_demux_finalize (GObject * object); + +static void gst_rtp_pt_demux_release (GstElement * element); +static gboolean gst_rtp_pt_demux_setup (GstElement * element); + +static GstFlowReturn gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf); +static GstCaps *gst_rtp_pt_demux_getcaps (GstPad * pad); +static GstStateChangeReturn gst_rtp_pt_demux_change_state (GstElement * element, + GstStateChange transition); + +static GstPad *find_pad_for_pt (GstRTPPtDemux * rtpdemux, guint8 pt); + +static guint gst_rtp_pt_demux_signals[LAST_SIGNAL] = { 0 }; + +static GstElementDetails gst_rtp_pt_demux_details = { + "RTP Demux", + /* XXX: what's the correct hierarchy? */ + "Codec/Demux/Network", + "Parses codec streams transmitted in the same RTP session", + "Kai Vehmanen <kai.vehmanen@nokia.com>" +}; + +static void +gst_rtp_pt_demux_base_init (gpointer g_class) +{ + GstElementClass *gstelement_klass = GST_ELEMENT_CLASS (g_class); + + gst_element_class_add_pad_template (gstelement_klass, + gst_static_pad_template_get (&rtp_pt_demux_sink_template)); + gst_element_class_add_pad_template (gstelement_klass, + gst_static_pad_template_get (&rtp_pt_demux_src_template)); + + gst_element_class_set_details (gstelement_klass, &gst_rtp_pt_demux_details); +} + +static void +gst_rtp_pt_demux_class_init (GstRTPPtDemuxClass * klass) +{ + GObjectClass *gobject_klass; + GstElementClass *gstelement_klass; + + gobject_klass = (GObjectClass *) klass; + gstelement_klass = (GstElementClass *) klass; + + gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE] = + g_signal_new ("new-payload-type", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstRTPPtDemuxClass, new_payload_type), + NULL, NULL, + g_cclosure_marshal_VOID__UINT_POINTER, + G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_PAD); + gst_rtp_pt_demux_signals[SIGNAL_PAYLOAD_TYPE_CHANGE] = + g_signal_new ("payload-type-change", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstRTPPtDemuxClass, payload_type_change), + NULL, NULL, gst_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + + gobject_klass->finalize = GST_DEBUG_FUNCPTR (gst_rtp_pt_demux_finalize); + + gstelement_klass->change_state = + GST_DEBUG_FUNCPTR (gst_rtp_pt_demux_change_state); + + GST_DEBUG_CATEGORY_INIT (gst_rtp_pt_demux_debug, + "rtpptdemux", 0, "RTP codec demuxer"); + +} + +static void +gst_rtp_pt_demux_init (GstRTPPtDemux * ptdemux, GstRTPPtDemuxClass * g_class) +{ + GstElementClass *klass = GST_ELEMENT_GET_CLASS (ptdemux); + + ptdemux->sink = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "sink"), "sink"); + g_assert (ptdemux->sink != NULL); + + gst_pad_set_chain_function (ptdemux->sink, gst_rtp_pt_demux_chain); + + gst_element_add_pad (GST_ELEMENT (ptdemux), ptdemux->sink); +} + +static void +gst_rtp_pt_demux_finalize (GObject * object) +{ + gst_rtp_pt_demux_release (GST_ELEMENT (object)); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static GstFlowReturn +gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstRTPPtDemux *rtpdemux; + GstElement *element = GST_ELEMENT (GST_OBJECT_PARENT (pad)); + guint8 pt; + GstPad *srcpad; + + rtpdemux = GST_RTP_PT_DEMUX (GST_OBJECT_PARENT (pad)); + + g_return_val_if_fail (gst_rtp_buffer_validate (buf), GST_FLOW_ERROR); + + pt = gst_rtp_buffer_get_payload_type (buf); + + srcpad = find_pad_for_pt (rtpdemux, pt); + if (srcpad == NULL) { + /* new PT, create a src pad */ + GstElementClass *klass; + GstPadTemplate *templ; + gchar *padname; + GstCaps *caps; + GstRTPPtDemuxPad *rtpdemuxpad; + + klass = GST_ELEMENT_GET_CLASS (rtpdemux); + templ = gst_element_class_get_pad_template (klass, "src%d"); + padname = g_strdup_printf ("src%d", pt); + srcpad = gst_pad_new_from_template (templ, padname); + g_free (padname); + + caps = gst_pad_get_caps (srcpad); + caps = gst_caps_make_writable (caps); + gst_caps_append_structure (caps, + gst_structure_new ("payload", "payload", G_TYPE_INT, pt, NULL)); + gst_pad_set_caps (srcpad, caps); + + /* XXX: set _link () function */ + gst_pad_set_getcaps_function (srcpad, gst_rtp_pt_demux_getcaps); + gst_pad_set_active (srcpad, TRUE); + gst_element_add_pad (element, srcpad); + + if (srcpad) { + GST_DEBUG ("Adding pt=%d to the list.", pt); + rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1); + rtpdemuxpad->pt = pt; + rtpdemuxpad->pad = srcpad; + rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad); + + GST_DEBUG ("emitting new-payload_type for pt %d", pt); + g_signal_emit (G_OBJECT (rtpdemux), + gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad); + } + } + + if (pt != rtpdemux->last_pt) { + gint emit_pt = pt; + + /* our own signal with an extra flag that this is the only pad */ + rtpdemux->last_pt = pt; + GST_DEBUG ("emitting payload-type-changed for pt %d", emit_pt); + g_signal_emit (G_OBJECT (rtpdemux), + gst_rtp_pt_demux_signals[SIGNAL_PAYLOAD_TYPE_CHANGE], 0, emit_pt); + } + + /* push to srcpad */ + if (srcpad) + gst_pad_push (srcpad, GST_BUFFER (buf)); + + return ret; +} + +static GstCaps * +gst_rtp_pt_demux_getcaps (GstPad * pad) +{ + GstCaps *caps; + + GST_OBJECT_LOCK (pad); + if ((caps = GST_PAD_CAPS (pad))) + caps = gst_caps_ref (caps); + GST_OBJECT_UNLOCK (pad); + + return caps; +} + +static GstPad * +find_pad_for_pt (GstRTPPtDemux * rtpdemux, guint8 pt) +{ + GstPad *respad = NULL; + GSList *item = rtpdemux->srcpads; + + for (; item; item = g_slist_next (item)) { + GstRTPPtDemuxPad *pad = item->data; + + if (pad->pt == pt) { + respad = pad->pad; + break; + } + } + + return respad; +} + +/** + * Reserves resources for the object. + */ +static gboolean +gst_rtp_pt_demux_setup (GstElement * element) +{ + GstRTPPtDemux *ptdemux = GST_RTP_PT_DEMUX (element); + gboolean res = TRUE; + + if (ptdemux) { + ptdemux->srcpads = NULL; + ptdemux->last_pt = 0xFFFF; + } + + return res; +} + +/** + * Free resources for the object. + */ +static void +gst_rtp_pt_demux_release (GstElement * element) +{ + GstRTPPtDemux *ptdemux = GST_RTP_PT_DEMUX (element); + + if (ptdemux) { + /* note: GstElement's dispose() will handle the pads */ + g_slist_free (ptdemux->srcpads); + ptdemux->srcpads = NULL; + } +} + +static GstStateChangeReturn +gst_rtp_pt_demux_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstRTPPtDemux *ptdemux; + + ptdemux = GST_RTP_PT_DEMUX (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + if (gst_rtp_pt_demux_setup (element) != TRUE) + ret = GST_STATE_CHANGE_FAILURE; + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + case GST_STATE_CHANGE_PAUSED_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_NULL: + gst_rtp_pt_demux_release (element); + break; + default: + break; + } + + return ret; +} diff --git a/gst/rtpmanager/gstrtpptdemux.h b/gst/rtpmanager/gstrtpptdemux.h new file mode 100644 index 00000000..93be3959 --- /dev/null +++ b/gst/rtpmanager/gstrtpptdemux.h @@ -0,0 +1,57 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_RTP_PT_DEMUX_H__ +#define __GST_RTP_PT_DEMUX_H__ + +#include <gst/gst.h> + +#define GST_TYPE_RTP_PT_DEMUX (gst_rtp_pt_demux_get_type()) +#define GST_RTP_PT_DEMUX(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_PT_DEMUX,GstRTPPtDemux)) +#define GST_RTP_PT_DEMUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_PT_DEMUX,GstRTPPtDemuxClass)) +#define GST_IS_RTP_PT_DEMUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_PT_DEMUX)) +#define GST_IS_RTP_PT_DEMUX_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_PT_DEMUX)) + +typedef struct _GstRTPPtDemux GstRTPPtDemux; +typedef struct _GstRTPPtDemuxClass GstRTPPtDemuxClass; +typedef struct _GstRTPPtDemuxPad GstRTPPtDemuxPad; + +struct _GstRTPPtDemux +{ + GstElement parent; /**< parent class */ + + GstPad *sink; /**< the sink pad */ + guint16 last_pt; /**< pt of the last packet 0xFFFF if none */ + GSList *srcpads; /**< a linked list of GstRTPPtDemuxPad objects */ +}; + +struct _GstRTPPtDemuxClass +{ + GstElementClass parent_class; + + /* signal emmited when a new PT is found from the incoming stream */ + void (*new_payload_type) (GstElement * element, gint pt, GstPad * pad); + + /* signal emitted when the payload type changes */ + void (*payload_type_change) (GstElement * element, gint pt); +}; + +GType gst_rtp_pt_demux_get_type (void); + +#endif /* __GST_RTP_PT_DEMUX_H__ */ diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c new file mode 100644 index 00000000..47df756f --- /dev/null +++ b/gst/rtpmanager/gstrtpsession.c @@ -0,0 +1,453 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:element-rtpsession + * @short_description: an RTP session manager + * @see_also: rtpjitterbuffer, rtpbin + * + * <refsect2> + * <para> + * </para> + * <title>Example pipelines</title> + * <para> + * <programlisting> + * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! alsasink + * </programlisting> + * </para> + * </refsect2> + * + * Last reviewed on 2007-04-02 (0.10.6) + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include "gstrtpsession.h" + +/* elementfactory information */ +static const GstElementDetails rtpsession_details = +GST_ELEMENT_DETAILS ("RTP Session", + "Filter/Editor/Video", + "Implement an RTP session", + "Wim Taymans <wim@fluendo.com>"); + +/* sink pads */ +static GstStaticPadTemplate rtpsession_recv_rtp_sink_template = +GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate rtpsession_recv_rtcp_sink_template = +GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtcp") + ); + +static GstStaticPadTemplate rtpsession_send_rtp_sink_template = +GST_STATIC_PAD_TEMPLATE ("send_rtp_sink", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtp") + ); + +/* src pads */ +static GstStaticPadTemplate rtpsession_recv_rtp_src_template = +GST_STATIC_PAD_TEMPLATE ("recv_rtp_src", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate rtpsession_sync_src_template = +GST_STATIC_PAD_TEMPLATE ("sync_src", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtcp") + ); + +static GstStaticPadTemplate rtpsession_send_rtp_src_template = +GST_STATIC_PAD_TEMPLATE ("send_rtp_src", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate rtpsession_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("rtcp_src", + GST_PAD_SRC, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtcp") + ); + +/* signals and args */ +enum +{ + /* FILL ME */ + LAST_SIGNAL +}; + +enum +{ + PROP_0 +}; + +/* GObject vmethods */ +static void gst_rtp_session_finalize (GObject * object); +static void gst_rtp_session_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_rtp_session_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +/* GstElement vmethods */ +static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element, + GstStateChange transition); +static GstPad *gst_rtp_session_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name); +static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad); + +/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */ + +GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT); + +static void +gst_rtp_session_base_init (gpointer klass) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + + /* sink pads */ + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpsession_recv_rtp_sink_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpsession_recv_rtcp_sink_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpsession_send_rtp_sink_template)); + + /* src pads */ + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpsession_recv_rtp_src_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpsession_sync_src_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpsession_send_rtp_src_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtpsession_rtcp_src_template)); + + gst_element_class_set_details (element_class, &rtpsession_details); +} + +static void +gst_rtp_session_class_init (GstRTPSessionClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + gobject_class->finalize = gst_rtp_session_finalize; + gobject_class->set_property = gst_rtp_session_set_property; + gobject_class->get_property = gst_rtp_session_get_property; + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad); +} + +static void +gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass) +{ +} + +static void +gst_rtp_session_finalize (GObject * object) +{ + GstRTPSession *rtpsession; + + rtpsession = GST_RTP_SESSION (object); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_rtp_session_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRTPSession *rtpsession; + + rtpsession = GST_RTP_SESSION (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_session_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRTPSession *rtpsession; + + rtpsession = GST_RTP_SESSION (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rtp_session_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn res; + GstRTPSession *rtpsession; + + rtpsession = GST_RTP_SESSION (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + break; + default: + break; + } + + res = parent_class->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + return res; +} + +/* receive a packet from a sender, send it to the RTP session manager and + * forward the packet on the rtp_src pad + */ +static GstFlowReturn +gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) +{ + GstRTPSession *rtpsession; + GstFlowReturn ret; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + /* FIXME, do something */ + ret = gst_pad_push (rtpsession->recv_rtp_src, buffer); + + gst_object_unref (rtpsession); + + return ret; +} + +/* Receive an RTCP packet from a sender, send it to the RTP session manager and + * forward the SR packets to the sync_src pad. + */ +static GstFlowReturn +gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer) +{ + GstRTPSession *rtpsession; + GstFlowReturn ret; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + /* FIXME, do something */ + ret = gst_pad_push (rtpsession->sync_src, buffer); + + gst_object_unref (rtpsession); + + return ret; +} + +/* Recieve an RTP packet to be send to the receivers, send to RTP session + * manager and forward to send_rtp_src. + */ +static GstFlowReturn +gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) +{ + GstRTPSession *rtpsession; + GstFlowReturn ret; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + /* FIXME, do something */ + ret = gst_pad_push (rtpsession->send_rtp_src, buffer); + + gst_object_unref (rtpsession); + + return ret; +} + + +/* Create sinkpad to receive RTP packets from senders. This will also create a + * srcpad for the RTP packets. + */ +static GstPad * +create_recv_rtp_sink (GstRTPSession * rtpsession) +{ + rtpsession->recv_rtp_sink = + gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template, + NULL); + gst_pad_set_chain_function (rtpsession->recv_rtp_sink, + gst_rtp_session_chain_recv_rtp); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), + rtpsession->recv_rtp_sink); + + rtpsession->recv_rtp_src = + gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, + NULL); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); + + return rtpsession->recv_rtp_sink; +} + +/* Create a sinkpad to receive RTCP messages from senders, this will also create a + * sync_src pad for the SR packets. + */ +static GstPad * +create_recv_rtcp_sink (GstRTPSession * rtpsession) +{ + rtpsession->recv_rtcp_sink = + gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template, + NULL); + gst_pad_set_chain_function (rtpsession->recv_rtcp_sink, + gst_rtp_session_chain_recv_rtcp); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), + rtpsession->recv_rtcp_sink); + + rtpsession->sync_src = + gst_pad_new_from_static_template (&rtpsession_sync_src_template, NULL); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src); + + return rtpsession->recv_rtcp_sink; +} + +/* Create a sinkpad to receive RTP packets for receivers. This will also create a + * send_rtp_src pad. + */ +static GstPad * +create_send_rtp_sink (GstRTPSession * rtpsession) +{ + rtpsession->send_rtp_sink = + gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template, + NULL); + gst_pad_set_chain_function (rtpsession->send_rtp_sink, + gst_rtp_session_chain_send_rtp); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), + rtpsession->recv_rtcp_sink); + + rtpsession->send_rtp_src = + gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, + NULL); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); + + return rtpsession->send_rtp_sink; +} + +/* Create a srcpad with the RTCP packets to send out. + * This pad will be driven by the RTP session manager when it wants to send out + * RTCP packets. + */ +static GstPad * +create_rtcp_src (GstRTPSession * rtpsession) +{ + rtpsession->rtcp_src = + gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src); + + return rtpsession->rtcp_src; +} + +static GstPad * +gst_rtp_session_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name) +{ + GstRTPSession *rtpsession; + GstElementClass *klass; + GstPad *result; + + g_return_val_if_fail (templ != NULL, NULL); + g_return_val_if_fail (GST_IS_RTP_SESSION (element), NULL); + + rtpsession = GST_RTP_SESSION (element); + klass = GST_ELEMENT_GET_CLASS (element); + + /* figure out the template */ + if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) { + if (rtpsession->recv_rtp_sink != NULL) + goto exists; + + result = create_recv_rtp_sink (rtpsession); + } else if (templ == gst_element_class_get_pad_template (klass, + "recv_rtcp_sink")) { + if (rtpsession->recv_rtcp_sink != NULL) + goto exists; + + result = create_recv_rtcp_sink (rtpsession); + } else if (templ == gst_element_class_get_pad_template (klass, + "send_rtp_sink")) { + if (rtpsession->send_rtp_sink != NULL) + goto exists; + + result = create_send_rtp_sink (rtpsession); + } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) { + if (rtpsession->rtcp_src != NULL) + goto exists; + + result = create_rtcp_src (rtpsession); + } else + goto wrong_template; + + return result; + + /* ERRORS */ +wrong_template: + { + g_warning ("rtpsession: this is not our template"); + return NULL; + } +exists: + { + g_warning ("rtpsession: pad already requested"); + return NULL; + } +} + +static void +gst_rtp_session_release_pad (GstElement * element, GstPad * pad) +{ +} diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h new file mode 100644 index 00000000..8b343064 --- /dev/null +++ b/gst/rtpmanager/gstrtpsession.h @@ -0,0 +1,62 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_RTP_SESSION_H__ +#define __GST_RTP_SESSION_H__ + +#include <gst/gst.h> + +#define GST_TYPE_RTP_SESSION \ + (gst_rtp_session_get_type()) +#define GST_RTP_SESSION(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_SESSION,GstRTPSession)) +#define GST_RTP_SESSION_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_SESSION,GstRTPSessionClass)) +#define GST_IS_RTP_SESSION(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_SESSION)) +#define GST_IS_RTP_SESSION_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_SESSION)) + +typedef struct _GstRTPSession GstRTPSession; +typedef struct _GstRTPSessionClass GstRTPSessionClass; +typedef struct _GstRTPSessionPrivate GstRTPSessionPrivate; + +struct _GstRTPSession { + GstElement element; + + /*< private >*/ + GstPad *recv_rtp_sink; + GstPad *recv_rtcp_sink; + GstPad *send_rtp_sink; + + GstPad *recv_rtp_src; + GstPad *sync_src; + GstPad *send_rtp_src; + GstPad *rtcp_src; + + GstRTPSessionPrivate *priv; +}; + +struct _GstRTPSessionClass { + GstElementClass parent_class; +}; + +GType gst_rtp_session_get_type (void); + +#endif /* __GST_RTP_SESSION_H__ */ |