summaryrefslogtreecommitdiffstats
path: root/src/libs/engine/Patch.cpp
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2007-08-09 05:16:00 +0000
committerDavid Robillard <d@drobilla.net>2007-08-09 05:16:00 +0000
commit9b7a2af07fd1f5df3e517021d676805eb20bc74f (patch)
tree3a75d0096fea9f013f7da24cd9c41dcea89fb4de /src/libs/engine/Patch.cpp
parentfccb1edce5dda41bdfef12340a5d5b95b86a1389 (diff)
downloadingen-9b7a2af07fd1f5df3e517021d676805eb20bc74f.tar.gz
ingen-9b7a2af07fd1f5df3e517021d676805eb20bc74f.tar.bz2
ingen-9b7a2af07fd1f5df3e517021d676805eb20bc74f.zip
Realtime safe parallel graph execution, e.g. run with ingen -e -p 3 for 3 concurrent audio threads.
git-svn-id: http://svn.drobilla.net/lad/ingen@689 a436a847-0d15-0410-975c-d299462d15a1
Diffstat (limited to 'src/libs/engine/Patch.cpp')
-rw-r--r--src/libs/engine/Patch.cpp145
1 files changed, 95 insertions, 50 deletions
diff --git a/src/libs/engine/Patch.cpp b/src/libs/engine/Patch.cpp
index c9c0bee6..ad1459e8 100644
--- a/src/libs/engine/Patch.cpp
+++ b/src/libs/engine/Patch.cpp
@@ -25,16 +25,19 @@
#include "Port.hpp"
#include "Connection.hpp"
#include "DuplexPort.hpp"
+#include "Engine.hpp"
+#include "ProcessSlave.hpp"
-using std::cerr; using std::cout; using std::endl;
+using namespace std;
namespace Ingen {
-Patch::Patch(const string& path, uint32_t poly, Patch* parent, SampleRate srate, size_t buffer_size, uint32_t internal_poly)
+Patch::Patch(Engine& engine, const string& path, uint32_t poly, Patch* parent, SampleRate srate, size_t buffer_size, uint32_t internal_poly)
: NodeBase(new Plugin(Plugin::Patch, "ingen:patch"), path, poly, parent, srate, buffer_size),
+ _engine(engine),
_internal_poly(internal_poly),
- _process_order(NULL),
+ _compiled_patch(NULL),
_process(false)
{
assert(internal_poly >= 1);
@@ -62,7 +65,7 @@ Patch::~Patch()
delete _nodes.erase(i);
}
- delete _process_order;
+ delete _compiled_patch;
}
@@ -107,30 +110,94 @@ Patch::disable()
/** Run the patch for the specified number of frames.
*
- * Calls all Nodes in the order _process_order specifies.
+ * Calls all Nodes in (roughly, if parallel) the order _compiled_patch specifies.
*/
void
Patch::process(SampleCount nframes, FrameTime start, FrameTime end)
{
- if (_process_order == NULL || !_process)
+ if (_compiled_patch == NULL || _compiled_patch->size() == 0 || !_process)
return;
-
- // FIXME: This is far too slow, too much iteration/conditionals every cycle
+ CompiledPatch* const cp = _compiled_patch;
+
+ /* Prepare input ports */
+
// This breaks MIDI input, somehow (?)
//for (Raul::List<Port*>::iterator i = _input_ports.begin(); i != _input_ports.end(); ++i)
// (*i)->pre_process(nframes, start, end);
for (Raul::List<Port*>::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i)
(*i)->pre_process(nframes, start, end);
- // Run all nodes (consume input ports)
- for (size_t i=0; i < _process_order->size(); ++i) {
- // Could be a gap due to a node removal event (see RemoveNodeEvent.cpp)
- // Yes, this is ugly
- if (_process_order->at(i))
- _process_order->at(i)->process(nframes, start, end);
+
+ /* Start p-1 slaves */
+
+ size_t n_slaves = _engine.process_slaves().size();
+
+ if (n_slaves >= cp->size())
+ n_slaves = cp->size()-1;
+
+ if (n_slaves > 0) {
+ for (size_t i=0; i < cp->size(); ++i)
+ (*cp)[i].node()->reset_input_ready();
+
+ for (size_t i=0; i < n_slaves; ++i)
+ _engine.process_slaves()[i]->whip(cp, i+1, nframes, start, end);
}
+
+ /* Process ourself until everything is done
+ * This is analogous to ProcessSlave::_whipped(), but this is the master
+ * (i.e. what the main Jack process thread calls). Where ProcessSlave
+ * waits on input, this just skips the node and tries the next, to avoid
+ * waiting in the Jack thread which pisses Jack off.
+ */
+
+ size_t index = 0;
+ //size_t run_count = 0;
+ size_t num_finished = 0; // Number of consecutive finished nodes hit
+
+ while (num_finished < cp->size()) {
+
+ CompiledNode& n = (*cp)[index];
+
+ if (n.node()->process_lock()) {
+ if (n.node()->n_inputs_ready() == n.n_providers()) {
+ //cout << "************ Main running " << n.node()->path() << " at index " << index << endl;
+ n.node()->process(nframes, start, end);
+
+ //cerr << n.node()->path() << " @ " << &n << " dependants: " << n.dependants().size() << endl;
+
+ /* Signal dependants their input is ready */
+ for (size_t i=0; i < n.dependants().size(); ++i)
+ n.dependants()[i]->signal_input_ready();
+
+ //++run_count;
+ ++num_finished;
+ } else {
+ n.node()->process_unlock();
+ num_finished = 0;
+ }
+ } else {
+ if (n.node()->n_inputs_ready() == n.n_providers())
+ ++num_finished;
+ else
+ num_finished = 0;
+ }
+
+ index = (index + 1) % cp->size();
+ }
+
+ /* Tell slaves we're done in case we beat them, and pray they're
+ * really done by the start of next cycle.
+ * FIXME: This probably breaks (race) at extremely small nframes.
+ */
+ for (size_t i=0; i < n_slaves; ++i)
+ _engine.process_slaves()[i]->finish();
+
+ //cout << "Main Thread ran \t" << run_count << " nodes this cycle." << endl;
+
+ /* Write output ports */
+
for (Raul::List<Port*>::iterator i = _input_ports.begin(); i != _input_ports.end(); ++i)
(*i)->post_process(nframes, start, end);
for (Raul::List<Port*>::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i)
@@ -292,63 +359,41 @@ Patch::build_ports_array() const
*
* Not realtime safe.
*/
-Raul::Array<Node*>*
-Patch::build_process_order() const
+CompiledPatch*
+Patch::compile() const
{
assert(ThreadManager::current_thread_id() == THREAD_PRE_PROCESS);
//cerr << "*********** Building process order for " << path() << endl;
- Raul::Array<Node*>* const process_order = new Raul::Array<Node*>(_nodes.size(), NULL);
+ CompiledPatch* const compiled_patch = new CompiledPatch();//_nodes.size());
// FIXME: tweak algorithm so it just ends up like this and save the cost of iteration?
for (Raul::List<Node*>::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i)
(*i)->traversed(false);
- // Traverse backwards starting at outputs
- //for (Raul::List<Port*>::const_iterator p = _output_ports.begin(); p != _output_ports.end(); ++p) {
-
- /*const Port* const port = (*p);
- for (Raul::List<Connection*>::const_iterator c = port->connections().begin();
- c != port->connections().end(); ++c) {
- const Connection* const connection = (*c);
- assert(connection->dst_port() == port);
- assert(connection->src_port());
- assert(connection->src_port()->parent_node());
- build_process_order_recursive(connection->src_port()->parent_node(), process_order);
- }*/
- //}
-
for (Raul::List<Node*>::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) {
Node* const node = (*i);
// Either a sink or connected to our output ports:
if ( ( ! node->traversed()) && node->dependants()->size() == 0)
- build_process_order_recursive(node, process_order);
+ compile_recursive(node, compiled_patch);
}
-
- // Add any (disjoint) nodes that weren't hit by the traversal
- // FIXME: this shouldn't be necessary
- /*for (Raul::List<Node*>::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) {
- Node* const node = (*i);
- if ( ! node->traversed()) {
- process_order->push_back(*i);
- node->traversed(true);
- cerr << "********** APPENDED DISJOINT NODE " << node->path() << endl;
- }
- }*/
-
- /*
- cerr << "----------------------------------------\n";
+
+ /*cerr << "----------------------------------------\n";
for (size_t i=0; i < process_order->size(); ++i) {
assert(process_order->at(i));
cerr << process_order->at(i)->path() << endl;
}
- cerr << "----------------------------------------\n";
- */
+ cerr << "----------------------------------------\n";*/
+
+ assert(compiled_patch->size() == _nodes.size());
- assert(process_order->size() == _nodes.size());
+#ifndef NDEBUG
+ for (size_t i=0; i < compiled_patch->size(); ++i)
+ assert(compiled_patch->at(i).node());
+#endif
- return process_order;
+ return compiled_patch;
}