From 177f2b48b6516ba4dee70a709ce28105f1092de2 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Wed, 18 May 2011 17:45:39 +0000 Subject: Use an intrusive linked list for event queue rather than Raul::List. This avoids the need to allocate list nodes, improving performance (event throughput), and making EventSource::push_queued realtime safe. Remove unused queue_size parameter from EventSource and friends. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@3282 a436a847-0d15-0410-975c-d299462d15a1 --- src/server/Engine.cpp | 2 +- src/server/Event.hpp | 18 +++++++--- src/server/EventSource.cpp | 69 +++++++++++++++++++----------------- src/server/EventSource.hpp | 14 ++++---- src/server/HTTPEngineReceiver.cpp | 2 +- src/server/OSCEngineReceiver.cpp | 4 +-- src/server/OSCEngineReceiver.hpp | 2 +- src/server/PostProcessor.cpp | 55 ++++++++++++++++++---------- src/server/PostProcessor.hpp | 28 ++++++++------- src/server/QueuedEngineInterface.cpp | 4 +-- src/server/QueuedEngineInterface.hpp | 2 +- src/server/ingen_engine.cpp | 3 +- src/server/ingen_lv2.cpp | 4 +-- src/server/ingen_osc.cpp | 1 - 14 files changed, 120 insertions(+), 88 deletions(-) (limited to 'src') diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index 90518bd2..3aa05eba 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -62,7 +62,7 @@ Engine::Engine(Ingen::Shared::World* a_world) , _maid(new Raul::Maid(event_queue_size())) , _message_context(new MessageContext(*this)) , _node_factory(new NodeFactory(a_world)) - , _post_processor(new PostProcessor(*this, event_queue_size())) + , _post_processor(new PostProcessor(*this)) , _quit_flag(false) { if (a_world->store()) { diff --git a/src/server/Event.hpp b/src/server/Event.hpp index af39bfa4..e915631a 100644 --- a/src/server/Event.hpp +++ b/src/server/Event.hpp @@ -19,9 +19,12 @@ #define INGEN_ENGINE_EVENT_HPP #include + #include "raul/SharedPtr.hpp" #include "raul/Deletable.hpp" #include "raul/Path.hpp" +#include "raul/AtomicPtr.hpp" + #include "types.hpp" namespace Ingen { @@ -56,6 +59,10 @@ public: inline SampleCount time() const { return _time; } + /** Get the next event in the event process list. */ + Event* next() const { return _next.get(); } + void next(Event* ev) { _next = ev; } + int error() { return _error; } protected: @@ -67,11 +74,12 @@ protected: , _executed(false) {} - Engine& _engine; - SharedPtr _request; - FrameTime _time; - int _error; - bool _executed; + Engine& _engine; + SharedPtr _request; + Raul::AtomicPtr _next; + FrameTime _time; + int _error; + bool _executed; }; } // namespace Server diff --git a/src/server/EventSource.cpp b/src/server/EventSource.cpp index 273a4693..cdecfd4a 100644 --- a/src/server/EventSource.cpp +++ b/src/server/EventSource.cpp @@ -15,19 +15,18 @@ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -#include #include "EventSource.hpp" -#include "QueuedEvent.hpp" #include "PostProcessor.hpp" -#include "ThreadManager.hpp" #include "ProcessContext.hpp" +#include "QueuedEvent.hpp" +#include "ThreadManager.hpp" using namespace std; namespace Ingen { namespace Server { -EventSource::EventSource(size_t queue_size) +EventSource::EventSource() : _blocking_semaphore(0) { Thread::set_context(THREAD_PRE_PROCESS); @@ -45,10 +44,22 @@ void EventSource::push_queued(QueuedEvent* const ev) { assert(!ev->is_prepared()); - Raul::List::Node* node = new Raul::List::Node(ev); - _events.push_back(node); - if (_prepared_back.get() == NULL) - _prepared_back = node; + assert(!ev->next()); + + QueuedEvent* const head = _head.get(); + QueuedEvent* const tail = _tail.get(); + + if (!head) { + _head = ev; + _tail = ev; + } else { + _tail = ev; + tail->next(ev); + } + + if (!_prepared_back.get()) { + _prepared_back = ev; + } whip(); } @@ -62,39 +73,35 @@ EventSource::process(PostProcessor& dest, ProcessContext& context, bool limit) { ThreadManager::assert_thread(THREAD_PROCESS); - if (_events.empty()) + if (!_head.get()) return; /* Limit the maximum number of queued events to process per cycle. This - * makes the process callback (more) realtime-safe by preventing being - * choked by events coming in faster than they can be processed. - * FIXME: test this and figure out a good value */ + makes the process callback (more) realtime-safe by preventing being + choked by events coming in faster than they can be processed. + FIXME: test this and figure out a good value + */ const size_t MAX_QUEUED_EVENTS = context.nframes() / 32; size_t num_events_processed = 0; - Raul::List::Node* head = _events.head(); - Raul::List::Node* tail = head; - - if (!head) - return; - - QueuedEvent* ev = (QueuedEvent*)head->elem(); + QueuedEvent* ev = _head.get(); + QueuedEvent* last = ev; while (ev && ev->is_prepared() && ev->time() < context.end()) { ev->execute(context); - tail = head; - head = head->next(); + last = ev; + ev = (QueuedEvent*)ev->next(); ++num_events_processed; - if (limit && num_events_processed > MAX_QUEUED_EVENTS) + if (limit && (num_events_processed > MAX_QUEUED_EVENTS)) break; - ev = (head ? (QueuedEvent*)head->elem() : NULL); } if (num_events_processed > 0) { - Raul::List front; - _events.chop_front(front, num_events_processed, tail); - dest.append(&front); + dest.append(_head.get(), last); + _head = (QueuedEvent*)last->next(); + if (!last->next()) + _tail = NULL; } } @@ -102,19 +109,15 @@ EventSource::process(PostProcessor& dest, ProcessContext& context, bool limit) void EventSource::_whipped() { - Raul::List::Node* pb = _prepared_back.get(); - if (!pb) + QueuedEvent* ev = _prepared_back.get(); + if (!ev) return; - QueuedEvent* const ev = (QueuedEvent*)pb->elem(); - assert(ev); - assert(!ev->is_prepared()); ev->pre_process(); assert(ev->is_prepared()); - assert(_prepared_back.get() == pb); - _prepared_back = pb->next(); + _prepared_back = (QueuedEvent*)ev->next(); // If event was blocking, wait for event to being run through the // process thread before preparing the next event diff --git a/src/server/EventSource.hpp b/src/server/EventSource.hpp index 19d87ed9..5d3c1e14 100644 --- a/src/server/EventSource.hpp +++ b/src/server/EventSource.hpp @@ -18,9 +18,9 @@ #ifndef INGEN_ENGINE_EVENTSOURCE_HPP #define INGEN_ENGINE_EVENTSOURCE_HPP +#include "raul/AtomicPtr.hpp" #include "raul/Semaphore.hpp" #include "raul/Slave.hpp" -#include "raul/List.hpp" namespace Ingen { namespace Server { @@ -39,12 +39,12 @@ class ProcessContext; class EventSource : protected Raul::Slave { public: - explicit EventSource(size_t queue_size); + explicit EventSource(); virtual ~EventSource(); void process(PostProcessor& dest, ProcessContext& context, bool limit=true); - bool empty() { return _events.empty(); } + bool empty() { return !_head.get(); } /** Signal that a blocking event is finished. * @@ -61,9 +61,11 @@ protected: virtual void _whipped(); ///< Prepare 1 event private: - Raul::List _events; - Raul::AtomicPtr::Node> _prepared_back; - Raul::Semaphore _blocking_semaphore; + Raul::AtomicPtr _head; + Raul::AtomicPtr _prepared_back; + Raul::AtomicPtr _tail; + + Raul::Semaphore _blocking_semaphore; }; } // namespace Server diff --git a/src/server/HTTPEngineReceiver.cpp b/src/server/HTTPEngineReceiver.cpp index b027a6b3..4284c51a 100644 --- a/src/server/HTTPEngineReceiver.cpp +++ b/src/server/HTTPEngineReceiver.cpp @@ -51,7 +51,7 @@ using namespace Serialisation; namespace Server { HTTPEngineReceiver::HTTPEngineReceiver(Engine& engine, uint16_t port) - : QueuedEngineInterface(engine, 64) // FIXME + : QueuedEngineInterface(engine) , _server(soup_server_new(SOUP_SERVER_PORT, port, NULL)) { _receive_thread = new ReceiveThread(*this); diff --git a/src/server/OSCEngineReceiver.cpp b/src/server/OSCEngineReceiver.cpp index 635856da..338fb985 100644 --- a/src/server/OSCEngineReceiver.cpp +++ b/src/server/OSCEngineReceiver.cpp @@ -57,8 +57,8 @@ namespace Server { * See the "Client OSC Namespace Documentation" for details.

*/ -OSCEngineReceiver::OSCEngineReceiver(Engine& engine, size_t queue_size, uint16_t port) - : QueuedEngineInterface(engine, queue_size) // FIXME +OSCEngineReceiver::OSCEngineReceiver(Engine& engine, uint16_t port) + : QueuedEngineInterface(engine) , _server(NULL) { _receive_thread = new ReceiveThread(*this); diff --git a/src/server/OSCEngineReceiver.hpp b/src/server/OSCEngineReceiver.hpp index 8a76fa01..c9585f44 100644 --- a/src/server/OSCEngineReceiver.hpp +++ b/src/server/OSCEngineReceiver.hpp @@ -57,7 +57,7 @@ inline static int name##_cb(LO_HANDLER_ARGS, void* myself)\ class OSCEngineReceiver : public QueuedEngineInterface { public: - OSCEngineReceiver(Engine& engine, size_t queue_size, uint16_t port); + OSCEngineReceiver(Engine& engine, uint16_t port); ~OSCEngineReceiver(); private: diff --git a/src/server/PostProcessor.cpp b/src/server/PostProcessor.cpp index 25252a16..3364322a 100644 --- a/src/server/PostProcessor.cpp +++ b/src/server/PostProcessor.cpp @@ -15,16 +15,16 @@ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -#include -#include +#include + #include "raul/log.hpp" -#include "raul/SRSWQueue.hpp" -#include "events/SendPortValue.hpp" -#include "Event.hpp" -#include "PostProcessor.hpp" -#include "Engine.hpp" + #include "Driver.hpp" +#include "Engine.hpp" +#include "PostProcessor.hpp" #include "ProcessContext.hpp" +#include "QueuedEvent.hpp" +#include "events/SendPortValue.hpp" using namespace std; using namespace Raul; @@ -32,10 +32,9 @@ using namespace Raul; namespace Ingen { namespace Server { -PostProcessor::PostProcessor(Engine& engine, size_t queue_size) +PostProcessor::PostProcessor(Engine& engine) : _engine(engine) , _max_time(0) - , _events(queue_size) , _event_buffer_size(sizeof(Events::SendPortValue)) // FIXME: make generic , _event_buffer((uint8_t*)malloc(_event_buffer_size)) { @@ -46,6 +45,21 @@ PostProcessor::~PostProcessor() free(_event_buffer); } +void +PostProcessor::append(QueuedEvent* first, QueuedEvent* last) +{ + assert(first); + assert(last); + QueuedEvent* const head = _head.get(); + QueuedEvent* const tail = _tail.get(); + if (!head) { + _head = first; + } else { + tail->next(first); + } + _tail = last; +} + void PostProcessor::process() { @@ -75,16 +89,21 @@ PostProcessor::process() } /* Process normal events */ - Raul::List::Node* n = _events.head(); - while (n) { - if (n->elem()->time() > end_time) + QueuedEvent* ev = _head.get(); + while (ev) { + if (ev->time() > end_time) break; - Raul::List::Node* next = n->next(); - n->elem()->post_process(); - _events.erase(_events.begin()); - delete n->elem(); - delete n; - n = next; + + QueuedEvent* const next = (QueuedEvent*)ev->next(); + ev->post_process(); + if (next) { + _head = next; + } else { + _head = NULL; + _tail = NULL; + } + delete ev; + ev = next; } } diff --git a/src/server/PostProcessor.hpp b/src/server/PostProcessor.hpp index f06a182e..9cf8ea06 100644 --- a/src/server/PostProcessor.hpp +++ b/src/server/PostProcessor.hpp @@ -18,14 +18,15 @@ #ifndef INGEN_ENGINE_POSTPROCESSOR_HPP #define INGEN_ENGINE_POSTPROCESSOR_HPP -#include -#include "raul/SRSWQueue.hpp" -#include "raul/List.hpp" +#include "raul/AtomicInt.hpp" +#include "raul/AtomicPtr.hpp" + +#include "types.hpp" namespace Ingen { namespace Server { -class Event; +class QueuedEvent; class Engine; /** Processor for Events after leaving the audio thread. @@ -42,11 +43,13 @@ class Engine; class PostProcessor { public: - PostProcessor(Engine& engine, size_t queue_size); + PostProcessor(Engine& engine); ~PostProcessor(); - /** Push a list of events on to the process queue, realtime-safe, not thread-safe. */ - inline void append(Raul::List* l) { _events.append(*l); } + /** Push a list of events on to the process queue. + realtime-safe, not thread-safe. + */ + void append(QueuedEvent* first, QueuedEvent* last); /** Post-process and delete all pending events */ void process(); @@ -55,11 +58,12 @@ public: void set_end_time(FrameTime time) { _max_time = time; } private: - Engine& _engine; - Raul::AtomicInt _max_time; - Raul::List _events; - uint32_t _event_buffer_size; - uint8_t* _event_buffer; + Engine& _engine; + Raul::AtomicPtr _head; + Raul::AtomicPtr _tail; + Raul::AtomicInt _max_time; + uint32_t _event_buffer_size; + uint8_t* _event_buffer; }; } // namespace Server diff --git a/src/server/QueuedEngineInterface.cpp b/src/server/QueuedEngineInterface.cpp index 5ef9335f..958a5110 100644 --- a/src/server/QueuedEngineInterface.cpp +++ b/src/server/QueuedEngineInterface.cpp @@ -33,8 +33,8 @@ using namespace Raul; namespace Ingen { namespace Server { -QueuedEngineInterface::QueuedEngineInterface(Engine& engine, size_t queue_size) - : EventSource(queue_size) +QueuedEngineInterface::QueuedEngineInterface(Engine& engine) + : EventSource() , _request(new Request(this, NULL, 0)) , _engine(engine) , _in_bundle(false) diff --git a/src/server/QueuedEngineInterface.hpp b/src/server/QueuedEngineInterface.hpp index 3bd77013..65ad80f7 100644 --- a/src/server/QueuedEngineInterface.hpp +++ b/src/server/QueuedEngineInterface.hpp @@ -48,7 +48,7 @@ class QueuedEngineInterface : public EventSource, public ServerInterface { public: - QueuedEngineInterface(Engine& engine, size_t queue_size); + QueuedEngineInterface(Engine& engine); virtual ~QueuedEngineInterface(); Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; } diff --git a/src/server/ingen_engine.cpp b/src/server/ingen_engine.cpp index 568267cd..a0cda60c 100644 --- a/src/server/ingen_engine.cpp +++ b/src/server/ingen_engine.cpp @@ -29,8 +29,7 @@ struct IngenEngineModule : public Ingen::Shared::Module { SharedPtr engine(new Server::Engine(world)); world->set_local_engine(engine); SharedPtr interface( - new Server::QueuedEngineInterface(*engine.get(), - engine->event_queue_size())); + new Server::QueuedEngineInterface(*engine.get())); world->set_engine(interface); engine->add_event_source(interface); assert(world->local_engine() == engine); diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index bd64cf34..aa3c1f56 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -268,9 +268,7 @@ ingen_instantiate(const LV2_Descriptor* descriptor, plugin->world->set_local_engine(engine); SharedPtr interface( - new Server::QueuedEngineInterface( - *engine.get(), - engine->event_queue_size())); + new Server::QueuedEngineInterface(*engine.get())); plugin->world->set_engine(interface); engine->add_event_source(interface); diff --git a/src/server/ingen_osc.cpp b/src/server/ingen_osc.cpp index bf0f09f8..f10481e5 100644 --- a/src/server/ingen_osc.cpp +++ b/src/server/ingen_osc.cpp @@ -29,7 +29,6 @@ struct IngenOSCModule : public Ingen::Shared::Module { SharedPtr interface( new Server::OSCEngineReceiver( *engine, - engine->event_queue_size(), world->conf()->option("engine-port").get_int32())); engine->add_event_source(interface); } -- cgit v1.2.1