diff options
author | David Robillard <d@drobilla.net> | 2007-08-09 05:16:00 +0000 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2007-08-09 05:16:00 +0000 |
commit | 9b7a2af07fd1f5df3e517021d676805eb20bc74f (patch) | |
tree | 3a75d0096fea9f013f7da24cd9c41dcea89fb4de /src/libs/engine/Patch.cpp | |
parent | fccb1edce5dda41bdfef12340a5d5b95b86a1389 (diff) | |
download | ingen-9b7a2af07fd1f5df3e517021d676805eb20bc74f.tar.gz ingen-9b7a2af07fd1f5df3e517021d676805eb20bc74f.tar.bz2 ingen-9b7a2af07fd1f5df3e517021d676805eb20bc74f.zip |
Realtime safe parallel graph execution, e.g. run with ingen -e -p 3 for 3 concurrent audio threads.
git-svn-id: http://svn.drobilla.net/lad/ingen@689 a436a847-0d15-0410-975c-d299462d15a1
Diffstat (limited to 'src/libs/engine/Patch.cpp')
-rw-r--r-- | src/libs/engine/Patch.cpp | 145 |
1 files changed, 95 insertions, 50 deletions
diff --git a/src/libs/engine/Patch.cpp b/src/libs/engine/Patch.cpp index c9c0bee6..ad1459e8 100644 --- a/src/libs/engine/Patch.cpp +++ b/src/libs/engine/Patch.cpp @@ -25,16 +25,19 @@ #include "Port.hpp" #include "Connection.hpp" #include "DuplexPort.hpp" +#include "Engine.hpp" +#include "ProcessSlave.hpp" -using std::cerr; using std::cout; using std::endl; +using namespace std; namespace Ingen { -Patch::Patch(const string& path, uint32_t poly, Patch* parent, SampleRate srate, size_t buffer_size, uint32_t internal_poly) +Patch::Patch(Engine& engine, const string& path, uint32_t poly, Patch* parent, SampleRate srate, size_t buffer_size, uint32_t internal_poly) : NodeBase(new Plugin(Plugin::Patch, "ingen:patch"), path, poly, parent, srate, buffer_size), + _engine(engine), _internal_poly(internal_poly), - _process_order(NULL), + _compiled_patch(NULL), _process(false) { assert(internal_poly >= 1); @@ -62,7 +65,7 @@ Patch::~Patch() delete _nodes.erase(i); } - delete _process_order; + delete _compiled_patch; } @@ -107,30 +110,94 @@ Patch::disable() /** Run the patch for the specified number of frames. * - * Calls all Nodes in the order _process_order specifies. + * Calls all Nodes in (roughly, if parallel) the order _compiled_patch specifies. */ void Patch::process(SampleCount nframes, FrameTime start, FrameTime end) { - if (_process_order == NULL || !_process) + if (_compiled_patch == NULL || _compiled_patch->size() == 0 || !_process) return; - - // FIXME: This is far too slow, too much iteration/conditionals every cycle + CompiledPatch* const cp = _compiled_patch; + + /* Prepare input ports */ + // This breaks MIDI input, somehow (?) //for (Raul::List<Port*>::iterator i = _input_ports.begin(); i != _input_ports.end(); ++i) // (*i)->pre_process(nframes, start, end); for (Raul::List<Port*>::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i) (*i)->pre_process(nframes, start, end); - // Run all nodes (consume input ports) - for (size_t i=0; i < _process_order->size(); ++i) { - // Could be a gap due to a node removal event (see RemoveNodeEvent.cpp) - // Yes, this is ugly - if (_process_order->at(i)) - _process_order->at(i)->process(nframes, start, end); + + /* Start p-1 slaves */ + + size_t n_slaves = _engine.process_slaves().size(); + + if (n_slaves >= cp->size()) + n_slaves = cp->size()-1; + + if (n_slaves > 0) { + for (size_t i=0; i < cp->size(); ++i) + (*cp)[i].node()->reset_input_ready(); + + for (size_t i=0; i < n_slaves; ++i) + _engine.process_slaves()[i]->whip(cp, i+1, nframes, start, end); } + + /* Process ourself until everything is done + * This is analogous to ProcessSlave::_whipped(), but this is the master + * (i.e. what the main Jack process thread calls). Where ProcessSlave + * waits on input, this just skips the node and tries the next, to avoid + * waiting in the Jack thread which pisses Jack off. + */ + + size_t index = 0; + //size_t run_count = 0; + size_t num_finished = 0; // Number of consecutive finished nodes hit + + while (num_finished < cp->size()) { + + CompiledNode& n = (*cp)[index]; + + if (n.node()->process_lock()) { + if (n.node()->n_inputs_ready() == n.n_providers()) { + //cout << "************ Main running " << n.node()->path() << " at index " << index << endl; + n.node()->process(nframes, start, end); + + //cerr << n.node()->path() << " @ " << &n << " dependants: " << n.dependants().size() << endl; + + /* Signal dependants their input is ready */ + for (size_t i=0; i < n.dependants().size(); ++i) + n.dependants()[i]->signal_input_ready(); + + //++run_count; + ++num_finished; + } else { + n.node()->process_unlock(); + num_finished = 0; + } + } else { + if (n.node()->n_inputs_ready() == n.n_providers()) + ++num_finished; + else + num_finished = 0; + } + + index = (index + 1) % cp->size(); + } + + /* Tell slaves we're done in case we beat them, and pray they're + * really done by the start of next cycle. + * FIXME: This probably breaks (race) at extremely small nframes. + */ + for (size_t i=0; i < n_slaves; ++i) + _engine.process_slaves()[i]->finish(); + + //cout << "Main Thread ran \t" << run_count << " nodes this cycle." << endl; + + /* Write output ports */ + for (Raul::List<Port*>::iterator i = _input_ports.begin(); i != _input_ports.end(); ++i) (*i)->post_process(nframes, start, end); for (Raul::List<Port*>::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i) @@ -292,63 +359,41 @@ Patch::build_ports_array() const * * Not realtime safe. */ -Raul::Array<Node*>* -Patch::build_process_order() const +CompiledPatch* +Patch::compile() const { assert(ThreadManager::current_thread_id() == THREAD_PRE_PROCESS); //cerr << "*********** Building process order for " << path() << endl; - Raul::Array<Node*>* const process_order = new Raul::Array<Node*>(_nodes.size(), NULL); + CompiledPatch* const compiled_patch = new CompiledPatch();//_nodes.size()); // FIXME: tweak algorithm so it just ends up like this and save the cost of iteration? for (Raul::List<Node*>::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) (*i)->traversed(false); - // Traverse backwards starting at outputs - //for (Raul::List<Port*>::const_iterator p = _output_ports.begin(); p != _output_ports.end(); ++p) { - - /*const Port* const port = (*p); - for (Raul::List<Connection*>::const_iterator c = port->connections().begin(); - c != port->connections().end(); ++c) { - const Connection* const connection = (*c); - assert(connection->dst_port() == port); - assert(connection->src_port()); - assert(connection->src_port()->parent_node()); - build_process_order_recursive(connection->src_port()->parent_node(), process_order); - }*/ - //} - for (Raul::List<Node*>::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) { Node* const node = (*i); // Either a sink or connected to our output ports: if ( ( ! node->traversed()) && node->dependants()->size() == 0) - build_process_order_recursive(node, process_order); + compile_recursive(node, compiled_patch); } - - // Add any (disjoint) nodes that weren't hit by the traversal - // FIXME: this shouldn't be necessary - /*for (Raul::List<Node*>::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) { - Node* const node = (*i); - if ( ! node->traversed()) { - process_order->push_back(*i); - node->traversed(true); - cerr << "********** APPENDED DISJOINT NODE " << node->path() << endl; - } - }*/ - - /* - cerr << "----------------------------------------\n"; + + /*cerr << "----------------------------------------\n"; for (size_t i=0; i < process_order->size(); ++i) { assert(process_order->at(i)); cerr << process_order->at(i)->path() << endl; } - cerr << "----------------------------------------\n"; - */ + cerr << "----------------------------------------\n";*/ + + assert(compiled_patch->size() == _nodes.size()); - assert(process_order->size() == _nodes.size()); +#ifndef NDEBUG + for (size_t i=0; i < compiled_patch->size(); ++i) + assert(compiled_patch->at(i).node()); +#endif - return process_order; + return compiled_patch; } |