From 9b7a2af07fd1f5df3e517021d676805eb20bc74f Mon Sep 17 00:00:00 2001 From: David Robillard Date: Thu, 9 Aug 2007 05:16:00 +0000 Subject: Realtime safe parallel graph execution, e.g. run with ingen -e -p 3 for 3 concurrent audio threads. git-svn-id: http://svn.drobilla.net/lad/ingen@689 a436a847-0d15-0410-975c-d299462d15a1 --- src/libs/engine/AudioDriver.hpp | 2 + src/libs/engine/Connection.cpp | 2 +- src/libs/engine/Engine.cpp | 52 +++++---- src/libs/engine/Engine.hpp | 13 ++- src/libs/engine/JackAudioDriver.hpp | 2 +- src/libs/engine/Makefile.am | 3 + src/libs/engine/Node.hpp | 31 ++++++ src/libs/engine/NodeBase.cpp | 55 +++++++++- src/libs/engine/NodeBase.hpp | 39 +++++-- src/libs/engine/NodeFactory.cpp | 2 +- src/libs/engine/Patch.cpp | 145 +++++++++++++++++--------- src/libs/engine/Patch.hpp | 38 ++++--- src/libs/engine/ProcessSlave.cpp | 83 +++++++++++++++ src/libs/engine/ProcessSlave.hpp | 97 +++++++++++++++++ src/libs/engine/events/AddNodeEvent.cpp | 12 +-- src/libs/engine/events/AddNodeEvent.hpp | 23 ++-- src/libs/engine/events/ClearPatchEvent.cpp | 6 +- src/libs/engine/events/ConnectionEvent.cpp | 11 +- src/libs/engine/events/ConnectionEvent.hpp | 3 +- src/libs/engine/events/CreatePatchEvent.cpp | 12 +-- src/libs/engine/events/CreatePatchEvent.hpp | 4 +- src/libs/engine/events/DestroyEvent.cpp | 33 +++--- src/libs/engine/events/DestroyEvent.hpp | 3 +- src/libs/engine/events/DisconnectionEvent.cpp | 12 +-- src/libs/engine/events/DisconnectionEvent.hpp | 3 +- src/libs/engine/events/EnablePatchEvent.cpp | 10 +- src/libs/engine/events/EnablePatchEvent.hpp | 7 +- src/libs/gui/ConnectWindow.cpp | 4 +- src/progs/ingen/cmdline.c | 52 +++++++-- src/progs/ingen/cmdline.ggo | 3 +- src/progs/ingen/cmdline.h | 6 +- src/progs/ingen/main.cpp | 7 +- 32 files changed, 590 insertions(+), 185 deletions(-) create mode 100644 src/libs/engine/ProcessSlave.cpp create mode 100644 src/libs/engine/ProcessSlave.hpp (limited to 'src') diff --git a/src/libs/engine/AudioDriver.hpp b/src/libs/engine/AudioDriver.hpp index bc7d1ee9..51373532 100644 --- a/src/libs/engine/AudioDriver.hpp +++ b/src/libs/engine/AudioDriver.hpp @@ -49,6 +49,8 @@ public: virtual SampleCount buffer_size() const = 0; virtual SampleCount sample_rate() const = 0; virtual SampleCount frame_time() const = 0; + + virtual bool is_realtime() const = 0; }; diff --git a/src/libs/engine/Connection.cpp b/src/libs/engine/Connection.cpp index 3a402948..637da891 100644 --- a/src/libs/engine/Connection.cpp +++ b/src/libs/engine/Connection.cpp @@ -50,7 +50,7 @@ Connection::Connection(Port* src_port, Port* dst_port) if (_must_mix) _local_buffer = BufferFactory::create(dst_port->type(), dst_port->buffer(0)->size()); - cerr << src_port->path() << " -> " << dst_port->path() << " must mix: " << _must_mix << endl; + //cerr << src_port->path() << " -> " << dst_port->path() << " must mix: " << _must_mix << endl; } diff --git a/src/libs/engine/Engine.cpp b/src/libs/engine/Engine.cpp index af4eca27..f904ce2a 100644 --- a/src/libs/engine/Engine.cpp +++ b/src/libs/engine/Engine.cpp @@ -38,6 +38,7 @@ #include "EnablePatchEvent.hpp" #include "OSCEngineReceiver.hpp" #include "PostProcessor.hpp" +#include "ProcessSlave.hpp" #ifdef HAVE_JACK_MIDI #include "JackMidiDriver.hpp" #endif @@ -50,21 +51,23 @@ namespace Ingen { Engine::Engine(Ingen::Shared::World* world) -: _world(world), - _midi_driver(NULL), - _osc_driver(NULL), - _maid(new Raul::Maid(maid_queue_size)), - _post_processor(new PostProcessor(/**_maid, */post_processor_queue_size)), - _broadcaster(new ClientBroadcaster()), - _object_store(new ObjectStore()), - _node_factory(new NodeFactory(world)), -/*#ifdef HAVE_LASH - _lash_driver(new LashDriver()), -#else */ -// _lash_driver(NULL), -//#endif - _quit_flag(false), - _activated(false) + : _world(world) + , _midi_driver(NULL) + , _osc_driver(NULL) + , _maid(new Raul::Maid(maid_queue_size)) + , _post_processor(new PostProcessor(/**_maid, */post_processor_queue_size)) + , _broadcaster(new ClientBroadcaster()) + , _object_store(new ObjectStore()) + , _node_factory(new NodeFactory(world)) +#if 0 +#ifdef HAVE_LASH + , _lash_driver(new LashDriver()) +#else + , _lash_driver(NULL) +#endif +#endif + , _quit_flag(false) + , _activated(false) { } @@ -72,6 +75,11 @@ Engine::Engine(Ingen::Shared::World* world) Engine::~Engine() { deactivate(); + + for (size_t i=0; i < _process_slaves.size(); ++i) + delete _process_slaves[i]; + + _process_slaves.clear(); for (ObjectStore::Objects::const_iterator i = _object_store->objects().begin(); i != _object_store->objects().end(); ++i) { @@ -198,7 +206,7 @@ Engine::set_event_source(SharedPtr source) bool -Engine::activate() +Engine::activate(size_t parallelism) { if (_activated) return false; @@ -216,17 +224,23 @@ Engine::activate() // Create root patch - Patch* root_patch = new Patch("", 1, NULL, + Patch* root_patch = new Patch(*this, "", 1, NULL, _audio_driver->sample_rate(), _audio_driver->buffer_size(), 1); root_patch->activate(); _object_store->add(root_patch); - root_patch->process_order(root_patch->build_process_order()); - root_patch->enable(); + root_patch->compiled_patch(root_patch->compile()); assert(_audio_driver->root_patch() == NULL); _audio_driver->set_root_patch(root_patch); _audio_driver->activate(); + + _process_slaves.clear(); + _process_slaves.reserve(parallelism); + for (size_t i=0; i < parallelism - 1; ++i) + _process_slaves.push_back(new ProcessSlave(_audio_driver->is_realtime())); + + root_patch->enable(); //_post_processor->start(); diff --git a/src/libs/engine/Engine.hpp b/src/libs/engine/Engine.hpp index 562a2fa3..d4f3c972 100644 --- a/src/libs/engine/Engine.hpp +++ b/src/libs/engine/Engine.hpp @@ -19,11 +19,11 @@ #define ENGINE_H #include CONFIG_H_PATH -#include "module/module.h" - #include +#include #include #include +#include "module/module.h" #include "DataType.hpp" template class Queue; @@ -46,6 +46,7 @@ class QueuedEvent; class QueuedEngineInterface; class LashDriver; class Driver; +class ProcessSlave; /** The main class for the Engine. @@ -74,9 +75,7 @@ public: virtual SharedPtr new_queued_interface(); - //virtual void set_event_source(SharedPtr); - - virtual bool activate(); + virtual bool activate(size_t parallelism); virtual void deactivate(); virtual bool activated() { return _activated; } @@ -96,8 +95,12 @@ public: Driver* driver(DataType type); Ingen::Shared::World* world() { return _world; } + + typedef std::vector ProcessSlaves; + inline const ProcessSlaves& process_slaves() const { return _process_slaves; } private: + ProcessSlaves _process_slaves; Ingen::Shared::World* _world; diff --git a/src/libs/engine/JackAudioDriver.hpp b/src/libs/engine/JackAudioDriver.hpp index 583d2c62..9b195b13 100644 --- a/src/libs/engine/JackAudioDriver.hpp +++ b/src/libs/engine/JackAudioDriver.hpp @@ -98,7 +98,7 @@ public: inline const jack_position_t* position() { return &_position; } inline const jack_transport_state_t transport_state() { return _transport_state; } - bool is_realtime() { return jack_is_realtime(_client); } + bool is_realtime() const { return jack_is_realtime(_client); } jack_client_t* jack_client() const { return _client; } SampleCount buffer_size() const { return _buffer_size; } diff --git a/src/libs/engine/Makefile.am b/src/libs/engine/Makefile.am index 7f87c479..8d082d50 100644 --- a/src/libs/engine/Makefile.am +++ b/src/libs/engine/Makefile.am @@ -30,6 +30,7 @@ libingen_engine_la_SOURCES = \ ClientBroadcaster.hpp \ Connection.cpp \ Connection.hpp \ + CompiledPatch.hpp \ DataType.hpp \ Driver.hpp \ DuplexPort.cpp \ @@ -83,6 +84,8 @@ libingen_engine_la_SOURCES = \ Port.hpp \ PostProcessor.cpp \ PostProcessor.hpp \ + ProcessSlave.hpp \ + ProcessSlave.cpp \ QueuedEngineInterface.cpp \ QueuedEngineInterface.hpp \ QueuedEvent.hpp \ diff --git a/src/libs/engine/Node.hpp b/src/libs/engine/Node.hpp index e6c5316b..feb9e9ef 100644 --- a/src/libs/engine/Node.hpp +++ b/src/libs/engine/Node.hpp @@ -62,6 +62,37 @@ public: virtual void deactivate() = 0; virtual bool activated() = 0; + /** Parallelism: Reset flags for start of a new cycle. + */ + virtual void reset_input_ready() = 0; + + /** Parallelism: Claim this node (to wait on its input). + * Only one thread will ever take this lock on a particular Node. + * \return true if lock was aquired, false otherwise + */ + virtual bool process_lock() = 0; + + /** Parallelism: Unclaim this node (let someone else wait on its input). + * Only a thread which successfully called process_lock may call this. + */ + virtual void process_unlock() = 0; + + /** Parallelism: Wait for signal that input is ready. + * Only a thread which successfully called process_lock may call this. + */ + virtual void wait_for_input(size_t num_providers) = 0; + + /** Parallelism: Signal that input is ready. Realtime safe. + * Calling this will wake up the thread which blocked on wait_for_input + * if there is one, and otherwise cause it to return true the next call. + * \return true if lock was aquired and input is ready, false otherwise + */ + virtual void signal_input_ready() = 0; + + /** Parallelism: Return the number of providers that have signalled. + */ + virtual unsigned n_inputs_ready() const = 0; + /** Run the node for @a nframes input/output. * * @a start and @a end are transport times: end is not redundant in the case diff --git a/src/libs/engine/NodeBase.cpp b/src/libs/engine/NodeBase.cpp index b7066a0c..b61783b3 100644 --- a/src/libs/engine/NodeBase.cpp +++ b/src/libs/engine/NodeBase.cpp @@ -41,6 +41,9 @@ NodeBase::NodeBase(const Plugin* plugin, const string& name, uint32_t poly, Patc _buffer_size(buffer_size), _activated(false), _traversed(false), + _input_ready(1), + _process_lock(0), + _n_inputs_ready(0), _ports(NULL), _providers(new Raul::List()), _dependants(new Raul::List()) @@ -88,6 +91,55 @@ NodeBase::set_buffer_size(size_t size) for (size_t i=0; i < _ports->size(); ++i) _ports->at(i)->set_buffer_size(size); } + + +void +NodeBase::reset_input_ready() +{ + //cout << path() << " RESET" << endl; + _n_inputs_ready = 0; + _process_lock = 0; + _input_ready.reset(0); +} + + +bool +NodeBase::process_lock() +{ + return _process_lock.compare_and_exchange(0, 1); +} + + +void +NodeBase::process_unlock() +{ + _process_lock = 0; +} + + +void +NodeBase::wait_for_input(size_t num_providers) +{ + assert(_process_lock.get() == 1); + + while ((unsigned)_n_inputs_ready.get() < num_providers) { + //cout << path() << " WAITING " << _n_inputs_ready.get() << endl; + _input_ready.wait(); + //cout << path() << " CAUGHT SIGNAL" << endl; + //++_n_inputs_ready; + } + + //cout << path() << " READY" << endl; +} + + +void +NodeBase::signal_input_ready() +{ + //cout << path() << " SIGNAL" << endl; + ++_n_inputs_ready; + _input_ready.post(); +} /** Prepare to run a cycle (in the audio thread) @@ -108,7 +160,8 @@ void NodeBase::post_process(SampleCount nframes, FrameTime start, FrameTime end) { assert(_activated); - // Prepare any output ports for reading (MIDI) + + /* Write output ports */ for (size_t i=0; i < _ports->size(); ++i) _ports->at(i)->post_process(nframes, start, end); } diff --git a/src/libs/engine/NodeBase.hpp b/src/libs/engine/NodeBase.hpp index 68a60068..668f615c 100644 --- a/src/libs/engine/NodeBase.hpp +++ b/src/libs/engine/NodeBase.hpp @@ -21,6 +21,8 @@ #include "types.hpp" #include #include +#include +#include #include "Node.hpp" using std::string; @@ -52,10 +54,16 @@ public: virtual void activate(); virtual void deactivate(); bool activated() { return _activated; } + + virtual void reset_input_ready(); + virtual bool process_lock(); + virtual void process_unlock(); + virtual void wait_for_input(size_t num_providers); + virtual unsigned n_inputs_ready() const { return _n_inputs_ready.get(); } - virtual void post_process(SampleCount nframes, FrameTime start, FrameTime end); - virtual void process(SampleCount nframes, FrameTime start, FrameTime end) = 0; virtual void pre_process(SampleCount nframes, FrameTime start, FrameTime end); + virtual void process(SampleCount nframes, FrameTime start, FrameTime end) = 0; + virtual void post_process(SampleCount nframes, FrameTime start, FrameTime end); virtual void set_port_buffer(uint32_t voice, uint32_t port_num, Buffer* buf) {} @@ -70,18 +78,24 @@ public: const Raul::Array& ports() const { return *_ports; } - virtual Raul::List* providers() { return _providers; } - virtual void providers(Raul::List* l) { _providers = l; } + /* These are NOT to be used in the audio thread! + * The providers and dependants in CompiledNode are for that + */ + + virtual Raul::List* providers() { return _providers; } + virtual void providers(Raul::List* l) { _providers = l; } - virtual Raul::List* dependants() { return _dependants; } - virtual void dependants(Raul::List* l) { _dependants = l; } + virtual Raul::List* dependants() { return _dependants; } + virtual void dependants(Raul::List* l) { _dependants = l; } virtual const Plugin* plugin() const { return _plugin; } /** A node's parent is always a patch, so static cast should be safe */ Patch* parent_patch() const { return (Patch*)_parent; } -protected: +protected: + virtual void signal_input_ready(); + const Plugin* _plugin; uint32_t _poly; @@ -89,10 +103,13 @@ protected: size_t _buffer_size; bool _activated; - bool _traversed; ///< Flag for process order algorithm - Raul::Array* _ports; ///< Access in audio thread only - Raul::List* _providers; ///< Nodes connected to this one's input ports - Raul::List* _dependants; ///< Nodes this one's output ports are connected to + bool _traversed; ///< Flag for process order algorithm + Raul::Semaphore _input_ready; ///< Parallelism: input ready signal + Raul::AtomicInt _process_lock; ///< Parallelism: Waiting on inputs 'lock' + Raul::AtomicInt _n_inputs_ready; ///< Parallelism: # input ready signals this cycle + Raul::Array* _ports; ///< Access in audio thread only + Raul::List* _providers; ///< Nodes connected to this one's input ports + Raul::List* _dependants; ///< Nodes this one's output ports are connected to }; diff --git a/src/libs/engine/NodeFactory.cpp b/src/libs/engine/NodeFactory.cpp index dfc8c194..c2432813 100644 --- a/src/libs/engine/NodeFactory.cpp +++ b/src/libs/engine/NodeFactory.cpp @@ -58,7 +58,7 @@ NodeFactory::NodeFactory(Ingen::Shared::World* world) // Add builtin plugin types to _internal_plugins list // FIXME: ewwww, definitely a better way to do this! - Patch* parent = new Patch("dummy", 1, NULL, 1, 1, 1); + Patch* parent = new Patch(*world->local_engine, "dummy", 1, NULL, 1, 1, 1); Node* n = NULL; n = new MidiNoteNode("foo", 1, parent, 1, 1); diff --git a/src/libs/engine/Patch.cpp b/src/libs/engine/Patch.cpp index c9c0bee6..ad1459e8 100644 --- a/src/libs/engine/Patch.cpp +++ b/src/libs/engine/Patch.cpp @@ -25,16 +25,19 @@ #include "Port.hpp" #include "Connection.hpp" #include "DuplexPort.hpp" +#include "Engine.hpp" +#include "ProcessSlave.hpp" -using std::cerr; using std::cout; using std::endl; +using namespace std; namespace Ingen { -Patch::Patch(const string& path, uint32_t poly, Patch* parent, SampleRate srate, size_t buffer_size, uint32_t internal_poly) +Patch::Patch(Engine& engine, const string& path, uint32_t poly, Patch* parent, SampleRate srate, size_t buffer_size, uint32_t internal_poly) : NodeBase(new Plugin(Plugin::Patch, "ingen:patch"), path, poly, parent, srate, buffer_size), + _engine(engine), _internal_poly(internal_poly), - _process_order(NULL), + _compiled_patch(NULL), _process(false) { assert(internal_poly >= 1); @@ -62,7 +65,7 @@ Patch::~Patch() delete _nodes.erase(i); } - delete _process_order; + delete _compiled_patch; } @@ -107,30 +110,94 @@ Patch::disable() /** Run the patch for the specified number of frames. * - * Calls all Nodes in the order _process_order specifies. + * Calls all Nodes in (roughly, if parallel) the order _compiled_patch specifies. */ void Patch::process(SampleCount nframes, FrameTime start, FrameTime end) { - if (_process_order == NULL || !_process) + if (_compiled_patch == NULL || _compiled_patch->size() == 0 || !_process) return; - - // FIXME: This is far too slow, too much iteration/conditionals every cycle + CompiledPatch* const cp = _compiled_patch; + + /* Prepare input ports */ + // This breaks MIDI input, somehow (?) //for (Raul::List::iterator i = _input_ports.begin(); i != _input_ports.end(); ++i) // (*i)->pre_process(nframes, start, end); for (Raul::List::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i) (*i)->pre_process(nframes, start, end); - // Run all nodes (consume input ports) - for (size_t i=0; i < _process_order->size(); ++i) { - // Could be a gap due to a node removal event (see RemoveNodeEvent.cpp) - // Yes, this is ugly - if (_process_order->at(i)) - _process_order->at(i)->process(nframes, start, end); + + /* Start p-1 slaves */ + + size_t n_slaves = _engine.process_slaves().size(); + + if (n_slaves >= cp->size()) + n_slaves = cp->size()-1; + + if (n_slaves > 0) { + for (size_t i=0; i < cp->size(); ++i) + (*cp)[i].node()->reset_input_ready(); + + for (size_t i=0; i < n_slaves; ++i) + _engine.process_slaves()[i]->whip(cp, i+1, nframes, start, end); } + + /* Process ourself until everything is done + * This is analogous to ProcessSlave::_whipped(), but this is the master + * (i.e. what the main Jack process thread calls). Where ProcessSlave + * waits on input, this just skips the node and tries the next, to avoid + * waiting in the Jack thread which pisses Jack off. + */ + + size_t index = 0; + //size_t run_count = 0; + size_t num_finished = 0; // Number of consecutive finished nodes hit + + while (num_finished < cp->size()) { + + CompiledNode& n = (*cp)[index]; + + if (n.node()->process_lock()) { + if (n.node()->n_inputs_ready() == n.n_providers()) { + //cout << "************ Main running " << n.node()->path() << " at index " << index << endl; + n.node()->process(nframes, start, end); + + //cerr << n.node()->path() << " @ " << &n << " dependants: " << n.dependants().size() << endl; + + /* Signal dependants their input is ready */ + for (size_t i=0; i < n.dependants().size(); ++i) + n.dependants()[i]->signal_input_ready(); + + //++run_count; + ++num_finished; + } else { + n.node()->process_unlock(); + num_finished = 0; + } + } else { + if (n.node()->n_inputs_ready() == n.n_providers()) + ++num_finished; + else + num_finished = 0; + } + + index = (index + 1) % cp->size(); + } + + /* Tell slaves we're done in case we beat them, and pray they're + * really done by the start of next cycle. + * FIXME: This probably breaks (race) at extremely small nframes. + */ + for (size_t i=0; i < n_slaves; ++i) + _engine.process_slaves()[i]->finish(); + + //cout << "Main Thread ran \t" << run_count << " nodes this cycle." << endl; + + /* Write output ports */ + for (Raul::List::iterator i = _input_ports.begin(); i != _input_ports.end(); ++i) (*i)->post_process(nframes, start, end); for (Raul::List::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i) @@ -292,63 +359,41 @@ Patch::build_ports_array() const * * Not realtime safe. */ -Raul::Array* -Patch::build_process_order() const +CompiledPatch* +Patch::compile() const { assert(ThreadManager::current_thread_id() == THREAD_PRE_PROCESS); //cerr << "*********** Building process order for " << path() << endl; - Raul::Array* const process_order = new Raul::Array(_nodes.size(), NULL); + CompiledPatch* const compiled_patch = new CompiledPatch();//_nodes.size()); // FIXME: tweak algorithm so it just ends up like this and save the cost of iteration? for (Raul::List::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) (*i)->traversed(false); - // Traverse backwards starting at outputs - //for (Raul::List::const_iterator p = _output_ports.begin(); p != _output_ports.end(); ++p) { - - /*const Port* const port = (*p); - for (Raul::List::const_iterator c = port->connections().begin(); - c != port->connections().end(); ++c) { - const Connection* const connection = (*c); - assert(connection->dst_port() == port); - assert(connection->src_port()); - assert(connection->src_port()->parent_node()); - build_process_order_recursive(connection->src_port()->parent_node(), process_order); - }*/ - //} - for (Raul::List::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) { Node* const node = (*i); // Either a sink or connected to our output ports: if ( ( ! node->traversed()) && node->dependants()->size() == 0) - build_process_order_recursive(node, process_order); + compile_recursive(node, compiled_patch); } - - // Add any (disjoint) nodes that weren't hit by the traversal - // FIXME: this shouldn't be necessary - /*for (Raul::List::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) { - Node* const node = (*i); - if ( ! node->traversed()) { - process_order->push_back(*i); - node->traversed(true); - cerr << "********** APPENDED DISJOINT NODE " << node->path() << endl; - } - }*/ - - /* - cerr << "----------------------------------------\n"; + + /*cerr << "----------------------------------------\n"; for (size_t i=0; i < process_order->size(); ++i) { assert(process_order->at(i)); cerr << process_order->at(i)->path() << endl; } - cerr << "----------------------------------------\n"; - */ + cerr << "----------------------------------------\n";*/ + + assert(compiled_patch->size() == _nodes.size()); - assert(process_order->size() == _nodes.size()); +#ifndef NDEBUG + for (size_t i=0; i < compiled_patch->size(); ++i) + assert(compiled_patch->at(i).node()); +#endif - return process_order; + return compiled_patch; } diff --git a/src/libs/engine/Patch.hpp b/src/libs/engine/Patch.hpp index 0b911541..7780c60b 100644 --- a/src/libs/engine/Patch.hpp +++ b/src/libs/engine/Patch.hpp @@ -24,6 +24,7 @@ #include "NodeBase.hpp" #include "Plugin.hpp" #include "DataType.hpp" +#include "CompiledPatch.hpp" using std::string; @@ -32,6 +33,8 @@ template class Array; namespace Ingen { class Connection; +class Engine; +class CompiledPatch; /** A group of nodes in a graph, possibly polyphonic. @@ -45,7 +48,7 @@ class Connection; class Patch : public NodeBase { public: - Patch(const string& name, uint32_t poly, Patch* parent, SampleRate srate, size_t buffer_size, uint32_t local_poly); + Patch(Engine& engine, const string& name, uint32_t poly, Patch* parent, SampleRate srate, size_t buffer_size, uint32_t local_poly); virtual ~Patch(); void activate(); @@ -76,13 +79,13 @@ public: void add_connection(Raul::ListNode* c) { _connections.push_back(c); } Raul::ListNode* remove_connection(const Port* src_port, const Port* dst_port); - Raul::Array* process_order() { return _process_order; } - void process_order(Raul::Array* po) { _process_order = po; } + CompiledPatch* compiled_patch() { return _compiled_patch; } + void compiled_patch(CompiledPatch* cp) { _compiled_patch = cp; } Raul::Array* external_ports() { return _ports; } void external_ports(Raul::Array* pa) { _ports = pa; } - Raul::Array* build_process_order() const; + CompiledPatch* compile() const; Raul::Array* build_ports_array() const; /** Whether to run this patch's DSP bits in the audio thread */ @@ -93,32 +96,35 @@ public: uint32_t internal_poly() const { return _internal_poly; } private: - inline void build_process_order_recursive(Node* n, Raul::Array* order) const; + inline void compile_recursive(Node* n, CompiledPatch* output) const; + Engine& _engine; uint32_t _internal_poly; - Raul::Array* _process_order; ///< Accessed in audio thread only - Raul::List _connections; ///< Accessed in audio thread only - Raul::List _input_ports; ///< Accessed in preprocessing thread only - Raul::List _output_ports; ///< Accessed in preprocessing thread only - Raul::List _nodes; ///< Accessed in preprocessing thread only + CompiledPatch* _compiled_patch; ///< Accessed in audio thread only + Raul::List _connections; ///< Accessed in audio thread only + Raul::List _input_ports; ///< Accessed in preprocessing thread only + Raul::List _output_ports; ///< Accessed in preprocessing thread only + Raul::List _nodes; ///< Accessed in preprocessing thread only bool _process; }; -/** Private helper for build_process_order */ +/** Private helper for compile */ inline void -Patch::build_process_order_recursive(Node* n, Raul::Array* order) const +Patch::compile_recursive(Node* n, CompiledPatch* output) const { - if (n == NULL || n->traversed()) return; + if (n == NULL || n->traversed()) + return; + n->traversed(true); - assert(order != NULL); + assert(output != NULL); for (Raul::List::iterator i = n->providers()->begin(); i != n->providers()->end(); ++i) if ( ! (*i)->traversed() ) - build_process_order_recursive((*i), order); + compile_recursive((*i), output); - order->push_back(n); + output->push_back(CompiledNode(n, n->providers()->size(), n->dependants())); } diff --git a/src/libs/engine/ProcessSlave.cpp b/src/libs/engine/ProcessSlave.cpp new file mode 100644 index 00000000..fba09da4 --- /dev/null +++ b/src/libs/engine/ProcessSlave.cpp @@ -0,0 +1,83 @@ +/* This file is part of Ingen. + * Copyright (C) 2007 Dave Robillard + * + * Ingen is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include "ProcessSlave.hpp" +#include "Node.hpp" +#include "CompiledPatch.hpp" + +using namespace std; + +namespace Ingen { + + +size_t ProcessSlave::_next_id = 0; + + +void +ProcessSlave::_whipped() +{ + assert(_compiled_patch); + CompiledPatch* const cp = _compiled_patch; + + /* Iterate over all nodes attempting to run immediately or block then run, + * until we've been through the entire array without getting a lock, + * and thus are finished this cycle. + */ + + //size_t run_count = 0; + size_t num_finished = 0; // Number of consecutive finished nodes hit + + while (_state == STATE_RUNNING) { + + CompiledNode& n = (*cp)[_index]; + + if (n.node()->process_lock()) { + + n.node()->wait_for_input(n.n_providers()); + + //cout << "************ Thread " << _id << " running " + // << n.node()->path() << " at index " << _index << endl; + n.node()->process(_nframes, _start, _end); + + //cerr << n.node()->path() << " @ " << &n << " dependants: " << n.dependants().size() << endl; + + /* Signal dependants their input is ready */ + for (size_t i=0; i < n.dependants().size(); ++i) + n.dependants()[i]->signal_input_ready(); + + //++run_count; + num_finished = 1; + } else { + ++num_finished; + } + + _index = (_index + 1) % cp->size(); + + if (num_finished >= cp->size()) + break; + } + + _index = 0; + _compiled_patch = NULL; + _state = STATE_FINISHED; + + //cout << "Thread " << _id << " ran \t" << run_count << " nodes this cycle." << endl; +} + + +} // namespace Ingen diff --git a/src/libs/engine/ProcessSlave.hpp b/src/libs/engine/ProcessSlave.hpp new file mode 100644 index 00000000..a5d2d9cc --- /dev/null +++ b/src/libs/engine/ProcessSlave.hpp @@ -0,0 +1,97 @@ +/* This file is part of Ingen. + * Copyright (C) 2007 Dave Robillard + * + * Ingen is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef PROCESS_SLAVE_HPP +#define PROCESS_SLAVE_HPP + +#include CONFIG_H_PATH + +#include +#include +#include +#include +#include "types.hpp" + +namespace Ingen { + +class Node; +class CompiledPatch; + + +class ProcessSlave : protected Raul::Slave { +public: + ProcessSlave(bool realtime) + : _id(_next_id++), _state(STATE_FINISHED), _index(0), _compiled_patch(NULL) + { + std::stringstream ss; + ss << "Process Slave "; + ss << _id; + set_name(ss.str()); + + start(); + + if (realtime) + set_scheduling(SCHED_FIFO, 40); + } + + ~ProcessSlave() { + stop(); + } + + inline void whip(CompiledPatch* compiled_patch, size_t start_index, + SampleCount nframes, FrameTime start, FrameTime end) { + assert(_state == STATE_FINISHED); + _nframes = nframes; + _start = start; + _end = end; + _index = start_index; + _compiled_patch = compiled_patch; + _state = STATE_RUNNING; + Raul::Slave::whip(); + } + + inline void finish() { + while (_state.get() != STATE_FINISHED) + _state.compare_and_exchange(STATE_RUNNING, STATE_FINISH_SIGNALLED); + } + + size_t id() const { return _id; } + +private: + + void _whipped(); + + static size_t _next_id; + + static const int STATE_RUNNING = 0; + static const int STATE_FINISH_SIGNALLED = 1; + static const int STATE_FINISHED = 2; + + size_t _id; + Raul::AtomicInt _state; + SampleCount _nframes; + FrameTime _start; + FrameTime _end; + size_t _index; + CompiledPatch* _compiled_patch; +}; + + +} // namespace Ingen + +#endif // PROCESS_SLAVE_HPP + diff --git a/src/libs/engine/events/AddNodeEvent.cpp b/src/libs/engine/events/AddNodeEvent.cpp index e692a8ee..3b2b01cf 100644 --- a/src/libs/engine/events/AddNodeEvent.cpp +++ b/src/libs/engine/events/AddNodeEvent.cpp @@ -42,7 +42,7 @@ AddNodeEvent::AddNodeEvent(Engine& engine, SharedPtr responder, Sampl _poly(poly), _patch(NULL), _node(NULL), - _process_order(NULL), + _compiled_patch(NULL), _node_already_exists(false) { } @@ -62,7 +62,7 @@ AddNodeEvent::AddNodeEvent(Engine& engine, SharedPtr responder, Sampl _poly(poly), _patch(NULL), _node(NULL), - _process_order(NULL), + _compiled_patch(NULL), _node_already_exists(false) { } @@ -101,7 +101,7 @@ AddNodeEvent::pre_process() // FIXME: not really necessary to build process order since it's not connected, // just append to the list if (_patch->enabled()) - _process_order = _patch->build_process_order(); + _compiled_patch = _patch->compile(); } } QueuedEvent::pre_process(); @@ -114,9 +114,9 @@ AddNodeEvent::execute(SampleCount nframes, FrameTime start, FrameTime end) QueuedEvent::execute(nframes, start, end); if (_node != NULL) { - if (_patch->process_order() != NULL) - _engine.maid()->push(_patch->process_order()); - _patch->process_order(_process_order); + if (_patch->compiled_patch() != NULL) + _engine.maid()->push(_patch->compiled_patch()); + _patch->compiled_patch(_compiled_patch); } } diff --git a/src/libs/engine/events/AddNodeEvent.hpp b/src/libs/engine/events/AddNodeEvent.hpp index 28df137c..eaf48807 100644 --- a/src/libs/engine/events/AddNodeEvent.hpp +++ b/src/libs/engine/events/AddNodeEvent.hpp @@ -31,6 +31,7 @@ namespace Ingen { class Patch; class Node; class Plugin; +class CompiledPatch; /** An event to load a Node and insert it into a Patch. @@ -62,17 +63,17 @@ public: void post_process(); private: - string _patch_name; - Raul::Path _path; - string _plugin_uri; ///< If nonempty then type, library, label, are ignored - string _plugin_type; - string _plugin_lib; - string _plugin_label; - bool _poly; - Patch* _patch; - Node* _node; - Raul::Array* _process_order; ///< Patch's new process order - bool _node_already_exists; + string _patch_name; + Raul::Path _path; + string _plugin_uri; ///< If nonempty then type, library, label, are ignored + string _plugin_type; + string _plugin_lib; + string _plugin_label; + bool _poly; + Patch* _patch; + Node* _node; + CompiledPatch* _compiled_patch; ///< Patch's new process order + bool _node_already_exists; }; diff --git a/src/libs/engine/events/ClearPatchEvent.cpp b/src/libs/engine/events/ClearPatchEvent.cpp index 2c991cf7..5a914901 100644 --- a/src/libs/engine/events/ClearPatchEvent.cpp +++ b/src/libs/engine/events/ClearPatchEvent.cpp @@ -72,9 +72,9 @@ ClearPatchEvent::execute(SampleCount nframes, FrameTime start, FrameTime end) //for (Raul::List::const_iterator i = _patch->nodes().begin(); i != _patch->nodes().end(); ++i) // (*i)->remove_from_patch(); - if (_patch->process_order() != NULL) { - _engine.maid()->push(_patch->process_order()); - _patch->process_order(NULL); + if (_patch->compiled_patch() != NULL) { + _engine.maid()->push(_patch->compiled_patch()); + _patch->compiled_patch(NULL); } } } diff --git a/src/libs/engine/events/ConnectionEvent.cpp b/src/libs/engine/events/ConnectionEvent.cpp index a8c5dc13..69bb2645 100644 --- a/src/libs/engine/events/ConnectionEvent.cpp +++ b/src/libs/engine/events/ConnectionEvent.cpp @@ -41,7 +41,7 @@ ConnectionEvent::ConnectionEvent(Engine& engine, SharedPtr responder, _patch(NULL), _src_port(NULL), _dst_port(NULL), - _process_order(NULL), + _compiled_patch(NULL), _connection(NULL), _patch_listnode(NULL), _port_listnode(NULL), @@ -138,7 +138,7 @@ ConnectionEvent::pre_process() } if (_patch->enabled()) - _process_order = _patch->build_process_order(); + _compiled_patch = _patch->compile(); QueuedEvent::pre_process(); } @@ -153,9 +153,9 @@ ConnectionEvent::execute(SampleCount nframes, FrameTime start, FrameTime end) // These must be inserted here, since they're actually used by the audio thread _dst_input_port->add_connection(_port_listnode); _patch->add_connection(_patch_listnode); - if (_patch->process_order() != NULL) - _engine.maid()->push(_patch->process_order()); - _patch->process_order(_process_order); + if (_patch->compiled_patch() != NULL) + _engine.maid()->push(_patch->compiled_patch()); + _patch->compiled_patch(_compiled_patch); } } @@ -170,7 +170,6 @@ ConnectionEvent::post_process() // FIXME: better error messages string msg = "Unable to make connection "; msg.append(_src_port_path + " -> " + _dst_port_path); - cerr << "CONNECTION ERROR " << (unsigned)_error << endl; _responder->respond_error(msg); } } diff --git a/src/libs/engine/events/ConnectionEvent.hpp b/src/libs/engine/events/ConnectionEvent.hpp index 1efcee1b..202fef01 100644 --- a/src/libs/engine/events/ConnectionEvent.hpp +++ b/src/libs/engine/events/ConnectionEvent.hpp @@ -39,6 +39,7 @@ class Port; class Connection; class InputPort; class OutputPort; +class CompiledPatch; /** Make a Connection between two Ports. @@ -75,7 +76,7 @@ private: OutputPort* _src_output_port; InputPort* _dst_input_port; - Raul::Array* _process_order; ///< New process order for Patch + CompiledPatch* _compiled_patch; ///< New process order for Patch Connection* _connection; Raul::ListNode* _patch_listnode; diff --git a/src/libs/engine/events/CreatePatchEvent.cpp b/src/libs/engine/events/CreatePatchEvent.cpp index 6c0cd0e8..365b1a02 100644 --- a/src/libs/engine/events/CreatePatchEvent.cpp +++ b/src/libs/engine/events/CreatePatchEvent.cpp @@ -36,7 +36,7 @@ CreatePatchEvent::CreatePatchEvent(Engine& engine, SharedPtr responde _path(path), _patch(NULL), _parent(NULL), - _process_order(NULL), + _compiled_patch(NULL), _poly(poly), _error(NO_ERROR) { @@ -69,13 +69,13 @@ CreatePatchEvent::pre_process() if (_parent != NULL && _poly > 1 && _poly == static_cast(_parent->internal_poly())) poly = _poly; - _patch = new Patch(_path.name(), poly, _parent, _engine.audio_driver()->sample_rate(), _engine.audio_driver()->buffer_size(), _poly); + _patch = new Patch(_engine, _path.name(), poly, _parent, _engine.audio_driver()->sample_rate(), _engine.audio_driver()->buffer_size(), _poly); if (_parent != NULL) { _parent->add_node(new Raul::ListNode(_patch)); if (_parent->enabled()) - _process_order = _parent->build_process_order(); + _compiled_patch = _parent->compile(); } _patch->activate(); @@ -102,9 +102,9 @@ CreatePatchEvent::execute(SampleCount nframes, FrameTime start, FrameTime end) assert(_parent != NULL); assert(_path != "/"); - if (_parent->process_order() != NULL) - _engine.maid()->push(_parent->process_order()); - _parent->process_order(_process_order); + if (_parent->compiled_patch() != NULL) + _engine.maid()->push(_parent->compiled_patch()); + _parent->compiled_patch(_compiled_patch); } } } diff --git a/src/libs/engine/events/CreatePatchEvent.hpp b/src/libs/engine/events/CreatePatchEvent.hpp index 1e45b353..1eb33ed4 100644 --- a/src/libs/engine/events/CreatePatchEvent.hpp +++ b/src/libs/engine/events/CreatePatchEvent.hpp @@ -32,6 +32,7 @@ namespace Ingen { class Patch; class Node; class Plugin; +class CompiledPatch; /** Creates a new Patch. @@ -53,8 +54,7 @@ private: Raul::Path _path; Patch* _patch; Patch* _parent; - Raul::Array* _process_order; - TreeNode* _patch_treenode; + CompiledPatch* _compiled_patch; int _poly; ErrorType _error; }; diff --git a/src/libs/engine/events/DestroyEvent.cpp b/src/libs/engine/events/DestroyEvent.cpp index a30bb3ae..91823a79 100644 --- a/src/libs/engine/events/DestroyEvent.cpp +++ b/src/libs/engine/events/DestroyEvent.cpp @@ -47,7 +47,7 @@ DestroyEvent::DestroyEvent(Engine& engine, SharedPtr responder, Frame _patch_node_listnode(NULL), _patch_port_listnode(NULL), _ports_array(NULL), - _process_order(NULL), + _compiled_patch(NULL), _disconnect_node_event(NULL), _disconnect_port_event(NULL) { @@ -65,9 +65,8 @@ DestroyEvent::DestroyEvent(Engine& engine, SharedPtr responder, Frame _driver_port(NULL), _patch_node_listnode(NULL), _patch_port_listnode(NULL), - _store_treenode(NULL), _ports_array(NULL), - _process_order(NULL), + _compiled_patch(NULL), _disconnect_node_event(NULL), _disconnect_port_event(NULL) { @@ -114,17 +113,19 @@ DestroyEvent::pre_process() if (_node->parent_patch()->enabled()) { // FIXME: is this called multiple times? - _process_order = _node->parent_patch()->build_process_order(); + _compiled_patch = _node->parent_patch()->compile(); // Remove node to be removed from the process order so it isn't executed by // Patch::run and can safely be destroyed - //for (size_t i=0; i < _process_order->size(); ++i) - // if (_process_order->at(i) == _node) - // _process_order->at(i) = NULL; // ew, gap + //for (size_t i=0; i < _compiled_patch->size(); ++i) + // if (_compiled_patch->at(i) == _node) + // _compiled_patch->at(i) = NULL; // ew, gap #ifdef DEBUG // Be sure node is removed from process order, so it can be destroyed - for (size_t i=0; i < _process_order->size(); ++i) - assert(_process_order->at(i) != _node); + for (size_t i=0; i < _compiled_patch->size(); ++i) { + assert(_compiled_patch->at(i).node() != _node); + // FIXME: check providers/dependants too + } #endif } } @@ -141,7 +142,7 @@ DestroyEvent::pre_process() if (_port->parent_patch()->enabled()) { // FIXME: is this called multiple times? - _process_order = _port->parent_patch()->build_process_order(); + _compiled_patch = _port->parent_patch()->compile(); _ports_array = _port->parent_patch()->build_ports_array(); assert(_ports_array->size() == _port->parent_patch()->num_ports()); } @@ -164,9 +165,9 @@ DestroyEvent::execute(SampleCount nframes, FrameTime start, FrameTime end) if (_disconnect_node_event) _disconnect_node_event->execute(nframes, start, end); - if (_node->parent_patch()->process_order()) - _engine.maid()->push(_node->parent_patch()->process_order()); - _node->parent_patch()->process_order(_process_order); + if (_node->parent_patch()->compiled_patch()) + _engine.maid()->push(_node->parent_patch()->compiled_patch()); + _node->parent_patch()->compiled_patch(_compiled_patch); } else if (_patch_port_listnode) { assert(_port); @@ -174,10 +175,10 @@ DestroyEvent::execute(SampleCount nframes, FrameTime start, FrameTime end) if (_disconnect_port_event) _disconnect_port_event->execute(nframes, start, end); - if (_port->parent_patch()->process_order()) - _engine.maid()->push(_port->parent_patch()->process_order()); + if (_port->parent_patch()->compiled_patch()) + _engine.maid()->push(_port->parent_patch()->compiled_patch()); - _port->parent_patch()->process_order(_process_order); + _port->parent_patch()->compiled_patch(_compiled_patch); if (_port->parent_patch()->external_ports()) _engine.maid()->push(_port->parent_patch()->external_ports()); diff --git a/src/libs/engine/events/DestroyEvent.hpp b/src/libs/engine/events/DestroyEvent.hpp index a858ee0b..c401ff31 100644 --- a/src/libs/engine/events/DestroyEvent.hpp +++ b/src/libs/engine/events/DestroyEvent.hpp @@ -41,6 +41,7 @@ class DriverPort; class Plugin; class DisconnectNodeEvent; class DisconnectPortEvent; +class CompiledPatch; /** An event to remove and delete a Node. @@ -68,7 +69,7 @@ private: Raul::ListNode* _patch_node_listnode; Raul::ListNode* _patch_port_listnode; Raul::Array* _ports_array; ///< New (external) ports array for Patch - Raul::Array* _process_order; ///< Patch's new process order + CompiledPatch* _compiled_patch; ///< Patch's new process order DisconnectNodeEvent* _disconnect_node_event; DisconnectPortEvent* _disconnect_port_event; }; diff --git a/src/libs/engine/events/DisconnectionEvent.cpp b/src/libs/engine/events/DisconnectionEvent.cpp index 794399b3..67c3b5fb 100644 --- a/src/libs/engine/events/DisconnectionEvent.cpp +++ b/src/libs/engine/events/DisconnectionEvent.cpp @@ -44,7 +44,7 @@ DisconnectionEvent::DisconnectionEvent(Engine& engine, SharedPtr resp _src_port(NULL), _dst_port(NULL), _lookup(true), - _process_order(NULL), + _compiled_patch(NULL), _error(NO_ERROR) { } @@ -58,7 +58,7 @@ DisconnectionEvent::DisconnectionEvent(Engine& engine, SharedPtr resp _src_port(src_port), _dst_port(dst_port), _lookup(false), - _process_order(NULL), + _compiled_patch(NULL), _error(NO_ERROR) { // FIXME: These break for patch ports.. is that ok? @@ -145,7 +145,7 @@ DisconnectionEvent::pre_process() } if (_patch->enabled()) - _process_order = _patch->build_process_order(); + _compiled_patch = _patch->compile(); QueuedEvent::pre_process(); } @@ -172,9 +172,9 @@ DisconnectionEvent::execute(SampleCount nframes, FrameTime start, FrameTime end) _engine.maid()->push(patch_connection); _engine.maid()->push(port_connection->elem()); - if (_patch->process_order() != NULL) - _engine.maid()->push(_patch->process_order()); - _patch->process_order(_process_order); + if (_patch->compiled_patch() != NULL) + _engine.maid()->push(_patch->compiled_patch()); + _patch->compiled_patch(_compiled_patch); } else { _error = CONNECTION_NOT_FOUND; } diff --git a/src/libs/engine/events/DisconnectionEvent.hpp b/src/libs/engine/events/DisconnectionEvent.hpp index 5a30b7f1..7109c645 100644 --- a/src/libs/engine/events/DisconnectionEvent.hpp +++ b/src/libs/engine/events/DisconnectionEvent.hpp @@ -39,6 +39,7 @@ class Port; class Connection; class InputPort; class OutputPort; +class CompiledPatch; /** Make a Connection between two Ports. @@ -78,7 +79,7 @@ private: bool _lookup; - Raul::Array* _process_order; ///< New process order for Patch + CompiledPatch* _compiled_patch; ///< New process order for Patch ErrorType _error; }; diff --git a/src/libs/engine/events/EnablePatchEvent.cpp b/src/libs/engine/events/EnablePatchEvent.cpp index ed39ae04..46cc5cc6 100644 --- a/src/libs/engine/events/EnablePatchEvent.cpp +++ b/src/libs/engine/events/EnablePatchEvent.cpp @@ -30,7 +30,7 @@ EnablePatchEvent::EnablePatchEvent(Engine& engine, SharedPtr responde : QueuedEvent(engine, responder, timestamp), _patch_path(patch_path), _patch(NULL), - _process_order(NULL) + _compiled_patch(NULL) { } @@ -44,8 +44,8 @@ EnablePatchEvent::pre_process() /* Any event that requires a new process order will set the patch's * process order to NULL if it is executed when the patch is not * active. So, if the PO is NULL, calculate it here */ - if (_patch->process_order() == NULL) - _process_order = _patch->build_process_order(); + if (_patch->compiled_patch() == NULL) + _compiled_patch = _patch->compile(); } QueuedEvent::pre_process(); @@ -60,8 +60,8 @@ EnablePatchEvent::execute(SampleCount nframes, FrameTime start, FrameTime end) if (_patch != NULL) { _patch->enable(); - if (_patch->process_order() == NULL) - _patch->process_order(_process_order); + if (_patch->compiled_patch() == NULL) + _patch->compiled_patch(_compiled_patch); } } diff --git a/src/libs/engine/events/EnablePatchEvent.hpp b/src/libs/engine/events/EnablePatchEvent.hpp index d187594b..2bacd17d 100644 --- a/src/libs/engine/events/EnablePatchEvent.hpp +++ b/src/libs/engine/events/EnablePatchEvent.hpp @@ -29,6 +29,7 @@ namespace Ingen { class Patch; class Node; +class CompiledPatch; /** Enables a patch's DSP processing. @@ -45,9 +46,9 @@ public: void post_process(); private: - string _patch_path; - Patch* _patch; - Raul::Array* _process_order; // Patch's new process order + string _patch_path; + Patch* _patch; + CompiledPatch* _compiled_patch; // Patch's new process order }; diff --git a/src/libs/gui/ConnectWindow.cpp b/src/libs/gui/ConnectWindow.cpp index 1504176b..333d0454 100644 --- a/src/libs/gui/ConnectWindow.cpp +++ b/src/libs/gui/ConnectWindow.cpp @@ -112,7 +112,7 @@ ConnectWindow::start(SharedPtr engine, SharedPtractivate(); + engine->activate(1); // FIXME Glib::signal_timeout().connect( sigc::mem_fun(engine.get(), &Ingen::Engine::main_iteration), 1000); @@ -252,7 +252,7 @@ ConnectWindow::connect() App::instance().attach(engine_interface, client); - _engine->activate(); + _engine->activate(1); // FIXME Glib::signal_timeout().connect( sigc::mem_fun(_engine.get(), &Ingen::Engine::main_iteration), 1000); diff --git a/src/progs/ingen/cmdline.c b/src/progs/ingen/cmdline.c index 462c6a3e..a293d127 100644 --- a/src/progs/ingen/cmdline.c +++ b/src/progs/ingen/cmdline.c @@ -1,7 +1,7 @@ /* - File autogenerated by gengetopt version 2.20 + File autogenerated by gengetopt generated with the following command: - gengetopt + gengetopt -g The developers of gengetopt consider the fixed text that goes in all gengetopt output files to be in the public domain: @@ -36,8 +36,9 @@ const char *gengetopt_args_info_help[] = { " -g, --gui Launch the GTK graphical interface (default=on)", " -C, --client-port=INT Client OSC port", " -l, --load=STRING Load patch", - " -p, --path=STRING Target path for loaded patch", + " -L, --path=STRING Target path for loaded patch", " -r, --run=STRING Run script", + " -p, --parallelism=INT Number of concurrent process threads (default=`1')", 0 }; @@ -66,6 +67,7 @@ void clear_given (struct gengetopt_args_info *args_info) args_info->load_given = 0 ; args_info->path_given = 0 ; args_info->run_given = 0 ; + args_info->parallelism_given = 0 ; } static @@ -84,6 +86,8 @@ void clear_args (struct gengetopt_args_info *args_info) args_info->path_orig = NULL; args_info->run_arg = NULL; args_info->run_orig = NULL; + args_info->parallelism_arg = 1; + args_info->parallelism_orig = NULL; } @@ -100,6 +104,7 @@ void init_args_info(struct gengetopt_args_info *args_info) args_info->load_help = gengetopt_args_info_help[7] ; args_info->path_help = gengetopt_args_info_help[8] ; args_info->run_help = gengetopt_args_info_help[9] ; + args_info->parallelism_help = gengetopt_args_info_help[10] ; } @@ -189,6 +194,11 @@ cmdline_parser_release (struct gengetopt_args_info *args_info) free (args_info->run_orig); /* free previous argument */ args_info->run_orig = 0; } + if (args_info->parallelism_orig) + { + free (args_info->parallelism_orig); /* free previous argument */ + args_info->parallelism_orig = 0; + } clear_given (args_info); } @@ -261,6 +271,13 @@ cmdline_parser_file_save(const char *filename, struct gengetopt_args_info *args_ fprintf(outfile, "%s\n", "run"); } } + if (args_info->parallelism_given) { + if (args_info->parallelism_orig) { + fprintf(outfile, "%s=\"%s\"\n", "parallelism", args_info->parallelism_orig); + } else { + fprintf(outfile, "%s\n", "parallelism"); + } + } fclose (outfile); @@ -351,13 +368,14 @@ cmdline_parser_internal (int argc, char * const *argv, struct gengetopt_args_inf { "gui", 0, NULL, 'g' }, { "client-port", 1, NULL, 'C' }, { "load", 1, NULL, 'l' }, - { "path", 1, NULL, 'p' }, + { "path", 1, NULL, 'L' }, { "run", 1, NULL, 'r' }, + { "parallelism", 1, NULL, 'p' }, { NULL, 0, NULL, 0 } }; stop_char = 0; - c = getopt_long (argc, argv, "hVeE:c:gC:l:p:r:", long_options, &option_index); + c = getopt_long (argc, argv, "hVeE:c:gC:l:L:r:p:", long_options, &option_index); if (c == -1) break; /* Exit from `while (1)' loop. */ @@ -475,10 +493,10 @@ cmdline_parser_internal (int argc, char * const *argv, struct gengetopt_args_inf args_info->load_orig = gengetopt_strdup (optarg); break; - case 'p': /* Target path for loaded patch. */ + case 'L': /* Target path for loaded patch. */ if (local_args_info.path_given) { - fprintf (stderr, "%s: `--path' (`-p') option given more than once%s\n", argv[0], (additional_error ? additional_error : "")); + fprintf (stderr, "%s: `--path' (`-L') option given more than once%s\n", argv[0], (additional_error ? additional_error : "")); goto failure; } if (args_info->path_given && ! override) @@ -511,6 +529,26 @@ cmdline_parser_internal (int argc, char * const *argv, struct gengetopt_args_inf args_info->run_orig = gengetopt_strdup (optarg); break; + case 'p': /* Number of concurrent process threads. */ + if (local_args_info.parallelism_given) + { + fprintf (stderr, "%s: `--parallelism' (`-p') option given more than once%s\n", argv[0], (additional_error ? additional_error : "")); + goto failure; + } + if (args_info->parallelism_given && ! override) + continue; + local_args_info.parallelism_given = 1; + args_info->parallelism_given = 1; + args_info->parallelism_arg = strtol (optarg, &stop_char, 0); + if (!(stop_char && *stop_char == '\0')) { + fprintf(stderr, "%s: invalid numeric value: %s\n", argv[0], optarg); + goto failure; + } + if (args_info->parallelism_orig) + free (args_info->parallelism_orig); /* free previous string */ + args_info->parallelism_orig = gengetopt_strdup (optarg); + break; + case 0: /* Long option with no short option */ case '?': /* Invalid option. */ diff --git a/src/progs/ingen/cmdline.ggo b/src/progs/ingen/cmdline.ggo index f844fa2d..480732e5 100644 --- a/src/progs/ingen/cmdline.ggo +++ b/src/progs/ingen/cmdline.ggo @@ -15,6 +15,7 @@ option "connect" c "Connect to existing engine at OSC URI" string no default="os option "gui" g "Launch the GTK graphical interface" flag on option "client-port" C "Client OSC port" int no option "load" l "Load patch" string no -option "path" p "Target path for loaded patch" string no +option "path" L "Target path for loaded patch" string no option "run" r "Run script" string no +option "parallelism" p "Number of concurrent process threads" int no default="1" diff --git a/src/progs/ingen/cmdline.h b/src/progs/ingen/cmdline.h index d96f5adb..1c0bf681 100644 --- a/src/progs/ingen/cmdline.h +++ b/src/progs/ingen/cmdline.h @@ -1,6 +1,6 @@ /* cmdline.h */ -/* File autogenerated by gengetopt version 2.20 */ +/* File autogenerated by gengetopt */ #ifndef CMDLINE_H #define CMDLINE_H @@ -48,6 +48,9 @@ struct gengetopt_args_info char * run_arg; /* Run script. */ char * run_orig; /* Run script original value given at command line. */ const char *run_help; /* Run script help description. */ + int parallelism_arg; /* Number of concurrent process threads (default='1'). */ + char * parallelism_orig; /* Number of concurrent process threads original value given at command line. */ + const char *parallelism_help; /* Number of concurrent process threads help description. */ int help_given ; /* Whether help was given. */ int version_given ; /* Whether version was given. */ @@ -59,6 +62,7 @@ struct gengetopt_args_info int load_given ; /* Whether load was given. */ int path_given ; /* Whether path was given. */ int run_given ; /* Whether run was given. */ + int parallelism_given ; /* Whether parallelism was given. */ } ; diff --git a/src/progs/ingen/main.cpp b/src/progs/ingen/main.cpp index eb93407b..31453315 100644 --- a/src/progs/ingen/main.cpp +++ b/src/progs/ingen/main.cpp @@ -38,6 +38,7 @@ #include "bindings/ingen_bindings.hpp" #endif + using namespace std; using namespace Ingen; @@ -136,7 +137,7 @@ main(int argc, char** argv) engine->start_jack_driver(); - engine->activate(); + engine->activate(args.parallelism_arg); } world->engine = engine_interface.get(); @@ -229,12 +230,14 @@ main(int argc, char** argv) /* Listen to OSC and do our own main thing. */ } else if (engine && !ran_gui) { + size_t parallelism = args.parallelism_arg; + signal(SIGINT, catch_int); signal(SIGTERM, catch_int); engine->start_osc_driver(args.engine_port_arg); engine->start_jack_driver(); - engine->activate(); + engine->activate(parallelism); engine->main(); } -- cgit v1.2.1