diff options
author | David Robillard <d@drobilla.net> | 2012-07-31 00:47:40 +0000 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2012-07-31 00:47:40 +0000 |
commit | 66ac730782803a47ae4977d2db2407126005f4bd (patch) | |
tree | db31b21dff549897aa5cc730d028913959aa930d | |
parent | 88f9a3bf23fc629385978633a8764d74788e1fdc (diff) | |
download | ingen-66ac730782803a47ae4977d2db2407126005f4bd.tar.gz ingen-66ac730782803a47ae4977d2db2407126005f4bd.tar.bz2 ingen-66ac730782803a47ae4977d2db2407126005f4bd.zip |
Remove rotten parallelism stuff.
git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4581 a436a847-0d15-0410-975c-d299462d15a1
-rw-r--r-- | src/Configuration.cpp | 1 | ||||
-rw-r--r-- | src/server/Context.cpp | 4 | ||||
-rw-r--r-- | src/server/Context.hpp | 2 | ||||
-rw-r--r-- | src/server/JackDriver.cpp | 5 | ||||
-rw-r--r-- | src/server/MessageContext.cpp | 2 | ||||
-rw-r--r-- | src/server/NodeImpl.cpp | 39 | ||||
-rw-r--r-- | src/server/NodeImpl.hpp | 33 | ||||
-rw-r--r-- | src/server/PatchImpl.cpp | 79 | ||||
-rw-r--r-- | src/server/PatchImpl.hpp | 3 | ||||
-rw-r--r-- | src/server/ProcessContext.cpp | 42 | ||||
-rw-r--r-- | src/server/ProcessContext.hpp | 23 | ||||
-rw-r--r-- | src/server/ProcessSlave.cpp | 75 | ||||
-rw-r--r-- | src/server/ProcessSlave.hpp | 95 | ||||
-rw-r--r-- | src/server/wscript | 2 |
14 files changed, 13 insertions, 392 deletions
diff --git a/src/Configuration.cpp b/src/Configuration.cpp index f3e57520..4db54b65 100644 --- a/src/Configuration.cpp +++ b/src/Configuration.cpp @@ -45,7 +45,6 @@ Configuration::Configuration() add("uuid", 'u', "JACK session UUID", STRING, Value()); add("load", 'l', "Load patch", STRING, Value()); add("packet-size", 'k', "Maximum UDP packet size", INT, Value(4096)); - add("parallelism", 'p', "Number of concurrent process threads", INT, Value(1)); add("path", 'L', "Target path for loaded patch", STRING, Value()); add("queue-size", 'q', "Event queue size", INT, Value(4096)); add("run", 'r', "Run script", STRING, Value()); diff --git a/src/server/Context.cpp b/src/server/Context.cpp index cd42b8d1..9aeb08db 100644 --- a/src/server/Context.cpp +++ b/src/server/Context.cpp @@ -43,10 +43,10 @@ struct Notification Raul::Atom::TypeID type; }; -Context::Context(Engine& engine, size_t event_sink_size, ID id) +Context::Context(Engine& engine, ID id) : _engine(engine) , _id(id) - , _event_sink(event_sink_size * sizeof(Notification)) + , _event_sink(engine.event_queue_size() * sizeof(Notification)) , _start(0) , _end(0) , _nframes(0) diff --git a/src/server/Context.hpp b/src/server/Context.hpp index 2311f7d2..c5502687 100644 --- a/src/server/Context.hpp +++ b/src/server/Context.hpp @@ -49,7 +49,7 @@ public: MESSAGE }; - Context(Engine& engine, size_t event_sink_size, ID id); + Context(Engine& engine, ID id); virtual ~Context() {} diff --git a/src/server/JackDriver.cpp b/src/server/JackDriver.cpp index db535a0f..78600b77 100644 --- a/src/server/JackDriver.cpp +++ b/src/server/JackDriver.cpp @@ -40,7 +40,6 @@ #include "MessageContext.hpp" #include "PatchImpl.hpp" #include "PortImpl.hpp" -#include "ProcessSlave.hpp" #include "ThreadManager.hpp" #include "util.hpp" @@ -277,10 +276,6 @@ JackDriver::activate() _is_activated = true; - _engine.process_context().activate( - world->conf().option("parallelism").get_int(), - is_realtime()); - if (jack_activate(_client)) { LOG(Raul::error)("Could not activate Jack client, aborting\n"); exit(EXIT_FAILURE); diff --git a/src/server/MessageContext.cpp b/src/server/MessageContext.cpp index e0c6b010..fd43596c 100644 --- a/src/server/MessageContext.cpp +++ b/src/server/MessageContext.cpp @@ -29,7 +29,7 @@ namespace Ingen { namespace Server { MessageContext::MessageContext(Engine& engine) - : Context(engine, engine.event_queue_size(), MESSAGE) + : Context(engine, MESSAGE) , Raul::Thread("MessageContext") , _sem(0) , _requests(engine.event_queue_size()) diff --git a/src/server/NodeImpl.cpp b/src/server/NodeImpl.cpp index 2b465b76..914d9906 100644 --- a/src/server/NodeImpl.cpp +++ b/src/server/NodeImpl.cpp @@ -43,9 +43,6 @@ NodeImpl::NodeImpl(PluginImpl* plugin, , _ports(NULL) , _context(Context::AUDIO) , _polyphony((polyphonic && parent) ? parent->internal_poly() : 1) - , _input_ready(1) - , _process_lock(0) - , _n_inputs_ready(0) , _polyphonic(polyphonic) , _activated(false) , _traversed(false) @@ -159,42 +156,6 @@ NodeImpl::set_buffer_size(Context& context, } } -void -NodeImpl::reset_input_ready() -{ - _n_inputs_ready = 0; - _process_lock = 0; - _input_ready.reset(0); -} - -bool -NodeImpl::process_lock() -{ - return _process_lock.compare_and_exchange(0, 1); -} - -void -NodeImpl::process_unlock() -{ - _process_lock = 0; -} - -void -NodeImpl::wait_for_input(ProcessContext& context, size_t num_providers) -{ - assert(_process_lock.get() == 1); - - while ((unsigned)_n_inputs_ready.get() < num_providers) - _input_ready.wait(); -} - -void -NodeImpl::signal_input_ready(ProcessContext& context) -{ - ++_n_inputs_ready; - _input_ready.post(); -} - /** Prepare to run a cycle (in the audio thread) */ void diff --git a/src/server/NodeImpl.hpp b/src/server/NodeImpl.hpp index f9509e49..2b0faf03 100644 --- a/src/server/NodeImpl.hpp +++ b/src/server/NodeImpl.hpp @@ -22,7 +22,6 @@ #include "raul/Array.hpp" #include "raul/AtomicInt.hpp" -#include "raul/Semaphore.hpp" #include "BufferRef.hpp" #include "Context.hpp" @@ -88,35 +87,6 @@ public: /** Return true iff this node is activated */ bool activated() { return _activated; } - /** Parallelism: Reset flags for start of a new cycle. */ - virtual void reset_input_ready(); - - /** Parallelism: Claim this node (to wait on its input). - * Only one thread will ever take this lock on a particular Node. - * \return true if lock was aquired, false otherwise - */ - virtual bool process_lock(); - - /** Parallelism: Unclaim this node (let someone else wait on its input). - * Only a thread which successfully called process_lock may call this. - */ - virtual void process_unlock(); - - /** Parallelism: Wait for signal that input is ready. - * Only a thread which successfully called process_lock may call this. - */ - virtual void wait_for_input(ProcessContext& context, size_t num_providers); - - /** Parallelism: Signal that input is ready. Realtime safe. - * Calling this will wake up the thread which blocked on wait_for_input - * if there is one, and otherwise cause it to return true the next call. - * \return true if lock was aquired and input is ready, false otherwise - */ - virtual void signal_input_ready(ProcessContext& context); - - /** Parallelism: Return the number of providers that have signalled. */ - virtual unsigned n_inputs_ready() const { return _n_inputs_ready.get(); } - /** Learn the next incoming MIDI event (for internals) */ virtual void learn() {} @@ -196,9 +166,6 @@ protected: Raul::Array<PortImpl*>* _ports; ///< Access in audio thread only Context::ID _context; ///< Context this node runs in uint32_t _polyphony; - Raul::Semaphore _input_ready; ///< Parallelism: input ready signal - Raul::AtomicInt _process_lock; ///< Parallelism: Waiting on inputs 'lock' - Raul::AtomicInt _n_inputs_ready; ///< Parallelism: # input ready signals this cycle std::list<NodeImpl*> _providers; ///< Nodes connected to this one's input ports std::list<NodeImpl*> _dependants; ///< Nodes this one's output ports are connected to bool _polyphonic; diff --git a/src/server/PatchImpl.cpp b/src/server/PatchImpl.cpp index 359122fa..ee5ea230 100644 --- a/src/server/PatchImpl.cpp +++ b/src/server/PatchImpl.cpp @@ -28,7 +28,6 @@ #include "PatchImpl.hpp" #include "PatchPlugin.hpp" #include "PortImpl.hpp" -#include "ProcessSlave.hpp" #include "ThreadManager.hpp" using namespace std; @@ -154,10 +153,8 @@ PatchImpl::process(ProcessContext& context) if (_compiled_patch && _compiled_patch->size() > 0) { // Run all nodes - if (context.slaves().size() > 0) { - process_parallel(context); - } else { - process_single(context); + for (size_t i = 0; i < _compiled_patch->size(); ++i) { + (*_compiled_patch)[i].node()->process(context); } // Queue any cross-context edges @@ -171,78 +168,6 @@ PatchImpl::process(ProcessContext& context) } void -PatchImpl::process_parallel(ProcessContext& context) -{ - size_t n_slaves = context.slaves().size(); - - CompiledPatch* const cp = _compiled_patch; - - /* Start p-1 slaves */ - - if (n_slaves >= cp->size()) - n_slaves = cp->size()-1; - - if (n_slaves > 0) { - for (size_t i = 0; i < cp->size(); ++i) - (*cp)[i].node()->reset_input_ready(); - - for (size_t i = 0; i < n_slaves; ++i) - context.slaves()[i]->whip(cp, i+1, context); - } - - /* Process ourself until everything is done - * This is analogous to ProcessSlave::_whipped(), but this is the master - * (i.e. what the main Jack process thread calls). Where ProcessSlave - * waits on input, this just skips the node and tries the next, to avoid - * waiting in the Jack thread which pisses Jack off. - */ - - size_t index = 0; - size_t num_finished = 0; // Number of consecutive finished nodes hit - - while (num_finished < cp->size()) { - CompiledNode& n = (*cp)[index]; - - if (n.node()->process_lock()) { - if (n.node()->n_inputs_ready() == n.n_providers()) { - n.node()->process(context); - - /* Signal dependants their input is ready */ - for (uint32_t i = 0; i < n.dependants().size(); ++i) - n.dependants()[i]->signal_input_ready(context); - - ++num_finished; - } else { - n.node()->process_unlock(); - num_finished = 0; - } - } else { - if (n.node()->n_inputs_ready() == n.n_providers()) - ++num_finished; - else - num_finished = 0; - } - - index = (index + 1) % cp->size(); - } - - /* Tell slaves we're done in case we beat them, and pray they're - * really done by the start of next cycle. - * FIXME: This probably breaks (race) at extremely small nframes where - * ingen is the majority of the DSP load. - */ - for (uint32_t i = 0; i < n_slaves; ++i) - context.slaves()[i]->finish(); -} - -void -PatchImpl::process_single(ProcessContext& context) -{ - for (size_t i = 0; i < _compiled_patch->size(); ++i) - (*_compiled_patch)[i].node()->process(context); -} - -void PatchImpl::set_buffer_size(Context& context, BufferFactory& bufs, LV2_URID type, diff --git a/src/server/PatchImpl.hpp b/src/server/PatchImpl.hpp index 1d8cef08..1204eeee 100644 --- a/src/server/PatchImpl.hpp +++ b/src/server/PatchImpl.hpp @@ -156,9 +156,6 @@ public: Engine& engine() { return _engine; } private: - void process_parallel(ProcessContext& context); - void process_single(ProcessContext& context); - Engine& _engine; uint32_t _poly_pre; ///< Pre-process thread only uint32_t _poly_process; ///< Process thread only diff --git a/src/server/ProcessContext.cpp b/src/server/ProcessContext.cpp deleted file mode 100644 index 66dbcb19..00000000 --- a/src/server/ProcessContext.cpp +++ /dev/null @@ -1,42 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard <http://drobilla.net/> - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "Engine.hpp" -#include "ProcessContext.hpp" -#include "ProcessSlave.hpp" - -namespace Ingen { -namespace Server { - -ProcessContext::ProcessContext(Engine& engine) - : Context(engine, engine.event_queue_size(), AUDIO) -{} - -void -ProcessContext::activate(uint32_t parallelism, bool sched_rt) -{ - for (uint32_t i = 0; i < _slaves.size(); ++i) { - delete _slaves[i]; - } - _slaves.clear(); - _slaves.reserve(parallelism); - for (uint32_t i = 0; i < parallelism - 1; ++i) { - _slaves.push_back(new ProcessSlave(_engine, sched_rt)); - } -} - -} // namespace Server -} // namespace Ingen diff --git a/src/server/ProcessContext.hpp b/src/server/ProcessContext.hpp index 9e0115e6..75fdcb83 100644 --- a/src/server/ProcessContext.hpp +++ b/src/server/ProcessContext.hpp @@ -17,33 +17,24 @@ #ifndef INGEN_ENGINE_PROCESSCONTEXT_HPP #define INGEN_ENGINE_PROCESSCONTEXT_HPP -#include <vector> - #include "Context.hpp" namespace Ingen { namespace Server { -class ProcessSlave; - /** Context of a process() call (the audio context). + * + * Currently this class has no implementation to speak of, but the type is used + * as a parameter for audio context methods to make their context clear. + * * \ingroup engine */ class ProcessContext : public Context { public: - typedef std::vector<ProcessSlave*> Slaves; - - const Slaves& slaves() const { return _slaves; } - Slaves& slaves() { return _slaves; } - - void activate(uint32_t parallelism, bool sched_rt); - -private: - friend class Engine; - explicit ProcessContext(Engine& engine); - - Slaves _slaves; + ProcessContext(Engine& engine) + : Context(engine, AUDIO) + {} }; } // namespace Server diff --git a/src/server/ProcessSlave.cpp b/src/server/ProcessSlave.cpp deleted file mode 100644 index 83a8956f..00000000 --- a/src/server/ProcessSlave.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard <http://drobilla.net/> - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "CompiledPatch.hpp" -#include "NodeImpl.hpp" -#include "ProcessSlave.hpp" -#include "ThreadManager.hpp" - -using namespace std; - -namespace Ingen { -namespace Server { - -uint32_t ProcessSlave::_next_id = 0; - -void -ProcessSlave::_whipped() -{ - ThreadManager::set_flag(THREAD_PROCESS); - - assert(_compiled_patch); - CompiledPatch* const cp = _compiled_patch; - - /* Iterate over all nodes attempting to run immediately or block then run, - * until we've been through the entire array without getting a lock, - * and thus are finished this cycle. - */ - - size_t num_finished = 0; // Number of consecutive finished nodes hit - - while (_state == STATE_RUNNING) { - - CompiledNode& n = (*cp)[_index]; - - if (n.node()->process_lock()) { - - n.node()->wait_for_input(*_context, n.n_providers()); - - n.node()->process(*_context); - - /* Signal dependants their input is ready */ - for (size_t i=0; i < n.dependants().size(); ++i) - n.dependants()[i]->signal_input_ready(*_context); - - num_finished = 1; - } else { - ++num_finished; - } - - _index = (_index + 1) % cp->size(); - - if (num_finished >= cp->size()) - break; - } - - _index = 0; - _compiled_patch = NULL; - _state = STATE_FINISHED; -} - -} // namespace Server -} // namespace Ingen diff --git a/src/server/ProcessSlave.hpp b/src/server/ProcessSlave.hpp deleted file mode 100644 index 742e9141..00000000 --- a/src/server/ProcessSlave.hpp +++ /dev/null @@ -1,95 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard <http://drobilla.net/> - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef INGEN_ENGINE_PROCESSSLAVE_HPP -#define INGEN_ENGINE_PROCESSSLAVE_HPP - -#include <stdint.h> - -#include "raul/AtomicInt.hpp" -#include "raul/Slave.hpp" -#include "raul/log.hpp" - -namespace Ingen { -namespace Server { - -class CompiledPatch; -class Engine; -class ProcessContext; - -class ProcessSlave : protected Raul::Slave { -public: - ProcessSlave(Engine& engine, bool realtime) - : Raul::Slave((Raul::fmt("Process Slave %1%") % _next_id).str()) - , _engine(engine) - , _context(NULL) - , _id(_next_id++) - , _index(0) - , _state(STATE_FINISHED) - , _compiled_patch(NULL) - { - start(); - - if (realtime) { - set_scheduling(true, 40); - } - } - - ~ProcessSlave() { - stop(); - } - - inline void whip(CompiledPatch* compiled_patch, - uint32_t start_index, - ProcessContext& context) - { - assert(_state == STATE_FINISHED); - _context = &context; - _index = start_index; - _state = STATE_RUNNING; - _compiled_patch = compiled_patch; - - Raul::Slave::whip(); - } - - inline void finish() { - while (_state.get() != STATE_FINISHED) - _state.compare_and_exchange(STATE_RUNNING, STATE_FINISH_SIGNALLED); - } - - inline uint32_t id() const { return _id; } - -private: - void _whipped(); - - static uint32_t _next_id; - - static const int STATE_RUNNING = 0; - static const int STATE_FINISH_SIGNALLED = 1; - static const int STATE_FINISHED = 2; - - Engine& _engine; - ProcessContext* _context; - uint32_t _id; - uint32_t _index; - Raul::AtomicInt _state; - CompiledPatch* _compiled_patch; -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_PROCESSSLAVE_HPP diff --git a/src/server/wscript b/src/server/wscript index 7f048828..023408de 100644 --- a/src/server/wscript +++ b/src/server/wscript @@ -28,8 +28,6 @@ def build(bld): PortImpl.cpp PostProcessor.cpp PreProcessor.cpp - ProcessContext.cpp - ProcessSlave.cpp Worker.cpp events/Connect.cpp events/CreateNode.cpp |