summaryrefslogtreecommitdiffstats
path: root/src/server
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2017-02-18 19:29:15 +0100
committerDavid Robillard <d@drobilla.net>2017-02-18 19:38:14 +0100
commit667e633e829760b5a1e9591227ec5437cac1995d (patch)
treeffbcdcdacb1187912027e0365c8b0e571b43e5bf /src/server
parent24b575cf28f9b007939b9fa14c991c51326c8348 (diff)
downloadingen-667e633e829760b5a1e9591227ec5437cac1995d.tar.gz
ingen-667e633e829760b5a1e9591227ec5437cac1995d.tar.bz2
ingen-667e633e829760b5a1e9591227ec5437cac1995d.zip
Improve parallel analysis and execution algorithms
Diffstat (limited to 'src/server')
-rw-r--r--src/server/Clock.hpp60
-rw-r--r--src/server/CompiledGraph.cpp180
-rw-r--r--src/server/CompiledGraph.hpp27
-rw-r--r--src/server/Engine.hpp2
-rw-r--r--src/server/Task.cpp77
-rw-r--r--src/server/Task.hpp49
-rw-r--r--src/server/ingen_lv2.cpp6
7 files changed, 197 insertions, 204 deletions
diff --git a/src/server/Clock.hpp b/src/server/Clock.hpp
deleted file mode 100644
index d9526d02..00000000
--- a/src/server/Clock.hpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- This file is part of Ingen.
- Copyright 2016 David Robillard <http://drobilla.net/>
-
- Ingen is free software: you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free
- Software Foundation, either version 3 of the License, or any later version.
-
- Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
-
- You should have received a copy of the GNU Affero General Public License
- along with Ingen. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef INGEN_ENGINE_CLOCK_HPP
-#define INGEN_ENGINE_CLOCK_HPP
-
-#ifdef __MACH__
-# include <mach/mach.h>
-# include <mach/mach_time.h>
-#endif
-
-namespace Ingen {
-namespace Server {
-
-class Clock {
-public:
-#ifdef __MACH__
-
- Clock() { mach_timebase_info(&_timebase); }
-
- uint64_t now_microseconds() const {
- const uint64_t now = mach_absolute_time();
- return now * _timebase.numer / _timebase.denom / 1e3;
- }
-
-private:
- mach_timebase_info_data_t _timebase;
-
-#else
-
- uint64_t now_microseconds() const {
- struct timespec time;
-# if defined(CLOCK_MONOTONIC_RAW)
- clock_gettime(CLOCK_MONOTONIC_RAW, &time);
-# else
- clock_gettime(CLOCK_MONOTONIC, &time);
-# endif
- return (uint64_t)time.tv_sec * 1e6 + (uint64_t)time.tv_nsec / 1e3;
- }
-
-#endif
-};
-
-} // namespace Server
-} // namespace Ingen
-
-#endif // INGEN_ENGINE_CLOCK_HPP
diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp
index b808cb90..09e378b4 100644
--- a/src/server/CompiledGraph.cpp
+++ b/src/server/CompiledGraph.cpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2015-2016 David Robillard <http://drobilla.net/>
+ Copyright 2015-2017 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -40,10 +40,22 @@ public:
const BlockImpl* root;
};
+static bool
+has_provider_with_many_dependants(BlockImpl* n)
+{
+ for (BlockImpl* p : n->providers()) {
+ if (p->dependants().size() > 1) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
CompiledGraph::CompiledGraph(GraphImpl* graph)
: _log(graph->engine().log())
, _path(graph->path())
- , _master(Task::Mode::SEQUENTIAL)
+ , _master(new Task(Task::Mode::SEQUENTIAL))
{
compile_graph(graph);
}
@@ -66,18 +78,31 @@ CompiledGraph::compile(Raul::Maid& maid, GraphImpl& graph)
}
}
-void
-CompiledGraph::compile_set(const std::set<BlockImpl*>& blocks,
- Task& task,
- std::set<BlockImpl*>& k)
+static size_t
+num_unvisited_dependants(BlockImpl* block)
{
- // Keep compiling working set until all nodes are visited
- for (BlockImpl* block : blocks) {
- // Each block is the start of a new sequential task
- Task seq(Task::Mode::SEQUENTIAL);
- compile_block(block, seq, k);
- task.push_back(seq);
+ size_t count = 0;
+ for (BlockImpl* b : block->dependants()) {
+ if (b->get_mark() == BlockImpl::Mark::UNVISITED) {
+ ++count;
+ }
+ }
+ return count;
+}
+
+static size_t
+parallel_depth(BlockImpl* block)
+{
+ if (has_provider_with_many_dependants(block)) {
+ return 2;
+ }
+
+ size_t min_provider_depth = std::numeric_limits<size_t>::max();
+ for (auto p : block->providers()) {
+ min_provider_depth = std::min(min_provider_depth, parallel_depth(p));
}
+
+ return 2 + min_provider_depth;
}
void
@@ -91,41 +116,34 @@ CompiledGraph::compile_graph(GraphImpl* graph)
// Mark all blocks as unvisited initially
b.set_mark(BlockImpl::Mark::UNVISITED);
- if (b.providers().empty()) {
- // Block has no dependencies, add to initial working set
+ if (b.dependants().empty()) {
+ // Block has no dependants, add to initial working set
blocks.insert(&b);
}
}
- // Compile initial working set into master task
- Task start(Task::Mode::PARALLEL);
- std::set<BlockImpl*> next;
- compile_set(blocks, start, next);
- _master.push_back(start);
+ // Keep compiling working set until all nodes are visited
+ while (!blocks.empty()) {
+ std::set<BlockImpl*> predecessors;
- // Keep compiling working set until all connected nodes are visited
- while (!next.empty()) {
- blocks.clear();
- // The working set is a parallel task...
- Task par(Task::Mode::PARALLEL);
- for (BlockImpl* block : next) {
- // ... where each block is the start of a new sequential task
- Task seq(Task::Mode::SEQUENTIAL);
- compile_block(block, seq, blocks);
- par.push_back(seq);
+ // Calculate maximum sequential depth to consume this phase
+ size_t depth = std::numeric_limits<size_t>::max();
+ for (auto i : blocks) {
+ depth = std::min(depth, parallel_depth(i));
}
- _master.push_back(par);
- next = blocks;
- }
- // Compile any nodes that weren't reached (disconnected cycles)
- for (auto& b : graph->blocks()) {
- if (b.get_mark() == BlockImpl::Mark::UNVISITED) {
- compile_block(&b, _master, next);
+ Task par(Task::Mode::PARALLEL);
+ for (auto b : blocks) {
+ assert(num_unvisited_dependants(b) == 0);
+ Task seq(Task::Mode::SEQUENTIAL);
+ compile_block(b, seq, depth, predecessors);
+ par.push_front(std::move(seq));
}
+ _master->push_front(std::move(par));
+ blocks = predecessors;
}
- _master.simplify();
+ _master = Task::simplify(std::move(_master));
if (graph->engine().world()->conf().option("trace").get<int32_t>()) {
dump([this](const std::string& msg) {
@@ -137,75 +155,93 @@ CompiledGraph::compile_graph(GraphImpl* graph)
/** Throw a FeedbackException iff `dependant` has `root` as a dependency. */
static void
-check_feedback(const BlockImpl* root, BlockImpl* dependant)
+check_feedback(const BlockImpl* root, BlockImpl* provider)
{
- if (dependant == root) {
+ if (provider == root) {
throw FeedbackException(root);
}
- for (auto& d : dependant->dependants()) {
- const BlockImpl::Mark mark = d->get_mark();
+ for (auto p : provider->providers()) {
+ const BlockImpl::Mark mark = p->get_mark();
switch (mark) {
case BlockImpl::Mark::UNVISITED:
- d->set_mark(BlockImpl::Mark::VISITING);
- check_feedback(root, d);
+ p->set_mark(BlockImpl::Mark::VISITING);
+ check_feedback(root, p);
break;
case BlockImpl::Mark::VISITING:
- throw FeedbackException(d, root);
+ throw FeedbackException(p, root);
case BlockImpl::Mark::VISITED:
break;
}
- d->set_mark(mark);
+ p->set_mark(mark);
}
}
void
-CompiledGraph::compile_dependant(const BlockImpl* root,
- BlockImpl* block,
- Task& task,
- std::set<BlockImpl*>& k)
+CompiledGraph::compile_provider(const BlockImpl* root,
+ BlockImpl* block,
+ Task& task,
+ size_t max_depth,
+ std::set<BlockImpl*>& k)
{
- if (block->providers().size() > 1) {
- /* Dependant has other providers, so this is the start of a sequential task.
- Add dependant to future working set and stop traversal. */
+ if (block->dependants().size() > 1) {
+ /* Provider has other dependants, so this is the tail of a sequential task.
+ Add provider to future working set and stop traversal. */
check_feedback(root, block);
- k.insert(block);
- } else {
- // Dependant has only this provider, add here
+ if (num_unvisited_dependants(block) == 0) {
+ k.insert(block);
+ }
+ } else if (max_depth > 0) {
+ // Calling dependant has only this provider, add here
if (task.mode() == Task::Mode::PARALLEL) {
// Inside a parallel task, compile into a new sequential child
Task seq(Task::Mode::SEQUENTIAL);
- compile_block(block, seq, k);
- task.push_back(seq);
+ compile_block(block, seq, max_depth, k);
+ task.push_front(std::move(seq));
} else {
- // Append to given sequential task
- compile_block(block, task, k);
+ // Prepend to given sequential task
+ compile_block(block, task, max_depth, k);
+ }
+ } else {
+ if (num_unvisited_dependants(block) == 0) {
+ k.insert(block);
}
}
}
void
-CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set<BlockImpl*>& k)
+CompiledGraph::compile_block(BlockImpl* n,
+ Task& task,
+ size_t max_depth,
+ std::set<BlockImpl*>& k)
{
switch (n->get_mark()) {
case BlockImpl::Mark::UNVISITED:
n->set_mark(BlockImpl::Mark::VISITING);
- // Execute this task before the dependants to follow
- task.push_back(Task(Task::Mode::SINGLE, n));
+ // Execute this task after the providers to follow
+ task.push_front(Task(Task::Mode::SINGLE, n));
- if (n->dependants().size() < 2) {
- // Single dependant, append to this sequential task
- for (auto& d : n->dependants()) {
- compile_dependant(n, d, task, k);
+ if (n->providers().size() < 2) {
+ // Single provider, prepend it to this sequential task
+ for (auto p : n->providers()) {
+ compile_provider(n, p, task, max_depth - 1, k);
+ }
+ } else if (has_provider_with_many_dependants(n)) {
+ // Stop recursion and enqueue providers for the next round
+ for (auto p : n->providers()) {
+ if (num_unvisited_dependants(p) == 0) {
+ k.insert(p);
+ }
}
} else {
- // Multiple dependants, create a new parallel task
+ // Multiple providers with only this node as dependant,
+ // make a new parallel task to execute them
Task par(Task::Mode::PARALLEL);
- for (auto& d : n->dependants()) {
- compile_dependant(n, d, par, k);
+ for (auto p : n->providers()) {
+ compile_provider(n, p, par, max_depth - 1, k);
}
- task.push_back(par);
+ task.push_front(std::move(par));
}
n->set_mark(BlockImpl::Mark::VISITED);
break;
@@ -221,7 +257,7 @@ CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set<BlockImpl*>& k)
void
CompiledGraph::run(RunContext& context)
{
- _master.run(context);
+ _master->run(context);
}
void
@@ -229,7 +265,7 @@ CompiledGraph::dump(std::function<void (const std::string&)> sink) const
{
sink("(compiled-graph ");
sink(_path);
- _master.dump(sink, 2, false);
+ _master->dump(sink, 2, false);
sink(")\n");
}
diff --git a/src/server/CompiledGraph.hpp b/src/server/CompiledGraph.hpp
index eeeb6111..0578c18a 100644
--- a/src/server/CompiledGraph.hpp
+++ b/src/server/CompiledGraph.hpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2016 David Robillard <http://drobilla.net/>
+ Copyright 2007-2017 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -61,16 +61,21 @@ private:
typedef std::set<BlockImpl*> BlockSet;
void compile_graph(GraphImpl* graph);
- void compile_set(const BlockSet& blocks, Task& task, BlockSet& k);
- void compile_block(BlockImpl* block, Task& task, BlockSet& k);
- void compile_dependant(const BlockImpl* root,
- BlockImpl* block,
- Task& task,
- BlockSet& k);
-
- Log& _log;
- const Raul::Path _path;
- Task _master;
+
+ void compile_block(BlockImpl* block,
+ Task& task,
+ size_t max_depth,
+ BlockSet& k);
+
+ void compile_provider(const BlockImpl* root,
+ BlockImpl* block,
+ Task& task,
+ size_t max_depth,
+ BlockSet& k);
+
+ Log& _log;
+ const Raul::Path _path;
+ std::unique_ptr<Task> _master;
};
} // namespace Server
diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp
index ea5a1402..084e995d 100644
--- a/src/server/Engine.hpp
+++ b/src/server/Engine.hpp
@@ -22,13 +22,13 @@
#include <mutex>
#include <random>
+#include "ingen/Clock.hpp"
#include "ingen/EngineBase.hpp"
#include "ingen/Interface.hpp"
#include "ingen/ingen.h"
#include "ingen/types.hpp"
#include "raul/Noncopyable.hpp"
-#include "Clock.hpp"
#include "Event.hpp"
#include "RunContext.hpp"
#include "EventWriter.hpp"
diff --git a/src/server/Task.cpp b/src/server/Task.cpp
index e19855d3..f15c9881 100644
--- a/src/server/Task.cpp
+++ b/src/server/Task.cpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2015-2016 David Robillard <http://drobilla.net/>
+ Copyright 2015-2017 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -30,14 +30,14 @@ Task::run(RunContext& context)
_block->process(context);
break;
case Mode::SEQUENTIAL:
- for (Task& task : *this) {
- task.run(context);
+ for (const auto& task : _children) {
+ task->run(context);
}
break;
case Mode::PARALLEL:
// Initialize (not) done state of sub-tasks
- for (Task& task : *this) {
- task.set_done(false);
+ for (const auto& task : _children) {
+ task->set_done(false);
}
// Grab the first sub-task
@@ -65,12 +65,12 @@ Task::steal(RunContext& context)
{
if (_mode == Mode::PARALLEL) {
const unsigned i = _next++;
- if (i < size()) {
- return &(*this)[i];
+ if (i < _children.size()) {
+ return _children[i].get();
}
}
- return NULL;
+ return nullptr;
}
Task*
@@ -84,15 +84,16 @@ Task::get_task(RunContext& context)
while (true) {
// Push done end index as forward as possible
- for (; _done_end < size() && (*this)[_done_end].done(); ++_done_end) {}
+ while (_done_end < _children.size() && _children[_done_end]->done()) {
+ ++_done_end;
+ }
- if (_done_end >= size()) {
- return NULL; // All child tasks are finished
+ if (_done_end >= _children.size()) {
+ return nullptr; // All child tasks are finished
}
// All child tasks claimed, but some are unfinished, steal a task
- t = context.engine().steal_task(context.id() + 1);
- if (t) {
+ if ((t = context.engine().steal_task(context.id() + 1))) {
return t;
}
@@ -104,30 +105,36 @@ Task::get_task(RunContext& context)
}
}
-void
-Task::simplify()
+std::unique_ptr<Task>
+Task::simplify(std::unique_ptr<Task>&& task)
{
- if (_mode != Mode::SINGLE) {
- for (std::vector<Task>::iterator t = begin(); t != end();) {
- t->simplify();
- if (t->mode() != Mode::SINGLE && t->empty()) {
- // Empty task, erase
- t = erase(t);
- } else if (t->mode() == _mode) {
- // Subtask with the same type, fold child into parent
- const Task child(*t);
- t = erase(t);
- t = insert(t, child.begin(), child.end());
- } else {
- ++t;
- }
- }
+ if (task->mode() == Mode::SINGLE) {
+ return std::move(task);
+ }
- if (size() == 1) {
- const Task t(front());
- *this = t;
+ for (auto t = task->_children.begin(); t != task->_children.end();) {
+ *t = simplify(std::move(*t));
+ if ((*t)->empty()) {
+ // Empty task, erase
+ t = task->_children.erase(t);
+ } else if ((*t)->mode() == task->_mode) {
+ // Subtask with the same type, fold child into parent
+ std::unique_ptr<Task> child(std::move(*t));
+ t = task->_children.erase(t);
+ t = task->_children.insert(
+ t,
+ std::make_move_iterator(child->_children.begin()),
+ std::make_move_iterator(child->_children.end()));
+ } else {
+ ++t;
}
}
+
+ if (task->_children.size() == 1) {
+ return std::move(task->_children.front());
+ }
+
+ return std::move(task);
}
void
@@ -144,8 +151,8 @@ Task::dump(std::function<void (const std::string&)> sink, unsigned indent, bool
sink(_block->path());
} else {
sink(((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par "));
- for (size_t i = 0; i < size(); ++i) {
- (*this)[i].dump(sink, indent + 5, i == 0);
+ for (size_t i = 0; i < _children.size(); ++i) {
+ _children[i]->dump(sink, indent + 5, i == 0);
}
sink(")");
}
diff --git a/src/server/Task.hpp b/src/server/Task.hpp
index 2c1a0cc2..982a6206 100644
--- a/src/server/Task.hpp
+++ b/src/server/Task.hpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2016 David Robillard <http://drobilla.net/>
+ Copyright 2007-2017 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -18,8 +18,9 @@
#define INGEN_ENGINE_TASK_HPP
#include <cassert>
+#include <deque>
+#include <memory>
#include <ostream>
-#include <vector>
namespace Ingen {
namespace Server {
@@ -27,7 +28,7 @@ namespace Server {
class BlockImpl;
class RunContext;
-class Task : public std::vector<Task> {
+class Task {
public:
enum class Mode {
SINGLE, ///< Single block to run
@@ -35,7 +36,7 @@ public:
PARALLEL ///< Elements may be run in any order in parallel
};
- Task(Mode mode, BlockImpl* block=NULL)
+ Task(Mode mode, BlockImpl* block = nullptr)
: _block(block)
, _mode(mode)
, _done_end(0)
@@ -45,23 +46,13 @@ public:
assert(!(mode == Mode::SINGLE && !block));
}
- Task& operator=(const Task& copy) {
- *static_cast<std::vector<Task>*>(this) = copy;
- _block = copy._block;
- _mode = copy._mode;
- _done_end = copy._done_end;
- _next = copy._next.load();
- _done = copy._done.load();
- return *this;
- }
-
- Task(const Task& copy)
- : std::vector<Task>(copy)
- , _block(copy._block)
- , _mode(copy._mode)
- , _done_end(copy._done_end)
- , _next(copy._next.load())
- , _done(copy._done.load())
+ Task(Task&& task)
+ : _children(std::move(task._children))
+ , _block(task._block)
+ , _mode(task._mode)
+ , _done_end(task._done_end)
+ , _next(task._next.load())
+ , _done(task._done.load())
{}
/** Run task in the given context. */
@@ -70,12 +61,20 @@ public:
/** Pretty print task to the given stream (recursively). */
void dump(std::function<void (const std::string&)> sink, unsigned indent, bool first) const;
+ /** Return true iff this is an empty task. */
+ bool empty() const { return _mode != Mode::SINGLE && _children.empty(); }
+
/** Simplify task expression. */
- void simplify();
+ static std::unique_ptr<Task> simplify(std::unique_ptr<Task>&& task);
/** Steal a child task from this task (succeeds for PARALLEL only). */
Task* steal(RunContext& context);
+ /** Prepend a child to this task. */
+ void push_front(Task&& task) {
+ _children.push_front(std::unique_ptr<Task>(new Task(std::move(task))));
+ }
+
Mode mode() const { return _mode; }
BlockImpl* block() const { return _block; }
bool done() const { return _done; }
@@ -83,8 +82,14 @@ public:
void set_done(bool done) { _done = done; }
private:
+ typedef std::deque<std::unique_ptr<Task>> Children;
+
+ Task(const Task&) = delete;
+ Task& operator=(const Task&) = delete;
+
Task* get_task(RunContext& context);
+ Children _children; ///< Vector of child tasks
BlockImpl* _block; ///< Used for SINGLE only
Mode _mode; ///< Execution mode
unsigned _done_end; ///< Index of rightmost done sub-task
diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp
index 9119ae14..00b59ca7 100644
--- a/src/server/ingen_lv2.cpp
+++ b/src/server/ingen_lv2.cpp
@@ -507,9 +507,9 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
}
IngenPlugin* plugin = new IngenPlugin();
- plugin->map = map;
- plugin->world = new Ingen::World(
- plugin->argc, plugin->argv, map, unmap, log);
+ plugin->map = map;
+ plugin->world = new Ingen::World(map, unmap, log);
+ plugin->world->load_configuration(plugin->argc, plugin->argv);
LV2_URID bufsz_max = map->map(map->handle, LV2_BUF_SIZE__maxBlockLength);
LV2_URID bufsz_seq = map->map(map->handle, LV2_BUF_SIZE__sequenceSize);