/* This file is part of Ingen. * Copyright (C) 2008-2009 Dave Robillard * * Ingen is free software; you can redistribute it and/or modify it under the * terms of the GNU General Public License as published by the Free Software * Foundation; either version 2 of the License, or (at your option) any later * version. * * Ingen is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include "QueuedEventSource.hpp" #include "QueuedEvent.hpp" #include "PostProcessor.hpp" #include "ThreadManager.hpp" #include "ProcessContext.hpp" using namespace std; namespace Ingen { QueuedEventSource::QueuedEventSource(size_t queue_size) : _blocking_semaphore(0) { Thread::set_context(THREAD_PRE_PROCESS); assert(context() == THREAD_PRE_PROCESS); set_name("QueuedEventSource"); } QueuedEventSource::~QueuedEventSource() { Thread::stop(); } /** Push an unprepared event onto the queue. */ void QueuedEventSource::push_queued(QueuedEvent* const ev) { assert(!ev->is_prepared()); Raul::List::Node* node = new Raul::List::Node(ev); _events.push_back(node); if (_prepared_back.get() == NULL) _prepared_back = node; whip(); } /** Process all events for a cycle. * * Executed events will be pushed to @a dest. */ void QueuedEventSource::process(PostProcessor& dest, ProcessContext& context) { assert(ThreadManager::current_thread_id() == THREAD_PROCESS); if (_events.empty()) return; /* Limit the maximum number of queued events to process per cycle. This * makes the process callback (more) realtime-safe by preventing being * choked by events coming in faster than they can be processed. * FIXME: test this and figure out a good value */ const size_t MAX_QUEUED_EVENTS = context.nframes() / 32; size_t num_events_processed = 0; QueuedEvent* ev = (QueuedEvent*)_events.front(); Raul::List::Node* new_head = _events.head(); while (ev && ev->is_prepared() && ev->time() < context.end()) { ev->execute(context); new_head = new_head->next(); if (++num_events_processed > MAX_QUEUED_EVENTS) break; ev = (new_head ? (QueuedEvent*)new_head->elem() : NULL); } if (num_events_processed > 0) { Raul::List front; _events.chop_front(front, num_events_processed, new_head); dest.append(&front); } } // Private // /** Pre-process a single event */ void QueuedEventSource::_whipped() { Raul::List::Node* pb = _prepared_back.get(); if (!pb) return; QueuedEvent* const ev = (QueuedEvent*)pb->elem(); assert(ev); if (!ev) return; assert(!ev->is_prepared()); ev->pre_process(); assert(ev->is_prepared()); assert(_prepared_back.get() == pb); _prepared_back = pb->next(); // If event was blocking, wait for event to being run through the // process thread before preparing the next event if (ev->is_blocking()) _blocking_semaphore.wait(); } } // namespace Ingen