From 2fbdcbaeb28e62fffb4c63c5118fdc5243b426bc Mon Sep 17 00:00:00 2001 From: David Robillard Date: Tue, 19 Aug 2008 21:42:33 +0000 Subject: Gracefully handle full event queue on the engine side (fix loading massive patches). git-svn-id: http://svn.drobilla.net/lad/ingen@1450 a436a847-0d15-0410-975c-d299462d15a1 --- src/libs/engine/LV2Plugin.cpp | 2 -- src/libs/engine/QueuedEventSource.cpp | 47 ++++++++++++++++++------------ src/libs/engine/QueuedEventSource.hpp | 13 ++++++--- src/libs/engine/events/CreateNodeEvent.cpp | 2 -- 4 files changed, 38 insertions(+), 26 deletions(-) (limited to 'src/libs') diff --git a/src/libs/engine/LV2Plugin.cpp b/src/libs/engine/LV2Plugin.cpp index 53d317b4..09d48a10 100644 --- a/src/libs/engine/LV2Plugin.cpp +++ b/src/libs/engine/LV2Plugin.cpp @@ -70,8 +70,6 @@ LV2Plugin::instantiate(const string& name, n = NULL; } - - return n; } diff --git a/src/libs/engine/QueuedEventSource.cpp b/src/libs/engine/QueuedEventSource.cpp index 4b63b6c4..69ab805a 100644 --- a/src/libs/engine/QueuedEventSource.cpp +++ b/src/libs/engine/QueuedEventSource.cpp @@ -29,12 +29,13 @@ namespace Ingen { QueuedEventSource::QueuedEventSource(size_t queued_size, size_t stamped_size) -: _front(0), - _back(0), - _prepared_back(0), - _size(queued_size+1), - _blocking_semaphore(0), - _stamped_queue(stamped_size) + : _front(0) + , _back(0) + , _prepared_back(0) + , _size(queued_size+1) + , _blocking_semaphore(0) + , _full_semaphore(0) + , _stamped_queue(stamped_size) { _events = (QueuedEvent**)calloc(_size, sizeof(QueuedEvent*)); @@ -62,14 +63,20 @@ QueuedEventSource::push_queued(QueuedEvent* const ev) { assert(!ev->is_prepared()); - if (_events[_back] != NULL) { - cerr << "[QueuedEventSource] Error: Queue is full! Event is lost, please report!" << endl; - delete ev; - } else { - _events[_back] = ev; - _back = (_back + 1) % _size; + unsigned back = _back.get(); + bool full = (((_front.get() - back + _size) % _size) == 1); + while (full) { whip(); + cerr << "WARNING: Event queue full. Waiting..." << endl; + _full_semaphore.wait(); + back = _back.get(); + full = (((_front.get() - back + _size) % _size) == 1); } + + assert(_events[back] == NULL); + _events[back] = ev; + _back = (back + 1) % _size; + whip(); } @@ -107,6 +114,9 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context) ++num_events_processed; } + if (_full_semaphore.has_waiter() && num_events_processed > 0) + _full_semaphore.post(); + /*if (num_events_processed > 0) dest.whip();*/ //else @@ -130,12 +140,13 @@ QueuedEventSource::pop_earliest_queued_before(const SampleCount time) { assert(ThreadManager::current_thread_id() == THREAD_PROCESS); - QueuedEvent* const front_event = _events[_front]; + const unsigned front = _front.get(); + QueuedEvent* const front_event = _events[front]; // Pop if (front_event && front_event->is_prepared() && front_event->time() < time) { - _events[_front] = NULL; - _front = (_front + 1) % _size; + _events[front] = NULL; + _front = (front + 1) % _size; return front_event; } else { return NULL; @@ -150,8 +161,8 @@ QueuedEventSource::pop_earliest_queued_before(const SampleCount time) void QueuedEventSource::_whipped() { - QueuedEvent* const ev = _events[_prepared_back]; - //assert(ev); + const unsigned prepared_back = _prepared_back.get(); + QueuedEvent* const ev = _events[prepared_back]; if (!ev) return; @@ -159,7 +170,7 @@ QueuedEventSource::_whipped() ev->pre_process(); assert(ev->is_prepared()); - _prepared_back = (_prepared_back+1) % _size; + _prepared_back = (prepared_back + 1) % _size; // If event was blocking, wait for event to being run through the // process thread before preparing the next event diff --git a/src/libs/engine/QueuedEventSource.hpp b/src/libs/engine/QueuedEventSource.hpp index 59e10b1e..6dea092d 100644 --- a/src/libs/engine/QueuedEventSource.hpp +++ b/src/libs/engine/QueuedEventSource.hpp @@ -22,11 +22,14 @@ #include #include "types.hpp" #include +#include #include #include #include "Event.hpp" #include "EventSource.hpp" +using Raul::AtomicInt; + namespace Ingen { class QueuedEvent; @@ -62,7 +65,7 @@ protected: Event* pop_earliest_queued_before(const SampleCount time); inline Event* pop_earliest_stamped_before(const SampleCount time); - inline bool unprepared_events() { return (_prepared_back != _back); } + inline bool unprepared_events() { return (_prepared_back.get() != _back.get()); } virtual void _whipped(); ///< Prepare 1 event @@ -72,13 +75,15 @@ private: //(FIXME: make this a separate class?) // 2-part queue for events that require pre-processing: - size_t _front; ///< Front of queue - size_t _back; ///< Back of entire queue (1 past index of back element) - size_t _prepared_back; ///< Back of prepared section (1 past index of back prepared element) + AtomicInt _front; ///< Front of queue + AtomicInt _back; ///< Back of entire queue (1 past index of back element) + AtomicInt _prepared_back; ///< Back of prepared section (1 past index of back prepared element) const size_t _size; QueuedEvent** _events; Raul::Semaphore _blocking_semaphore; + Raul::Semaphore _full_semaphore; + /** Queue for timestamped events (no pre-processing). */ Raul::SRSWQueue _stamped_queue; }; diff --git a/src/libs/engine/events/CreateNodeEvent.cpp b/src/libs/engine/events/CreateNodeEvent.cpp index 49cbb58e..a3681597 100644 --- a/src/libs/engine/events/CreateNodeEvent.cpp +++ b/src/libs/engine/events/CreateNodeEvent.cpp @@ -87,8 +87,6 @@ CreateNodeEvent::pre_process() if (_patch && plugin) { - Glib::Mutex::Lock lock(_engine.world()->rdf_world->mutex()); - _node = plugin->instantiate(_path.name(), _polyphonic, _patch, _engine.audio_driver()->sample_rate(), _engine.audio_driver()->buffer_size()); -- cgit v1.2.1