diff options
author | David Robillard <d@drobilla.net> | 2017-11-12 19:57:56 +0100 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2017-11-12 19:58:13 +0100 |
commit | f8cd75372d9c45a0a8616d89a18a5b9906ac9d54 (patch) | |
tree | eb9b06bc00c2d3b36f1f56cbd6fad7cd33df6ae0 /src/server | |
parent | a66df006ccd3426148491f0a4c21c9facb8104ea (diff) | |
download | ingen-f8cd75372d9c45a0a8616d89a18a5b9906ac9d54.tar.gz ingen-f8cd75372d9c45a0a8616d89a18a5b9906ac9d54.tar.bz2 ingen-f8cd75372d9c45a0a8616d89a18a5b9906ac9d54.zip |
WIP: Clean up task implementationtasks
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/CompiledGraph.cpp | 163 | ||||
-rw-r--r-- | src/server/CompiledGraph.hpp | 53 | ||||
-rw-r--r-- | src/server/GraphImpl.hpp | 1 | ||||
-rw-r--r-- | src/server/RunContext.cpp | 13 | ||||
-rw-r--r-- | src/server/RunContext.hpp | 17 | ||||
-rw-r--r-- | src/server/Runner.hpp | 44 | ||||
-rw-r--r-- | src/server/Task.cpp | 59 | ||||
-rw-r--r-- | src/server/Task.hpp | 85 | ||||
-rw-r--r-- | src/server/Work.hpp | 55 |
9 files changed, 266 insertions, 224 deletions
diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp index 0d51f284..e4af74ba 100644 --- a/src/server/CompiledGraph.cpp +++ b/src/server/CompiledGraph.cpp @@ -15,6 +15,7 @@ */ #include <algorithm> +#include <set> #include "ingen/ColorContext.hpp" #include "ingen/Configuration.hpp" @@ -23,11 +24,14 @@ #include "CompiledGraph.hpp" #include "Engine.hpp" #include "GraphImpl.hpp" -#include "ThreadManager.hpp" namespace Ingen { namespace Server { +typedef std::set<BlockImpl*> BlockSet; +typedef Task BlockTask; +typedef Work<BlockImpl> IR; + /** Graph contains ambiguous feedback with no delay nodes. */ class FeedbackException : public std::exception { public: @@ -52,30 +56,6 @@ has_provider_with_many_dependants(BlockImpl* n) return false; } -CompiledGraph::CompiledGraph(GraphImpl* graph) - : _master(Task::Mode::SEQUENTIAL) -{ - compile_graph(graph); -} - -MPtr<CompiledGraph> -CompiledGraph::compile(Raul::Maid& maid, GraphImpl& graph) -{ - try { - return maid.make_managed<CompiledGraph>(&graph); - } catch (FeedbackException e) { - Log& log = graph.engine().log(); - if (e.node && e.root) { - log.error(fmt("Feedback compiling %1% from %2%\n") - % e.node->path() % e.root->path()); - } else { - log.error(fmt("Feedback compiling %1%\n") - % e.node->path()); - } - return MPtr<CompiledGraph>(); - } -} - static size_t num_unvisited_dependants(BlockImpl* block) { @@ -103,11 +83,15 @@ parallel_depth(BlockImpl* block) return 2 + min_provider_depth; } -void -CompiledGraph::compile_graph(GraphImpl* graph) -{ - ThreadManager::assert_thread(THREAD_PRE_PROCESS); +static void +compile_block(BlockImpl* n, + IR& task, + size_t max_depth, + std::set<BlockImpl*>& k); +static BlockTask +compile_graph(GraphImpl* graph) +{ // Start with sink nodes (no outputs, or connected only to graph outputs) std::set<BlockImpl*> blocks; for (auto& b : graph->blocks()) { @@ -120,6 +104,8 @@ CompiledGraph::compile_graph(GraphImpl* graph) } } + IR main = IR::sequential(); + // Keep compiling working set until all nodes are visited while (!blocks.empty()) { std::set<BlockImpl*> predecessors; @@ -130,26 +116,21 @@ CompiledGraph::compile_graph(GraphImpl* graph) depth = std::min(depth, parallel_depth(i)); } - Task par(Task::Mode::PARALLEL); + IR par = IR::parallel(); for (auto b : blocks) { assert(num_unvisited_dependants(b) == 0); - Task seq(Task::Mode::SEQUENTIAL); + IR seq = IR::sequential(); compile_block(b, seq, depth, predecessors); - par.push_front(std::move(seq)); + par.prepend(std::move(seq)); } - _master.push_front(std::move(par)); + main.prepend(std::move(par)); blocks = predecessors; } - _master = Task::simplify(std::move(_master)); - - if (graph->engine().world()->conf().option("trace").get<int32_t>()) { - ColorContext ctx(stderr, ColorContext::Color::YELLOW); - dump(graph->path()); - } + return BlockTask::compile(main); } -/** Throw a FeedbackException iff `dependant` has `root` as a dependency. */ +/** Throw a FeedbackException iff `provider` has `root` as a dependency. */ static void check_feedback(const BlockImpl* root, BlockImpl* provider) { @@ -173,12 +154,12 @@ check_feedback(const BlockImpl* root, BlockImpl* provider) } } -void -CompiledGraph::compile_provider(const BlockImpl* root, - BlockImpl* block, - Task& task, - size_t max_depth, - std::set<BlockImpl*>& k) +static void +compile_provider(const BlockImpl* root, + BlockImpl* block, + IR& task, + size_t max_depth, + std::set<BlockImpl*>& k) { if (block->dependants().size() > 1) { /* Provider has other dependants, so this is the tail of a sequential task. @@ -189,11 +170,11 @@ CompiledGraph::compile_provider(const BlockImpl* root, } } else if (max_depth > 0) { // Calling dependant has only this provider, add here - if (task.mode() == Task::Mode::PARALLEL) { + if (task.mode() == IR::Mode::PARALLEL) { // Inside a parallel task, compile into a new sequential child - Task seq(Task::Mode::SEQUENTIAL); + IR seq = IR::sequential(); compile_block(block, seq, max_depth, k); - task.push_front(std::move(seq)); + task.prepend(std::move(seq)); } else { // Prepend to given sequential task compile_block(block, task, max_depth, k); @@ -205,18 +186,18 @@ CompiledGraph::compile_provider(const BlockImpl* root, } } -void -CompiledGraph::compile_block(BlockImpl* n, - Task& task, - size_t max_depth, - std::set<BlockImpl*>& k) +static void +compile_block(BlockImpl* n, + IR& task, + size_t max_depth, + std::set<BlockImpl*>& k) { switch (n->get_mark()) { case BlockImpl::Mark::UNVISITED: n->set_mark(BlockImpl::Mark::VISITING); // Execute this task after the providers to follow - task.push_front(Task(Task::Mode::SINGLE, n)); + task.prepend(IR::unit(*n)); if (n->providers().size() < 2) { // Single provider, prepend it to this sequential task @@ -233,11 +214,11 @@ CompiledGraph::compile_block(BlockImpl* n, } else { // Multiple providers with only this node as dependant, // make a new parallel task to execute them - Task par(Task::Mode::PARALLEL); + IR par = IR::parallel(); for (auto p : n->providers()) { compile_provider(n, p, par, max_depth - 1, k); } - task.push_front(std::move(par)); + task.prepend(std::move(par)); } n->set_mark(BlockImpl::Mark::VISITED); break; @@ -250,23 +231,65 @@ CompiledGraph::compile_block(BlockImpl* n, } } -void -CompiledGraph::run(RunContext& context) +static void +dump_string(const std::string& str) { - _master.run(context); + fwrite(str.c_str(), 1, str.size(), stderr); } -void -CompiledGraph::dump(const std::string& name) const +static void +dump_task(const BlockTask& task, unsigned indent, bool first) +{ + if (!first) { + dump_string("\n"); + for (unsigned i = 0; i < indent; ++i) { + dump_string(" "); + } + } + + if (task.mode() == BlockTask::Mode::UNIT) { + dump_string(task.unit()->path()); + } else { + dump_string((task.mode() == BlockTask::Mode::SEQUENTIAL) + ? "(seq " + : "(par "); + for (size_t i = 0; i < task.children().size(); ++i) { + dump_task(task.children()[i], indent + 5, i == 0); + } + dump_string(")"); + } +} + +static void +dump(const BlockTask& task, const std::string& name) +{ + dump_string("(compiled-graph "); + dump_string(name); + dump_task(task, 2, false); + dump_string(")\n"); +} + +MPtr<CompiledGraph> +compile(Raul::Maid& maid, GraphImpl& graph) { - auto sink = [](const std::string& s) { - fwrite(s.c_str(), 1, s.size(), stderr); - }; - - sink("(compiled-graph "); - sink(name); - _master.dump(sink, 2, false); - sink(")\n"); + try { + BlockTask master(compile_graph(&graph)); + if (graph.engine().world()->conf().option("trace").get<int32_t>()) { + ColorContext ctx(stderr, ColorContext::Color::YELLOW); + dump(master, graph.path()); + } + return maid.make_managed<CompiledGraph>(std::move(master)); + } catch (FeedbackException e) { + Log& log = graph.engine().log(); + if (e.node && e.root) { + log.error(fmt("Feedback compiling %1% from %2%\n") + % e.node->path() % e.root->path()); + } else { + log.error(fmt("Feedback compiling %1%\n") + % e.node->path()); + } + return MPtr<CompiledGraph>(); + } } } // namespace Server diff --git a/src/server/CompiledGraph.hpp b/src/server/CompiledGraph.hpp index 833ae0c6..24a12351 100644 --- a/src/server/CompiledGraph.hpp +++ b/src/server/CompiledGraph.hpp @@ -17,66 +17,21 @@ #ifndef INGEN_ENGINE_COMPILEDGRAPH_HPP #define INGEN_ENGINE_COMPILEDGRAPH_HPP -#include <functional> -#include <set> -#include <vector> - #include "ingen/types.hpp" #include "raul/Maid.hpp" -#include "raul/Noncopyable.hpp" #include "Task.hpp" namespace Ingen { namespace Server { -class BlockImpl; class GraphImpl; -class RunContext; - -/** A graph ``compiled'' into a quickly executable form. - * - * This is a flat sequence of nodes ordered such that the process thread can - * execute the nodes in order and have nodes always executed before any of - * their dependencies. - */ -class CompiledGraph : public Raul::Maid::Disposable - , public Raul::Noncopyable -{ -public: - static MPtr<CompiledGraph> compile(Raul::Maid& maid, GraphImpl& graph); - - void run(RunContext& context); - -private: - friend class Raul::Maid; ///< Allow make_managed to construct - - CompiledGraph(GraphImpl* graph); - - typedef std::set<BlockImpl*> BlockSet; - - void dump(const std::string& name) const; - - void compile_graph(GraphImpl* graph); - - void compile_block(BlockImpl* block, - Task& task, - size_t max_depth, - BlockSet& k); - - void compile_provider(const BlockImpl* root, - BlockImpl* block, - Task& task, - size_t max_depth, - BlockSet& k); +class BlockImpl; - Task _master; -}; +/** A graph ``compiled'' into an efficiently executable form. */ +using CompiledGraph = Raul::Maid::Managed<Task>; -inline MPtr<CompiledGraph> compile(Raul::Maid& maid, GraphImpl& graph) -{ - return CompiledGraph::compile(maid, graph); -} +MPtr<CompiledGraph> compile(Raul::Maid& maid, GraphImpl& graph); } // namespace Server } // namespace Ingen diff --git a/src/server/GraphImpl.hpp b/src/server/GraphImpl.hpp index 6064624a..e75716b6 100644 --- a/src/server/GraphImpl.hpp +++ b/src/server/GraphImpl.hpp @@ -35,7 +35,6 @@ class Arc; namespace Server { class ArcImpl; -class CompiledGraph; class Engine; class RunContext; diff --git a/src/server/RunContext.cpp b/src/server/RunContext.cpp index b2e3f269..5d5c9e33 100644 --- a/src/server/RunContext.cpp +++ b/src/server/RunContext.cpp @@ -18,6 +18,7 @@ #include "ingen/Log.hpp" #include "ingen/URIMap.hpp" +#include "BlockImpl.hpp" #include "Broadcaster.hpp" #include "BufferFactory.hpp" #include "Engine.hpp" @@ -141,20 +142,26 @@ RunContext::emit_notifications(FrameTime end) } void -RunContext::claim_task(Task* task) +RunContext::claim_task(BlockTask* task) { if ((_task = task)) { _engine.signal_tasks_available(); } } -Task* +RunContext::BlockTask* RunContext::steal_task() const { return _engine.steal_task(_id + 1); } void +RunContext::run_unit(BlockImpl* unit) +{ + unit->process(*this); +} + +void RunContext::set_priority(int priority) { if (_thread) { @@ -185,7 +192,7 @@ void RunContext::run() { while (_engine.wait_for_tasks()) { - for (Task* t; (t = _engine.steal_task(0));) { + for (BlockTask* t; (t = _engine.steal_task(0));) { t->run(*this); } } diff --git a/src/server/RunContext.hpp b/src/server/RunContext.hpp index 87fb0fed..939371e7 100644 --- a/src/server/RunContext.hpp +++ b/src/server/RunContext.hpp @@ -23,11 +23,13 @@ #include "ingen/World.hpp" #include "raul/RingBuffer.hpp" +#include "Runner.hpp" #include "types.hpp" namespace Ingen { namespace Server { +class BlockImpl; class Engine; class PortImpl; class Task; @@ -44,9 +46,11 @@ class Task; * * \ingroup engine */ -class RunContext +class RunContext : public Runner { public: + using BlockTask = Task; + /** Create a new run context. * * @param engine The engine this context is running within. @@ -115,10 +119,13 @@ public: } /** Claim a parallel task, and signal others that work is available. */ - void claim_task(Task* task); + void claim_task(BlockTask* task) override; /** Steal a task from some other context if possible. */ - Task* steal_task() const; + BlockTask* steal_task() const override; + + /** Run `unit`. */ + void run_unit(BlockImpl* unit) override; void set_priority(int priority); void set_rate(SampleCount rate) { _rate = rate; } @@ -126,7 +133,7 @@ public: void join(); inline Engine& engine() const { return _engine; } - inline Task* task() const { return _task; } + inline BlockTask* task() const { return _task; } inline unsigned id() const { return _id; } inline FrameTime start() const { return _start; } inline FrameTime time() const { return _start + _offset; } @@ -143,7 +150,7 @@ protected: Engine& _engine; ///< Engine we're running in Raul::RingBuffer* _event_sink; ///< Port updates from process context - Task* _task; ///< Currently executing task + BlockTask* _task; ///< Currently executing task std::thread* _thread; ///< Thread (NULL for main run context) unsigned _id; ///< Context ID diff --git a/src/server/Runner.hpp b/src/server/Runner.hpp new file mode 100644 index 00000000..173984e3 --- /dev/null +++ b/src/server/Runner.hpp @@ -0,0 +1,44 @@ +/* + This file is part of Ingen. + Copyright 2007-2017 David Robillard <http://drobilla.net/> + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef INGEN_ENGINE_RUNNER_HPP +#define INGEN_ENGINE_RUNNER_HPP + +namespace Ingen { +namespace Server { + +class Task; +class BlockImpl; + +class Runner +{ +public: + virtual ~Runner() = default; + + /** Claim a parallel task, and signal others that work is available. */ + virtual void claim_task(Task* task) = 0; + + /** Steal a task from some other context if possible. */ + virtual Task* steal_task() const = 0; + + /** Run `unit`. */ + virtual void run_unit(BlockImpl* unit) = 0; +}; + +} // namespace Server +} // namespace Ingen + +#endif // INGEN_ENGINE_RUNNER_HPP diff --git a/src/server/Task.cpp b/src/server/Task.cpp index c76c19ae..0630ec51 100644 --- a/src/server/Task.cpp +++ b/src/server/Task.cpp @@ -21,12 +21,12 @@ namespace Ingen { namespace Server { void -Task::run(RunContext& context) +Task::run(Runner& context) { switch (_mode) { - case Mode::SINGLE: + case Mode::UNIT: // fprintf(stderr, "%u run %s\n", context.id(), _block->path().c_str()); - _block->process(context); + context.run_unit(_unit); break; case Mode::SEQUENTIAL: for (auto& task : _children) { @@ -59,7 +59,7 @@ Task::run(RunContext& context) } Task* -Task::steal(RunContext& context) +Task::steal(Runner& context) { if (_mode == Mode::PARALLEL) { const unsigned i = _next++; @@ -72,7 +72,7 @@ Task::steal(RunContext& context) } Task* -Task::get_task(RunContext& context) +Task::get_task(Runner& context) { // Attempt to "steal" a task from ourselves Task* t = steal(context); @@ -104,54 +104,29 @@ Task::get_task(RunContext& context) } Task -Task::simplify(Task task) +Task::compile(const IR& work) { - if (task.mode() == Mode::SINGLE) { - return task; + if (work.mode() == Mode::UNIT) { + return Task(Mode::UNIT, work.unit()); } - Task ret(task.mode()); - for (auto&& c : task._children) { - auto child = simplify(std::move(c)); + Task task(work.mode()); + for (auto c = work.children().rbegin(); c != work.children().rend(); ++c) { + auto child(compile(*c)); if (!child.empty()) { - if (child.mode() == task.mode()) { - // Merge child into parent + if (child.mode() == work.mode()) { // Merge child into parent for (auto&& grandchild : child._children) { - ret.append(std::move(grandchild)); + task.append(std::move(grandchild)); } } else { - // Add child task - ret.append(std::move(child)); + task.append(std::move(child)); } } } - if (ret._children.size() == 1) { - return std::move(ret._children.front()); - } - - return ret; -} - -void -Task::dump(std::function<void (const std::string&)> sink, unsigned indent, bool first) const -{ - if (!first) { - sink("\n"); - for (unsigned i = 0; i < indent; ++i) { - sink(" "); - } - } - - if (_mode == Mode::SINGLE) { - sink(_block->path()); - } else { - sink(((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par ")); - for (size_t i = 0; i < _children.size(); ++i) { - _children[i].dump(sink, indent + 5, i == 0); - } - sink(")"); - } + return ((task._children.size() == 1) + ? std::move(task._children.front()) + : std::move(task)); } } // namespace Server diff --git a/src/server/Task.hpp b/src/server/Task.hpp index 203627a1..dc81eb2a 100644 --- a/src/server/Task.hpp +++ b/src/server/Task.hpp @@ -19,94 +19,71 @@ #include <atomic> #include <cassert> -#include <deque> -#include <functional> -#include <ostream> +#include <vector> + +#include "Work.hpp" namespace Ingen { namespace Server { +class Runner; class BlockImpl; -class RunContext; class Task { public: - enum class Mode { - SINGLE, ///< Single block to run - SEQUENTIAL, ///< Elements must be run sequentially in order - PARALLEL ///< Elements may be run in any order in parallel - }; - - Task(Mode mode, BlockImpl* block = nullptr) - : _block(block) - , _mode(mode) - , _done_end(0) - , _next(0) - , _done(false) - { - assert(!(mode == Mode::SINGLE && !block)); - } + using Unit = BlockImpl; + using IR = Work<Unit>; + using Mode = typename IR::Mode; + using Children = std::vector<Task>; Task(Task&& task) : _children(std::move(task._children)) - , _block(task._block) + , _unit(task._unit) , _mode(task._mode) , _done_end(task._done_end) , _next(task._next.load()) , _done(task._done.load()) {} - Task& operator=(Task&& task) - { - _children = std::move(task._children); - _block = task._block; - _mode = task._mode; - _done_end = task._done_end; - _next = task._next.load(); - _done = task._done.load(); - return *this; - } + Task(const Task&) = delete; + Task& operator=(const Task&) = delete; /** Run task in the given context. */ - void run(RunContext& context); - - /** Pretty print task to the given stream (recursively). */ - void dump(std::function<void (const std::string&)> sink, unsigned indent, bool first) const; + void run(Runner& context); /** Return true iff this is an empty task. */ - bool empty() const { return _mode != Mode::SINGLE && _children.empty(); } + bool empty() const { return _mode != Mode::UNIT && _children.empty(); } /** Simplify task expression. */ - static Task simplify(Task task); + static Task compile(const IR& source); /** Steal a child task from this task (succeeds for PARALLEL only). */ - Task* steal(RunContext& context); + Task* steal(Runner& context); - /** Prepend a child to this task. */ - void push_front(Task&& task) { - _children.emplace_front(std::move(task)); - } - - Mode mode() const { return _mode; } - BlockImpl* block() const { return _block; } - bool done() const { return _done; } + Mode mode() const { return _mode; } + Unit* unit() const { return _unit; } + const Children& children() const { return _children; } + bool done() const { return _done; } void set_done(bool done) { _done = done; } private: - typedef std::deque<Task> Children; - - Task(const Task&) = delete; - Task& operator=(const Task&) = delete; + Task(Mode mode, Unit* unit = nullptr) + : _unit(unit) + , _mode(mode) + , _done_end(0) + , _next(0) + , _done(false) + { + assert(!(mode == Mode::UNIT && !unit)); + } - Task* get_task(RunContext& context); + Task* get_task(Runner& context); - void append(Task t) { - _children.emplace_back(std::move(t)); - } + void append(Task t) { _children.emplace_back(std::move(t)); } Children _children; ///< Vector of child tasks - BlockImpl* _block; ///< Used for SINGLE only + Unit* _unit; ///< Used for UNIT only Mode _mode; ///< Execution mode unsigned _done_end; ///< Index of rightmost done sub-task std::atomic<unsigned> _next; ///< Index of next sub-task diff --git a/src/server/Work.hpp b/src/server/Work.hpp new file mode 100644 index 00000000..bfbc9ed0 --- /dev/null +++ b/src/server/Work.hpp @@ -0,0 +1,55 @@ +/* + This file is part of Ingen. + Copyright 2007-2017 David Robillard <http://drobilla.net/> + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef INGEN_ENGINE_WORK_HPP +#define INGEN_ENGINE_WORK_HPP + +#include <vector> + +namespace Ingen { +namespace Server { + +template<typename Unit> +class Work { +public: + enum class Mode { + UNIT, ///< Single unit to run + SEQUENTIAL, ///< Elements must be run sequentially in order + PARALLEL ///< Elements may be run in any order in parallel + }; + + static Work unit(Unit& unit) { return Work(Mode::UNIT, &unit); } + static Work sequential() { return Work(Mode::SEQUENTIAL); } + static Work parallel() { return Work(Mode::PARALLEL); } + + void prepend(Work&& work) { _children.emplace_back(std::move(work)); } + + Unit* unit() const { return _unit; } + Mode mode() const { return _mode; } + const std::vector<Work>& children() const { return _children; } + +private: + Work(Mode mode, Unit* unit = nullptr) : _unit(unit), _mode(mode) {} + + std::vector<Work> _children; ///< Children in reverse execution order + Unit* const _unit; ///< Used for UNIT mode only + const Mode _mode; ///< Execution mode +}; + +} // namespace Server +} // namespace Ingen + +#endif // INGEN_ENGINE_WORK_HPP |