From 841825844f9b2f4eae069d0aad064e4046e15471 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Mon, 14 May 2012 01:09:08 +0000 Subject: Real-time safe LV2 message handling. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4402 a436a847-0d15-0410-975c-d299462d15a1 --- ingen/EngineBase.hpp | 8 ++- src/server/Engine.cpp | 10 ++-- src/server/Engine.hpp | 4 +- src/server/PreProcessor.cpp | 6 +-- src/server/PreProcessor.hpp | 8 +-- src/server/ingen_lv2.cpp | 120 ++++++++++++++++++++++++++++++++++++-------- 6 files changed, 120 insertions(+), 36 deletions(-) diff --git a/ingen/EngineBase.hpp b/ingen/EngineBase.hpp index 6bcb7170..c9acb512 100644 --- a/ingen/EngineBase.hpp +++ b/ingen/EngineBase.hpp @@ -48,8 +48,14 @@ public: /** Process audio for @p sample_count frames. + + If the return value is non-zero, events have been processed and are + awaiting to be finalised (including responding and announcing any changes + to clients) via a call to main_iteration(). + + @return The number of events processed. */ - virtual void run(uint32_t sample_count) = 0; + virtual unsigned run(uint32_t sample_count) = 0; /** Indicate that a quit is desired. diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index f1c714d3..a146743d 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -231,7 +231,7 @@ Engine::deactivate() ThreadManager::single_threaded = true; } -void +unsigned Engine::run(uint32_t sample_count) { // Apply control bindings to input @@ -242,7 +242,7 @@ Engine::run(uint32_t sample_count) // Process events that came in during the last cycle // (Aiming for jitter-free 1 block event latency, ideally) - process_events(); + unsigned n_processed_events = process_events(); // Run root patch if (_root_patch) { @@ -257,6 +257,8 @@ Engine::run(uint32_t sample_count) if (_message_context.has_requests()) { _message_context.signal(_process_context); } + + return n_processed_events; } bool @@ -272,10 +274,10 @@ Engine::enqueue_event(Event* ev) _pre_processor->event(ev); } -void +unsigned Engine::process_events() { - _pre_processor->process(_process_context, *_post_processor); + return _pre_processor->process(_process_context, *_post_processor); } void diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index bd211d56..cf16a4b1 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -67,7 +67,7 @@ public: // EngineBase methods virtual bool activate(); virtual void deactivate(); - virtual void run(uint32_t sample_count); + virtual unsigned run(uint32_t sample_count); virtual void quit(); virtual bool main_iteration(); virtual void register_client(const Raul::URI& uri, @@ -83,7 +83,7 @@ public: void enqueue_event(Event* ev); /** Process events (process thread only). */ - void process_events(); + unsigned process_events(); bool is_process_context(const Context& context) const { return &context == &_process_context; diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index c1eb1dd3..435ad77d 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -64,11 +64,11 @@ PreProcessor::event(Event* const ev) whip(); } -bool +unsigned PreProcessor::process(ProcessContext& context, PostProcessor& dest, bool limit) { if (!_head.get()) - return true; + return 0; /* Limit the maximum number of queued events to process per cycle. This makes the process callback (more) realtime-safe by preventing being @@ -101,7 +101,7 @@ PreProcessor::process(ProcessContext& context, PostProcessor& dest, bool limit) _tail = NULL; } - return true; + return num_events_processed; } /** Pre-process a single event */ diff --git a/src/server/PreProcessor.hpp b/src/server/PreProcessor.hpp index c5b781f4..bcfcb432 100644 --- a/src/server/PreProcessor.hpp +++ b/src/server/PreProcessor.hpp @@ -44,11 +44,11 @@ public: void event(Event* ev); /** Process events for a cycle. - * @return False iff this source is finished and should be removed. + * @return The number of events processed. */ - bool process(ProcessContext& context, - PostProcessor& dest, - bool limit = true); + unsigned process(ProcessContext& context, + PostProcessor& dest, + bool limit = true); protected: virtual void _whipped(); ///< Prepare 1 event diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 1f459c15..56e9120e 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -79,7 +79,10 @@ namespace Server { class LV2Driver; -void handle_message(LV2Driver* driver, const LV2_Atom* msg); +void enqueue_message( + ProcessContext& context, LV2Driver* driver, const LV2_Atom* msg); + +void signal_main(ProcessContext& context, LV2Driver* driver); class LV2Port : public EnginePort { @@ -98,10 +101,16 @@ public: AudioBuffer* patch_buf = (AudioBuffer*)_patch_port->buffer(0).get(); patch_buf->copy((Sample*)_buffer, 0, context.nframes() - 1); } else { - LV2_Atom_Sequence* seq = (LV2_Atom_Sequence*)_buffer; + LV2_Atom_Sequence* seq = (LV2_Atom_Sequence*)_buffer; + bool enqueued = false; LV2_ATOM_SEQUENCE_FOREACH(seq, ev) { - // FIXME: Not RT safe, need to send these through a ring - handle_message(_driver, &ev->body); + // TODO: Only enqueue appropriate atoms + enqueue_message(context, _driver, &ev->body); + enqueued = true; + } + + if (enqueued) { + signal_main(context, _driver); } } } @@ -130,6 +139,7 @@ private: public: LV2Driver(Engine& engine, SampleCount buffer_size, SampleCount sample_rate) : _engine(engine) + , _main_sem(0) , _reader(engine.world()->uri_map(), engine.world()->uris(), engine.world()->forge(), @@ -137,6 +147,7 @@ public: , _writer(engine.world()->uri_map(), engine.world()->uris(), *this) + , _from_ui(buffer_size * sizeof(float)) // FIXME: size , _to_ui(buffer_size * sizeof(float)) // FIXME: size , _root_patch(NULL) , _buffer_size(buffer_size) @@ -152,10 +163,9 @@ public: for (Ports::iterator i = _ports.begin(); i != _ports.end(); ++i) (*i)->pre_process(_engine.process_context()); - _engine.process_events(); - - if (_root_patch) - _root_patch->process(_engine.process_context()); + if (_engine.run(nframes) > 0) { + _main_sem.post(); + } flush_to_ui(); @@ -165,6 +175,11 @@ public: _frame_time += nframes; } + virtual void deactivate() { + _engine.quit(); + _main_sem.post(); + } + virtual void set_root_patch(PatchImpl* patch) { _root_patch = patch; } virtual PatchImpl* root_patch() { return _root_patch; } @@ -203,6 +218,20 @@ public: return NULL; } + /** Called in run thread for events received at control input port. */ + void enqueue_message(const LV2_Atom* atom) { + if (_from_ui.write(lv2_atom_total_size(atom), atom) == 0) { +#ifndef NDEBUG + Raul::error << "Control input buffer overflow" << std::endl; +#endif + } + } + + Raul::Semaphore& main_sem() { return _main_sem; } + + /** AtomSink::write implementation called by the PostProcessor in the main + * thread to write responses to the UI. + */ void write(const LV2_Atom* atom) { // Called from post-processor in main thread while (_to_ui.write(lv2_atom_total_size(atom), atom) == 0) { @@ -213,6 +242,30 @@ public: } } + void consume_from_ui() { + const uint32_t read_space = _from_ui.read_space(); + void* buf = NULL; + for (uint32_t read = 0; read < read_space;) { + LV2_Atom atom; + if (!_from_ui.read(sizeof(LV2_Atom), &atom)) { + Raul::error << "Error reading head from from-UI ring" << std::endl; + break; + } + + buf = realloc(buf, sizeof(LV2_Atom) + atom.size); + memcpy(buf, &atom, sizeof(LV2_Atom)); + + if (!_from_ui.read(atom.size, (char*)buf + sizeof(LV2_Atom))) { + Raul::error << "Error reading body from from-UI ring" << std::endl; + break; + } + + _reader.write((LV2_Atom*)buf); + read += sizeof(LV2_Atom) + atom.size; + } + free(buf); + } + void flush_to_ui() { assert(_ports.size() >= 2); @@ -264,17 +317,18 @@ public: virtual SampleCount sample_rate() const { return _sample_rate; } virtual SampleCount frame_time() const { return _frame_time;} - virtual bool is_realtime() const { return true; } - //virtual ProcessContext& context() { return _context; } - Shared::AtomReader& reader() { return _reader; } - Shared::AtomWriter& writer() { return _writer; } + virtual bool is_realtime() const { return true; } + Shared::AtomReader& reader() { return _reader; } + Shared::AtomWriter& writer() { return _writer; } Ports& ports() { return _ports; } private: Engine& _engine; + Raul::Semaphore _main_sem; Shared::AtomReader _reader; Shared::AtomWriter _writer; + Raul::RingBuffer _from_ui; Raul::RingBuffer _to_ui; PatchImpl* _root_patch; SampleCount _buffer_size; @@ -286,9 +340,15 @@ private: }; void -handle_message(LV2Driver* driver, const LV2_Atom* msg) +enqueue_message(ProcessContext& context, LV2Driver* driver, const LV2_Atom* msg) +{ + driver->enqueue_message(msg); +} + +void +signal_main(ProcessContext& context, LV2Driver* driver) { - driver->reader().write(msg); + driver->main_sem().post(); } } // namespace Server @@ -302,16 +362,30 @@ using namespace Ingen::Server; class MainThread : public Raul::Thread { public: - explicit MainThread(SharedPtr engine) : _engine(engine) {} + explicit MainThread(SharedPtr engine, + LV2Driver* driver) + : _engine(engine) + , _driver(driver) + {} private: virtual void _run() { - while (_engine->main_iteration()) { - Glib::usleep(125000); // 1/8 second + while (true) { + // Wait until there is work to be done + _driver->main_sem().wait(); + + // Convert pending messages to events and push to pre processor + _driver->consume_from_ui(); + + // Run post processor and maid to finalise events from last time + if (!_engine->main_iteration()) { + return; + } } } SharedPtr _engine; + LV2Driver* _driver; }; struct IngenPlugin { @@ -411,8 +485,6 @@ ingen_instantiate(const LV2_Descriptor* descriptor, SharedPtr engine(new Server::Engine(plugin->world)); plugin->world->set_engine(engine); - plugin->main = new MainThread(engine); - plugin->main->set_name("Main"); SharedPtr interface = SharedPtr(engine->interface(), NullDeleter); @@ -426,6 +498,9 @@ ingen_instantiate(const LV2_Descriptor* descriptor, LV2Driver* driver = new LV2Driver(*engine.get(), 4096, rate); engine->set_driver(SharedPtr(driver)); + plugin->main = new MainThread(engine, driver); + plugin->main->set_name("Main"); + SharedPtr client(&driver->writer(), NullDeleter); interface->set_respondee(client); engine->register_client("http://drobilla.net/ns/ingen#internal", client); @@ -448,8 +523,6 @@ ingen_instantiate(const LV2_Descriptor* descriptor, engine->post_processor()->process(); } - engine->deactivate(); - return (LV2_Handle)plugin; } @@ -484,9 +557,12 @@ ingen_run(LV2_Handle instance, uint32_t sample_count) { IngenPlugin* me = (IngenPlugin*)instance; Server::Engine* engine = (Server::Engine*)me->world->engine().get(); + LV2Driver* driver = (LV2Driver*)engine->driver(); + // FIXME: don't do this every call Raul::Thread::get().set_context(Ingen::Server::THREAD_PROCESS); - ((LV2Driver*)engine->driver())->run(sample_count); + + driver->run(sample_count); } static void -- cgit v1.2.1