diff options
author | Wim Taymans <wim.taymans@gmail.com> | 2007-08-10 17:16:53 +0000 |
---|---|---|
committer | Wim Taymans <wim.taymans@gmail.com> | 2007-08-10 17:16:53 +0000 |
commit | 9e50d836d4173f127adc56446fa74f1e103fa673 (patch) | |
tree | cdde2fad6fe1ea9945bdcc179438c94d6e425805 | |
parent | a9090746e58e9a994d7f07f61452602b31461e1e (diff) | |
download | gst-plugins-bad-9e50d836d4173f127adc56446fa74f1e103fa673.tar.gz gst-plugins-bad-9e50d836d4173f127adc56446fa74f1e103fa673.tar.bz2 gst-plugins-bad-9e50d836d4173f127adc56446fa74f1e103fa673.zip |
gst/rtpmanager/: Remove complicated async queue and replace with more simple jitterbuffer code while also fixing some...
Original commit message from CVS:
* gst/rtpmanager/Makefile.am:
* gst/rtpmanager/async_jitter_queue.c:
* gst/rtpmanager/async_jitter_queue.h:
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_class_init),
(rtp_jitter_buffer_init), (rtp_jitter_buffer_finalize),
(rtp_jitter_buffer_new), (compare_seqnum),
(rtp_jitter_buffer_insert), (rtp_jitter_buffer_pop),
(rtp_jitter_buffer_flush), (rtp_jitter_buffer_num_packets),
(rtp_jitter_buffer_get_ts_diff):
* gst/rtpmanager/rtpjitterbuffer.h:
Remove complicated async queue and replace with more simple jitterbuffer
code while also fixing some bugs.
* gst/rtpmanager/gstrtpbin-marshal.list:
* gst/rtpmanager/gstrtpbin.c: (on_new_ssrc), (on_ssrc_collision),
(on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout),
(create_session), (gst_rtp_bin_class_init), (create_recv_rtp),
(create_send_rtp):
* gst/rtpmanager/gstrtpbin.h:
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_init), (gst_rtp_jitter_buffer_dispose),
(gst_jitter_buffer_sink_parse_caps),
(gst_rtp_jitter_buffer_flush_start),
(gst_rtp_jitter_buffer_flush_stop),
(gst_rtp_jitter_buffer_change_state),
(gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain),
(gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_set_property):
* gst/rtpmanager/gstrtpsession.c: (on_new_ssrc),
(on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc),
(on_bye_timeout), (on_timeout), (gst_rtp_session_class_init),
(gst_rtp_session_init):
* gst/rtpmanager/gstrtpsession.h:
* gst/rtpmanager/rtpsession.c: (on_bye_ssrc), (session_cleanup):
Use new jitterbuffer code.
Expose some new signals in preparation for handling EOS.
-rw-r--r-- | ChangeLog | 38 | ||||
-rw-r--r-- | gst/rtpmanager/Makefile.am | 4 | ||||
-rw-r--r-- | gst/rtpmanager/async_jitter_queue.c | 692 | ||||
-rw-r--r-- | gst/rtpmanager/async_jitter_queue.h | 130 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpbin-marshal.list | 1 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpbin.c | 141 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpbin.h | 8 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpjitterbuffer.c | 253 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 132 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpsession.h | 8 | ||||
-rw-r--r-- | gst/rtpmanager/rtpjitterbuffer.c | 237 | ||||
-rw-r--r-- | gst/rtpmanager/rtpjitterbuffer.h | 67 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsession.c | 2 |
13 files changed, 748 insertions, 965 deletions
@@ -1,3 +1,41 @@ +2007-08-10 Wim Taymans <wim.taymans@gmail.com> + + * gst/rtpmanager/Makefile.am: + * gst/rtpmanager/async_jitter_queue.c: + * gst/rtpmanager/async_jitter_queue.h: + * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_class_init), + (rtp_jitter_buffer_init), (rtp_jitter_buffer_finalize), + (rtp_jitter_buffer_new), (compare_seqnum), + (rtp_jitter_buffer_insert), (rtp_jitter_buffer_pop), + (rtp_jitter_buffer_flush), (rtp_jitter_buffer_num_packets), + (rtp_jitter_buffer_get_ts_diff): + * gst/rtpmanager/rtpjitterbuffer.h: + Remove complicated async queue and replace with more simple jitterbuffer + code while also fixing some bugs. + + * gst/rtpmanager/gstrtpbin-marshal.list: + * gst/rtpmanager/gstrtpbin.c: (on_new_ssrc), (on_ssrc_collision), + (on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout), + (create_session), (gst_rtp_bin_class_init), (create_recv_rtp), + (create_send_rtp): + * gst/rtpmanager/gstrtpbin.h: + * gst/rtpmanager/gstrtpjitterbuffer.c: + (gst_rtp_jitter_buffer_init), (gst_rtp_jitter_buffer_dispose), + (gst_jitter_buffer_sink_parse_caps), + (gst_rtp_jitter_buffer_flush_start), + (gst_rtp_jitter_buffer_flush_stop), + (gst_rtp_jitter_buffer_change_state), + (gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain), + (gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_set_property): + * gst/rtpmanager/gstrtpsession.c: (on_new_ssrc), + (on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc), + (on_bye_timeout), (on_timeout), (gst_rtp_session_class_init), + (gst_rtp_session_init): + * gst/rtpmanager/gstrtpsession.h: + * gst/rtpmanager/rtpsession.c: (on_bye_ssrc), (session_cleanup): + Use new jitterbuffer code. + Expose some new signals in preparation for handling EOS. + 2007-08-10 Sebastian Dröge <slomo@circular-chaos.org> * gst/filter/gstbpwsinc.c: (bpwsinc_build_kernel): diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am index 9e47cbdf..cad27a36 100644 --- a/gst/rtpmanager/Makefile.am +++ b/gst/rtpmanager/Makefile.am @@ -13,10 +13,10 @@ BUILT_SOURCES = $(built_sources) $(built_headers) libgstrtpmanager_la_SOURCES = gstrtpmanager.c \ gstrtpbin.c \ gstrtpclient.c \ - async_jitter_queue.c \ gstrtpjitterbuffer.c \ gstrtpptdemux.c \ gstrtpssrcdemux.c \ + rtpjitterbuffer.c \ rtpsession.c \ rtpsource.c \ rtpstats.c \ @@ -27,10 +27,10 @@ nodist_libgstrtpmanager_la_SOURCES = \ noinst_HEADERS = gstrtpbin.h \ gstrtpclient.h \ - async_jitter_queue.h \ gstrtpjitterbuffer.h \ gstrtpptdemux.h \ gstrtpssrcdemux.h \ + rtpjitterbuffer.h \ rtpsession.h \ rtpsource.h \ rtpstats.h \ diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c deleted file mode 100644 index 73597b28..00000000 --- a/gst/rtpmanager/async_jitter_queue.c +++ /dev/null @@ -1,692 +0,0 @@ -/* - * Async Jitter Queue based on g_async_queue - * This code is GST RTP smart and deals with timestamps - * - * Farsight Voice+Video library - * Copyright 2007 Collabora Ltd, - * Copyright 2007 Nokia Corporation - * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>. - * - * This is an async queue that has a buffering mecanism based on the set low - * and high threshold. When the lower threshold is reached, the queue will - * fill itself up until the higher threshold is reached before allowing any - * pops to occur. This allows a jitterbuffer of at least min threshold items - * to be available. - */ - -/* GLIB - Library of useful routines for C programming - * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald - * - * GAsyncQueue: asynchronous queue implementation, based on Gqueue. - * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/* - * MT safe - */ - -#include "config.h" - -#include "async_jitter_queue.h" - -#include <gst/gst.h> -#include <gst/rtp/gstrtpbuffer.h> - -#define DEFAULT_LOW_THRESHOLD 0.1 -#define DEFAULT_HIGH_THRESHOLD 0.9 - -struct _AsyncJitterQueue -{ - GMutex *mutex; - GCond *cond; - GQueue *queue; - guint waiting_threads; - gint32 ref_count; - gfloat low_threshold; - gfloat high_threshold; - guint32 max_queue_length; - gboolean buffering; - gboolean pop_flushing; - gboolean pop_blocking; - guint pops_remaining; - guint32 tail_buffer_duration; -}; - -/** - * async_jitter_queue_new: - * - * Creates a new asynchronous queue with the initial reference count of 1. - * - * Return value: the new #AsyncJitterQueue. - **/ -AsyncJitterQueue * -async_jitter_queue_new (void) -{ - AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1); - - retval->mutex = g_mutex_new (); - retval->cond = g_cond_new (); - retval->queue = g_queue_new (); - retval->waiting_threads = 0; - retval->ref_count = 1; - retval->low_threshold = DEFAULT_LOW_THRESHOLD; - retval->high_threshold = DEFAULT_HIGH_THRESHOLD; - retval->buffering = TRUE; /* we need to buffer initially */ - retval->pop_flushing = TRUE; - retval->pop_blocking = TRUE; - retval->pops_remaining = 0; - retval->tail_buffer_duration = 0; - return retval; -} - -/* checks buffering state and wakes up waiting pops */ -void -signal_waiting_threads (AsyncJitterQueue * queue) -{ - if (async_jitter_queue_length_ts_units_unlocked (queue) >= - queue->high_threshold * queue->max_queue_length) { - GST_DEBUG ("stop buffering"); - queue->buffering = FALSE; - } - - if (queue->waiting_threads > 0) { - if (!queue->buffering) { - g_cond_signal (queue->cond); - } - } -} - -/** - * async_jitter_queue_ref: - * @queue: a #AsyncJitterQueue. - * - * Increases the reference count of the asynchronous @queue by 1. You - * do not need to hold the lock to call this function. - * - * Returns: the @queue that was passed in (since 2.6) - **/ -AsyncJitterQueue * -async_jitter_queue_ref (AsyncJitterQueue * queue) -{ - g_return_val_if_fail (queue, NULL); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); - - g_atomic_int_inc (&queue->ref_count); - - return queue; -} - -/** - * async_jitter_queue_ref_unlocked: - * @queue: a #AsyncJitterQueue. - * - * Increases the reference count of the asynchronous @queue by 1. - **/ -void -async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - g_atomic_int_inc (&queue->ref_count); -} - -/** - * async_jitter_queue_set_low_threshold: - * @queue: a #AsyncJitterQueue. - * @threshold: the lower threshold (fraction of max size) - * - * Sets the low threshold on the queue. This threshold indicates the minimum - * number of items allowed in the queue before we refill it up to the set - * maximum threshold. - **/ -void -async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue, - gfloat threshold) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->low_threshold = threshold; -} - -/** - * async_jitter_queue_set_max_threshold: - * @queue: a #AsyncJitterQueue. - * @threshold: the higher threshold (fraction of max size) - * - * Sets the high threshold on the queue. This threshold indicates the amount of - * items to fill in the queue before releasing any blocking pop calls. This - * blocking mecanism is only triggered when we reach the low threshold and must - * refill the queue. - **/ -void -async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue, - gfloat threshold) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->high_threshold = threshold; -} - -/* set the maximum queue length in RTP timestamp units */ -void -async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue, - guint32 max_length) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->max_queue_length = max_length; -} - -GQueue * -async_jitter_queue_get_g_queue (AsyncJitterQueue * queue) -{ - g_return_val_if_fail (queue, NULL); - - return queue->queue; -} - -static guint32 -calculate_ts_diff (guint32 high_ts, guint32 low_ts) -{ - /* it needs to work if ts wraps */ - if (high_ts >= low_ts) { - return high_ts - low_ts; - } else { - return high_ts + G_MAXUINT32 + 1 - low_ts; - } -} - -/* this function returns the length of the queue in timestamp units. It will - * also add the duration of the last buffer in the queue */ -/* FIXME This function wrongly assumes that there are no missing packets inside - * the buffer, in reality it needs to check for gaps and subsctract those from - * the total */ -guint32 -async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue) -{ - guint32 tail_ts; - guint32 head_ts; - guint32 ret; - GstBuffer *head; - GstBuffer *tail; - - g_return_val_if_fail (queue, 0); - - if (queue->queue->length < 2) { - return 0; - } - - tail = g_queue_peek_tail (queue->queue); - head = g_queue_peek_head (queue->queue); - - if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head)) - return 0; - - tail_ts = gst_rtp_buffer_get_timestamp (tail); - head_ts = gst_rtp_buffer_get_timestamp (head); - - ret = calculate_ts_diff (head_ts, tail_ts); - - /* let's add the duration of the tail buffer */ - ret += queue->tail_buffer_duration; - - return ret; -} - -/** - * async_jitter_queue_unref_and_unlock: - * @queue: a #AsyncJitterQueue. - * - * Decreases the reference count of the asynchronous @queue by 1 and - * releases the lock. This function must be called while holding the - * @queue's lock. If the reference count went to 0, the @queue will be - * destroyed and the memory allocated will be freed. - **/ -void -async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - g_mutex_unlock (queue->mutex); - async_jitter_queue_unref (queue); -} - -/** - * async_jitter_queue_unref: - * @queue: a #AsyncJitterQueue. - * - * Decreases the reference count of the asynchronous @queue by 1. If - * the reference count went to 0, the @queue will be destroyed and the - * memory allocated will be freed. So you are not allowed to use the - * @queue afterwards, as it might have disappeared. You do not need to - * hold the lock to call this function. - **/ -void -async_jitter_queue_unref (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - if (g_atomic_int_dec_and_test (&queue->ref_count)) { - g_return_if_fail (queue->waiting_threads == 0); - g_mutex_free (queue->mutex); - if (queue->cond) - g_cond_free (queue->cond); - g_queue_free (queue->queue); - g_free (queue); - } -} - -/** - * async_jitter_queue_lock: - * @queue: a #AsyncJitterQueue. - * - * Acquires the @queue's lock. After that you can only call the - * <function>async_jitter_queue_*_unlocked()</function> function variants on that - * @queue. Otherwise it will deadlock. - **/ -void -async_jitter_queue_lock (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - g_mutex_lock (queue->mutex); -} - -/** - * async_jitter_queue_unlock: - * @queue: a #AsyncJitterQueue. - * - * Releases the queue's lock. - **/ -void -async_jitter_queue_unlock (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - g_mutex_unlock (queue->mutex); -} - -/** - * async_jitter_queue_push: - * @queue: a #AsyncJitterQueue. - * @data: @data to push into the @queue. - * - * Pushes the @data into the @queue. @data must not be %NULL. - **/ -void -async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - g_return_if_fail (data); - - g_mutex_lock (queue->mutex); - async_jitter_queue_push_unlocked (queue, data); - g_mutex_unlock (queue->mutex); -} - -/** - * async_jitter_queue_push_unlocked: - * @queue: a #AsyncJitterQueue. - * @data: @data to push into the @queue. - * - * Pushes the @data into the @queue. @data must not be %NULL. This - * function must be called while holding the @queue's lock. - **/ -void -async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - g_return_if_fail (data); - - g_queue_push_head (queue->queue, data); - - signal_waiting_threads (queue); -} - -/** - * async_jitter_queue_push_sorted: - * @queue: a #AsyncJitterQueue - * @data: the @data to push into the @queue - * @func: the #GCompareDataFunc is used to sort @queue. This function - * is passed two elements of the @queue. The function should return - * 0 if they are equal, a negative value if the first element - * should be higher in the @queue or a positive value if the first - * element should be lower in the @queue than the second element. - * @user_data: user data passed to @func. - * - * Inserts @data into @queue using @func to determine the new - * position. - * - * This function requires that the @queue is sorted before pushing on - * new elements. - * - * This function will lock @queue before it sorts the queue and unlock - * it when it is finished. - * - * For an example of @func see async_jitter_queue_sort(). - * - * Since: 2.10 - **/ -gboolean -async_jitter_queue_push_sorted (AsyncJitterQueue * queue, - gpointer data, GCompareDataFunc func, gpointer user_data) -{ - gboolean ret; - - g_return_val_if_fail (queue != NULL, FALSE); - - g_mutex_lock (queue->mutex); - ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data); - g_mutex_unlock (queue->mutex); - - return ret; -} - -/** - * async_jitter_queue_push_sorted_unlocked: - * @queue: a #AsyncJitterQueue - * @data: the @data to push into the @queue - * @func: the #GCompareDataFunc is used to sort @queue. This function - * is passed two elements of the @queue. The function should return - * 0 if they are equal, a negative value if the first element - * should be higher in the @queue or a positive value if the first - * element should be lower in the @queue than the second element. - * @user_data: user data passed to @func. - * - * Inserts @data into @queue using @func to determine the new - * position. - * - * This function requires that the @queue is sorted before pushing on - * new elements. - * - * If @GCompareDataFunc returns 0, this function does not insert @data and - * return FALSE. - * - * This function is called while holding the @queue's lock. - * - * For an example of @func see async_jitter_queue_sort(). - * - * Since: 2.10 - **/ -gboolean -async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue, - gpointer data, GCompareDataFunc func, gpointer user_data) -{ - GList *list; - gint func_ret = TRUE; - - g_return_val_if_fail (queue != NULL, FALSE); - - list = queue->queue->head; - while (list && (func_ret = func (list->data, data, user_data)) < 0) - list = list->next; - - if (func_ret == 0) { - return FALSE; - } - if (list) { - g_queue_insert_before (queue->queue, list, data); - } else { - g_queue_push_tail (queue->queue, data); - } - - signal_waiting_threads (queue); - return TRUE; -} - -void -async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue, - GList * sibling, gpointer data) -{ - g_return_if_fail (queue != NULL); - - g_queue_insert_before (queue->queue, sibling, data); - - signal_waiting_threads (queue); -} - -static gpointer -async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue) -{ - gpointer retval; - GstBuffer *tail_buffer = NULL; - guint tsunits; - - if (queue->pop_flushing) - return NULL; - - while (queue->pop_blocking) { - queue->waiting_threads++; - g_cond_wait (queue->cond, queue->mutex); - queue->waiting_threads--; - if (queue->pop_flushing) - return NULL; - } - - - tsunits = async_jitter_queue_length_ts_units_unlocked (queue); - - GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining, - (int) (queue->low_threshold * queue->max_queue_length)); - - if (tsunits <= queue->low_threshold * queue->max_queue_length - && queue->pops_remaining == 0) { - if (!queue->buffering) { - GST_DEBUG ("start buffering"); - queue->buffering = TRUE; - queue->pops_remaining = queue->queue->length; - } - - GST_DEBUG ("wait for data"); - while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) { - queue->waiting_threads++; - g_cond_wait (queue->cond, queue->mutex); - queue->waiting_threads--; - if (queue->pop_flushing) - return NULL; - } - } - - retval = g_queue_pop_tail (queue->queue); - if (queue->pops_remaining) - queue->pops_remaining--; - - tail_buffer = g_queue_peek_tail (queue->queue); - if (tail_buffer) { - if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) { - queue->tail_buffer_duration = 0; - } else if (gst_rtp_buffer_get_seq (tail_buffer) - - gst_rtp_buffer_get_seq (retval) == 1) { - queue->tail_buffer_duration = - calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer), - gst_rtp_buffer_get_timestamp (retval)); - } else { - /* There is a sequence number gap -> we can't calculate the duration - * let's just set it to 0 */ - queue->tail_buffer_duration = 0; - } - } - - g_assert (retval); - - return retval; -} - -/** - * async_jitter_queue_pop: - * @queue: a #AsyncJitterQueue. - * - * Pops data from the @queue. This function blocks until data become - * available. If pop is disabled, tis function return NULL. - * - * Return value: data from the queue. - **/ -gpointer -async_jitter_queue_pop (AsyncJitterQueue * queue) -{ - gpointer retval; - - g_return_val_if_fail (queue, NULL); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); - - g_mutex_lock (queue->mutex); - retval = async_jitter_queue_pop_intern_unlocked (queue); - g_mutex_unlock (queue->mutex); - - return retval; -} - -/** - * async_jitter_queue_pop_unlocked: - * @queue: a #AsyncJitterQueue. - * - * Pops data from the @queue. This function blocks until data become - * available. This function must be called while holding the @queue's - * lock. - * - * Return value: data from the queue. - **/ -gpointer -async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue) -{ - g_return_val_if_fail (queue, NULL); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); - - return async_jitter_queue_pop_intern_unlocked (queue); -} - -/** - * async_jitter_queue_length: - * @queue: a #AsyncJitterQueue. - * - * Returns the length of the queue - * Return value: the length of the @queue. - **/ -gint -async_jitter_queue_length (AsyncJitterQueue * queue) -{ - gint retval; - - g_return_val_if_fail (queue, 0); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0); - - g_mutex_lock (queue->mutex); - retval = queue->queue->length; - g_mutex_unlock (queue->mutex); - - return retval; -} - -/** - * async_jitter_queue_length_unlocked: - * @queue: a #AsyncJitterQueue. - * - * Returns the length of the queue. - * - * Return value: the length of the @queue. - **/ -gint -async_jitter_queue_length_unlocked (AsyncJitterQueue * queue) -{ - g_return_val_if_fail (queue, 0); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0); - - return queue->queue->length; -} - -/** - * async_jitter_queue_set_flushing_unlocked: - * @queue: a #AsyncJitterQueue. - * @free_func: a function to call to free the elements - * @user_data: user data passed to @free_func - * - * This function is used to set/unset flushing. If flushing is set any - * waiting/blocked pops will be unblocked. Any subsequent calls to pop will - * return NULL. Flushing is set by default. - */ -void -async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue, - GFunc free_func, gpointer user_data) -{ - gpointer elem; - - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->pop_flushing = TRUE; - /* let's unblock any remaining pops */ - if (queue->waiting_threads > 0) - g_cond_broadcast (queue->cond); - /* free data from queue */ - while ((elem = g_queue_pop_head (queue->queue))) - free_func (elem, user_data); -} - -/** - * async_jitter_queue_unset_flushing_unlocked: - * @queue: a #AsyncJitterQueue. - * @free_func: a function to call to free the elements - * @user_data: user data passed to @free_func - * - * This function is used to set/unset flushing. If flushing is set any - * waiting/blocked pops will be unblocked. Any subsequent calls to pop will - * return NULL. Flushing is set by default. - */ -void -async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->pop_flushing = FALSE; - /* let's unblock any remaining pops */ - if (queue->waiting_threads > 0) - g_cond_broadcast (queue->cond); -} - -/** - * async_jitter_queue_set_blocking_unlocked: - * @queue: a #AsyncJitterQueue. - * @enabled: a boolean to enable/disable blocking - * - * This function is used to enable/disable blocking. If blocking is enabled any - * pops will be blocked until the queue is unblocked. The queue is blocked by - * default. - */ -void -async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue, - gboolean blocking) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->pop_blocking = blocking; - /* let's unblock any remaining pops */ - if (queue->waiting_threads > 0) - g_cond_broadcast (queue->cond); -} diff --git a/gst/rtpmanager/async_jitter_queue.h b/gst/rtpmanager/async_jitter_queue.h deleted file mode 100644 index bea76d6d..00000000 --- a/gst/rtpmanager/async_jitter_queue.h +++ /dev/null @@ -1,130 +0,0 @@ -/* Async Jitter Queue based on g_async_queue - * - * Farsight Voice+Video library - * Copyright 2007 Collabora Ltd, - * Copyright 2007 Nokia Corporation - * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>. - */ - -/* GLIB - Library of useful routines for C programming - * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/* - * Modified by the GLib Team and others 1997-2000. See the AUTHORS - * file for a list of people on the GLib Team. See the ChangeLog - * files for a list of changes. These files are distributed with - * GLib at ftp://ftp.gtk.org/pub/gtk/. - */ - -#ifndef __ASYNCJITTERQUEUE_H__ -#define __ASYNCJITTERQUEUE_H__ - -#include <glib.h> -#include <glib/gthread.h> - -G_BEGIN_DECLS - -typedef struct _AsyncJitterQueue AsyncJitterQueue; - -/* Asyncronous Queues, can be used to communicate between threads - */ - -/* Get a new AsyncJitterQueue with the ref_count 1 */ -AsyncJitterQueue* async_jitter_queue_new (void); - -/* Lock and unlock a AsyncJitterQueue. All functions lock the queue for - * themselves, but in certain cirumstances you want to hold the lock longer, - * thus you lock the queue, call the *_unlocked functions and unlock it again. - */ -void async_jitter_queue_lock (AsyncJitterQueue *queue); -void async_jitter_queue_unlock (AsyncJitterQueue *queue); - -/* Ref and unref the AsyncJitterQueue. */ -AsyncJitterQueue* async_jitter_queue_ref (AsyncJitterQueue *queue); -void async_jitter_queue_unref (AsyncJitterQueue *queue); -#ifndef G_DISABLE_DEPRECATED -/* You don't have to hold the lock for calling *_ref and *_unref anymore. */ -void async_jitter_queue_ref_unlocked (AsyncJitterQueue *queue); -void async_jitter_queue_unref_and_unlock (AsyncJitterQueue *queue); -#endif /* !G_DISABLE_DEPRECATED */ - -void async_jitter_queue_set_low_threshold (AsyncJitterQueue *queue, - gfloat threshold); -void async_jitter_queue_set_high_threshold (AsyncJitterQueue *queue, - gfloat threshold); - -void async_jitter_queue_set_max_queue_length (AsyncJitterQueue *queue, - guint32 max_length); - -/* Push data into the async queue. Must not be NULL. */ -void async_jitter_queue_push (AsyncJitterQueue *queue, - gpointer data); -void async_jitter_queue_push_unlocked (AsyncJitterQueue *queue, - gpointer data); -gboolean async_jitter_queue_push_sorted (AsyncJitterQueue *queue, - gpointer data, - GCompareDataFunc func, - gpointer user_data); - -void async_jitter_queue_insert_after_unlocked(AsyncJitterQueue *queue, - GList *sibling, - gpointer data); - -gboolean async_jitter_queue_push_sorted_unlocked(AsyncJitterQueue *queue, - gpointer data, - GCompareDataFunc func, - gpointer user_data); - -/* Pop data from the async queue. When no data is there, the thread is blocked - * until data arrives. */ -gpointer async_jitter_queue_pop (AsyncJitterQueue *queue); -gpointer async_jitter_queue_pop_unlocked (AsyncJitterQueue *queue); - -/* Try to pop data. NULL is returned in case of empty queue. */ -gpointer async_jitter_queue_try_pop (AsyncJitterQueue *queue); -gpointer async_jitter_queue_try_pop_unlocked (AsyncJitterQueue *queue); - -/* Wait for data until at maximum until end_time is reached. NULL is returned - * in case of empty queue. */ -gpointer async_jitter_queue_timed_pop (AsyncJitterQueue *queue, - GTimeVal *end_time); -gpointer async_jitter_queue_timed_pop_unlocked (AsyncJitterQueue *queue, - GTimeVal *end_time); - -/* Return the length of the queue. Negative values mean that threads - * are waiting, positve values mean that there are entries in the - * queue. Actually this function returns the length of the queue minus - * the number of waiting threads, async_jitter_queue_length == 0 could also - * mean 'n' entries in the queue and 'n' thread waiting. Such can - * happen due to locking of the queue or due to scheduling. */ -gint async_jitter_queue_length (AsyncJitterQueue *queue); -gint async_jitter_queue_length_unlocked (AsyncJitterQueue *queue); - -void async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue* queue, - GFunc free_func, gpointer user_data); -void async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue* queue); -void async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue* queue, - gboolean blocking); -guint32 -async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue *queue); - -G_END_DECLS - -#endif /* __ASYNCJITTERQUEUE_H__ */ - diff --git a/gst/rtpmanager/gstrtpbin-marshal.list b/gst/rtpmanager/gstrtpbin-marshal.list index d0b9103b..ca760d82 100644 --- a/gst/rtpmanager/gstrtpbin-marshal.list +++ b/gst/rtpmanager/gstrtpbin-marshal.list @@ -2,3 +2,4 @@ UINT:UINT BOXED:UINT BOXED:UINT,UINT VOID:UINT,OBJECT +VOID:UINT,UINT diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index cc0afd00..e96291b5 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -158,6 +158,13 @@ enum { SIGNAL_REQUEST_PT_MAP, SIGNAL_CLEAR_PT_MAP, + + SIGNAL_ON_NEW_SSRC, + SIGNAL_ON_SSRC_COLLISION, + SIGNAL_ON_SSRC_VALIDATED, + SIGNAL_ON_BYE_SSRC, + SIGNAL_ON_BYE_TIMEOUT, + SIGNAL_ON_TIMEOUT, LAST_SIGNAL }; @@ -258,6 +265,48 @@ find_session_by_id (GstRTPBin * rtpbin, gint id) return NULL; } +static void +on_new_ssrc (GstElement * session, guint32 ssrc, GstRTPBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0, + sess->id, ssrc); +} + +static void +on_ssrc_collision (GstElement * session, guint32 ssrc, GstRTPBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0, + sess->id, ssrc); +} + +static void +on_ssrc_validated (GstElement * session, guint32 ssrc, GstRTPBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0, + sess->id, ssrc); +} + +static void +on_bye_ssrc (GstElement * session, guint32 ssrc, GstRTPBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0, + sess->id, ssrc); +} + +static void +on_bye_timeout (GstElement * session, guint32 ssrc, GstRTPBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0, + sess->id, ssrc); +} + +static void +on_timeout (GstElement * session, guint32 ssrc, GstRTPBinSession * sess) +{ + g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0, + sess->id, ssrc); +} + /* create a session with the given id. Must be called with RTP_BIN_LOCK */ static GstRTPBinSession * create_session (GstRTPBin * rtpbin, gint id) @@ -284,6 +333,18 @@ create_session (GstRTPBin * rtpbin, gint id) g_signal_connect (session, "request-pt-map", (GCallback) pt_map_requested, sess); + g_signal_connect (sess->session, "on-new-ssrc", + (GCallback) on_new_ssrc, sess); + g_signal_connect (sess->session, "on-ssrc-collision", + (GCallback) on_ssrc_collision, sess); + g_signal_connect (sess->session, "on-ssrc-validated", + (GCallback) on_ssrc_validated, sess); + g_signal_connect (sess->session, "on-bye-ssrc", + (GCallback) on_bye_ssrc, sess); + g_signal_connect (sess->session, "on-bye-timeout", + (GCallback) on_bye_timeout, sess); + g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess); + gst_bin_add (GST_BIN_CAST (rtpbin), session); gst_element_set_state (session, GST_STATE_PLAYING); gst_bin_add (GST_BIN_CAST (rtpbin), demux); @@ -557,6 +618,86 @@ gst_rtp_bin_class_init (GstRTPBinClass * klass) G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTPBinClass, clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); + /** + * GstRTPBin::on-new-ssrc: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the SSRC + * + * Notify of a new SSRC that entered @session. + */ + gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] = + g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_new_ssrc), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, + G_TYPE_UINT, G_TYPE_UINT); + /** + * GstRTPBin::on-ssrc_collision: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the SSRC + * + * Notify when we have an SSRC collision + */ + gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] = + g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_ssrc_collision), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, + G_TYPE_UINT, G_TYPE_UINT); + /** + * GstRTPBin::on-ssrc_validated: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the SSRC + * + * Notify of a new SSRC that became validated. + */ + gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] = + g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_ssrc_validated), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, + G_TYPE_UINT, G_TYPE_UINT); + + /** + * GstRTPBin::on-bye-ssrc: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the SSRC + * + * Notify of an SSRC that became inactive because of a BYE packet. + */ + gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] = + g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_bye_ssrc), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, + G_TYPE_UINT, G_TYPE_UINT); + /** + * GstRTPBin::on-bye-timeout: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the SSRC + * + * Notify of an SSRC that has timed out because of BYE + */ + gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] = + g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_bye_timeout), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, + G_TYPE_UINT, G_TYPE_UINT); + /** + * GstRTPBin::on-timeout: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the SSRC + * + * Notify of an SSRC that has timed out + */ + gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] = + g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_timeout), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, + G_TYPE_UINT, G_TYPE_UINT); + gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_rtp_bin_provide_clock); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index ffbdd62c..99cb2815 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -57,8 +57,14 @@ struct _GstRTPBinClass { /* get the caps for pt */ GstCaps* (*request_pt_map) (GstRTPBin *rtpbin, guint session, guint pt); - void (*clear_pt_map) (GstRTPBin *rtpbin); + + void (*on_new_ssrc) (GstRTPBin *rtpbin, guint session, guint32 ssrc); + void (*on_ssrc_collision) (GstRTPBin *rtpbin, guint session, guint32 ssrc); + void (*on_ssrc_validated) (GstRTPBin *rtpbin, guint session, guint32 ssrc); + void (*on_bye_ssrc) (GstRTPBin *rtpbin, guint session, guint32 ssrc); + void (*on_bye_timeout) (GstRTPBin *rtpbin, guint session, guint32 ssrc); + void (*on_timeout) (GstRTPBin *rtpbin, guint session, guint32 ssrc); }; GType gst_rtp_bin_get_type (void); diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 79d06788..fe85f87f 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -4,7 +4,7 @@ * Copyright 2007 Collabora Ltd, * Copyright 2007 Nokia Corporation * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>. - * Copyright 2007 Wim Taymans <wim@fluendo.com> + * Copyright 2007 Wim Taymans <wim.taymans@gmail.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -72,7 +72,7 @@ #include "gstrtpbin-marshal.h" #include "gstrtpjitterbuffer.h" -#include "async_jitter_queue.h" +#include "rtpjitterbuffer.h" GST_DEBUG_CATEGORY (rtpjitterbuffer_debug); #define GST_CAT_DEFAULT (rtpjitterbuffer_debug) @@ -87,7 +87,7 @@ GST_ELEMENT_DETAILS ("RTP packet jitter-buffer", "Filter/Network/RTP", "A buffer that deals with network jitter and other transmission faults", "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, " - "Wim Taymans <wim@fluendo.com>"); + "Wim Taymans <wim.taymans@gmail.com>"); /* RTPJitterBuffer signals and args */ enum @@ -107,11 +107,32 @@ enum PROP_DROP_ON_LATENCY }; +#define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock)) + +#define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \ + JBUF_LOCK (priv); \ + if (priv->srcresult != GST_FLOW_OK) \ + goto label; \ +} G_STMT_END + +#define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock)) +#define JBUF_WAIT(priv) (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock)) + +#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \ + JBUF_WAIT(priv); \ + if (priv->srcresult != GST_FLOW_OK) \ + goto label; \ +} G_STMT_END + +#define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond)) + struct _GstRTPJitterBufferPrivate { GstPad *sinkpad, *srcpad; - AsyncJitterQueue *queue; + RTPJitterBuffer *jbuf; + GMutex *jbuf_lock; + GCond *jbuf_cond; /* properties */ guint latency_ms; @@ -122,12 +143,16 @@ struct _GstRTPJitterBufferPrivate /* the next expected seqnum */ guint32 next_seqnum; + /* state */ + gboolean eos; + /* clock rate and rtp timestamp offset */ gint32 clock_rate; gint64 clock_base; /* when we are shutting down */ GstFlowReturn srcresult; + gboolean blocked; /* for sync */ GstSegment segment; @@ -292,9 +317,9 @@ gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer, priv->latency_ms = DEFAULT_LATENCY_MS; priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY; - priv->queue = async_jitter_queue_new (); - async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD); - async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD); + priv->jbuf = rtp_jitter_buffer_new (); + priv->jbuf_lock = g_mutex_new (); + priv->jbuf_cond = g_cond_new (); priv->waiting_seqnum = -1; @@ -332,9 +357,9 @@ gst_rtp_jitter_buffer_dispose (GObject * object) GstRTPJitterBuffer *jitterbuffer; jitterbuffer = GST_RTP_JITTER_BUFFER (object); - if (jitterbuffer->priv->queue) { - async_jitter_queue_unref (jitterbuffer->priv->queue); - jitterbuffer->priv->queue = NULL; + if (jitterbuffer->priv->jbuf) { + g_object_unref (jitterbuffer->priv->jbuf); + jitterbuffer->priv->jbuf = NULL; } G_OBJECT_CLASS (parent_class)->dispose (object); @@ -430,9 +455,6 @@ gst_jitter_buffer_sink_parse_caps (GstRTPJitterBuffer * jitterbuffer, } else priv->next_seqnum = -1; - async_jitter_queue_set_max_queue_length (priv->queue, - priv->latency_ms * priv->clock_rate / 1000); - return TRUE; /* ERRORS */ @@ -470,34 +492,24 @@ gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps) } static void -free_func (gpointer data, GstRTPJitterBuffer * user_data) -{ - if (GST_IS_BUFFER (data)) - gst_buffer_unref (GST_BUFFER_CAST (data)); - else - gst_event_unref (GST_EVENT_CAST (data)); -} - -static void gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer) { GstRTPJitterBufferPrivate *priv; priv = jitterbuffer->priv; - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* mark ourselves as flushing */ priv->srcresult = GST_FLOW_WRONG_STATE; GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue"); /* this unblocks any waiting pops on the src pad task */ - async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue, - (GFunc) free_func, jitterbuffer); + JBUF_SIGNAL (priv); + rtp_jitter_buffer_flush (priv->jbuf); /* unlock clock, we just unschedule, the entry will be released by the * locking streaming thread. */ if (priv->clock_id) gst_clock_id_unschedule (priv->clock_id); - - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); } static void @@ -507,7 +519,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer) priv = jitterbuffer->priv; - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue"); /* Mark as non flushing */ priv->srcresult = GST_FLOW_OK; @@ -515,9 +527,8 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer) priv->last_popped_seqnum = -1; priv->next_seqnum = -1; priv->clock_rate = -1; - /* allow pops from the src pad task */ - async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue); - async_jitter_queue_unlock (priv->queue); + priv->eos = FALSE; + JBUF_UNLOCK (priv); } static gboolean @@ -566,21 +577,20 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* reset negotiated values */ priv->clock_rate = -1; priv->clock_base = -1; /* block until we go to PLAYING */ - async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, - TRUE); - async_jitter_queue_unlock (priv->queue); + priv->blocked = TRUE; + JBUF_UNLOCK (priv); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* unblock to allow streaming in PLAYING */ - async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, - FALSE); - async_jitter_queue_unlock (priv->queue); + priv->blocked = FALSE; + JBUF_SIGNAL (priv); + JBUF_UNLOCK (priv); break; default: break; @@ -596,11 +606,10 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* block to stop streaming when PAUSED */ - async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, - TRUE); - async_jitter_queue_unlock (priv->queue); + priv->blocked = TRUE; + JBUF_UNLOCK (priv); if (ret != GST_STATE_CHANGE_FAILURE) ret = GST_STATE_CHANGE_NO_PREROLL; break; @@ -630,30 +639,6 @@ priv_compare_rtp_seq_lt (guint16 a, guint16 b) } } -/** - * gets the seqnum from the buffers and compare them - */ -static gint -compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b) -{ - gint ret; - - if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) { - /* two buffers */ - ret = priv_compare_rtp_seq_lt - (gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)), - gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b))); - } else { - /* one of them is an event, the event always goes before the other element - * so we return -1. */ - if (GST_IS_EVENT (a)) - ret = -1; - else - ret = 1; - } - return ret; -} - static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event) { @@ -707,16 +692,20 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event) case GST_EVENT_EOS: { /* push EOS in queue. We always push it at the head */ - async_jitter_queue_lock (priv->queue); - GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS"); + JBUF_LOCK (priv); /* check for flushing, we need to discard the event and return FALSE when * we are flushing */ ret = priv->srcresult == GST_FLOW_OK; - if (ret) - async_jitter_queue_push_unlocked (priv->queue, event); - else + if (ret) { + GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS"); + priv->eos = TRUE; + JBUF_SIGNAL (priv); + } else { + GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s", + gst_flow_get_name (priv->srcresult)); gst_event_unref (event); - async_jitter_queue_unlock (priv->queue); + } + JBUF_UNLOCK (priv); break; } default: @@ -780,7 +769,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) GstRTPJitterBuffer *jitterbuffer; GstRTPJitterBufferPrivate *priv; guint16 seqnum; - GstFlowReturn ret; + GstFlowReturn ret = GST_FLOW_OK; jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); @@ -803,10 +792,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) seqnum = gst_rtp_buffer_get_seq (buffer); GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum); - async_jitter_queue_lock (priv->queue); - ret = priv->srcresult; - if (ret != GST_FLOW_OK) - goto out_flushing; + JBUF_LOCK_CHECK (priv, out_flushing); + /* don't accept more data on EOS */ + if (priv->eos) + goto have_eos; /* let's check if this buffer is too late, we cannot accept packets with * bigger seqnum than the one we already pushed. */ @@ -818,14 +807,18 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) /* let's drop oldest packet if the queue is already full and drop-on-latency * is set. */ if (priv->drop_on_latency) { - if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >= - priv->latency_ms * priv->clock_rate / 1000) { + guint64 latency_ts; + + latency_ts = + gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000); + + if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) { GstBuffer *old_buf; GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d", seqnum); - old_buf = async_jitter_queue_pop_unlocked (priv->queue); + old_buf = rtp_jitter_buffer_pop (priv->jbuf); gst_buffer_unref (old_buf); } } @@ -833,10 +826,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) /* now insert the packet into the queue in sorted order. This function returns * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ - if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer, - (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) + if (!rtp_jitter_buffer_insert (priv->jbuf, buffer)) goto duplicate; + /* signal addition of new buffer */ + JBUF_SIGNAL (priv); + /* let's unschedule and unblock any waiting buffers. We only want to do this * if there is a currently waiting newer (> seqnum) buffer */ if (priv->clock_id) { @@ -846,11 +841,11 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) } } - GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d", - seqnum, async_jitter_queue_length_unlocked (priv->queue)); + GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets", + seqnum, rtp_jitter_buffer_num_packets (priv->jbuf)); finished: - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); gst_object_unref (jitterbuffer); @@ -875,10 +870,18 @@ not_negotiated: } out_flushing: { + ret = priv->srcresult; GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret)); gst_buffer_unref (buffer); goto finished; } +have_eos: + { + ret = GST_FLOW_UNEXPECTED; + GST_DEBUG_OBJECT (jitterbuffer, "we are EOS, refusing buffer"); + gst_buffer_unref (buffer); + goto finished; + } too_late: { GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already" @@ -908,8 +911,7 @@ static void gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer) { GstRTPJitterBufferPrivate *priv; - gpointer elem; - GstBuffer *outbuf; + GstBuffer *outbuf = NULL; GstFlowReturn result; guint16 seqnum; guint32 rtp_time; @@ -918,44 +920,24 @@ gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer) priv = jitterbuffer->priv; - async_jitter_queue_lock (priv->queue); + JBUF_LOCK_CHECK (priv, flushing); again: GST_DEBUG_OBJECT (jitterbuffer, "Popping item"); - /* pop a buffer, we will get NULL if the queue was shut down */ - elem = async_jitter_queue_pop_unlocked (priv->queue); - if (!elem) - goto no_elem; - - /* special code for events */ - if (G_UNLIKELY (GST_IS_EVENT (elem))) { - GstEvent *event = GST_EVENT_CAST (elem); - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_EOS: - GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue"); - /* we don't expect more data now, makes upstream perform EOS actions */ - priv->srcresult = GST_FLOW_UNEXPECTED; - break; - default: - GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue", - GST_EVENT_TYPE_NAME (event)); - break; - } - async_jitter_queue_unlock (priv->queue); - - /* push event */ - gst_pad_push_event (priv->srcpad, event); - return; + /* wait if we are blocked or don't have a packet and eos */ + while (priv->blocked || !(rtp_jitter_buffer_num_packets (priv->jbuf) + || priv->eos)) { + JBUF_WAIT_CHECK (priv, flushing); } + if (priv->eos) + goto do_eos; - /* we know it's a buffer now */ - outbuf = GST_BUFFER_CAST (elem); + /* pop a buffer, we must have a buffer now */ + outbuf = rtp_jitter_buffer_pop (priv->jbuf); seqnum = gst_rtp_buffer_get_seq (outbuf); - GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d", - gst_rtp_buffer_get_seq (outbuf), - async_jitter_queue_length_unlocked (priv->queue)); + GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d, now %d left", + seqnum, rtp_jitter_buffer_num_packets (priv->jbuf)); /* If we don't know what the next seqnum should be (== -1) we have to wait * because it might be possible that we are not receiving this buffer in-order, @@ -1032,11 +1014,11 @@ again: GST_OBJECT_UNLOCK (jitterbuffer); /* release the lock so that the other end can push stuff or unlock */ - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); ret = gst_clock_id_wait (id, &jitter); - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* and free the entry */ gst_clock_id_unref (id); priv->clock_id = NULL; @@ -1054,8 +1036,7 @@ again: GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled, will retry to push with new buffer"); /* reinserting popped buffer into queue */ - if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf, - (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) { + if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) { GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", seqnum); priv->num_duplicates++; @@ -1087,7 +1068,7 @@ push_buffer: * so the other end can push stuff in the queue again. */ priv->last_popped_seqnum = seqnum; priv->next_seqnum = (seqnum + 1) & 0xffff; - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); /* push buffer */ GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum); @@ -1098,20 +1079,22 @@ push_buffer: return; /* ERRORS */ -no_elem: +do_eos: { /* store result, we are flushing now */ - GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing"); - priv->srcresult = GST_FLOW_WRONG_STATE; + GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream"); + priv->srcresult = GST_FLOW_UNEXPECTED; gst_pad_pause_task (priv->srcpad); - async_jitter_queue_unlock (priv->queue); + gst_pad_push_event (priv->srcpad, gst_event_new_eos ()); + JBUF_UNLOCK (priv); return; } flushing: { GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); - gst_buffer_unref (outbuf); - async_jitter_queue_unlock (priv->queue); + if (outbuf) + gst_buffer_unref (outbuf); + JBUF_UNLOCK (priv); return; } pause: @@ -1120,13 +1103,13 @@ pause: GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason); - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* store result */ priv->srcresult = result; /* we don't post errors or anything because upstream will do that for us * when we pass the return value upstream. */ gst_pad_pause_task (priv->srcpad); - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); return; } } @@ -1194,11 +1177,7 @@ gst_rtp_jitter_buffer_set_property (GObject * object, old_latency = jitterbuffer->priv->latency_ms; jitterbuffer->priv->latency_ms = new_latency; - if (jitterbuffer->priv->clock_rate != -1) { - async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue, - gst_util_uint64_scale_int (new_latency, - jitterbuffer->priv->clock_rate, 1000)); - } + /* post message if latency changed, this will infor the parent pipeline * that a latency reconfiguration is possible. */ if (new_latency != old_latency) { diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 3e33cf6a..bb47a29e 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -202,6 +202,13 @@ enum { SIGNAL_REQUEST_PT_MAP, SIGNAL_CLEAR_PT_MAP, + + SIGNAL_ON_NEW_SSRC, + SIGNAL_ON_SSRC_COLLISION, + SIGNAL_ON_SSRC_VALIDATED, + SIGNAL_ON_BYE_SSRC, + SIGNAL_ON_BYE_TIMEOUT, + SIGNAL_ON_TIMEOUT, LAST_SIGNAL }; @@ -266,6 +273,48 @@ static void gst_rtp_session_clear_pt_map (GstRTPSession * rtpsession); static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; +static void +on_new_ssrc (RTPSession * session, RTPSource * src, GstRTPSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, + src->ssrc); +} + +static void +on_ssrc_collision (RTPSession * session, RTPSource * src, GstRTPSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0, + src->ssrc); +} + +static void +on_ssrc_validated (RTPSession * session, RTPSource * src, GstRTPSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0, + src->ssrc); +} + +static void +on_bye_ssrc (RTPSession * session, RTPSource * src, GstRTPSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, + src->ssrc); +} + +static void +on_bye_timeout (RTPSession * session, RTPSource * src, GstRTPSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, + src->ssrc); +} + +static void +on_timeout (RTPSession * session, RTPSource * src, GstRTPSession * sess) +{ + g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, + src->ssrc); +} + GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT); static void @@ -332,6 +381,76 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass) G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTPSessionClass, clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE); + /** + * GstRTPSession::on-new-ssrc: + * @sess: the object which received the signal + * @ssrc: the SSRC + * + * Notify of a new SSRC that entered @session. + */ + gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] = + g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_new_ssrc), + NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + /** + * GstRTPSession::on-ssrc_collision: + * @sess: the object which received the signal + * @ssrc: the SSRC + * + * Notify when we have an SSRC collision + */ + gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] = + g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, + on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT, + G_TYPE_NONE, 1, G_TYPE_UINT); + /** + * GstRTPSession::on-ssrc_validated: + * @sess: the object which received the signal + * @ssrc: the SSRC + * + * Notify of a new SSRC that became validated. + */ + gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] = + g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, + on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT, + G_TYPE_NONE, 1, G_TYPE_UINT); + + /** + * GstRTPSession::on-bye-ssrc: + * @sess: the object which received the signal + * @ssrc: the SSRC + * + * Notify of an SSRC that became inactive because of a BYE packet. + */ + gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] = + g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_bye_ssrc), + NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + /** + * GstRTPSession::on-bye-timeout: + * @sess: the object which received the signal + * @ssrc: the SSRC + * + * Notify of an SSRC that has timed out because of BYE + */ + gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] = + g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_bye_timeout), + NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + /** + * GstRTPSession::on-timeout: + * @sess: the object which received the signal + * @ssrc: the SSRC + * + * Notify of an SSRC that has timed out + */ + gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] = + g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_timeout), + NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); gstelement_class->request_new_pad = @@ -353,6 +472,19 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass) rtpsession->priv->session = rtp_session_new (); /* configure callbacks */ rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession); + /* configure signals */ + g_signal_connect (rtpsession->priv->session, "on-new-ssrc", + (GCallback) on_new_ssrc, rtpsession); + g_signal_connect (rtpsession->priv->session, "on-ssrc-collision", + (GCallback) on_ssrc_collision, rtpsession); + g_signal_connect (rtpsession->priv->session, "on-ssrc-validated", + (GCallback) on_ssrc_validated, rtpsession); + g_signal_connect (rtpsession->priv->session, "on-bye-ssrc", + (GCallback) on_bye_ssrc, rtpsession); + g_signal_connect (rtpsession->priv->session, "on-bye-timeout", + (GCallback) on_bye_timeout, rtpsession); + g_signal_connect (rtpsession->priv->session, "on-timeout", + (GCallback) on_timeout, rtpsession); } static void diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h index c58f23e9..6c9fb774 100644 --- a/gst/rtpmanager/gstrtpsession.h +++ b/gst/rtpmanager/gstrtpsession.h @@ -59,8 +59,14 @@ struct _GstRTPSessionClass { /* signals */ GstCaps* (*request_pt_map) (GstRTPSession *sess, guint pt); - void (*clear_pt_map) (GstRTPSession *sess); + + void (*on_new_ssrc) (GstRTPSession *sess, guint32 ssrc); + void (*on_ssrc_collision) (GstRTPSession *sess, guint32 ssrc); + void (*on_ssrc_validated) (GstRTPSession *sess, guint32 ssrc); + void (*on_bye_ssrc) (GstRTPSession *sess, guint32 ssrc); + void (*on_bye_timeout) (GstRTPSession *sess, guint32 ssrc); + void (*on_timeout) (GstRTPSession *sess, guint32 ssrc); }; GType gst_rtp_session_get_type (void); diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c new file mode 100644 index 00000000..f90811b4 --- /dev/null +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -0,0 +1,237 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ +#include <string.h> + +#include <gst/rtp/gstrtpbuffer.h> +#include <gst/rtp/gstrtcpbuffer.h> + +#include "rtpjitterbuffer.h" + +GST_DEBUG_CATEGORY_STATIC (rtp_jitter_buffer_debug); +#define GST_CAT_DEFAULT rtp_jitter_buffer_debug + +/* signals and args */ +enum +{ + LAST_SIGNAL +}; + +enum +{ + PROP_0 +}; + +/* GObject vmethods */ +static void rtp_jitter_buffer_finalize (GObject * object); + +/* static guint rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 }; */ + +G_DEFINE_TYPE (RTPJitterBuffer, rtp_jitter_buffer, G_TYPE_OBJECT); + +static void +rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass) +{ + GObjectClass *gobject_class; + + gobject_class = (GObjectClass *) klass; + + gobject_class->finalize = rtp_jitter_buffer_finalize; + + GST_DEBUG_CATEGORY_INIT (rtp_jitter_buffer_debug, "rtpjitterbuffer", 0, + "RTP Jitter Buffer"); +} + +static void +rtp_jitter_buffer_init (RTPJitterBuffer * jbuf) +{ + jbuf->packets = g_queue_new (); +} + +static void +rtp_jitter_buffer_finalize (GObject * object) +{ + RTPJitterBuffer *jbuf; + + jbuf = RTP_JITTER_BUFFER_CAST (object); + + rtp_jitter_buffer_flush (jbuf); + g_queue_free (jbuf->packets); + + G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object); +} + +/** + * rtp_jitter_buffer_new: + * + * Create an #RTPJitterBuffer. + * + * Returns: a new #RTPJitterBuffer. Use g_object_unref() after usage. + */ +RTPJitterBuffer * +rtp_jitter_buffer_new (void) +{ + RTPJitterBuffer *jbuf; + + jbuf = g_object_new (RTP_TYPE_JITTER_BUFFER, NULL); + + return jbuf; +} + +static gint +compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf) +{ + guint16 seq1, seq2; + + seq1 = gst_rtp_buffer_get_seq (a); + seq2 = gst_rtp_buffer_get_seq (b); + + /* check if diff more than half of the 16bit range */ + if (abs (seq2 - seq1) > (1 << 15)) { + /* one of a/b has wrapped */ + return seq1 - seq2; + } else { + return seq2 - seq1; + } +} + +/** + * rtp_jitter_buffer_insert: + * @jbuf: an #RTPJitterBuffer + * @buf: a buffer + * + * Inserts @buf into the packet queue of @jbuf. The sequence number of the + * packet will be used to sort the packets. This function takes ownerhip of + * @buf when the function returns %TRUE. + * + * Returns: %FALSE if a packet with the same number already existed. + */ +gboolean +rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf) +{ + GList *list; + gint func_ret = 1; + + g_return_val_if_fail (jbuf != NULL, FALSE); + g_return_val_if_fail (buf != NULL, FALSE); + + /* loop the list to skip strictly smaller seqnum buffers */ + list = jbuf->packets->head; + while (list + && (func_ret = + compare_seqnum (GST_BUFFER_CAST (list->data), buf, jbuf)) < 0) + list = list->next; + + /* we hit a packet with the same seqnum, return FALSE to notify a duplicate */ + if (func_ret == 0) + return FALSE; + + if (list) + g_queue_insert_before (jbuf->packets, list, buf); + else + g_queue_push_tail (jbuf->packets, buf); + + return TRUE; +} + +/** + * rtp_jitter_buffer_pop: + * @jbuf: an #RTPJitterBuffer + * + * Pops the oldest buffer from the packet queue of @jbuf. + * + * Returns: a #GstBuffer or %NULL when there was no packet in the queue. + */ +GstBuffer * +rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf) +{ + GstBuffer *buf; + + g_return_val_if_fail (jbuf != NULL, FALSE); + + buf = g_queue_pop_tail (jbuf->packets); + + return buf; +} + +/** + * rtp_jitter_buffer_flush: + * @jbuf: an #RTPJitterBuffer + * + * Flush all packets from the jitterbuffer. + */ +void +rtp_jitter_buffer_flush (RTPJitterBuffer * jbuf) +{ + GstBuffer *buffer; + + g_return_if_fail (jbuf != NULL); + + while ((buffer = g_queue_pop_head (jbuf->packets))) + gst_buffer_unref (buffer); +} + +/** + * rtp_jitter_buffer_num_packets: + * @jbuf: an #RTPJitterBuffer + * + * Get the number of packets currently in "jbuf. + * + * Returns: The number of packets in @jbuf. + */ +guint +rtp_jitter_buffer_num_packets (RTPJitterBuffer * jbuf) +{ + g_return_val_if_fail (jbuf != NULL, 0); + + return jbuf->packets->length; +} + +/** + * rtp_jitter_buffer_get_ts_diff: + * @jbuf: an #RTPJitterBuffer + * + * Get the difference between the timestamps of first and last packet in the + * jitterbuffer. + * + * Returns: The difference expressed in the timestamp units of the packets. + */ +guint32 +rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf) +{ + guint32 high_ts, low_ts; + GstBuffer *high_buf, *low_buf; + + g_return_val_if_fail (jbuf != NULL, 0); + + high_buf = g_queue_peek_head (jbuf->packets); + low_buf = g_queue_peek_tail (jbuf->packets); + + if (!high_buf || !low_buf || high_buf == low_buf) + return 0; + + high_ts = gst_rtp_buffer_get_timestamp (high_buf); + low_ts = gst_rtp_buffer_get_timestamp (low_buf); + + /* it needs to work if ts wraps */ + if (high_ts >= low_ts) { + return high_ts - low_ts; + } else { + return high_ts + G_MAXUINT32 + 1 - low_ts; + } +} diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h new file mode 100644 index 00000000..14b5b3f2 --- /dev/null +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -0,0 +1,67 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __RTP_JITTER_BUFFER_H__ +#define __RTP_JITTER_BUFFER_H__ + +#include <gst/gst.h> +#include <gst/rtp/gstrtcpbuffer.h> +#include <gst/netbuffer/gstnetbuffer.h> + +typedef struct _RTPJitterBuffer RTPJitterBuffer; +typedef struct _RTPJitterBufferClass RTPJitterBufferClass; + +#define RTP_TYPE_JITTER_BUFFER (rtp_jitter_buffer_get_type()) +#define RTP_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_JITTER_BUFFER,RTPJitterBuffer)) +#define RTP_JITTER_BUFFER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_JITTER_BUFFER,RTPJitterBufferClass)) +#define RTP_IS_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_JITTER_BUFFER)) +#define RTP_IS_JITTER_BUFFER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_JITTER_BUFFER)) +#define RTP_JITTER_BUFFER_CAST(src) ((RTPJitterBuffer *)(src)) + +/** + * RTPJitterBuffer: + * + * A JitterBuffer in the #RTPSession + */ +struct _RTPJitterBuffer { + GObject object; + + GQueue *packets; +}; + +struct _RTPJitterBufferClass { + GObjectClass parent_class; +}; + +GType rtp_jitter_buffer_get_type (void); + +/* managing lifetime */ +RTPJitterBuffer* rtp_jitter_buffer_new (void); + +gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf); +GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf); + +void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf); + +guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf); +guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf); + + + +#endif /* __RTP_JITTER_BUFFER_H__ */ diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 74907912..2b3bcb82 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -271,7 +271,6 @@ on_ssrc_validated (RTPSession * sess, RTPSource * source) static void on_bye_ssrc (RTPSession * sess, RTPSource * source) { - /* notify app that reconsideration should be performed */ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source); } @@ -1724,7 +1723,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) on_bye_timeout (sess, source); else on_timeout (sess, source); - } return remove; } |