summaryrefslogtreecommitdiffstats
path: root/gst/rtpmanager/async_jitter_queue.c
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@gmail.com>2007-08-10 17:16:53 +0000
committerWim Taymans <wim.taymans@gmail.com>2007-08-10 17:16:53 +0000
commit9e50d836d4173f127adc56446fa74f1e103fa673 (patch)
treecdde2fad6fe1ea9945bdcc179438c94d6e425805 /gst/rtpmanager/async_jitter_queue.c
parenta9090746e58e9a994d7f07f61452602b31461e1e (diff)
downloadgst-plugins-bad-9e50d836d4173f127adc56446fa74f1e103fa673.tar.gz
gst-plugins-bad-9e50d836d4173f127adc56446fa74f1e103fa673.tar.bz2
gst-plugins-bad-9e50d836d4173f127adc56446fa74f1e103fa673.zip
gst/rtpmanager/: Remove complicated async queue and replace with more simple jitterbuffer code while also fixing some...
Original commit message from CVS: * gst/rtpmanager/Makefile.am: * gst/rtpmanager/async_jitter_queue.c: * gst/rtpmanager/async_jitter_queue.h: * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_class_init), (rtp_jitter_buffer_init), (rtp_jitter_buffer_finalize), (rtp_jitter_buffer_new), (compare_seqnum), (rtp_jitter_buffer_insert), (rtp_jitter_buffer_pop), (rtp_jitter_buffer_flush), (rtp_jitter_buffer_num_packets), (rtp_jitter_buffer_get_ts_diff): * gst/rtpmanager/rtpjitterbuffer.h: Remove complicated async queue and replace with more simple jitterbuffer code while also fixing some bugs. * gst/rtpmanager/gstrtpbin-marshal.list: * gst/rtpmanager/gstrtpbin.c: (on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout), (create_session), (gst_rtp_bin_class_init), (create_recv_rtp), (create_send_rtp): * gst/rtpmanager/gstrtpbin.h: * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_init), (gst_rtp_jitter_buffer_dispose), (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_flush_start), (gst_rtp_jitter_buffer_flush_stop), (gst_rtp_jitter_buffer_change_state), (gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_set_property): * gst/rtpmanager/gstrtpsession.c: (on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout), (gst_rtp_session_class_init), (gst_rtp_session_init): * gst/rtpmanager/gstrtpsession.h: * gst/rtpmanager/rtpsession.c: (on_bye_ssrc), (session_cleanup): Use new jitterbuffer code. Expose some new signals in preparation for handling EOS.
Diffstat (limited to 'gst/rtpmanager/async_jitter_queue.c')
-rw-r--r--gst/rtpmanager/async_jitter_queue.c692
1 files changed, 0 insertions, 692 deletions
diff --git a/gst/rtpmanager/async_jitter_queue.c b/gst/rtpmanager/async_jitter_queue.c
deleted file mode 100644
index 73597b28..00000000
--- a/gst/rtpmanager/async_jitter_queue.c
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
- * Async Jitter Queue based on g_async_queue
- * This code is GST RTP smart and deals with timestamps
- *
- * Farsight Voice+Video library
- * Copyright 2007 Collabora Ltd,
- * Copyright 2007 Nokia Corporation
- * @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
- *
- * This is an async queue that has a buffering mecanism based on the set low
- * and high threshold. When the lower threshold is reached, the queue will
- * fill itself up until the higher threshold is reached before allowing any
- * pops to occur. This allows a jitterbuffer of at least min threshold items
- * to be available.
- */
-
-/* GLIB - Library of useful routines for C programming
- * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
- *
- * GAsyncQueue: asynchronous queue implementation, based on Gqueue.
- * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-/*
- * MT safe
- */
-
-#include "config.h"
-
-#include "async_jitter_queue.h"
-
-#include <gst/gst.h>
-#include <gst/rtp/gstrtpbuffer.h>
-
-#define DEFAULT_LOW_THRESHOLD 0.1
-#define DEFAULT_HIGH_THRESHOLD 0.9
-
-struct _AsyncJitterQueue
-{
- GMutex *mutex;
- GCond *cond;
- GQueue *queue;
- guint waiting_threads;
- gint32 ref_count;
- gfloat low_threshold;
- gfloat high_threshold;
- guint32 max_queue_length;
- gboolean buffering;
- gboolean pop_flushing;
- gboolean pop_blocking;
- guint pops_remaining;
- guint32 tail_buffer_duration;
-};
-
-/**
- * async_jitter_queue_new:
- *
- * Creates a new asynchronous queue with the initial reference count of 1.
- *
- * Return value: the new #AsyncJitterQueue.
- **/
-AsyncJitterQueue *
-async_jitter_queue_new (void)
-{
- AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
-
- retval->mutex = g_mutex_new ();
- retval->cond = g_cond_new ();
- retval->queue = g_queue_new ();
- retval->waiting_threads = 0;
- retval->ref_count = 1;
- retval->low_threshold = DEFAULT_LOW_THRESHOLD;
- retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
- retval->buffering = TRUE; /* we need to buffer initially */
- retval->pop_flushing = TRUE;
- retval->pop_blocking = TRUE;
- retval->pops_remaining = 0;
- retval->tail_buffer_duration = 0;
- return retval;
-}
-
-/* checks buffering state and wakes up waiting pops */
-void
-signal_waiting_threads (AsyncJitterQueue * queue)
-{
- if (async_jitter_queue_length_ts_units_unlocked (queue) >=
- queue->high_threshold * queue->max_queue_length) {
- GST_DEBUG ("stop buffering");
- queue->buffering = FALSE;
- }
-
- if (queue->waiting_threads > 0) {
- if (!queue->buffering) {
- g_cond_signal (queue->cond);
- }
- }
-}
-
-/**
- * async_jitter_queue_ref:
- * @queue: a #AsyncJitterQueue.
- *
- * Increases the reference count of the asynchronous @queue by 1. You
- * do not need to hold the lock to call this function.
- *
- * Returns: the @queue that was passed in (since 2.6)
- **/
-AsyncJitterQueue *
-async_jitter_queue_ref (AsyncJitterQueue * queue)
-{
- g_return_val_if_fail (queue, NULL);
- g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
-
- g_atomic_int_inc (&queue->ref_count);
-
- return queue;
-}
-
-/**
- * async_jitter_queue_ref_unlocked:
- * @queue: a #AsyncJitterQueue.
- *
- * Increases the reference count of the asynchronous @queue by 1.
- **/
-void
-async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- g_atomic_int_inc (&queue->ref_count);
-}
-
-/**
- * async_jitter_queue_set_low_threshold:
- * @queue: a #AsyncJitterQueue.
- * @threshold: the lower threshold (fraction of max size)
- *
- * Sets the low threshold on the queue. This threshold indicates the minimum
- * number of items allowed in the queue before we refill it up to the set
- * maximum threshold.
- **/
-void
-async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
- gfloat threshold)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- queue->low_threshold = threshold;
-}
-
-/**
- * async_jitter_queue_set_max_threshold:
- * @queue: a #AsyncJitterQueue.
- * @threshold: the higher threshold (fraction of max size)
- *
- * Sets the high threshold on the queue. This threshold indicates the amount of
- * items to fill in the queue before releasing any blocking pop calls. This
- * blocking mecanism is only triggered when we reach the low threshold and must
- * refill the queue.
- **/
-void
-async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
- gfloat threshold)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- queue->high_threshold = threshold;
-}
-
-/* set the maximum queue length in RTP timestamp units */
-void
-async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
- guint32 max_length)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- queue->max_queue_length = max_length;
-}
-
-GQueue *
-async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
-{
- g_return_val_if_fail (queue, NULL);
-
- return queue->queue;
-}
-
-static guint32
-calculate_ts_diff (guint32 high_ts, guint32 low_ts)
-{
- /* it needs to work if ts wraps */
- if (high_ts >= low_ts) {
- return high_ts - low_ts;
- } else {
- return high_ts + G_MAXUINT32 + 1 - low_ts;
- }
-}
-
-/* this function returns the length of the queue in timestamp units. It will
- * also add the duration of the last buffer in the queue */
-/* FIXME This function wrongly assumes that there are no missing packets inside
- * the buffer, in reality it needs to check for gaps and subsctract those from
- * the total */
-guint32
-async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
-{
- guint32 tail_ts;
- guint32 head_ts;
- guint32 ret;
- GstBuffer *head;
- GstBuffer *tail;
-
- g_return_val_if_fail (queue, 0);
-
- if (queue->queue->length < 2) {
- return 0;
- }
-
- tail = g_queue_peek_tail (queue->queue);
- head = g_queue_peek_head (queue->queue);
-
- if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
- return 0;
-
- tail_ts = gst_rtp_buffer_get_timestamp (tail);
- head_ts = gst_rtp_buffer_get_timestamp (head);
-
- ret = calculate_ts_diff (head_ts, tail_ts);
-
- /* let's add the duration of the tail buffer */
- ret += queue->tail_buffer_duration;
-
- return ret;
-}
-
-/**
- * async_jitter_queue_unref_and_unlock:
- * @queue: a #AsyncJitterQueue.
- *
- * Decreases the reference count of the asynchronous @queue by 1 and
- * releases the lock. This function must be called while holding the
- * @queue's lock. If the reference count went to 0, the @queue will be
- * destroyed and the memory allocated will be freed.
- **/
-void
-async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- g_mutex_unlock (queue->mutex);
- async_jitter_queue_unref (queue);
-}
-
-/**
- * async_jitter_queue_unref:
- * @queue: a #AsyncJitterQueue.
- *
- * Decreases the reference count of the asynchronous @queue by 1. If
- * the reference count went to 0, the @queue will be destroyed and the
- * memory allocated will be freed. So you are not allowed to use the
- * @queue afterwards, as it might have disappeared. You do not need to
- * hold the lock to call this function.
- **/
-void
-async_jitter_queue_unref (AsyncJitterQueue * queue)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- if (g_atomic_int_dec_and_test (&queue->ref_count)) {
- g_return_if_fail (queue->waiting_threads == 0);
- g_mutex_free (queue->mutex);
- if (queue->cond)
- g_cond_free (queue->cond);
- g_queue_free (queue->queue);
- g_free (queue);
- }
-}
-
-/**
- * async_jitter_queue_lock:
- * @queue: a #AsyncJitterQueue.
- *
- * Acquires the @queue's lock. After that you can only call the
- * <function>async_jitter_queue_*_unlocked()</function> function variants on that
- * @queue. Otherwise it will deadlock.
- **/
-void
-async_jitter_queue_lock (AsyncJitterQueue * queue)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- g_mutex_lock (queue->mutex);
-}
-
-/**
- * async_jitter_queue_unlock:
- * @queue: a #AsyncJitterQueue.
- *
- * Releases the queue's lock.
- **/
-void
-async_jitter_queue_unlock (AsyncJitterQueue * queue)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- g_mutex_unlock (queue->mutex);
-}
-
-/**
- * async_jitter_queue_push:
- * @queue: a #AsyncJitterQueue.
- * @data: @data to push into the @queue.
- *
- * Pushes the @data into the @queue. @data must not be %NULL.
- **/
-void
-async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
- g_return_if_fail (data);
-
- g_mutex_lock (queue->mutex);
- async_jitter_queue_push_unlocked (queue, data);
- g_mutex_unlock (queue->mutex);
-}
-
-/**
- * async_jitter_queue_push_unlocked:
- * @queue: a #AsyncJitterQueue.
- * @data: @data to push into the @queue.
- *
- * Pushes the @data into the @queue. @data must not be %NULL. This
- * function must be called while holding the @queue's lock.
- **/
-void
-async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
- g_return_if_fail (data);
-
- g_queue_push_head (queue->queue, data);
-
- signal_waiting_threads (queue);
-}
-
-/**
- * async_jitter_queue_push_sorted:
- * @queue: a #AsyncJitterQueue
- * @data: the @data to push into the @queue
- * @func: the #GCompareDataFunc is used to sort @queue. This function
- * is passed two elements of the @queue. The function should return
- * 0 if they are equal, a negative value if the first element
- * should be higher in the @queue or a positive value if the first
- * element should be lower in the @queue than the second element.
- * @user_data: user data passed to @func.
- *
- * Inserts @data into @queue using @func to determine the new
- * position.
- *
- * This function requires that the @queue is sorted before pushing on
- * new elements.
- *
- * This function will lock @queue before it sorts the queue and unlock
- * it when it is finished.
- *
- * For an example of @func see async_jitter_queue_sort().
- *
- * Since: 2.10
- **/
-gboolean
-async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
- gpointer data, GCompareDataFunc func, gpointer user_data)
-{
- gboolean ret;
-
- g_return_val_if_fail (queue != NULL, FALSE);
-
- g_mutex_lock (queue->mutex);
- ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
- g_mutex_unlock (queue->mutex);
-
- return ret;
-}
-
-/**
- * async_jitter_queue_push_sorted_unlocked:
- * @queue: a #AsyncJitterQueue
- * @data: the @data to push into the @queue
- * @func: the #GCompareDataFunc is used to sort @queue. This function
- * is passed two elements of the @queue. The function should return
- * 0 if they are equal, a negative value if the first element
- * should be higher in the @queue or a positive value if the first
- * element should be lower in the @queue than the second element.
- * @user_data: user data passed to @func.
- *
- * Inserts @data into @queue using @func to determine the new
- * position.
- *
- * This function requires that the @queue is sorted before pushing on
- * new elements.
- *
- * If @GCompareDataFunc returns 0, this function does not insert @data and
- * return FALSE.
- *
- * This function is called while holding the @queue's lock.
- *
- * For an example of @func see async_jitter_queue_sort().
- *
- * Since: 2.10
- **/
-gboolean
-async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
- gpointer data, GCompareDataFunc func, gpointer user_data)
-{
- GList *list;
- gint func_ret = TRUE;
-
- g_return_val_if_fail (queue != NULL, FALSE);
-
- list = queue->queue->head;
- while (list && (func_ret = func (list->data, data, user_data)) < 0)
- list = list->next;
-
- if (func_ret == 0) {
- return FALSE;
- }
- if (list) {
- g_queue_insert_before (queue->queue, list, data);
- } else {
- g_queue_push_tail (queue->queue, data);
- }
-
- signal_waiting_threads (queue);
- return TRUE;
-}
-
-void
-async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
- GList * sibling, gpointer data)
-{
- g_return_if_fail (queue != NULL);
-
- g_queue_insert_before (queue->queue, sibling, data);
-
- signal_waiting_threads (queue);
-}
-
-static gpointer
-async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
-{
- gpointer retval;
- GstBuffer *tail_buffer = NULL;
- guint tsunits;
-
- if (queue->pop_flushing)
- return NULL;
-
- while (queue->pop_blocking) {
- queue->waiting_threads++;
- g_cond_wait (queue->cond, queue->mutex);
- queue->waiting_threads--;
- if (queue->pop_flushing)
- return NULL;
- }
-
-
- tsunits = async_jitter_queue_length_ts_units_unlocked (queue);
-
- GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining,
- (int) (queue->low_threshold * queue->max_queue_length));
-
- if (tsunits <= queue->low_threshold * queue->max_queue_length
- && queue->pops_remaining == 0) {
- if (!queue->buffering) {
- GST_DEBUG ("start buffering");
- queue->buffering = TRUE;
- queue->pops_remaining = queue->queue->length;
- }
-
- GST_DEBUG ("wait for data");
- while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
- queue->waiting_threads++;
- g_cond_wait (queue->cond, queue->mutex);
- queue->waiting_threads--;
- if (queue->pop_flushing)
- return NULL;
- }
- }
-
- retval = g_queue_pop_tail (queue->queue);
- if (queue->pops_remaining)
- queue->pops_remaining--;
-
- tail_buffer = g_queue_peek_tail (queue->queue);
- if (tail_buffer) {
- if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
- queue->tail_buffer_duration = 0;
- } else if (gst_rtp_buffer_get_seq (tail_buffer)
- - gst_rtp_buffer_get_seq (retval) == 1) {
- queue->tail_buffer_duration =
- calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
- gst_rtp_buffer_get_timestamp (retval));
- } else {
- /* There is a sequence number gap -> we can't calculate the duration
- * let's just set it to 0 */
- queue->tail_buffer_duration = 0;
- }
- }
-
- g_assert (retval);
-
- return retval;
-}
-
-/**
- * async_jitter_queue_pop:
- * @queue: a #AsyncJitterQueue.
- *
- * Pops data from the @queue. This function blocks until data become
- * available. If pop is disabled, tis function return NULL.
- *
- * Return value: data from the queue.
- **/
-gpointer
-async_jitter_queue_pop (AsyncJitterQueue * queue)
-{
- gpointer retval;
-
- g_return_val_if_fail (queue, NULL);
- g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
-
- g_mutex_lock (queue->mutex);
- retval = async_jitter_queue_pop_intern_unlocked (queue);
- g_mutex_unlock (queue->mutex);
-
- return retval;
-}
-
-/**
- * async_jitter_queue_pop_unlocked:
- * @queue: a #AsyncJitterQueue.
- *
- * Pops data from the @queue. This function blocks until data become
- * available. This function must be called while holding the @queue's
- * lock.
- *
- * Return value: data from the queue.
- **/
-gpointer
-async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
-{
- g_return_val_if_fail (queue, NULL);
- g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
-
- return async_jitter_queue_pop_intern_unlocked (queue);
-}
-
-/**
- * async_jitter_queue_length:
- * @queue: a #AsyncJitterQueue.
- *
- * Returns the length of the queue
- * Return value: the length of the @queue.
- **/
-gint
-async_jitter_queue_length (AsyncJitterQueue * queue)
-{
- gint retval;
-
- g_return_val_if_fail (queue, 0);
- g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
-
- g_mutex_lock (queue->mutex);
- retval = queue->queue->length;
- g_mutex_unlock (queue->mutex);
-
- return retval;
-}
-
-/**
- * async_jitter_queue_length_unlocked:
- * @queue: a #AsyncJitterQueue.
- *
- * Returns the length of the queue.
- *
- * Return value: the length of the @queue.
- **/
-gint
-async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
-{
- g_return_val_if_fail (queue, 0);
- g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
-
- return queue->queue->length;
-}
-
-/**
- * async_jitter_queue_set_flushing_unlocked:
- * @queue: a #AsyncJitterQueue.
- * @free_func: a function to call to free the elements
- * @user_data: user data passed to @free_func
- *
- * This function is used to set/unset flushing. If flushing is set any
- * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
- * return NULL. Flushing is set by default.
- */
-void
-async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
- GFunc free_func, gpointer user_data)
-{
- gpointer elem;
-
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- queue->pop_flushing = TRUE;
- /* let's unblock any remaining pops */
- if (queue->waiting_threads > 0)
- g_cond_broadcast (queue->cond);
- /* free data from queue */
- while ((elem = g_queue_pop_head (queue->queue)))
- free_func (elem, user_data);
-}
-
-/**
- * async_jitter_queue_unset_flushing_unlocked:
- * @queue: a #AsyncJitterQueue.
- * @free_func: a function to call to free the elements
- * @user_data: user data passed to @free_func
- *
- * This function is used to set/unset flushing. If flushing is set any
- * waiting/blocked pops will be unblocked. Any subsequent calls to pop will
- * return NULL. Flushing is set by default.
- */
-void
-async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- queue->pop_flushing = FALSE;
- /* let's unblock any remaining pops */
- if (queue->waiting_threads > 0)
- g_cond_broadcast (queue->cond);
-}
-
-/**
- * async_jitter_queue_set_blocking_unlocked:
- * @queue: a #AsyncJitterQueue.
- * @enabled: a boolean to enable/disable blocking
- *
- * This function is used to enable/disable blocking. If blocking is enabled any
- * pops will be blocked until the queue is unblocked. The queue is blocked by
- * default.
- */
-void
-async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
- gboolean blocking)
-{
- g_return_if_fail (queue);
- g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
-
- queue->pop_blocking = blocking;
- /* let's unblock any remaining pops */
- if (queue->waiting_threads > 0)
- g_cond_broadcast (queue->cond);
-}