summaryrefslogtreecommitdiffstats
path: root/src/engine/MessageContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/engine/MessageContext.cpp')
-rw-r--r--src/engine/MessageContext.cpp50
1 files changed, 36 insertions, 14 deletions
diff --git a/src/engine/MessageContext.cpp b/src/engine/MessageContext.cpp
index 5a180a10..6e08eb8d 100644
--- a/src/engine/MessageContext.cpp
+++ b/src/engine/MessageContext.cpp
@@ -32,16 +32,17 @@ namespace Ingen {
void
-MessageContext::run(NodeImpl* node)
+MessageContext::run(NodeImpl* node, FrameTime time)
{
if (ThreadManager::current_thread_id() == THREAD_PRE_PROCESS) {
assert(node);
Glib::Mutex::Lock lock(_mutex);
- _request = node;
+ _non_rt_request = Request(time, node);
_sem.post();
_cond.wait(_mutex);
} else if (ThreadManager::current_thread_id() == THREAD_PROCESS) {
- _requests.write(sizeof(NodeImpl*), &node);
+ Request r(time, node);
+ _requests.write(sizeof(Request), &r);
// signal() will be called at the end of this process cycle
} else if (ThreadManager::current_thread_id() == THREAD_MESSAGE) {
cout << "Message context recursion at " << node->path() << endl;
@@ -54,33 +55,55 @@ MessageContext::run(NodeImpl* node)
void
MessageContext::_run()
{
- NodeImpl* node = NULL;
+ Request req;
while (true) {
_sem.wait();
- // Run a node requested by the pre-process thread
+ // Enqueue a request from the pre-process thread
{
Glib::Mutex::Lock lock(_mutex);
- node = _request.get();
- if (node) {
+ const Request req = _non_rt_request;
+ if (req.node) {
+ _queue.insert(req);
+ _end_time = std::max(_end_time, req.time);
_cond.broadcast(); // Notify caller we got the message
- run_node(node);
}
}
- // Run nodes requested by the audio thread
+ // Enqueue (and thereby sort) requests from audio thread
while (has_requests()) {
- _requests.full_read(sizeof(NodeImpl*), &node);
- run_node(node);
+ _requests.full_read(sizeof(Request), &req);
+ if (req.node) {
+ _queue.insert(req);
+ } else {
+ _end_time = req.time;
+ break;
+ }
+ }
+
+ // Run events in time increasing order
+ // Note that executing nodes may insert further events into the queue
+ while (!_queue.empty()) {
+ const Request req = *_queue.begin();
+
+ // Break if all events during this cycle have been consumed
+ // (the queue may contain generated events with later times)
+ if (req.time > _end_time) {
+ break;
+ }
+
+ _queue.erase(_queue.begin());
+ execute(req);
}
}
}
void
-MessageContext::run_node(NodeImpl* node)
+MessageContext::execute(const Request& req)
{
+ NodeImpl* node = req.node;
node->message_run(*this);
void* valid_ports = node->valid_ports();
@@ -94,8 +117,7 @@ MessageContext::run_node(NodeImpl* node)
for (PatchImpl::Connections::iterator c = wires.begin(); c != wires.end(); ++c) {
ConnectionImpl* ci = dynamic_cast<ConnectionImpl*>(c->get());
if (ci->src_port() == p) {
- ci->dst_port()->pre_process(*_engine.message_context());
- run_node(ci->dst_port()->parent_node());
+ _queue.insert(Request(req.time, ci->dst_port()->parent_node()));
}
}
}