From c9863ba79edbac7ae78ad0e374a2988c816ad31a Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sat, 24 Mar 2018 14:07:08 +0100 Subject: WIP: Variant task --- src/server/CompiledGraph.cpp | 27 +++++---- src/server/Engine.cpp | 4 +- src/server/Engine.hpp | 7 ++- src/server/GraphImpl.cpp | 2 +- src/server/RunContext.cpp | 5 +- src/server/RunContext.hpp | 15 +++-- src/server/Task.cpp | 133 +++++++++++++++++++++++-------------------- src/server/Task.hpp | 52 +++++++++++++++++ 8 files changed, 163 insertions(+), 82 deletions(-) (limited to 'src/server') diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp index 32302174..fbc7ab26 100644 --- a/src/server/CompiledGraph.cpp +++ b/src/server/CompiledGraph.cpp @@ -17,6 +17,8 @@ #include #include +#include + #include "ingen/ColorContext.hpp" #include "ingen/Configuration.hpp" #include "ingen/Log.hpp" @@ -111,7 +113,7 @@ check_feedback(const BlockImpl* root, BlockImpl* node) } static void -compile_block(BlockImpl* n, Task& task, BlockSet& k) +compile_block(BlockImpl* n, ParTask& task, BlockSet& k) { switch (n->get_mark()) { case BlockImpl::Mark::UNVISITED: { @@ -119,7 +121,7 @@ compile_block(BlockImpl* n, Task& task, BlockSet& k) if (has_provider_with_fan_out(n)) { // Provider shared by other dependants, enqueue providers for later // Prepends n to current task: (... n ...) - task.emplace_front(n); + task.emplace_front(SingleTask{n}); for (auto p : n->providers()) { check_feedback(n, p); if (num_unvisited_dependants(p) == 0) { @@ -129,13 +131,15 @@ compile_block(BlockImpl* n, Task& task, BlockSet& k) } else { // All providers have only this dependant // Runs node after parallel providers: (... (seq (par p...) n) ...) - task.emplace_front(Task::Mode::SEQUENTIAL); - task.front().emplace_front(n); + SeqTask seq; + seq.emplace_front(SingleTask{n}); - task.front().emplace_front(Task::Mode::PARALLEL); + ParTask par; for (auto p : n->providers()) { - compile_block(p, task.front().front(), k); + compile_block(p, par, k); } + seq.emplace_front(std::move(par)); + task.emplace_front(std::move(seq)); } break; } @@ -149,7 +153,7 @@ compile_block(BlockImpl* n, Task& task, BlockSet& k) } CompiledGraph::CompiledGraph(GraphImpl& graph) - : _master(std::unique_ptr(new Task(Task::Mode::SEQUENTIAL))) + : _master(std::unique_ptr(new Task(SeqTask()))) { ThreadManager::assert_thread(THREAD_PRE_PROCESS); @@ -169,16 +173,17 @@ CompiledGraph::CompiledGraph(GraphImpl& graph) while (!blocks.empty()) { BlockSet predecessors; - _master->emplace_front(Task::Mode::PARALLEL); + ParTask par; for (auto b : blocks) { assert(num_unvisited_dependants(b) == 0); - compile_block(b, _master->front(), predecessors); + compile_block(b, par, predecessors); } + boost::get(_master.get())->emplace_front(std::move(par)); blocks = std::move(predecessors); } - _master = Task::simplify(std::move(_master)); + _master = simplify(std::move(_master)); } MPtr compile(Raul::Maid& maid, GraphImpl& graph) @@ -190,7 +195,7 @@ MPtr compile(Raul::Maid& maid, GraphImpl& graph) Log& log = graph.engine().log(); auto sink = [&log](const std::string& s) { log.trace(s); }; sink(graph.path() + " =>\n "); - result->master().dump(sink, 2, true); + //result->master().dump(sink, 2, true); sink("\n"); } diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index a7476845..fba7c7a3 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -260,9 +260,9 @@ Engine::steal_task(unsigned start_thread) for (unsigned i = 0; i < _run_contexts.size(); ++i) { const unsigned id = (start_thread + i) % _run_contexts.size(); RunContext* const ctx = _run_contexts[id]; - Task* par = ctx->task(); + ParTask* par = ctx->task(); if (par) { - Task* t = par->steal(*ctx); + Task* t = steal(*par, *ctx); if (t) { return t; } diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index f5ba1feb..1f8cbce1 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -58,10 +58,15 @@ class PostProcessor; class PreProcessor; class RunContext; class SocketListener; -class Task; class UndoStack; class Worker; +struct SingleTask; +struct SeqTask; +struct ParTask; + +using Task = boost::variant; + /** The engine which executes the process graph. diff --git a/src/server/GraphImpl.cpp b/src/server/GraphImpl.cpp index dc928179..ef3ef4d4 100644 --- a/src/server/GraphImpl.cpp +++ b/src/server/GraphImpl.cpp @@ -232,7 +232,7 @@ void GraphImpl::run(RunContext& context) { if (_compiled_graph) { - _compiled_graph->master().run(context); + Server::run(_compiled_graph->master(), context); } } diff --git a/src/server/RunContext.cpp b/src/server/RunContext.cpp index 3ab9d15c..b446addf 100644 --- a/src/server/RunContext.cpp +++ b/src/server/RunContext.cpp @@ -141,7 +141,7 @@ RunContext::emit_notifications(FrameTime end) } void -RunContext::claim_task(Task* task) +RunContext::claim_task(ParTask* task) { if ((_task = task)) { _engine.signal_tasks_available(); @@ -186,7 +186,8 @@ RunContext::run() { while (_engine.wait_for_tasks()) { for (Task* t; (t = _engine.steal_task(0));) { - t->run(*this); + Server::run(*t, *this); + // t->run(*this); } } } diff --git a/src/server/RunContext.hpp b/src/server/RunContext.hpp index bb64a250..6c8246ca 100644 --- a/src/server/RunContext.hpp +++ b/src/server/RunContext.hpp @@ -20,6 +20,8 @@ #include #include +#include + #include "lv2/lv2plug.in/ns/ext/urid/urid.h" #include "raul/RingBuffer.hpp" @@ -30,7 +32,12 @@ namespace Server { class Engine; class PortImpl; -class Task; + +struct SingleTask; +struct SeqTask; +struct ParTask; + +using Task = boost::variant; /** Graph execution context. * @@ -115,7 +122,7 @@ public: } /** Claim a parallel task, and signal others that work is available. */ - void claim_task(Task* task); + void claim_task(ParTask* task); /** Steal a task from some other context if possible. */ Task* steal_task() const; @@ -126,7 +133,7 @@ public: void join(); inline Engine& engine() const { return _engine; } - inline Task* task() const { return _task; } + inline ParTask* task() const { return _task; } inline unsigned id() const { return _id; } inline FrameTime start() const { return _start; } inline FrameTime time() const { return _start + _offset; } @@ -143,7 +150,7 @@ protected: Engine& _engine; ///< Engine we're running in Raul::RingBuffer* _event_sink; ///< Port updates from process context - Task* _task; ///< Currently executing task + ParTask* _task; ///< Currently executing task std::thread* _thread; ///< Thread (NULL for main run context) unsigned _id; ///< Context ID diff --git a/src/server/Task.cpp b/src/server/Task.cpp index d2cb2683..0978e15d 100644 --- a/src/server/Task.cpp +++ b/src/server/Task.cpp @@ -17,76 +17,27 @@ #include "BlockImpl.hpp" #include "Task.hpp" +#include + namespace Ingen { namespace Server { -void -Task::run(RunContext& context) -{ - switch (_mode) { - case Mode::SINGLE: - // fprintf(stderr, "%u run %s\n", context.id(), _block->path().c_str()); - _block->process(context); - break; - case Mode::SEQUENTIAL: - for (const auto& task : _children) { - task->run(context); - } - break; - case Mode::PARALLEL: - // Initialize (not) done state of sub-tasks - for (const auto& task : _children) { - task->set_done(false); - } - - // Grab the first sub-task - _next = 0; - _done_end = 0; - Task* t = steal(context); - - // Allow other threads to steal sub-tasks - context.claim_task(this); - - // Run available tasks until this task is finished - for (; t; t = get_task(context)) { - t->run(context); - } - context.claim_task(nullptr); - break; - } - - set_done(true); -} - -Task* -Task::steal(RunContext& context) -{ - if (_mode == Mode::PARALLEL) { - const unsigned i = _next++; - if (i < _children.size()) { - return _children[i].get(); - } - } - - return nullptr; -} - -Task* -Task::get_task(RunContext& context) +static Task* +get_task(ParTask& task, RunContext& context) { // Attempt to "steal" a task from ourselves - Task* t = steal(context); + Task* t = steal(task, context); if (t) { return t; } while (true) { // Push done end index as forward as possible - while (_done_end < _children.size() && _children[_done_end]->done()) { - ++_done_end; - } + // while (task.done_end < task.children.size() && task.children[task.done_end]->done()) { + // ++task.done_end; + // } - if (_done_end >= _children.size()) { + if (task.done_end >= task.children.size()) { return nullptr; // All child tasks are finished } @@ -103,10 +54,67 @@ Task::get_task(RunContext& context) } } +struct Runner +{ + using result_type = void; + + void operator()(SingleTask& task) { + task.block->process(context); + } + + void operator()(SeqTask& task) { + for (const auto& child : task.children) { + run(*child, context); + } + } + + void operator()(ParTask& task) { + // Initialize (not) done state of sub-tasks + // for (const auto& child : task.children) { + // child.done = false; + // } + + // Grab the first sub-task + task.next = 0; + task.done_end = 0; + Task* t = steal(task, context); + + // Allow other threads to steal sub-tasks + context.claim_task(&task); + + // Run available tasks until this task is finished + for (; t; t = get_task(task, context)) { + run(*t, context); + } + context.claim_task(nullptr); + } + + RunContext context; +}; + +void +run(Task& task, RunContext& context) +{ + Runner runner{context}; + boost::apply_visitor(runner, task); +} + +Task* +steal(ParTask& task, RunContext& context) +{ + const unsigned i = task.next++; + if (i < task.children.size()) { + return task.children[i].get(); + } + + return nullptr; +} + std::unique_ptr -Task::simplify(std::unique_ptr&& task) +simplify(std::unique_ptr&& task) { - if (task->mode() == Mode::SINGLE) { +#if 0 + if (boost::get(task.get())) { return std::move(task); } @@ -129,10 +137,12 @@ Task::simplify(std::unique_ptr&& task) if (ret->_children.size() == 1) { return std::move(ret->_children.front()); } - return ret; +#endif + return nullptr; } +#if 0 void Task::dump(std::function sink, unsigned indent, bool first) const { @@ -153,6 +163,7 @@ Task::dump(std::function sink, unsigned indent, bool sink(")"); } } +#endif } // namespace Server } // namespace Ingen diff --git a/src/server/Task.hpp b/src/server/Task.hpp index 99fe347d..25f76260 100644 --- a/src/server/Task.hpp +++ b/src/server/Task.hpp @@ -24,12 +24,55 @@ #include #include +#include + namespace Ingen { namespace Server { class BlockImpl; class RunContext; +struct SingleTask; +struct MultiTask; +struct SeqTask; +struct ParTask; + +using Task = boost::variant; + +using TaskChildren = std::deque>; + +struct SingleTask { + BlockImpl* block; ///< Block to run +}; + +struct MultiTask { + /** Prepend a child to this task. */ + template + void emplace_front(Args... args) { + children.emplace_front(new Task(std::forward(args)...)); + } + + TaskChildren children; ///< Child tasks +}; + +struct SeqTask : MultiTask { +}; + +struct ParTask : MultiTask { + ParTask() = default; + + ParTask(ParTask&& task) + : done_end(task.done_end) + , next(task.next.load()) + , done(task.done.load()) + {} + + unsigned done_end{0}; ///< Index of rightmost done sub-task + std::atomic next{0}; ///< Index of next sub-task + std::atomic done{false}; ///< Completion phase +}; + + #if 0 class Task { public: enum class Mode { @@ -112,6 +155,15 @@ private: std::atomic _done; ///< Completion phase }; +#endif + +void run(Task& task, RunContext& context); + +std::unique_ptr simplify(std::unique_ptr&& task); + +/** Steal a child task from this task. */ +Task* steal(ParTask& task, RunContext& context); + } // namespace Server } // namespace Ingen -- cgit v1.2.1