From 667e633e829760b5a1e9591227ec5437cac1995d Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sat, 18 Feb 2017 19:29:15 +0100 Subject: Improve parallel analysis and execution algorithms --- src/server/Clock.hpp | 60 --------------- src/server/CompiledGraph.cpp | 180 ++++++++++++++++++++++++++----------------- src/server/CompiledGraph.hpp | 27 ++++--- src/server/Engine.hpp | 2 +- src/server/Task.cpp | 77 +++++++++--------- src/server/Task.hpp | 49 ++++++------ src/server/ingen_lv2.cpp | 6 +- 7 files changed, 197 insertions(+), 204 deletions(-) delete mode 100644 src/server/Clock.hpp (limited to 'src/server') diff --git a/src/server/Clock.hpp b/src/server/Clock.hpp deleted file mode 100644 index d9526d02..00000000 --- a/src/server/Clock.hpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2016 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#ifndef INGEN_ENGINE_CLOCK_HPP -#define INGEN_ENGINE_CLOCK_HPP - -#ifdef __MACH__ -# include -# include -#endif - -namespace Ingen { -namespace Server { - -class Clock { -public: -#ifdef __MACH__ - - Clock() { mach_timebase_info(&_timebase); } - - uint64_t now_microseconds() const { - const uint64_t now = mach_absolute_time(); - return now * _timebase.numer / _timebase.denom / 1e3; - } - -private: - mach_timebase_info_data_t _timebase; - -#else - - uint64_t now_microseconds() const { - struct timespec time; -# if defined(CLOCK_MONOTONIC_RAW) - clock_gettime(CLOCK_MONOTONIC_RAW, &time); -# else - clock_gettime(CLOCK_MONOTONIC, &time); -# endif - return (uint64_t)time.tv_sec * 1e6 + (uint64_t)time.tv_nsec / 1e3; - } - -#endif -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_CLOCK_HPP diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp index b808cb90..09e378b4 100644 --- a/src/server/CompiledGraph.cpp +++ b/src/server/CompiledGraph.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2015-2016 David Robillard + Copyright 2015-2017 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -40,10 +40,22 @@ public: const BlockImpl* root; }; +static bool +has_provider_with_many_dependants(BlockImpl* n) +{ + for (BlockImpl* p : n->providers()) { + if (p->dependants().size() > 1) { + return true; + } + } + + return false; +} + CompiledGraph::CompiledGraph(GraphImpl* graph) : _log(graph->engine().log()) , _path(graph->path()) - , _master(Task::Mode::SEQUENTIAL) + , _master(new Task(Task::Mode::SEQUENTIAL)) { compile_graph(graph); } @@ -66,18 +78,31 @@ CompiledGraph::compile(Raul::Maid& maid, GraphImpl& graph) } } -void -CompiledGraph::compile_set(const std::set& blocks, - Task& task, - std::set& k) +static size_t +num_unvisited_dependants(BlockImpl* block) { - // Keep compiling working set until all nodes are visited - for (BlockImpl* block : blocks) { - // Each block is the start of a new sequential task - Task seq(Task::Mode::SEQUENTIAL); - compile_block(block, seq, k); - task.push_back(seq); + size_t count = 0; + for (BlockImpl* b : block->dependants()) { + if (b->get_mark() == BlockImpl::Mark::UNVISITED) { + ++count; + } + } + return count; +} + +static size_t +parallel_depth(BlockImpl* block) +{ + if (has_provider_with_many_dependants(block)) { + return 2; + } + + size_t min_provider_depth = std::numeric_limits::max(); + for (auto p : block->providers()) { + min_provider_depth = std::min(min_provider_depth, parallel_depth(p)); } + + return 2 + min_provider_depth; } void @@ -91,41 +116,34 @@ CompiledGraph::compile_graph(GraphImpl* graph) // Mark all blocks as unvisited initially b.set_mark(BlockImpl::Mark::UNVISITED); - if (b.providers().empty()) { - // Block has no dependencies, add to initial working set + if (b.dependants().empty()) { + // Block has no dependants, add to initial working set blocks.insert(&b); } } - // Compile initial working set into master task - Task start(Task::Mode::PARALLEL); - std::set next; - compile_set(blocks, start, next); - _master.push_back(start); + // Keep compiling working set until all nodes are visited + while (!blocks.empty()) { + std::set predecessors; - // Keep compiling working set until all connected nodes are visited - while (!next.empty()) { - blocks.clear(); - // The working set is a parallel task... - Task par(Task::Mode::PARALLEL); - for (BlockImpl* block : next) { - // ... where each block is the start of a new sequential task - Task seq(Task::Mode::SEQUENTIAL); - compile_block(block, seq, blocks); - par.push_back(seq); + // Calculate maximum sequential depth to consume this phase + size_t depth = std::numeric_limits::max(); + for (auto i : blocks) { + depth = std::min(depth, parallel_depth(i)); } - _master.push_back(par); - next = blocks; - } - // Compile any nodes that weren't reached (disconnected cycles) - for (auto& b : graph->blocks()) { - if (b.get_mark() == BlockImpl::Mark::UNVISITED) { - compile_block(&b, _master, next); + Task par(Task::Mode::PARALLEL); + for (auto b : blocks) { + assert(num_unvisited_dependants(b) == 0); + Task seq(Task::Mode::SEQUENTIAL); + compile_block(b, seq, depth, predecessors); + par.push_front(std::move(seq)); } + _master->push_front(std::move(par)); + blocks = predecessors; } - _master.simplify(); + _master = Task::simplify(std::move(_master)); if (graph->engine().world()->conf().option("trace").get()) { dump([this](const std::string& msg) { @@ -137,75 +155,93 @@ CompiledGraph::compile_graph(GraphImpl* graph) /** Throw a FeedbackException iff `dependant` has `root` as a dependency. */ static void -check_feedback(const BlockImpl* root, BlockImpl* dependant) +check_feedback(const BlockImpl* root, BlockImpl* provider) { - if (dependant == root) { + if (provider == root) { throw FeedbackException(root); } - for (auto& d : dependant->dependants()) { - const BlockImpl::Mark mark = d->get_mark(); + for (auto p : provider->providers()) { + const BlockImpl::Mark mark = p->get_mark(); switch (mark) { case BlockImpl::Mark::UNVISITED: - d->set_mark(BlockImpl::Mark::VISITING); - check_feedback(root, d); + p->set_mark(BlockImpl::Mark::VISITING); + check_feedback(root, p); break; case BlockImpl::Mark::VISITING: - throw FeedbackException(d, root); + throw FeedbackException(p, root); case BlockImpl::Mark::VISITED: break; } - d->set_mark(mark); + p->set_mark(mark); } } void -CompiledGraph::compile_dependant(const BlockImpl* root, - BlockImpl* block, - Task& task, - std::set& k) +CompiledGraph::compile_provider(const BlockImpl* root, + BlockImpl* block, + Task& task, + size_t max_depth, + std::set& k) { - if (block->providers().size() > 1) { - /* Dependant has other providers, so this is the start of a sequential task. - Add dependant to future working set and stop traversal. */ + if (block->dependants().size() > 1) { + /* Provider has other dependants, so this is the tail of a sequential task. + Add provider to future working set and stop traversal. */ check_feedback(root, block); - k.insert(block); - } else { - // Dependant has only this provider, add here + if (num_unvisited_dependants(block) == 0) { + k.insert(block); + } + } else if (max_depth > 0) { + // Calling dependant has only this provider, add here if (task.mode() == Task::Mode::PARALLEL) { // Inside a parallel task, compile into a new sequential child Task seq(Task::Mode::SEQUENTIAL); - compile_block(block, seq, k); - task.push_back(seq); + compile_block(block, seq, max_depth, k); + task.push_front(std::move(seq)); } else { - // Append to given sequential task - compile_block(block, task, k); + // Prepend to given sequential task + compile_block(block, task, max_depth, k); + } + } else { + if (num_unvisited_dependants(block) == 0) { + k.insert(block); } } } void -CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set& k) +CompiledGraph::compile_block(BlockImpl* n, + Task& task, + size_t max_depth, + std::set& k) { switch (n->get_mark()) { case BlockImpl::Mark::UNVISITED: n->set_mark(BlockImpl::Mark::VISITING); - // Execute this task before the dependants to follow - task.push_back(Task(Task::Mode::SINGLE, n)); + // Execute this task after the providers to follow + task.push_front(Task(Task::Mode::SINGLE, n)); - if (n->dependants().size() < 2) { - // Single dependant, append to this sequential task - for (auto& d : n->dependants()) { - compile_dependant(n, d, task, k); + if (n->providers().size() < 2) { + // Single provider, prepend it to this sequential task + for (auto p : n->providers()) { + compile_provider(n, p, task, max_depth - 1, k); + } + } else if (has_provider_with_many_dependants(n)) { + // Stop recursion and enqueue providers for the next round + for (auto p : n->providers()) { + if (num_unvisited_dependants(p) == 0) { + k.insert(p); + } } } else { - // Multiple dependants, create a new parallel task + // Multiple providers with only this node as dependant, + // make a new parallel task to execute them Task par(Task::Mode::PARALLEL); - for (auto& d : n->dependants()) { - compile_dependant(n, d, par, k); + for (auto p : n->providers()) { + compile_provider(n, p, par, max_depth - 1, k); } - task.push_back(par); + task.push_front(std::move(par)); } n->set_mark(BlockImpl::Mark::VISITED); break; @@ -221,7 +257,7 @@ CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set& k) void CompiledGraph::run(RunContext& context) { - _master.run(context); + _master->run(context); } void @@ -229,7 +265,7 @@ CompiledGraph::dump(std::function sink) const { sink("(compiled-graph "); sink(_path); - _master.dump(sink, 2, false); + _master->dump(sink, 2, false); sink(")\n"); } diff --git a/src/server/CompiledGraph.hpp b/src/server/CompiledGraph.hpp index eeeb6111..0578c18a 100644 --- a/src/server/CompiledGraph.hpp +++ b/src/server/CompiledGraph.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2016 David Robillard + Copyright 2007-2017 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -61,16 +61,21 @@ private: typedef std::set BlockSet; void compile_graph(GraphImpl* graph); - void compile_set(const BlockSet& blocks, Task& task, BlockSet& k); - void compile_block(BlockImpl* block, Task& task, BlockSet& k); - void compile_dependant(const BlockImpl* root, - BlockImpl* block, - Task& task, - BlockSet& k); - - Log& _log; - const Raul::Path _path; - Task _master; + + void compile_block(BlockImpl* block, + Task& task, + size_t max_depth, + BlockSet& k); + + void compile_provider(const BlockImpl* root, + BlockImpl* block, + Task& task, + size_t max_depth, + BlockSet& k); + + Log& _log; + const Raul::Path _path; + std::unique_ptr _master; }; } // namespace Server diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index ea5a1402..084e995d 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -22,13 +22,13 @@ #include #include +#include "ingen/Clock.hpp" #include "ingen/EngineBase.hpp" #include "ingen/Interface.hpp" #include "ingen/ingen.h" #include "ingen/types.hpp" #include "raul/Noncopyable.hpp" -#include "Clock.hpp" #include "Event.hpp" #include "RunContext.hpp" #include "EventWriter.hpp" diff --git a/src/server/Task.cpp b/src/server/Task.cpp index e19855d3..f15c9881 100644 --- a/src/server/Task.cpp +++ b/src/server/Task.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2015-2016 David Robillard + Copyright 2015-2017 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -30,14 +30,14 @@ Task::run(RunContext& context) _block->process(context); break; case Mode::SEQUENTIAL: - for (Task& task : *this) { - task.run(context); + for (const auto& task : _children) { + task->run(context); } break; case Mode::PARALLEL: // Initialize (not) done state of sub-tasks - for (Task& task : *this) { - task.set_done(false); + for (const auto& task : _children) { + task->set_done(false); } // Grab the first sub-task @@ -65,12 +65,12 @@ Task::steal(RunContext& context) { if (_mode == Mode::PARALLEL) { const unsigned i = _next++; - if (i < size()) { - return &(*this)[i]; + if (i < _children.size()) { + return _children[i].get(); } } - return NULL; + return nullptr; } Task* @@ -84,15 +84,16 @@ Task::get_task(RunContext& context) while (true) { // Push done end index as forward as possible - for (; _done_end < size() && (*this)[_done_end].done(); ++_done_end) {} + while (_done_end < _children.size() && _children[_done_end]->done()) { + ++_done_end; + } - if (_done_end >= size()) { - return NULL; // All child tasks are finished + if (_done_end >= _children.size()) { + return nullptr; // All child tasks are finished } // All child tasks claimed, but some are unfinished, steal a task - t = context.engine().steal_task(context.id() + 1); - if (t) { + if ((t = context.engine().steal_task(context.id() + 1))) { return t; } @@ -104,30 +105,36 @@ Task::get_task(RunContext& context) } } -void -Task::simplify() +std::unique_ptr +Task::simplify(std::unique_ptr&& task) { - if (_mode != Mode::SINGLE) { - for (std::vector::iterator t = begin(); t != end();) { - t->simplify(); - if (t->mode() != Mode::SINGLE && t->empty()) { - // Empty task, erase - t = erase(t); - } else if (t->mode() == _mode) { - // Subtask with the same type, fold child into parent - const Task child(*t); - t = erase(t); - t = insert(t, child.begin(), child.end()); - } else { - ++t; - } - } + if (task->mode() == Mode::SINGLE) { + return std::move(task); + } - if (size() == 1) { - const Task t(front()); - *this = t; + for (auto t = task->_children.begin(); t != task->_children.end();) { + *t = simplify(std::move(*t)); + if ((*t)->empty()) { + // Empty task, erase + t = task->_children.erase(t); + } else if ((*t)->mode() == task->_mode) { + // Subtask with the same type, fold child into parent + std::unique_ptr child(std::move(*t)); + t = task->_children.erase(t); + t = task->_children.insert( + t, + std::make_move_iterator(child->_children.begin()), + std::make_move_iterator(child->_children.end())); + } else { + ++t; } } + + if (task->_children.size() == 1) { + return std::move(task->_children.front()); + } + + return std::move(task); } void @@ -144,8 +151,8 @@ Task::dump(std::function sink, unsigned indent, bool sink(_block->path()); } else { sink(((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par ")); - for (size_t i = 0; i < size(); ++i) { - (*this)[i].dump(sink, indent + 5, i == 0); + for (size_t i = 0; i < _children.size(); ++i) { + _children[i]->dump(sink, indent + 5, i == 0); } sink(")"); } diff --git a/src/server/Task.hpp b/src/server/Task.hpp index 2c1a0cc2..982a6206 100644 --- a/src/server/Task.hpp +++ b/src/server/Task.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2016 David Robillard + Copyright 2007-2017 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -18,8 +18,9 @@ #define INGEN_ENGINE_TASK_HPP #include +#include +#include #include -#include namespace Ingen { namespace Server { @@ -27,7 +28,7 @@ namespace Server { class BlockImpl; class RunContext; -class Task : public std::vector { +class Task { public: enum class Mode { SINGLE, ///< Single block to run @@ -35,7 +36,7 @@ public: PARALLEL ///< Elements may be run in any order in parallel }; - Task(Mode mode, BlockImpl* block=NULL) + Task(Mode mode, BlockImpl* block = nullptr) : _block(block) , _mode(mode) , _done_end(0) @@ -45,23 +46,13 @@ public: assert(!(mode == Mode::SINGLE && !block)); } - Task& operator=(const Task& copy) { - *static_cast*>(this) = copy; - _block = copy._block; - _mode = copy._mode; - _done_end = copy._done_end; - _next = copy._next.load(); - _done = copy._done.load(); - return *this; - } - - Task(const Task& copy) - : std::vector(copy) - , _block(copy._block) - , _mode(copy._mode) - , _done_end(copy._done_end) - , _next(copy._next.load()) - , _done(copy._done.load()) + Task(Task&& task) + : _children(std::move(task._children)) + , _block(task._block) + , _mode(task._mode) + , _done_end(task._done_end) + , _next(task._next.load()) + , _done(task._done.load()) {} /** Run task in the given context. */ @@ -70,12 +61,20 @@ public: /** Pretty print task to the given stream (recursively). */ void dump(std::function sink, unsigned indent, bool first) const; + /** Return true iff this is an empty task. */ + bool empty() const { return _mode != Mode::SINGLE && _children.empty(); } + /** Simplify task expression. */ - void simplify(); + static std::unique_ptr simplify(std::unique_ptr&& task); /** Steal a child task from this task (succeeds for PARALLEL only). */ Task* steal(RunContext& context); + /** Prepend a child to this task. */ + void push_front(Task&& task) { + _children.push_front(std::unique_ptr(new Task(std::move(task)))); + } + Mode mode() const { return _mode; } BlockImpl* block() const { return _block; } bool done() const { return _done; } @@ -83,8 +82,14 @@ public: void set_done(bool done) { _done = done; } private: + typedef std::deque> Children; + + Task(const Task&) = delete; + Task& operator=(const Task&) = delete; + Task* get_task(RunContext& context); + Children _children; ///< Vector of child tasks BlockImpl* _block; ///< Used for SINGLE only Mode _mode; ///< Execution mode unsigned _done_end; ///< Index of rightmost done sub-task diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 9119ae14..00b59ca7 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -507,9 +507,9 @@ ingen_instantiate(const LV2_Descriptor* descriptor, } IngenPlugin* plugin = new IngenPlugin(); - plugin->map = map; - plugin->world = new Ingen::World( - plugin->argc, plugin->argv, map, unmap, log); + plugin->map = map; + plugin->world = new Ingen::World(map, unmap, log); + plugin->world->load_configuration(plugin->argc, plugin->argv); LV2_URID bufsz_max = map->map(map->handle, LV2_BUF_SIZE__maxBlockLength); LV2_URID bufsz_seq = map->map(map->handle, LV2_BUF_SIZE__sequenceSize); -- cgit v1.2.1