From 2fbdcbaeb28e62fffb4c63c5118fdc5243b426bc Mon Sep 17 00:00:00 2001 From: David Robillard Date: Tue, 19 Aug 2008 21:42:33 +0000 Subject: Gracefully handle full event queue on the engine side (fix loading massive patches). git-svn-id: http://svn.drobilla.net/lad/ingen@1450 a436a847-0d15-0410-975c-d299462d15a1 --- src/libs/engine/QueuedEventSource.cpp | 47 +++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 18 deletions(-) (limited to 'src/libs/engine/QueuedEventSource.cpp') diff --git a/src/libs/engine/QueuedEventSource.cpp b/src/libs/engine/QueuedEventSource.cpp index 4b63b6c4..69ab805a 100644 --- a/src/libs/engine/QueuedEventSource.cpp +++ b/src/libs/engine/QueuedEventSource.cpp @@ -29,12 +29,13 @@ namespace Ingen { QueuedEventSource::QueuedEventSource(size_t queued_size, size_t stamped_size) -: _front(0), - _back(0), - _prepared_back(0), - _size(queued_size+1), - _blocking_semaphore(0), - _stamped_queue(stamped_size) + : _front(0) + , _back(0) + , _prepared_back(0) + , _size(queued_size+1) + , _blocking_semaphore(0) + , _full_semaphore(0) + , _stamped_queue(stamped_size) { _events = (QueuedEvent**)calloc(_size, sizeof(QueuedEvent*)); @@ -62,14 +63,20 @@ QueuedEventSource::push_queued(QueuedEvent* const ev) { assert(!ev->is_prepared()); - if (_events[_back] != NULL) { - cerr << "[QueuedEventSource] Error: Queue is full! Event is lost, please report!" << endl; - delete ev; - } else { - _events[_back] = ev; - _back = (_back + 1) % _size; + unsigned back = _back.get(); + bool full = (((_front.get() - back + _size) % _size) == 1); + while (full) { whip(); + cerr << "WARNING: Event queue full. Waiting..." << endl; + _full_semaphore.wait(); + back = _back.get(); + full = (((_front.get() - back + _size) % _size) == 1); } + + assert(_events[back] == NULL); + _events[back] = ev; + _back = (back + 1) % _size; + whip(); } @@ -107,6 +114,9 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context) ++num_events_processed; } + if (_full_semaphore.has_waiter() && num_events_processed > 0) + _full_semaphore.post(); + /*if (num_events_processed > 0) dest.whip();*/ //else @@ -130,12 +140,13 @@ QueuedEventSource::pop_earliest_queued_before(const SampleCount time) { assert(ThreadManager::current_thread_id() == THREAD_PROCESS); - QueuedEvent* const front_event = _events[_front]; + const unsigned front = _front.get(); + QueuedEvent* const front_event = _events[front]; // Pop if (front_event && front_event->is_prepared() && front_event->time() < time) { - _events[_front] = NULL; - _front = (_front + 1) % _size; + _events[front] = NULL; + _front = (front + 1) % _size; return front_event; } else { return NULL; @@ -150,8 +161,8 @@ QueuedEventSource::pop_earliest_queued_before(const SampleCount time) void QueuedEventSource::_whipped() { - QueuedEvent* const ev = _events[_prepared_back]; - //assert(ev); + const unsigned prepared_back = _prepared_back.get(); + QueuedEvent* const ev = _events[prepared_back]; if (!ev) return; @@ -159,7 +170,7 @@ QueuedEventSource::_whipped() ev->pre_process(); assert(ev->is_prepared()); - _prepared_back = (_prepared_back+1) % _size; + _prepared_back = (prepared_back + 1) % _size; // If event was blocking, wait for event to being run through the // process thread before preparing the next event -- cgit v1.2.1