summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2009-11-22 05:13:47 +0000
committerDavid Robillard <d@drobilla.net>2009-11-22 05:13:47 +0000
commit1745a63b874c296253a27ca7ad95d3a7b17822f7 (patch)
tree63bb78f0c34372f70634ea6122d83ed07019dc88
parente479da3c26d41e977cf55b8e2355db45991be09f (diff)
downloadingen-1745a63b874c296253a27ca7ad95d3a7b17822f7.tar.gz
ingen-1745a63b874c296253a27ca7ad95d3a7b17822f7.tar.bz2
ingen-1745a63b874c296253a27ca7ad95d3a7b17822f7.zip
Execute cross-context events in correct increasing time order.
Propagate value changes / message sends breadth first instead of deptch first. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@2282 a436a847-0d15-0410-975c-d299462d15a1
-rw-r--r--src/engine/ConnectionImpl.cpp6
-rw-r--r--src/engine/EventBuffer.hpp4
-rw-r--r--src/engine/JackAudioDriver.cpp2
-rw-r--r--src/engine/LV2EventBuffer.cpp13
-rw-r--r--src/engine/LV2EventBuffer.hpp1
-rw-r--r--src/engine/MessageContext.cpp50
-rw-r--r--src/engine/MessageContext.hpp50
-rw-r--r--src/engine/ProcessContext.hpp4
-rw-r--r--src/engine/events/SetPortValue.cpp4
9 files changed, 104 insertions, 30 deletions
diff --git a/src/engine/ConnectionImpl.cpp b/src/engine/ConnectionImpl.cpp
index 27786e00..1973397d 100644
--- a/src/engine/ConnectionImpl.cpp
+++ b/src/engine/ConnectionImpl.cpp
@@ -146,7 +146,8 @@ ConnectionImpl::queue(Context& context)
}
while (src_buf->is_valid()) {
- LV2_Object* obj = src_buf->get_object();
+ LV2_Event* ev = src_buf->get_event();
+ LV2_Object* obj = LV2_OBJECT_FROM_EVENT(ev);
/*cout << _src_port->path() << " -> " << _dst_port->path()
<< " QUEUE OBJECT TYPE " << obj->type << ":";
for (size_t i = 0; i < obj->size; ++i)
@@ -156,7 +157,8 @@ ConnectionImpl::queue(Context& context)
_queue->write(sizeof(LV2_Object) + obj->size, obj);
src_buf->increment();
- context.engine().message_context()->run(_dst_port->parent_node());
+ context.engine().message_context()->run(_dst_port->parent_node(),
+ context.start() + ev->frames);
}
}
diff --git a/src/engine/EventBuffer.hpp b/src/engine/EventBuffer.hpp
index d75c91ba..e82585d6 100644
--- a/src/engine/EventBuffer.hpp
+++ b/src/engine/EventBuffer.hpp
@@ -64,6 +64,10 @@ public:
return _buf->get_object();
}
+ LV2_Event* get_event() const {
+ return _buf->get_event();
+ }
+
inline bool append(uint32_t frames,
uint32_t subframes,
uint16_t type,
diff --git a/src/engine/JackAudioDriver.cpp b/src/engine/JackAudioDriver.cpp
index edb1ba9a..bb330435 100644
--- a/src/engine/JackAudioDriver.cpp
+++ b/src/engine/JackAudioDriver.cpp
@@ -374,7 +374,7 @@ JackAudioDriver::_process_cb(jack_nframes_t nframes)
// Signal message context to run if necessary
if (_engine.message_context()->has_requests())
- _engine.message_context()->signal();
+ _engine.message_context()->signal(_process_context);
if (_engine.midi_driver())
_engine.midi_driver()->post_process(_process_context);
diff --git a/src/engine/LV2EventBuffer.cpp b/src/engine/LV2EventBuffer.cpp
index 20df73c4..cb157368 100644
--- a/src/engine/LV2EventBuffer.cpp
+++ b/src/engine/LV2EventBuffer.cpp
@@ -134,6 +134,19 @@ LV2EventBuffer::get_object() const
}
+/** Get the event currently pointed to, or NULL if invalid.
+ */
+LV2_Event*
+LV2EventBuffer::get_event() const
+{
+ if (lv2_event_is_valid(&_iter)) {
+ uint8_t* data;
+ return lv2_event_get(&_iter, &data);
+ }
+ return NULL;
+}
+
+
/** Append an event to the buffer.
*
* \a timestamp must be >= the latest event in the buffer.
diff --git a/src/engine/LV2EventBuffer.hpp b/src/engine/LV2EventBuffer.hpp
index 61cdba4c..26548e71 100644
--- a/src/engine/LV2EventBuffer.hpp
+++ b/src/engine/LV2EventBuffer.hpp
@@ -60,6 +60,7 @@ public:
uint8_t** data) const;
LV2_Object* get_object() const;
+ LV2_Event* get_event() const;
bool append(uint32_t frames,
uint32_t subframes,
diff --git a/src/engine/MessageContext.cpp b/src/engine/MessageContext.cpp
index 5a180a10..6e08eb8d 100644
--- a/src/engine/MessageContext.cpp
+++ b/src/engine/MessageContext.cpp
@@ -32,16 +32,17 @@ namespace Ingen {
void
-MessageContext::run(NodeImpl* node)
+MessageContext::run(NodeImpl* node, FrameTime time)
{
if (ThreadManager::current_thread_id() == THREAD_PRE_PROCESS) {
assert(node);
Glib::Mutex::Lock lock(_mutex);
- _request = node;
+ _non_rt_request = Request(time, node);
_sem.post();
_cond.wait(_mutex);
} else if (ThreadManager::current_thread_id() == THREAD_PROCESS) {
- _requests.write(sizeof(NodeImpl*), &node);
+ Request r(time, node);
+ _requests.write(sizeof(Request), &r);
// signal() will be called at the end of this process cycle
} else if (ThreadManager::current_thread_id() == THREAD_MESSAGE) {
cout << "Message context recursion at " << node->path() << endl;
@@ -54,33 +55,55 @@ MessageContext::run(NodeImpl* node)
void
MessageContext::_run()
{
- NodeImpl* node = NULL;
+ Request req;
while (true) {
_sem.wait();
- // Run a node requested by the pre-process thread
+ // Enqueue a request from the pre-process thread
{
Glib::Mutex::Lock lock(_mutex);
- node = _request.get();
- if (node) {
+ const Request req = _non_rt_request;
+ if (req.node) {
+ _queue.insert(req);
+ _end_time = std::max(_end_time, req.time);
_cond.broadcast(); // Notify caller we got the message
- run_node(node);
}
}
- // Run nodes requested by the audio thread
+ // Enqueue (and thereby sort) requests from audio thread
while (has_requests()) {
- _requests.full_read(sizeof(NodeImpl*), &node);
- run_node(node);
+ _requests.full_read(sizeof(Request), &req);
+ if (req.node) {
+ _queue.insert(req);
+ } else {
+ _end_time = req.time;
+ break;
+ }
+ }
+
+ // Run events in time increasing order
+ // Note that executing nodes may insert further events into the queue
+ while (!_queue.empty()) {
+ const Request req = *_queue.begin();
+
+ // Break if all events during this cycle have been consumed
+ // (the queue may contain generated events with later times)
+ if (req.time > _end_time) {
+ break;
+ }
+
+ _queue.erase(_queue.begin());
+ execute(req);
}
}
}
void
-MessageContext::run_node(NodeImpl* node)
+MessageContext::execute(const Request& req)
{
+ NodeImpl* node = req.node;
node->message_run(*this);
void* valid_ports = node->valid_ports();
@@ -94,8 +117,7 @@ MessageContext::run_node(NodeImpl* node)
for (PatchImpl::Connections::iterator c = wires.begin(); c != wires.end(); ++c) {
ConnectionImpl* ci = dynamic_cast<ConnectionImpl*>(c->get());
if (ci->src_port() == p) {
- ci->dst_port()->pre_process(*_engine.message_context());
- run_node(ci->dst_port()->parent_node());
+ _queue.insert(Request(req.time, ci->dst_port()->parent_node()));
}
}
}
diff --git a/src/engine/MessageContext.hpp b/src/engine/MessageContext.hpp
index c871ad1c..4d8c0695 100644
--- a/src/engine/MessageContext.hpp
+++ b/src/engine/MessageContext.hpp
@@ -18,12 +18,14 @@
#ifndef MESSAGECONTEXT_H
#define MESSAGECONTEXT_H
+#include <set>
#include <glibmm/thread.h>
#include "raul/Thread.hpp"
#include "raul/Semaphore.hpp"
#include "raul/AtomicPtr.hpp"
#include "object.lv2/object.h"
#include "Context.hpp"
+#include "ProcessContext.hpp"
#include "ThreadManager.hpp"
#include "tuning.hpp"
@@ -48,19 +50,37 @@ public:
, Raul::Thread("message-context")
, _sem(0)
, _requests(message_context_queue_size)
+ , _end_time(0)
{
Thread::set_context(THREAD_MESSAGE);
}
/** Request a run starting at node.
- *
* Safe to call from either process thread or pre-process thread.
*/
- void run(NodeImpl* node);
+ void run(NodeImpl* node, FrameTime time);
- inline void signal() { _sem.post(); }
+protected:
+ struct Request {
+ Request(FrameTime t=0, NodeImpl* n=0) : time(t), node(n) {}
+ FrameTime time;
+ NodeImpl* node;
+ };
+
+public:
+ /** Signal the end of a cycle that has produced messages.
+ * AUDIO THREAD ONLY.
+ */
+ inline void signal(ProcessContext& context) {
+ assert(ThreadManager::current_thread_id() == THREAD_PROCESS);
+ const Request cycle_end_request(context.end(), NULL);
+ _requests.write(sizeof(Request), &cycle_end_request);
+ _sem.post();
+ }
+
+ /** Return true iff requests are pending. Safe from any thread. */
inline bool has_requests() const {
- return _requests.read_space() >= sizeof(NodeImpl*) || _request.get();
+ return _requests.read_space() >= sizeof(Request);
}
protected:
@@ -68,13 +88,23 @@ protected:
void _run();
/** Actually execute and propagate from node */
- void run_node(NodeImpl* node);
+ void execute(const Request& req);
+
+ Raul::Semaphore _sem;
+ Raul::RingBuffer<Request> _requests;
+ Glib::Mutex _mutex;
+ Glib::Cond _cond;
+ Request _non_rt_request;
+
+ struct RequestEarlier {
+ bool operator()(const Request& r1, const Request& r2) {
+ return r1.time < r2.time;
+ }
+ };
- Raul::Semaphore _sem;
- Raul::RingBuffer<NodeImpl*> _requests;
- Glib::Mutex _mutex;
- Glib::Cond _cond;
- Raul::AtomicPtr<NodeImpl> _request;
+ typedef std::set<Request, RequestEarlier> Queue;
+ Queue _queue;
+ FrameTime _end_time;
};
diff --git a/src/engine/ProcessContext.hpp b/src/engine/ProcessContext.hpp
index 743b72c4..4b3888d0 100644
--- a/src/engine/ProcessContext.hpp
+++ b/src/engine/ProcessContext.hpp
@@ -52,8 +52,8 @@ public:
_end = end;
}
- inline SampleCount nframes()const { return _nframes; }
- inline FrameTime end() const { return _end; }
+ inline SampleCount nframes() const { return _nframes; }
+ inline FrameTime end() const { return _end; }
private:
SampleCount _nframes; ///< Length of this cycle in frames
diff --git a/src/engine/events/SetPortValue.cpp b/src/engine/events/SetPortValue.cpp
index 2a33b798..3d235cd1 100644
--- a/src/engine/events/SetPortValue.cpp
+++ b/src/engine/events/SetPortValue.cpp
@@ -22,6 +22,7 @@
#include "shared/LV2Object.hpp"
#include "module/World.hpp"
#include "AudioBuffer.hpp"
+#include "AudioDriver.hpp"
#include "ClientBroadcaster.hpp"
#include "Engine.hpp"
#include "EngineStore.hpp"
@@ -118,7 +119,8 @@ SetPortValue::pre_process()
if (_port && _port->context() == Context::MESSAGE) {
apply(*_engine.message_context());
_port->parent_node()->set_port_valid(_port->index());
- _engine.message_context()->run(_port->parent_node());
+ _engine.message_context()->run(_port->parent_node(),
+ _engine.audio_driver()->frame_time() + _engine.audio_driver()->buffer_size());
}
QueuedEvent::pre_process();