summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/server/CompiledGraph.cpp27
-rw-r--r--src/server/Engine.cpp4
-rw-r--r--src/server/Engine.hpp7
-rw-r--r--src/server/GraphImpl.cpp2
-rw-r--r--src/server/RunContext.cpp5
-rw-r--r--src/server/RunContext.hpp15
-rw-r--r--src/server/Task.cpp133
-rw-r--r--src/server/Task.hpp52
8 files changed, 163 insertions, 82 deletions
diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp
index 32302174..fbc7ab26 100644
--- a/src/server/CompiledGraph.cpp
+++ b/src/server/CompiledGraph.cpp
@@ -17,6 +17,8 @@
#include <algorithm>
#include <unordered_set>
+#include <boost/variant/get.hpp>
+
#include "ingen/ColorContext.hpp"
#include "ingen/Configuration.hpp"
#include "ingen/Log.hpp"
@@ -111,7 +113,7 @@ check_feedback(const BlockImpl* root, BlockImpl* node)
}
static void
-compile_block(BlockImpl* n, Task& task, BlockSet& k)
+compile_block(BlockImpl* n, ParTask& task, BlockSet& k)
{
switch (n->get_mark()) {
case BlockImpl::Mark::UNVISITED: {
@@ -119,7 +121,7 @@ compile_block(BlockImpl* n, Task& task, BlockSet& k)
if (has_provider_with_fan_out(n)) {
// Provider shared by other dependants, enqueue providers for later
// Prepends n to current task: (... n ...)
- task.emplace_front(n);
+ task.emplace_front(SingleTask{n});
for (auto p : n->providers()) {
check_feedback(n, p);
if (num_unvisited_dependants(p) == 0) {
@@ -129,13 +131,15 @@ compile_block(BlockImpl* n, Task& task, BlockSet& k)
} else {
// All providers have only this dependant
// Runs node after parallel providers: (... (seq (par p...) n) ...)
- task.emplace_front(Task::Mode::SEQUENTIAL);
- task.front().emplace_front(n);
+ SeqTask seq;
+ seq.emplace_front(SingleTask{n});
- task.front().emplace_front(Task::Mode::PARALLEL);
+ ParTask par;
for (auto p : n->providers()) {
- compile_block(p, task.front().front(), k);
+ compile_block(p, par, k);
}
+ seq.emplace_front(std::move(par));
+ task.emplace_front(std::move(seq));
}
break;
}
@@ -149,7 +153,7 @@ compile_block(BlockImpl* n, Task& task, BlockSet& k)
}
CompiledGraph::CompiledGraph(GraphImpl& graph)
- : _master(std::unique_ptr<Task>(new Task(Task::Mode::SEQUENTIAL)))
+ : _master(std::unique_ptr<Task>(new Task(SeqTask())))
{
ThreadManager::assert_thread(THREAD_PRE_PROCESS);
@@ -169,16 +173,17 @@ CompiledGraph::CompiledGraph(GraphImpl& graph)
while (!blocks.empty()) {
BlockSet predecessors;
- _master->emplace_front(Task::Mode::PARALLEL);
+ ParTask par;
for (auto b : blocks) {
assert(num_unvisited_dependants(b) == 0);
- compile_block(b, _master->front(), predecessors);
+ compile_block(b, par, predecessors);
}
+ boost::get<SeqTask>(_master.get())->emplace_front(std::move(par));
blocks = std::move(predecessors);
}
- _master = Task::simplify(std::move(_master));
+ _master = simplify(std::move(_master));
}
MPtr<CompiledGraph> compile(Raul::Maid& maid, GraphImpl& graph)
@@ -190,7 +195,7 @@ MPtr<CompiledGraph> compile(Raul::Maid& maid, GraphImpl& graph)
Log& log = graph.engine().log();
auto sink = [&log](const std::string& s) { log.trace(s); };
sink(graph.path() + " =>\n ");
- result->master().dump(sink, 2, true);
+ //result->master().dump(sink, 2, true);
sink("\n");
}
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index a7476845..fba7c7a3 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -260,9 +260,9 @@ Engine::steal_task(unsigned start_thread)
for (unsigned i = 0; i < _run_contexts.size(); ++i) {
const unsigned id = (start_thread + i) % _run_contexts.size();
RunContext* const ctx = _run_contexts[id];
- Task* par = ctx->task();
+ ParTask* par = ctx->task();
if (par) {
- Task* t = par->steal(*ctx);
+ Task* t = steal(*par, *ctx);
if (t) {
return t;
}
diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp
index f5ba1feb..1f8cbce1 100644
--- a/src/server/Engine.hpp
+++ b/src/server/Engine.hpp
@@ -58,10 +58,15 @@ class PostProcessor;
class PreProcessor;
class RunContext;
class SocketListener;
-class Task;
class UndoStack;
class Worker;
+struct SingleTask;
+struct SeqTask;
+struct ParTask;
+
+using Task = boost::variant<SingleTask, SeqTask, ParTask>;
+
/**
The engine which executes the process graph.
diff --git a/src/server/GraphImpl.cpp b/src/server/GraphImpl.cpp
index dc928179..ef3ef4d4 100644
--- a/src/server/GraphImpl.cpp
+++ b/src/server/GraphImpl.cpp
@@ -232,7 +232,7 @@ void
GraphImpl::run(RunContext& context)
{
if (_compiled_graph) {
- _compiled_graph->master().run(context);
+ Server::run(_compiled_graph->master(), context);
}
}
diff --git a/src/server/RunContext.cpp b/src/server/RunContext.cpp
index 3ab9d15c..b446addf 100644
--- a/src/server/RunContext.cpp
+++ b/src/server/RunContext.cpp
@@ -141,7 +141,7 @@ RunContext::emit_notifications(FrameTime end)
}
void
-RunContext::claim_task(Task* task)
+RunContext::claim_task(ParTask* task)
{
if ((_task = task)) {
_engine.signal_tasks_available();
@@ -186,7 +186,8 @@ RunContext::run()
{
while (_engine.wait_for_tasks()) {
for (Task* t; (t = _engine.steal_task(0));) {
- t->run(*this);
+ Server::run(*t, *this);
+ // t->run(*this);
}
}
}
diff --git a/src/server/RunContext.hpp b/src/server/RunContext.hpp
index bb64a250..6c8246ca 100644
--- a/src/server/RunContext.hpp
+++ b/src/server/RunContext.hpp
@@ -20,6 +20,8 @@
#include <cstdint>
#include <thread>
+#include <boost/variant/variant.hpp>
+
#include "lv2/lv2plug.in/ns/ext/urid/urid.h"
#include "raul/RingBuffer.hpp"
@@ -30,7 +32,12 @@ namespace Server {
class Engine;
class PortImpl;
-class Task;
+
+struct SingleTask;
+struct SeqTask;
+struct ParTask;
+
+using Task = boost::variant<SingleTask, SeqTask, ParTask>;
/** Graph execution context.
*
@@ -115,7 +122,7 @@ public:
}
/** Claim a parallel task, and signal others that work is available. */
- void claim_task(Task* task);
+ void claim_task(ParTask* task);
/** Steal a task from some other context if possible. */
Task* steal_task() const;
@@ -126,7 +133,7 @@ public:
void join();
inline Engine& engine() const { return _engine; }
- inline Task* task() const { return _task; }
+ inline ParTask* task() const { return _task; }
inline unsigned id() const { return _id; }
inline FrameTime start() const { return _start; }
inline FrameTime time() const { return _start + _offset; }
@@ -143,7 +150,7 @@ protected:
Engine& _engine; ///< Engine we're running in
Raul::RingBuffer* _event_sink; ///< Port updates from process context
- Task* _task; ///< Currently executing task
+ ParTask* _task; ///< Currently executing task
std::thread* _thread; ///< Thread (NULL for main run context)
unsigned _id; ///< Context ID
diff --git a/src/server/Task.cpp b/src/server/Task.cpp
index d2cb2683..0978e15d 100644
--- a/src/server/Task.cpp
+++ b/src/server/Task.cpp
@@ -17,76 +17,27 @@
#include "BlockImpl.hpp"
#include "Task.hpp"
+#include <boost/variant/get.hpp>
+
namespace Ingen {
namespace Server {
-void
-Task::run(RunContext& context)
-{
- switch (_mode) {
- case Mode::SINGLE:
- // fprintf(stderr, "%u run %s\n", context.id(), _block->path().c_str());
- _block->process(context);
- break;
- case Mode::SEQUENTIAL:
- for (const auto& task : _children) {
- task->run(context);
- }
- break;
- case Mode::PARALLEL:
- // Initialize (not) done state of sub-tasks
- for (const auto& task : _children) {
- task->set_done(false);
- }
-
- // Grab the first sub-task
- _next = 0;
- _done_end = 0;
- Task* t = steal(context);
-
- // Allow other threads to steal sub-tasks
- context.claim_task(this);
-
- // Run available tasks until this task is finished
- for (; t; t = get_task(context)) {
- t->run(context);
- }
- context.claim_task(nullptr);
- break;
- }
-
- set_done(true);
-}
-
-Task*
-Task::steal(RunContext& context)
-{
- if (_mode == Mode::PARALLEL) {
- const unsigned i = _next++;
- if (i < _children.size()) {
- return _children[i].get();
- }
- }
-
- return nullptr;
-}
-
-Task*
-Task::get_task(RunContext& context)
+static Task*
+get_task(ParTask& task, RunContext& context)
{
// Attempt to "steal" a task from ourselves
- Task* t = steal(context);
+ Task* t = steal(task, context);
if (t) {
return t;
}
while (true) {
// Push done end index as forward as possible
- while (_done_end < _children.size() && _children[_done_end]->done()) {
- ++_done_end;
- }
+ // while (task.done_end < task.children.size() && task.children[task.done_end]->done()) {
+ // ++task.done_end;
+ // }
- if (_done_end >= _children.size()) {
+ if (task.done_end >= task.children.size()) {
return nullptr; // All child tasks are finished
}
@@ -103,10 +54,67 @@ Task::get_task(RunContext& context)
}
}
+struct Runner
+{
+ using result_type = void;
+
+ void operator()(SingleTask& task) {
+ task.block->process(context);
+ }
+
+ void operator()(SeqTask& task) {
+ for (const auto& child : task.children) {
+ run(*child, context);
+ }
+ }
+
+ void operator()(ParTask& task) {
+ // Initialize (not) done state of sub-tasks
+ // for (const auto& child : task.children) {
+ // child.done = false;
+ // }
+
+ // Grab the first sub-task
+ task.next = 0;
+ task.done_end = 0;
+ Task* t = steal(task, context);
+
+ // Allow other threads to steal sub-tasks
+ context.claim_task(&task);
+
+ // Run available tasks until this task is finished
+ for (; t; t = get_task(task, context)) {
+ run(*t, context);
+ }
+ context.claim_task(nullptr);
+ }
+
+ RunContext context;
+};
+
+void
+run(Task& task, RunContext& context)
+{
+ Runner runner{context};
+ boost::apply_visitor(runner, task);
+}
+
+Task*
+steal(ParTask& task, RunContext& context)
+{
+ const unsigned i = task.next++;
+ if (i < task.children.size()) {
+ return task.children[i].get();
+ }
+
+ return nullptr;
+}
+
std::unique_ptr<Task>
-Task::simplify(std::unique_ptr<Task>&& task)
+simplify(std::unique_ptr<Task>&& task)
{
- if (task->mode() == Mode::SINGLE) {
+#if 0
+ if (boost::get<SingleTask>(task.get())) {
return std::move(task);
}
@@ -129,10 +137,12 @@ Task::simplify(std::unique_ptr<Task>&& task)
if (ret->_children.size() == 1) {
return std::move(ret->_children.front());
}
-
return ret;
+#endif
+ return nullptr;
}
+#if 0
void
Task::dump(std::function<void (const std::string&)> sink, unsigned indent, bool first) const
{
@@ -153,6 +163,7 @@ Task::dump(std::function<void (const std::string&)> sink, unsigned indent, bool
sink(")");
}
}
+#endif
} // namespace Server
} // namespace Ingen
diff --git a/src/server/Task.hpp b/src/server/Task.hpp
index 99fe347d..25f76260 100644
--- a/src/server/Task.hpp
+++ b/src/server/Task.hpp
@@ -24,12 +24,55 @@
#include <memory>
#include <ostream>
+#include <boost/variant/variant.hpp>
+
namespace Ingen {
namespace Server {
class BlockImpl;
class RunContext;
+struct SingleTask;
+struct MultiTask;
+struct SeqTask;
+struct ParTask;
+
+using Task = boost::variant<SingleTask, SeqTask, ParTask>;
+
+using TaskChildren = std::deque<std::unique_ptr<Task>>;
+
+struct SingleTask {
+ BlockImpl* block; ///< Block to run
+};
+
+struct MultiTask {
+ /** Prepend a child to this task. */
+ template<typename... Args>
+ void emplace_front(Args... args) {
+ children.emplace_front(new Task(std::forward<Args>(args)...));
+ }
+
+ TaskChildren children; ///< Child tasks
+};
+
+struct SeqTask : MultiTask {
+};
+
+struct ParTask : MultiTask {
+ ParTask() = default;
+
+ ParTask(ParTask&& task)
+ : done_end(task.done_end)
+ , next(task.next.load())
+ , done(task.done.load())
+ {}
+
+ unsigned done_end{0}; ///< Index of rightmost done sub-task
+ std::atomic<unsigned> next{0}; ///< Index of next sub-task
+ std::atomic<bool> done{false}; ///< Completion phase
+};
+
+ #if 0
class Task {
public:
enum class Mode {
@@ -112,6 +155,15 @@ private:
std::atomic<bool> _done; ///< Completion phase
};
+#endif
+
+void run(Task& task, RunContext& context);
+
+std::unique_ptr<Task> simplify(std::unique_ptr<Task>&& task);
+
+/** Steal a child task from this task. */
+Task* steal(ParTask& task, RunContext& context);
+
} // namespace Server
} // namespace Ingen