From 1745a63b874c296253a27ca7ad95d3a7b17822f7 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sun, 22 Nov 2009 05:13:47 +0000 Subject: Execute cross-context events in correct increasing time order. Propagate value changes / message sends breadth first instead of deptch first. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@2282 a436a847-0d15-0410-975c-d299462d15a1 --- src/engine/ConnectionImpl.cpp | 6 +++-- src/engine/EventBuffer.hpp | 4 +++ src/engine/JackAudioDriver.cpp | 2 +- src/engine/LV2EventBuffer.cpp | 13 ++++++++++ src/engine/LV2EventBuffer.hpp | 1 + src/engine/MessageContext.cpp | 50 +++++++++++++++++++++++++++----------- src/engine/MessageContext.hpp | 50 ++++++++++++++++++++++++++++++-------- src/engine/ProcessContext.hpp | 4 +-- src/engine/events/SetPortValue.cpp | 4 ++- 9 files changed, 104 insertions(+), 30 deletions(-) diff --git a/src/engine/ConnectionImpl.cpp b/src/engine/ConnectionImpl.cpp index 27786e00..1973397d 100644 --- a/src/engine/ConnectionImpl.cpp +++ b/src/engine/ConnectionImpl.cpp @@ -146,7 +146,8 @@ ConnectionImpl::queue(Context& context) } while (src_buf->is_valid()) { - LV2_Object* obj = src_buf->get_object(); + LV2_Event* ev = src_buf->get_event(); + LV2_Object* obj = LV2_OBJECT_FROM_EVENT(ev); /*cout << _src_port->path() << " -> " << _dst_port->path() << " QUEUE OBJECT TYPE " << obj->type << ":"; for (size_t i = 0; i < obj->size; ++i) @@ -156,7 +157,8 @@ ConnectionImpl::queue(Context& context) _queue->write(sizeof(LV2_Object) + obj->size, obj); src_buf->increment(); - context.engine().message_context()->run(_dst_port->parent_node()); + context.engine().message_context()->run(_dst_port->parent_node(), + context.start() + ev->frames); } } diff --git a/src/engine/EventBuffer.hpp b/src/engine/EventBuffer.hpp index d75c91ba..e82585d6 100644 --- a/src/engine/EventBuffer.hpp +++ b/src/engine/EventBuffer.hpp @@ -64,6 +64,10 @@ public: return _buf->get_object(); } + LV2_Event* get_event() const { + return _buf->get_event(); + } + inline bool append(uint32_t frames, uint32_t subframes, uint16_t type, diff --git a/src/engine/JackAudioDriver.cpp b/src/engine/JackAudioDriver.cpp index edb1ba9a..bb330435 100644 --- a/src/engine/JackAudioDriver.cpp +++ b/src/engine/JackAudioDriver.cpp @@ -374,7 +374,7 @@ JackAudioDriver::_process_cb(jack_nframes_t nframes) // Signal message context to run if necessary if (_engine.message_context()->has_requests()) - _engine.message_context()->signal(); + _engine.message_context()->signal(_process_context); if (_engine.midi_driver()) _engine.midi_driver()->post_process(_process_context); diff --git a/src/engine/LV2EventBuffer.cpp b/src/engine/LV2EventBuffer.cpp index 20df73c4..cb157368 100644 --- a/src/engine/LV2EventBuffer.cpp +++ b/src/engine/LV2EventBuffer.cpp @@ -134,6 +134,19 @@ LV2EventBuffer::get_object() const } +/** Get the event currently pointed to, or NULL if invalid. + */ +LV2_Event* +LV2EventBuffer::get_event() const +{ + if (lv2_event_is_valid(&_iter)) { + uint8_t* data; + return lv2_event_get(&_iter, &data); + } + return NULL; +} + + /** Append an event to the buffer. * * \a timestamp must be >= the latest event in the buffer. diff --git a/src/engine/LV2EventBuffer.hpp b/src/engine/LV2EventBuffer.hpp index 61cdba4c..26548e71 100644 --- a/src/engine/LV2EventBuffer.hpp +++ b/src/engine/LV2EventBuffer.hpp @@ -60,6 +60,7 @@ public: uint8_t** data) const; LV2_Object* get_object() const; + LV2_Event* get_event() const; bool append(uint32_t frames, uint32_t subframes, diff --git a/src/engine/MessageContext.cpp b/src/engine/MessageContext.cpp index 5a180a10..6e08eb8d 100644 --- a/src/engine/MessageContext.cpp +++ b/src/engine/MessageContext.cpp @@ -32,16 +32,17 @@ namespace Ingen { void -MessageContext::run(NodeImpl* node) +MessageContext::run(NodeImpl* node, FrameTime time) { if (ThreadManager::current_thread_id() == THREAD_PRE_PROCESS) { assert(node); Glib::Mutex::Lock lock(_mutex); - _request = node; + _non_rt_request = Request(time, node); _sem.post(); _cond.wait(_mutex); } else if (ThreadManager::current_thread_id() == THREAD_PROCESS) { - _requests.write(sizeof(NodeImpl*), &node); + Request r(time, node); + _requests.write(sizeof(Request), &r); // signal() will be called at the end of this process cycle } else if (ThreadManager::current_thread_id() == THREAD_MESSAGE) { cout << "Message context recursion at " << node->path() << endl; @@ -54,33 +55,55 @@ MessageContext::run(NodeImpl* node) void MessageContext::_run() { - NodeImpl* node = NULL; + Request req; while (true) { _sem.wait(); - // Run a node requested by the pre-process thread + // Enqueue a request from the pre-process thread { Glib::Mutex::Lock lock(_mutex); - node = _request.get(); - if (node) { + const Request req = _non_rt_request; + if (req.node) { + _queue.insert(req); + _end_time = std::max(_end_time, req.time); _cond.broadcast(); // Notify caller we got the message - run_node(node); } } - // Run nodes requested by the audio thread + // Enqueue (and thereby sort) requests from audio thread while (has_requests()) { - _requests.full_read(sizeof(NodeImpl*), &node); - run_node(node); + _requests.full_read(sizeof(Request), &req); + if (req.node) { + _queue.insert(req); + } else { + _end_time = req.time; + break; + } + } + + // Run events in time increasing order + // Note that executing nodes may insert further events into the queue + while (!_queue.empty()) { + const Request req = *_queue.begin(); + + // Break if all events during this cycle have been consumed + // (the queue may contain generated events with later times) + if (req.time > _end_time) { + break; + } + + _queue.erase(_queue.begin()); + execute(req); } } } void -MessageContext::run_node(NodeImpl* node) +MessageContext::execute(const Request& req) { + NodeImpl* node = req.node; node->message_run(*this); void* valid_ports = node->valid_ports(); @@ -94,8 +117,7 @@ MessageContext::run_node(NodeImpl* node) for (PatchImpl::Connections::iterator c = wires.begin(); c != wires.end(); ++c) { ConnectionImpl* ci = dynamic_cast(c->get()); if (ci->src_port() == p) { - ci->dst_port()->pre_process(*_engine.message_context()); - run_node(ci->dst_port()->parent_node()); + _queue.insert(Request(req.time, ci->dst_port()->parent_node())); } } } diff --git a/src/engine/MessageContext.hpp b/src/engine/MessageContext.hpp index c871ad1c..4d8c0695 100644 --- a/src/engine/MessageContext.hpp +++ b/src/engine/MessageContext.hpp @@ -18,12 +18,14 @@ #ifndef MESSAGECONTEXT_H #define MESSAGECONTEXT_H +#include #include #include "raul/Thread.hpp" #include "raul/Semaphore.hpp" #include "raul/AtomicPtr.hpp" #include "object.lv2/object.h" #include "Context.hpp" +#include "ProcessContext.hpp" #include "ThreadManager.hpp" #include "tuning.hpp" @@ -48,19 +50,37 @@ public: , Raul::Thread("message-context") , _sem(0) , _requests(message_context_queue_size) + , _end_time(0) { Thread::set_context(THREAD_MESSAGE); } /** Request a run starting at node. - * * Safe to call from either process thread or pre-process thread. */ - void run(NodeImpl* node); + void run(NodeImpl* node, FrameTime time); - inline void signal() { _sem.post(); } +protected: + struct Request { + Request(FrameTime t=0, NodeImpl* n=0) : time(t), node(n) {} + FrameTime time; + NodeImpl* node; + }; + +public: + /** Signal the end of a cycle that has produced messages. + * AUDIO THREAD ONLY. + */ + inline void signal(ProcessContext& context) { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); + const Request cycle_end_request(context.end(), NULL); + _requests.write(sizeof(Request), &cycle_end_request); + _sem.post(); + } + + /** Return true iff requests are pending. Safe from any thread. */ inline bool has_requests() const { - return _requests.read_space() >= sizeof(NodeImpl*) || _request.get(); + return _requests.read_space() >= sizeof(Request); } protected: @@ -68,13 +88,23 @@ protected: void _run(); /** Actually execute and propagate from node */ - void run_node(NodeImpl* node); + void execute(const Request& req); + + Raul::Semaphore _sem; + Raul::RingBuffer _requests; + Glib::Mutex _mutex; + Glib::Cond _cond; + Request _non_rt_request; + + struct RequestEarlier { + bool operator()(const Request& r1, const Request& r2) { + return r1.time < r2.time; + } + }; - Raul::Semaphore _sem; - Raul::RingBuffer _requests; - Glib::Mutex _mutex; - Glib::Cond _cond; - Raul::AtomicPtr _request; + typedef std::set Queue; + Queue _queue; + FrameTime _end_time; }; diff --git a/src/engine/ProcessContext.hpp b/src/engine/ProcessContext.hpp index 743b72c4..4b3888d0 100644 --- a/src/engine/ProcessContext.hpp +++ b/src/engine/ProcessContext.hpp @@ -52,8 +52,8 @@ public: _end = end; } - inline SampleCount nframes()const { return _nframes; } - inline FrameTime end() const { return _end; } + inline SampleCount nframes() const { return _nframes; } + inline FrameTime end() const { return _end; } private: SampleCount _nframes; ///< Length of this cycle in frames diff --git a/src/engine/events/SetPortValue.cpp b/src/engine/events/SetPortValue.cpp index 2a33b798..3d235cd1 100644 --- a/src/engine/events/SetPortValue.cpp +++ b/src/engine/events/SetPortValue.cpp @@ -22,6 +22,7 @@ #include "shared/LV2Object.hpp" #include "module/World.hpp" #include "AudioBuffer.hpp" +#include "AudioDriver.hpp" #include "ClientBroadcaster.hpp" #include "Engine.hpp" #include "EngineStore.hpp" @@ -118,7 +119,8 @@ SetPortValue::pre_process() if (_port && _port->context() == Context::MESSAGE) { apply(*_engine.message_context()); _port->parent_node()->set_port_valid(_port->index()); - _engine.message_context()->run(_port->parent_node()); + _engine.message_context()->run(_port->parent_node(), + _engine.audio_driver()->frame_time() + _engine.audio_driver()->buffer_size()); } QueuedEvent::pre_process(); -- cgit v1.2.1