diff options
Diffstat (limited to 'gst/rtpmanager/async_jitter_queue.c')
-rw-r--r-- | gst/rtpmanager/async_jitter_queue.c | 692 |
1 files changed, 0 insertions, 692 deletions
diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c deleted file mode 100644 index 73597b28..00000000 --- a/gst/rtpmanager/async_jitter_queue.c +++ /dev/null @@ -1,692 +0,0 @@ -/* - * Async Jitter Queue based on g_async_queue - * This code is GST RTP smart and deals with timestamps - * - * Farsight Voice+Video library - * Copyright 2007 Collabora Ltd, - * Copyright 2007 Nokia Corporation - * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>. - * - * This is an async queue that has a buffering mecanism based on the set low - * and high threshold. When the lower threshold is reached, the queue will - * fill itself up until the higher threshold is reached before allowing any - * pops to occur. This allows a jitterbuffer of at least min threshold items - * to be available. - */ - -/* GLIB - Library of useful routines for C programming - * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald - * - * GAsyncQueue: asynchronous queue implementation, based on Gqueue. - * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/* - * MT safe - */ - -#include "config.h" - -#include "async_jitter_queue.h" - -#include <gst/gst.h> -#include <gst/rtp/gstrtpbuffer.h> - -#define DEFAULT_LOW_THRESHOLD 0.1 -#define DEFAULT_HIGH_THRESHOLD 0.9 - -struct _AsyncJitterQueue -{ - GMutex *mutex; - GCond *cond; - GQueue *queue; - guint waiting_threads; - gint32 ref_count; - gfloat low_threshold; - gfloat high_threshold; - guint32 max_queue_length; - gboolean buffering; - gboolean pop_flushing; - gboolean pop_blocking; - guint pops_remaining; - guint32 tail_buffer_duration; -}; - -/** - * async_jitter_queue_new: - * - * Creates a new asynchronous queue with the initial reference count of 1. - * - * Return value: the new #AsyncJitterQueue. - **/ -AsyncJitterQueue * -async_jitter_queue_new (void) -{ - AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1); - - retval->mutex = g_mutex_new (); - retval->cond = g_cond_new (); - retval->queue = g_queue_new (); - retval->waiting_threads = 0; - retval->ref_count = 1; - retval->low_threshold = DEFAULT_LOW_THRESHOLD; - retval->high_threshold = DEFAULT_HIGH_THRESHOLD; - retval->buffering = TRUE; /* we need to buffer initially */ - retval->pop_flushing = TRUE; - retval->pop_blocking = TRUE; - retval->pops_remaining = 0; - retval->tail_buffer_duration = 0; - return retval; -} - -/* checks buffering state and wakes up waiting pops */ -void -signal_waiting_threads (AsyncJitterQueue * queue) -{ - if (async_jitter_queue_length_ts_units_unlocked (queue) >= - queue->high_threshold * queue->max_queue_length) { - GST_DEBUG ("stop buffering"); - queue->buffering = FALSE; - } - - if (queue->waiting_threads > 0) { - if (!queue->buffering) { - g_cond_signal (queue->cond); - } - } -} - -/** - * async_jitter_queue_ref: - * @queue: a #AsyncJitterQueue. - * - * Increases the reference count of the asynchronous @queue by 1. You - * do not need to hold the lock to call this function. - * - * Returns: the @queue that was passed in (since 2.6) - **/ -AsyncJitterQueue * -async_jitter_queue_ref (AsyncJitterQueue * queue) -{ - g_return_val_if_fail (queue, NULL); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); - - g_atomic_int_inc (&queue->ref_count); - - return queue; -} - -/** - * async_jitter_queue_ref_unlocked: - * @queue: a #AsyncJitterQueue. - * - * Increases the reference count of the asynchronous @queue by 1. - **/ -void -async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - g_atomic_int_inc (&queue->ref_count); -} - -/** - * async_jitter_queue_set_low_threshold: - * @queue: a #AsyncJitterQueue. - * @threshold: the lower threshold (fraction of max size) - * - * Sets the low threshold on the queue. This threshold indicates the minimum - * number of items allowed in the queue before we refill it up to the set - * maximum threshold. - **/ -void -async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue, - gfloat threshold) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->low_threshold = threshold; -} - -/** - * async_jitter_queue_set_max_threshold: - * @queue: a #AsyncJitterQueue. - * @threshold: the higher threshold (fraction of max size) - * - * Sets the high threshold on the queue. This threshold indicates the amount of - * items to fill in the queue before releasing any blocking pop calls. This - * blocking mecanism is only triggered when we reach the low threshold and must - * refill the queue. - **/ -void -async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue, - gfloat threshold) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->high_threshold = threshold; -} - -/* set the maximum queue length in RTP timestamp units */ -void -async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue, - guint32 max_length) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->max_queue_length = max_length; -} - -GQueue * -async_jitter_queue_get_g_queue (AsyncJitterQueue * queue) -{ - g_return_val_if_fail (queue, NULL); - - return queue->queue; -} - -static guint32 -calculate_ts_diff (guint32 high_ts, guint32 low_ts) -{ - /* it needs to work if ts wraps */ - if (high_ts >= low_ts) { - return high_ts - low_ts; - } else { - return high_ts + G_MAXUINT32 + 1 - low_ts; - } -} - -/* this function returns the length of the queue in timestamp units. It will - * also add the duration of the last buffer in the queue */ -/* FIXME This function wrongly assumes that there are no missing packets inside - * the buffer, in reality it needs to check for gaps and subsctract those from - * the total */ -guint32 -async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue) -{ - guint32 tail_ts; - guint32 head_ts; - guint32 ret; - GstBuffer *head; - GstBuffer *tail; - - g_return_val_if_fail (queue, 0); - - if (queue->queue->length < 2) { - return 0; - } - - tail = g_queue_peek_tail (queue->queue); - head = g_queue_peek_head (queue->queue); - - if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head)) - return 0; - - tail_ts = gst_rtp_buffer_get_timestamp (tail); - head_ts = gst_rtp_buffer_get_timestamp (head); - - ret = calculate_ts_diff (head_ts, tail_ts); - - /* let's add the duration of the tail buffer */ - ret += queue->tail_buffer_duration; - - return ret; -} - -/** - * async_jitter_queue_unref_and_unlock: - * @queue: a #AsyncJitterQueue. - * - * Decreases the reference count of the asynchronous @queue by 1 and - * releases the lock. This function must be called while holding the - * @queue's lock. If the reference count went to 0, the @queue will be - * destroyed and the memory allocated will be freed. - **/ -void -async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - g_mutex_unlock (queue->mutex); - async_jitter_queue_unref (queue); -} - -/** - * async_jitter_queue_unref: - * @queue: a #AsyncJitterQueue. - * - * Decreases the reference count of the asynchronous @queue by 1. If - * the reference count went to 0, the @queue will be destroyed and the - * memory allocated will be freed. So you are not allowed to use the - * @queue afterwards, as it might have disappeared. You do not need to - * hold the lock to call this function. - **/ -void -async_jitter_queue_unref (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - if (g_atomic_int_dec_and_test (&queue->ref_count)) { - g_return_if_fail (queue->waiting_threads == 0); - g_mutex_free (queue->mutex); - if (queue->cond) - g_cond_free (queue->cond); - g_queue_free (queue->queue); - g_free (queue); - } -} - -/** - * async_jitter_queue_lock: - * @queue: a #AsyncJitterQueue. - * - * Acquires the @queue's lock. After that you can only call the - * <function>async_jitter_queue_*_unlocked()</function> function variants on that - * @queue. Otherwise it will deadlock. - **/ -void -async_jitter_queue_lock (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - g_mutex_lock (queue->mutex); -} - -/** - * async_jitter_queue_unlock: - * @queue: a #AsyncJitterQueue. - * - * Releases the queue's lock. - **/ -void -async_jitter_queue_unlock (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - g_mutex_unlock (queue->mutex); -} - -/** - * async_jitter_queue_push: - * @queue: a #AsyncJitterQueue. - * @data: @data to push into the @queue. - * - * Pushes the @data into the @queue. @data must not be %NULL. - **/ -void -async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - g_return_if_fail (data); - - g_mutex_lock (queue->mutex); - async_jitter_queue_push_unlocked (queue, data); - g_mutex_unlock (queue->mutex); -} - -/** - * async_jitter_queue_push_unlocked: - * @queue: a #AsyncJitterQueue. - * @data: @data to push into the @queue. - * - * Pushes the @data into the @queue. @data must not be %NULL. This - * function must be called while holding the @queue's lock. - **/ -void -async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - g_return_if_fail (data); - - g_queue_push_head (queue->queue, data); - - signal_waiting_threads (queue); -} - -/** - * async_jitter_queue_push_sorted: - * @queue: a #AsyncJitterQueue - * @data: the @data to push into the @queue - * @func: the #GCompareDataFunc is used to sort @queue. This function - * is passed two elements of the @queue. The function should return - * 0 if they are equal, a negative value if the first element - * should be higher in the @queue or a positive value if the first - * element should be lower in the @queue than the second element. - * @user_data: user data passed to @func. - * - * Inserts @data into @queue using @func to determine the new - * position. - * - * This function requires that the @queue is sorted before pushing on - * new elements. - * - * This function will lock @queue before it sorts the queue and unlock - * it when it is finished. - * - * For an example of @func see async_jitter_queue_sort(). - * - * Since: 2.10 - **/ -gboolean -async_jitter_queue_push_sorted (AsyncJitterQueue * queue, - gpointer data, GCompareDataFunc func, gpointer user_data) -{ - gboolean ret; - - g_return_val_if_fail (queue != NULL, FALSE); - - g_mutex_lock (queue->mutex); - ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data); - g_mutex_unlock (queue->mutex); - - return ret; -} - -/** - * async_jitter_queue_push_sorted_unlocked: - * @queue: a #AsyncJitterQueue - * @data: the @data to push into the @queue - * @func: the #GCompareDataFunc is used to sort @queue. This function - * is passed two elements of the @queue. The function should return - * 0 if they are equal, a negative value if the first element - * should be higher in the @queue or a positive value if the first - * element should be lower in the @queue than the second element. - * @user_data: user data passed to @func. - * - * Inserts @data into @queue using @func to determine the new - * position. - * - * This function requires that the @queue is sorted before pushing on - * new elements. - * - * If @GCompareDataFunc returns 0, this function does not insert @data and - * return FALSE. - * - * This function is called while holding the @queue's lock. - * - * For an example of @func see async_jitter_queue_sort(). - * - * Since: 2.10 - **/ -gboolean -async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue, - gpointer data, GCompareDataFunc func, gpointer user_data) -{ - GList *list; - gint func_ret = TRUE; - - g_return_val_if_fail (queue != NULL, FALSE); - - list = queue->queue->head; - while (list && (func_ret = func (list->data, data, user_data)) < 0) - list = list->next; - - if (func_ret == 0) { - return FALSE; - } - if (list) { - g_queue_insert_before (queue->queue, list, data); - } else { - g_queue_push_tail (queue->queue, data); - } - - signal_waiting_threads (queue); - return TRUE; -} - -void -async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue, - GList * sibling, gpointer data) -{ - g_return_if_fail (queue != NULL); - - g_queue_insert_before (queue->queue, sibling, data); - - signal_waiting_threads (queue); -} - -static gpointer -async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue) -{ - gpointer retval; - GstBuffer *tail_buffer = NULL; - guint tsunits; - - if (queue->pop_flushing) - return NULL; - - while (queue->pop_blocking) { - queue->waiting_threads++; - g_cond_wait (queue->cond, queue->mutex); - queue->waiting_threads--; - if (queue->pop_flushing) - return NULL; - } - - - tsunits = async_jitter_queue_length_ts_units_unlocked (queue); - - GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining, - (int) (queue->low_threshold * queue->max_queue_length)); - - if (tsunits <= queue->low_threshold * queue->max_queue_length - && queue->pops_remaining == 0) { - if (!queue->buffering) { - GST_DEBUG ("start buffering"); - queue->buffering = TRUE; - queue->pops_remaining = queue->queue->length; - } - - GST_DEBUG ("wait for data"); - while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) { - queue->waiting_threads++; - g_cond_wait (queue->cond, queue->mutex); - queue->waiting_threads--; - if (queue->pop_flushing) - return NULL; - } - } - - retval = g_queue_pop_tail (queue->queue); - if (queue->pops_remaining) - queue->pops_remaining--; - - tail_buffer = g_queue_peek_tail (queue->queue); - if (tail_buffer) { - if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) { - queue->tail_buffer_duration = 0; - } else if (gst_rtp_buffer_get_seq (tail_buffer) - - gst_rtp_buffer_get_seq (retval) == 1) { - queue->tail_buffer_duration = - calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer), - gst_rtp_buffer_get_timestamp (retval)); - } else { - /* There is a sequence number gap -> we can't calculate the duration - * let's just set it to 0 */ - queue->tail_buffer_duration = 0; - } - } - - g_assert (retval); - - return retval; -} - -/** - * async_jitter_queue_pop: - * @queue: a #AsyncJitterQueue. - * - * Pops data from the @queue. This function blocks until data become - * available. If pop is disabled, tis function return NULL. - * - * Return value: data from the queue. - **/ -gpointer -async_jitter_queue_pop (AsyncJitterQueue * queue) -{ - gpointer retval; - - g_return_val_if_fail (queue, NULL); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); - - g_mutex_lock (queue->mutex); - retval = async_jitter_queue_pop_intern_unlocked (queue); - g_mutex_unlock (queue->mutex); - - return retval; -} - -/** - * async_jitter_queue_pop_unlocked: - * @queue: a #AsyncJitterQueue. - * - * Pops data from the @queue. This function blocks until data become - * available. This function must be called while holding the @queue's - * lock. - * - * Return value: data from the queue. - **/ -gpointer -async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue) -{ - g_return_val_if_fail (queue, NULL); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL); - - return async_jitter_queue_pop_intern_unlocked (queue); -} - -/** - * async_jitter_queue_length: - * @queue: a #AsyncJitterQueue. - * - * Returns the length of the queue - * Return value: the length of the @queue. - **/ -gint -async_jitter_queue_length (AsyncJitterQueue * queue) -{ - gint retval; - - g_return_val_if_fail (queue, 0); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0); - - g_mutex_lock (queue->mutex); - retval = queue->queue->length; - g_mutex_unlock (queue->mutex); - - return retval; -} - -/** - * async_jitter_queue_length_unlocked: - * @queue: a #AsyncJitterQueue. - * - * Returns the length of the queue. - * - * Return value: the length of the @queue. - **/ -gint -async_jitter_queue_length_unlocked (AsyncJitterQueue * queue) -{ - g_return_val_if_fail (queue, 0); - g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0); - - return queue->queue->length; -} - -/** - * async_jitter_queue_set_flushing_unlocked: - * @queue: a #AsyncJitterQueue. - * @free_func: a function to call to free the elements - * @user_data: user data passed to @free_func - * - * This function is used to set/unset flushing. If flushing is set any - * waiting/blocked pops will be unblocked. Any subsequent calls to pop will - * return NULL. Flushing is set by default. - */ -void -async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue, - GFunc free_func, gpointer user_data) -{ - gpointer elem; - - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->pop_flushing = TRUE; - /* let's unblock any remaining pops */ - if (queue->waiting_threads > 0) - g_cond_broadcast (queue->cond); - /* free data from queue */ - while ((elem = g_queue_pop_head (queue->queue))) - free_func (elem, user_data); -} - -/** - * async_jitter_queue_unset_flushing_unlocked: - * @queue: a #AsyncJitterQueue. - * @free_func: a function to call to free the elements - * @user_data: user data passed to @free_func - * - * This function is used to set/unset flushing. If flushing is set any - * waiting/blocked pops will be unblocked. Any subsequent calls to pop will - * return NULL. Flushing is set by default. - */ -void -async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->pop_flushing = FALSE; - /* let's unblock any remaining pops */ - if (queue->waiting_threads > 0) - g_cond_broadcast (queue->cond); -} - -/** - * async_jitter_queue_set_blocking_unlocked: - * @queue: a #AsyncJitterQueue. - * @enabled: a boolean to enable/disable blocking - * - * This function is used to enable/disable blocking. If blocking is enabled any - * pops will be blocked until the queue is unblocked. The queue is blocked by - * default. - */ -void -async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue, - gboolean blocking) -{ - g_return_if_fail (queue); - g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0); - - queue->pop_blocking = blocking; - /* let's unblock any remaining pops */ - if (queue->waiting_threads > 0) - g_cond_broadcast (queue->cond); -} |