diff options
Diffstat (limited to 'src/engine')
-rw-r--r-- | src/engine/HTTPEngineReceiver.cpp | 2 | ||||
-rw-r--r-- | src/engine/OSCEngineReceiver.cpp | 2 | ||||
-rw-r--r-- | src/engine/QueuedEngineInterface.cpp | 6 | ||||
-rw-r--r-- | src/engine/QueuedEngineInterface.hpp | 2 | ||||
-rw-r--r-- | src/engine/QueuedEventSource.cpp | 26 | ||||
-rw-r--r-- | src/engine/QueuedEventSource.hpp | 59 |
6 files changed, 22 insertions, 75 deletions
diff --git a/src/engine/HTTPEngineReceiver.cpp b/src/engine/HTTPEngineReceiver.cpp index d31e2ab4..cf5299ed 100644 --- a/src/engine/HTTPEngineReceiver.cpp +++ b/src/engine/HTTPEngineReceiver.cpp @@ -41,7 +41,7 @@ namespace Ingen { HTTPEngineReceiver::HTTPEngineReceiver(Engine& engine, uint16_t port) - : QueuedEngineInterface(engine, 2, 2) + : QueuedEngineInterface(engine, 64) // FIXME , _server(soup_server_new(SOUP_SERVER_PORT, port, NULL)) { _receive_thread = new ReceiveThread(*this); diff --git a/src/engine/OSCEngineReceiver.cpp b/src/engine/OSCEngineReceiver.cpp index 8ef819e8..b2131928 100644 --- a/src/engine/OSCEngineReceiver.cpp +++ b/src/engine/OSCEngineReceiver.cpp @@ -48,7 +48,7 @@ namespace Ingen { OSCEngineReceiver::OSCEngineReceiver(Engine& engine, size_t queue_size, uint16_t port) - : QueuedEngineInterface(engine, queue_size, queue_size) // FIXME + : QueuedEngineInterface(engine, queue_size) // FIXME , _server(NULL) { _receive_thread = new ReceiveThread(*this); diff --git a/src/engine/QueuedEngineInterface.cpp b/src/engine/QueuedEngineInterface.cpp index a399e955..c3bc56fe 100644 --- a/src/engine/QueuedEngineInterface.cpp +++ b/src/engine/QueuedEngineInterface.cpp @@ -26,8 +26,8 @@ namespace Ingen { -QueuedEngineInterface::QueuedEngineInterface(Engine& engine, size_t queued_size, size_t stamped_size) - : QueuedEventSource(queued_size, stamped_size) +QueuedEngineInterface::QueuedEngineInterface(Engine& engine, size_t queue_size) + : QueuedEventSource(queue_size) , _responder(new Responder(NULL, 0)) , _engine(engine) , _in_bundle(false) @@ -381,7 +381,7 @@ extern "C" { Ingen::QueuedEngineInterface* new_queued_interface(Ingen::Engine& engine) { - return new Ingen::QueuedEngineInterface(engine, Ingen::event_queue_size, Ingen::event_queue_size); + return new Ingen::QueuedEngineInterface(engine, Ingen::event_queue_size); } } // extern "C" diff --git a/src/engine/QueuedEngineInterface.hpp b/src/engine/QueuedEngineInterface.hpp index 6af848e7..481c5438 100644 --- a/src/engine/QueuedEngineInterface.hpp +++ b/src/engine/QueuedEngineInterface.hpp @@ -50,7 +50,7 @@ class Engine; class QueuedEngineInterface : public QueuedEventSource, public EngineInterface { public: - QueuedEngineInterface(Engine& engine, size_t queued_size, size_t stamped_size); + QueuedEngineInterface(Engine& engine, size_t queue_size); virtual ~QueuedEngineInterface() {} std::string uri() const { return "ingen:internal"; } diff --git a/src/engine/QueuedEventSource.cpp b/src/engine/QueuedEventSource.cpp index 7dd12049..e8c6deec 100644 --- a/src/engine/QueuedEventSource.cpp +++ b/src/engine/QueuedEventSource.cpp @@ -28,14 +28,13 @@ using namespace std; namespace Ingen { -QueuedEventSource::QueuedEventSource(size_t queued_size, size_t stamped_size) +QueuedEventSource::QueuedEventSource(size_t queue_size) : _front(0) , _back(0) , _prepared_back(0) - , _size(queued_size+1) + , _size(queue_size+1) , _blocking_semaphore(0) , _full_semaphore(0) - , _stamped_queue(stamped_size) { _events = (QueuedEvent**)calloc(_size, sizeof(QueuedEvent*)); @@ -99,8 +98,6 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context) unsigned int num_events_processed = 0; - /* FIXME: Merge these next two loops into one */ - while ((ev = pop_earliest_queued_before(context.end()))) { ev->execute(context); dest.push(ev); @@ -108,12 +105,6 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context) break; } - while ((ev = pop_earliest_stamped_before(context.end()))) { - ev->execute(context); - dest.push(ev); - ++num_events_processed; - } - if (_full_semaphore.has_waiter()) { const bool full = (((_front.get() - _back.get() + _size) % _size) == 1); if (!full) @@ -122,16 +113,11 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context) } -/** Pops the prepared event at the front of the prepare queue, if it exists. +/** Pop the prepared event at the front of the prepare queue, if it exists. * - * This method will only pop events that have been prepared, and are - * stamped before the time passed. In other words, it may return NULL - * even if there are events pending in the queue. The events returned are - * actually QueuedEvents, but after this they are "normal" events and the - * engine deals with them just like a realtime in-band event. The engine will - * not use the timestamps of the returned events in any way, since it is free - * to execute these non-time-stamped events whenever it wants (at whatever rate - * it wants). + * This method will only pop events that are prepared and have time stamp + * less than @a time. It may return NULL even if there are events pending in + * the queue, if they are unprepared or stamped in the future. */ Event* QueuedEventSource::pop_earliest_queued_before(const SampleCount time) diff --git a/src/engine/QueuedEventSource.hpp b/src/engine/QueuedEventSource.hpp index 1d846897..a06de39f 100644 --- a/src/engine/QueuedEventSource.hpp +++ b/src/engine/QueuedEventSource.hpp @@ -49,7 +49,7 @@ class PostProcessor; class QueuedEventSource : public EventSource, protected Raul::Slave { public: - QueuedEventSource(size_t queued_size, size_t stamped_size); + QueuedEventSource(size_t queue_size); ~QueuedEventSource(); void activate() { Slave::start(); } @@ -57,72 +57,33 @@ public: void process(PostProcessor& dest, ProcessContext& context); - void unblock(); + /** Signal that the blocking event is finished. + * When this is called preparing will resume. This MUST be called by + * blocking events in their post_process() method. */ + inline void unblock() { _blocking_semaphore.post(); } protected: - void push_queued(QueuedEvent* const ev); - inline void push_stamped(Event* const ev) { _stamped_queue.push(ev); } - Event* pop_earliest_queued_before(const SampleCount time); - inline Event* pop_earliest_stamped_before(const SampleCount time); + void push_queued(QueuedEvent* const ev); + Event* pop_earliest_queued_before(const SampleCount time); inline bool unprepared_events() { return (_prepared_back.get() != _back.get()); } virtual void _whipped(); ///< Prepare 1 event private: - // Note that it's crucially important which functions access which of these - // variables, to maintain threadsafeness. + // Note it's important which functions access which variables for thread safety - //(FIXME: make this a separate class?) // 2-part queue for events that require pre-processing: AtomicInt _front; ///< Front of queue - AtomicInt _back; ///< Back of entire queue (1 past index of back element) - AtomicInt _prepared_back; ///< Back of prepared section (1 past index of back prepared element) + AtomicInt _back; ///< Back of entire queue (index of back event + 1) + AtomicInt _prepared_back; ///< Back of prepared events (index of back prepared event + 1) const size_t _size; QueuedEvent** _events; Raul::Semaphore _blocking_semaphore; - Raul::Semaphore _full_semaphore; - - /** Queue for timestamped events (no pre-processing). */ - Raul::SRSWQueue<Event*> _stamped_queue; }; -/** Pops the realtime (timestamped, not preprocessed) event off the realtime queue. - * - * Engine will use the sample timestamps of returned events directly and execute the - * event with sample accuracy. Timestamps in the past will be bumped forward to - * the beginning of the cycle (offset 0), when eg. skipped cycles occur. - */ -inline Event* -QueuedEventSource::pop_earliest_stamped_before(const SampleCount time) -{ - Event* ret = NULL; - - if (!_stamped_queue.empty()) { - if (_stamped_queue.front()->time() < time) { - ret = _stamped_queue.front(); - _stamped_queue.pop(); - } - } - - return ret; -} - - -/** Signal that the blocking event is finished. - * - * When this is called preparing will resume. This MUST be called by - * blocking events in their post_process() method. - */ -inline void -QueuedEventSource::unblock() -{ - _blocking_semaphore.post(); -} - - } // namespace Ingen #endif // QUEUEDEVENTSOURCE_H |