summaryrefslogtreecommitdiffstats
path: root/src/libs/engine/QueuedEventSource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/libs/engine/QueuedEventSource.cpp')
-rw-r--r--src/libs/engine/QueuedEventSource.cpp47
1 files changed, 29 insertions, 18 deletions
diff --git a/src/libs/engine/QueuedEventSource.cpp b/src/libs/engine/QueuedEventSource.cpp
index 4b63b6c4..69ab805a 100644
--- a/src/libs/engine/QueuedEventSource.cpp
+++ b/src/libs/engine/QueuedEventSource.cpp
@@ -29,12 +29,13 @@ namespace Ingen {
QueuedEventSource::QueuedEventSource(size_t queued_size, size_t stamped_size)
-: _front(0),
- _back(0),
- _prepared_back(0),
- _size(queued_size+1),
- _blocking_semaphore(0),
- _stamped_queue(stamped_size)
+ : _front(0)
+ , _back(0)
+ , _prepared_back(0)
+ , _size(queued_size+1)
+ , _blocking_semaphore(0)
+ , _full_semaphore(0)
+ , _stamped_queue(stamped_size)
{
_events = (QueuedEvent**)calloc(_size, sizeof(QueuedEvent*));
@@ -62,14 +63,20 @@ QueuedEventSource::push_queued(QueuedEvent* const ev)
{
assert(!ev->is_prepared());
- if (_events[_back] != NULL) {
- cerr << "[QueuedEventSource] Error: Queue is full! Event is lost, please report!" << endl;
- delete ev;
- } else {
- _events[_back] = ev;
- _back = (_back + 1) % _size;
+ unsigned back = _back.get();
+ bool full = (((_front.get() - back + _size) % _size) == 1);
+ while (full) {
whip();
+ cerr << "WARNING: Event queue full. Waiting..." << endl;
+ _full_semaphore.wait();
+ back = _back.get();
+ full = (((_front.get() - back + _size) % _size) == 1);
}
+
+ assert(_events[back] == NULL);
+ _events[back] = ev;
+ _back = (back + 1) % _size;
+ whip();
}
@@ -107,6 +114,9 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context)
++num_events_processed;
}
+ if (_full_semaphore.has_waiter() && num_events_processed > 0)
+ _full_semaphore.post();
+
/*if (num_events_processed > 0)
dest.whip();*/
//else
@@ -130,12 +140,13 @@ QueuedEventSource::pop_earliest_queued_before(const SampleCount time)
{
assert(ThreadManager::current_thread_id() == THREAD_PROCESS);
- QueuedEvent* const front_event = _events[_front];
+ const unsigned front = _front.get();
+ QueuedEvent* const front_event = _events[front];
// Pop
if (front_event && front_event->is_prepared() && front_event->time() < time) {
- _events[_front] = NULL;
- _front = (_front + 1) % _size;
+ _events[front] = NULL;
+ _front = (front + 1) % _size;
return front_event;
} else {
return NULL;
@@ -150,8 +161,8 @@ QueuedEventSource::pop_earliest_queued_before(const SampleCount time)
void
QueuedEventSource::_whipped()
{
- QueuedEvent* const ev = _events[_prepared_back];
- //assert(ev);
+ const unsigned prepared_back = _prepared_back.get();
+ QueuedEvent* const ev = _events[prepared_back];
if (!ev)
return;
@@ -159,7 +170,7 @@ QueuedEventSource::_whipped()
ev->pre_process();
assert(ev->is_prepared());
- _prepared_back = (_prepared_back+1) % _size;
+ _prepared_back = (prepared_back + 1) % _size;
// If event was blocking, wait for event to being run through the
// process thread before preparing the next event