summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2012-05-14 01:09:08 +0000
committerDavid Robillard <d@drobilla.net>2012-05-14 01:09:08 +0000
commit841825844f9b2f4eae069d0aad064e4046e15471 (patch)
tree8a31a4456bab8ca06f757f0524bf5adacbe2aaa1
parentffa63b0c1e769fae0af5e2bbbd822d1e8b206535 (diff)
downloadingen-841825844f9b2f4eae069d0aad064e4046e15471.tar.gz
ingen-841825844f9b2f4eae069d0aad064e4046e15471.tar.bz2
ingen-841825844f9b2f4eae069d0aad064e4046e15471.zip
Real-time safe LV2 message handling.
git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4402 a436a847-0d15-0410-975c-d299462d15a1
-rw-r--r--ingen/EngineBase.hpp8
-rw-r--r--src/server/Engine.cpp10
-rw-r--r--src/server/Engine.hpp4
-rw-r--r--src/server/PreProcessor.cpp6
-rw-r--r--src/server/PreProcessor.hpp8
-rw-r--r--src/server/ingen_lv2.cpp120
6 files changed, 120 insertions, 36 deletions
diff --git a/ingen/EngineBase.hpp b/ingen/EngineBase.hpp
index 6bcb7170..c9acb512 100644
--- a/ingen/EngineBase.hpp
+++ b/ingen/EngineBase.hpp
@@ -48,8 +48,14 @@ public:
/**
Process audio for @p sample_count frames.
+
+ If the return value is non-zero, events have been processed and are
+ awaiting to be finalised (including responding and announcing any changes
+ to clients) via a call to main_iteration().
+
+ @return The number of events processed.
*/
- virtual void run(uint32_t sample_count) = 0;
+ virtual unsigned run(uint32_t sample_count) = 0;
/**
Indicate that a quit is desired.
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index f1c714d3..a146743d 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -231,7 +231,7 @@ Engine::deactivate()
ThreadManager::single_threaded = true;
}
-void
+unsigned
Engine::run(uint32_t sample_count)
{
// Apply control bindings to input
@@ -242,7 +242,7 @@ Engine::run(uint32_t sample_count)
// Process events that came in during the last cycle
// (Aiming for jitter-free 1 block event latency, ideally)
- process_events();
+ unsigned n_processed_events = process_events();
// Run root patch
if (_root_patch) {
@@ -257,6 +257,8 @@ Engine::run(uint32_t sample_count)
if (_message_context.has_requests()) {
_message_context.signal(_process_context);
}
+
+ return n_processed_events;
}
bool
@@ -272,10 +274,10 @@ Engine::enqueue_event(Event* ev)
_pre_processor->event(ev);
}
-void
+unsigned
Engine::process_events()
{
- _pre_processor->process(_process_context, *_post_processor);
+ return _pre_processor->process(_process_context, *_post_processor);
}
void
diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp
index bd211d56..cf16a4b1 100644
--- a/src/server/Engine.hpp
+++ b/src/server/Engine.hpp
@@ -67,7 +67,7 @@ public:
// EngineBase methods
virtual bool activate();
virtual void deactivate();
- virtual void run(uint32_t sample_count);
+ virtual unsigned run(uint32_t sample_count);
virtual void quit();
virtual bool main_iteration();
virtual void register_client(const Raul::URI& uri,
@@ -83,7 +83,7 @@ public:
void enqueue_event(Event* ev);
/** Process events (process thread only). */
- void process_events();
+ unsigned process_events();
bool is_process_context(const Context& context) const {
return &context == &_process_context;
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp
index c1eb1dd3..435ad77d 100644
--- a/src/server/PreProcessor.cpp
+++ b/src/server/PreProcessor.cpp
@@ -64,11 +64,11 @@ PreProcessor::event(Event* const ev)
whip();
}
-bool
+unsigned
PreProcessor::process(ProcessContext& context, PostProcessor& dest, bool limit)
{
if (!_head.get())
- return true;
+ return 0;
/* Limit the maximum number of queued events to process per cycle. This
makes the process callback (more) realtime-safe by preventing being
@@ -101,7 +101,7 @@ PreProcessor::process(ProcessContext& context, PostProcessor& dest, bool limit)
_tail = NULL;
}
- return true;
+ return num_events_processed;
}
/** Pre-process a single event */
diff --git a/src/server/PreProcessor.hpp b/src/server/PreProcessor.hpp
index c5b781f4..bcfcb432 100644
--- a/src/server/PreProcessor.hpp
+++ b/src/server/PreProcessor.hpp
@@ -44,11 +44,11 @@ public:
void event(Event* ev);
/** Process events for a cycle.
- * @return False iff this source is finished and should be removed.
+ * @return The number of events processed.
*/
- bool process(ProcessContext& context,
- PostProcessor& dest,
- bool limit = true);
+ unsigned process(ProcessContext& context,
+ PostProcessor& dest,
+ bool limit = true);
protected:
virtual void _whipped(); ///< Prepare 1 event
diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp
index 1f459c15..56e9120e 100644
--- a/src/server/ingen_lv2.cpp
+++ b/src/server/ingen_lv2.cpp
@@ -79,7 +79,10 @@ namespace Server {
class LV2Driver;
-void handle_message(LV2Driver* driver, const LV2_Atom* msg);
+void enqueue_message(
+ ProcessContext& context, LV2Driver* driver, const LV2_Atom* msg);
+
+void signal_main(ProcessContext& context, LV2Driver* driver);
class LV2Port : public EnginePort
{
@@ -98,10 +101,16 @@ public:
AudioBuffer* patch_buf = (AudioBuffer*)_patch_port->buffer(0).get();
patch_buf->copy((Sample*)_buffer, 0, context.nframes() - 1);
} else {
- LV2_Atom_Sequence* seq = (LV2_Atom_Sequence*)_buffer;
+ LV2_Atom_Sequence* seq = (LV2_Atom_Sequence*)_buffer;
+ bool enqueued = false;
LV2_ATOM_SEQUENCE_FOREACH(seq, ev) {
- // FIXME: Not RT safe, need to send these through a ring
- handle_message(_driver, &ev->body);
+ // TODO: Only enqueue appropriate atoms
+ enqueue_message(context, _driver, &ev->body);
+ enqueued = true;
+ }
+
+ if (enqueued) {
+ signal_main(context, _driver);
}
}
}
@@ -130,6 +139,7 @@ private:
public:
LV2Driver(Engine& engine, SampleCount buffer_size, SampleCount sample_rate)
: _engine(engine)
+ , _main_sem(0)
, _reader(engine.world()->uri_map(),
engine.world()->uris(),
engine.world()->forge(),
@@ -137,6 +147,7 @@ public:
, _writer(engine.world()->uri_map(),
engine.world()->uris(),
*this)
+ , _from_ui(buffer_size * sizeof(float)) // FIXME: size
, _to_ui(buffer_size * sizeof(float)) // FIXME: size
, _root_patch(NULL)
, _buffer_size(buffer_size)
@@ -152,10 +163,9 @@ public:
for (Ports::iterator i = _ports.begin(); i != _ports.end(); ++i)
(*i)->pre_process(_engine.process_context());
- _engine.process_events();
-
- if (_root_patch)
- _root_patch->process(_engine.process_context());
+ if (_engine.run(nframes) > 0) {
+ _main_sem.post();
+ }
flush_to_ui();
@@ -165,6 +175,11 @@ public:
_frame_time += nframes;
}
+ virtual void deactivate() {
+ _engine.quit();
+ _main_sem.post();
+ }
+
virtual void set_root_patch(PatchImpl* patch) { _root_patch = patch; }
virtual PatchImpl* root_patch() { return _root_patch; }
@@ -203,6 +218,20 @@ public:
return NULL;
}
+ /** Called in run thread for events received at control input port. */
+ void enqueue_message(const LV2_Atom* atom) {
+ if (_from_ui.write(lv2_atom_total_size(atom), atom) == 0) {
+#ifndef NDEBUG
+ Raul::error << "Control input buffer overflow" << std::endl;
+#endif
+ }
+ }
+
+ Raul::Semaphore& main_sem() { return _main_sem; }
+
+ /** AtomSink::write implementation called by the PostProcessor in the main
+ * thread to write responses to the UI.
+ */
void write(const LV2_Atom* atom) {
// Called from post-processor in main thread
while (_to_ui.write(lv2_atom_total_size(atom), atom) == 0) {
@@ -213,6 +242,30 @@ public:
}
}
+ void consume_from_ui() {
+ const uint32_t read_space = _from_ui.read_space();
+ void* buf = NULL;
+ for (uint32_t read = 0; read < read_space;) {
+ LV2_Atom atom;
+ if (!_from_ui.read(sizeof(LV2_Atom), &atom)) {
+ Raul::error << "Error reading head from from-UI ring" << std::endl;
+ break;
+ }
+
+ buf = realloc(buf, sizeof(LV2_Atom) + atom.size);
+ memcpy(buf, &atom, sizeof(LV2_Atom));
+
+ if (!_from_ui.read(atom.size, (char*)buf + sizeof(LV2_Atom))) {
+ Raul::error << "Error reading body from from-UI ring" << std::endl;
+ break;
+ }
+
+ _reader.write((LV2_Atom*)buf);
+ read += sizeof(LV2_Atom) + atom.size;
+ }
+ free(buf);
+ }
+
void flush_to_ui() {
assert(_ports.size() >= 2);
@@ -264,17 +317,18 @@ public:
virtual SampleCount sample_rate() const { return _sample_rate; }
virtual SampleCount frame_time() const { return _frame_time;}
- virtual bool is_realtime() const { return true; }
- //virtual ProcessContext& context() { return _context; }
- Shared::AtomReader& reader() { return _reader; }
- Shared::AtomWriter& writer() { return _writer; }
+ virtual bool is_realtime() const { return true; }
+ Shared::AtomReader& reader() { return _reader; }
+ Shared::AtomWriter& writer() { return _writer; }
Ports& ports() { return _ports; }
private:
Engine& _engine;
+ Raul::Semaphore _main_sem;
Shared::AtomReader _reader;
Shared::AtomWriter _writer;
+ Raul::RingBuffer _from_ui;
Raul::RingBuffer _to_ui;
PatchImpl* _root_patch;
SampleCount _buffer_size;
@@ -286,9 +340,15 @@ private:
};
void
-handle_message(LV2Driver* driver, const LV2_Atom* msg)
+enqueue_message(ProcessContext& context, LV2Driver* driver, const LV2_Atom* msg)
+{
+ driver->enqueue_message(msg);
+}
+
+void
+signal_main(ProcessContext& context, LV2Driver* driver)
{
- driver->reader().write(msg);
+ driver->main_sem().post();
}
} // namespace Server
@@ -302,16 +362,30 @@ using namespace Ingen::Server;
class MainThread : public Raul::Thread
{
public:
- explicit MainThread(SharedPtr<Engine> engine) : _engine(engine) {}
+ explicit MainThread(SharedPtr<Engine> engine,
+ LV2Driver* driver)
+ : _engine(engine)
+ , _driver(driver)
+ {}
private:
virtual void _run() {
- while (_engine->main_iteration()) {
- Glib::usleep(125000); // 1/8 second
+ while (true) {
+ // Wait until there is work to be done
+ _driver->main_sem().wait();
+
+ // Convert pending messages to events and push to pre processor
+ _driver->consume_from_ui();
+
+ // Run post processor and maid to finalise events from last time
+ if (!_engine->main_iteration()) {
+ return;
+ }
}
}
SharedPtr<Engine> _engine;
+ LV2Driver* _driver;
};
struct IngenPlugin {
@@ -411,8 +485,6 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
SharedPtr<Server::Engine> engine(new Server::Engine(plugin->world));
plugin->world->set_engine(engine);
- plugin->main = new MainThread(engine);
- plugin->main->set_name("Main");
SharedPtr<EventWriter> interface =
SharedPtr<EventWriter>(engine->interface(), NullDeleter<EventWriter>);
@@ -426,6 +498,9 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
LV2Driver* driver = new LV2Driver(*engine.get(), 4096, rate);
engine->set_driver(SharedPtr<Ingen::Server::Driver>(driver));
+ plugin->main = new MainThread(engine, driver);
+ plugin->main->set_name("Main");
+
SharedPtr<Interface> client(&driver->writer(), NullDeleter<Interface>);
interface->set_respondee(client);
engine->register_client("http://drobilla.net/ns/ingen#internal", client);
@@ -448,8 +523,6 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
engine->post_processor()->process();
}
- engine->deactivate();
-
return (LV2_Handle)plugin;
}
@@ -484,9 +557,12 @@ ingen_run(LV2_Handle instance, uint32_t sample_count)
{
IngenPlugin* me = (IngenPlugin*)instance;
Server::Engine* engine = (Server::Engine*)me->world->engine().get();
+ LV2Driver* driver = (LV2Driver*)engine->driver();
+
// FIXME: don't do this every call
Raul::Thread::get().set_context(Ingen::Server::THREAD_PROCESS);
- ((LV2Driver*)engine->driver())->run(sample_count);
+
+ driver->run(sample_count);
}
static void