diff options
Diffstat (limited to 'src/engine/MessageContext.cpp')
-rw-r--r-- | src/engine/MessageContext.cpp | 50 |
1 files changed, 36 insertions, 14 deletions
diff --git a/src/engine/MessageContext.cpp b/src/engine/MessageContext.cpp index 5a180a10..6e08eb8d 100644 --- a/src/engine/MessageContext.cpp +++ b/src/engine/MessageContext.cpp @@ -32,16 +32,17 @@ namespace Ingen { void -MessageContext::run(NodeImpl* node) +MessageContext::run(NodeImpl* node, FrameTime time) { if (ThreadManager::current_thread_id() == THREAD_PRE_PROCESS) { assert(node); Glib::Mutex::Lock lock(_mutex); - _request = node; + _non_rt_request = Request(time, node); _sem.post(); _cond.wait(_mutex); } else if (ThreadManager::current_thread_id() == THREAD_PROCESS) { - _requests.write(sizeof(NodeImpl*), &node); + Request r(time, node); + _requests.write(sizeof(Request), &r); // signal() will be called at the end of this process cycle } else if (ThreadManager::current_thread_id() == THREAD_MESSAGE) { cout << "Message context recursion at " << node->path() << endl; @@ -54,33 +55,55 @@ MessageContext::run(NodeImpl* node) void MessageContext::_run() { - NodeImpl* node = NULL; + Request req; while (true) { _sem.wait(); - // Run a node requested by the pre-process thread + // Enqueue a request from the pre-process thread { Glib::Mutex::Lock lock(_mutex); - node = _request.get(); - if (node) { + const Request req = _non_rt_request; + if (req.node) { + _queue.insert(req); + _end_time = std::max(_end_time, req.time); _cond.broadcast(); // Notify caller we got the message - run_node(node); } } - // Run nodes requested by the audio thread + // Enqueue (and thereby sort) requests from audio thread while (has_requests()) { - _requests.full_read(sizeof(NodeImpl*), &node); - run_node(node); + _requests.full_read(sizeof(Request), &req); + if (req.node) { + _queue.insert(req); + } else { + _end_time = req.time; + break; + } + } + + // Run events in time increasing order + // Note that executing nodes may insert further events into the queue + while (!_queue.empty()) { + const Request req = *_queue.begin(); + + // Break if all events during this cycle have been consumed + // (the queue may contain generated events with later times) + if (req.time > _end_time) { + break; + } + + _queue.erase(_queue.begin()); + execute(req); } } } void -MessageContext::run_node(NodeImpl* node) +MessageContext::execute(const Request& req) { + NodeImpl* node = req.node; node->message_run(*this); void* valid_ports = node->valid_ports(); @@ -94,8 +117,7 @@ MessageContext::run_node(NodeImpl* node) for (PatchImpl::Connections::iterator c = wires.begin(); c != wires.end(); ++c) { ConnectionImpl* ci = dynamic_cast<ConnectionImpl*>(c->get()); if (ci->src_port() == p) { - ci->dst_port()->pre_process(*_engine.message_context()); - run_node(ci->dst_port()->parent_node()); + _queue.insert(Request(req.time, ci->dst_port()->parent_node())); } } } |