diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/server/CompiledGraph.cpp | 2 | ||||
-rw-r--r-- | src/server/Engine.cpp | 11 | ||||
-rw-r--r-- | src/server/Engine.hpp | 9 | ||||
-rw-r--r-- | src/server/GraphImpl.cpp | 3 | ||||
-rw-r--r-- | src/server/RunContext.cpp | 16 | ||||
-rw-r--r-- | src/server/RunContext.hpp | 11 | ||||
-rw-r--r-- | src/server/Task.cpp | 67 | ||||
-rw-r--r-- | src/server/Task.hpp | 17 |
8 files changed, 63 insertions, 73 deletions
diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp index 12867d2c..b99300f9 100644 --- a/src/server/CompiledGraph.cpp +++ b/src/server/CompiledGraph.cpp @@ -123,7 +123,7 @@ compile_block(BlockImpl* n, ParTask& task, BlockSet& k) // Prepends n to current task: (... n ...) task.emplace_front(SingleTask{n}); for (auto p : n->providers()) { - check_feedback(n, p); + // check_feedback(n, p); if (num_unvisited_dependants(p) == 0) { k.insert(p); } diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index fba7c7a3..b05fa75c 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -254,7 +254,7 @@ Engine::signal_tasks_available() _tasks_available.notify_all(); } -Task* +Job Engine::steal_task(unsigned start_thread) { for (unsigned i = 0; i < _run_contexts.size(); ++i) { @@ -262,13 +262,14 @@ Engine::steal_task(unsigned start_thread) RunContext* const ctx = _run_contexts[id]; ParTask* par = ctx->task(); if (par) { - Task* t = steal(*par, *ctx); - if (t) { - return t; + Job job = steal(*par, *ctx); + if (job.task) { + return job; } } } - return nullptr; + + return Job{nullptr, nullptr}; } SPtr<Store> diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 1f8cbce1..e968fdd2 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -29,6 +29,7 @@ #include "ingen/types.hpp" #include "Event.hpp" +#include "Job.hpp" #include "Load.hpp" namespace Raul { @@ -61,12 +62,6 @@ class SocketListener; class UndoStack; class Worker; -struct SingleTask; -struct SeqTask; -struct ParTask; - -using Task = boost::variant<SingleTask, SeqTask, ParTask>; - /** The engine which executes the process graph. @@ -165,7 +160,7 @@ public: bool pending_notifications(); bool wait_for_tasks(); void signal_tasks_available(); - Task* steal_task(unsigned start_thread); + Job steal_task(unsigned start_thread); SPtr<Store> store() const; diff --git a/src/server/GraphImpl.cpp b/src/server/GraphImpl.cpp index ef3ef4d4..f2a0c634 100644 --- a/src/server/GraphImpl.cpp +++ b/src/server/GraphImpl.cpp @@ -232,7 +232,8 @@ void GraphImpl::run(RunContext& context) { if (_compiled_graph) { - Server::run(_compiled_graph->master(), context); + Job job{&_compiled_graph->master(), nullptr}; + Server::run(job, context); } } diff --git a/src/server/RunContext.cpp b/src/server/RunContext.cpp index b446addf..14bbaa30 100644 --- a/src/server/RunContext.cpp +++ b/src/server/RunContext.cpp @@ -143,12 +143,17 @@ RunContext::emit_notifications(FrameTime end) void RunContext::claim_task(ParTask* task) { - if ((_task = task)) { - _engine.signal_tasks_available(); + if (task) { + if (!_task) { + _task = task; + _engine.signal_tasks_available(); + } + } else { + _task = nullptr; } } -Task* +Job RunContext::steal_task() const { return _engine.steal_task(_id + 1); @@ -185,8 +190,9 @@ void RunContext::run() { while (_engine.wait_for_tasks()) { - for (Task* t; (t = _engine.steal_task(0));) { - Server::run(*t, *this); + for (Job job; (job = _engine.steal_task(0)).task;) { + // fprintf(stderr, "%d run\n", id()); + Server::run(job, *this); // t->run(*this); } } diff --git a/src/server/RunContext.hpp b/src/server/RunContext.hpp index 6c8246ca..7db53b1b 100644 --- a/src/server/RunContext.hpp +++ b/src/server/RunContext.hpp @@ -25,6 +25,7 @@ #include "lv2/lv2plug.in/ns/ext/urid/urid.h" #include "raul/RingBuffer.hpp" +#include "Job.hpp" #include "types.hpp" namespace Ingen { @@ -33,12 +34,6 @@ namespace Server { class Engine; class PortImpl; -struct SingleTask; -struct SeqTask; -struct ParTask; - -using Task = boost::variant<SingleTask, SeqTask, ParTask>; - /** Graph execution context. * * This is used to pass whatever information a Node might need to process; such @@ -125,7 +120,7 @@ public: void claim_task(ParTask* task); /** Steal a task from some other context if possible. */ - Task* steal_task() const; + Job steal_task() const; void set_priority(int priority); void set_rate(SampleCount rate) { _rate = rate; } @@ -144,7 +139,7 @@ public: inline bool realtime() const { return _realtime; } protected: - const RunContext& operator=(const RunContext& copy) = delete; + RunContext& operator=(const RunContext&) = delete; void run(); diff --git a/src/server/Task.cpp b/src/server/Task.cpp index 497f7895..48a82085 100644 --- a/src/server/Task.cpp +++ b/src/server/Task.cpp @@ -19,31 +19,25 @@ #include <boost/variant/get.hpp> +#include <xmmintrin.h> + namespace Ingen { namespace Server { -static Task* -get_task(ParTask& task, RunContext& context) +static Job +get_job(ParTask& task, RunContext& context) { // Attempt to "steal" a task from ourselves - Task* t = steal(task, context); - if (t) { - return t; + Job job = steal(task, context); + if (job.task) { + return job; } - while (true) { - // Push done end index as forward as possible - while (task.done_end < task.children.size() && task.children[task.done_end].done()) { - ++task.done_end; - } - - if (task.done_end >= task.children.size()) { - return nullptr; // All child tasks are finished - } - + while (task.done < task.children.size()) { // All child tasks claimed, but some are unfinished, steal a task - if ((t = context.steal_task())) { - return t; + job = context.steal_task(); + if (job.task) { + return job; } /* All child tasks are claimed, and we failed to steal any tasks. Spin @@ -51,7 +45,10 @@ get_task(ParTask& task, RunContext& context) signal in non-main threads, and maybe even in the main thread depending on your real-time safe philosophy... more experimentation here is needed. */ + _mm_pause(); } + + return Job{nullptr, nullptr}; } struct Runner @@ -64,50 +61,50 @@ struct Runner void operator()(SeqTask& task) { for (auto& child : task.children) { - run(child, context); + Job job{&child, nullptr}; + run(job, context); } } void operator()(ParTask& task) { - // Initialize (not) done state of sub-tasks - // for (const auto& child : task.children) { - // child.done = false; - // } - // Grab the first sub-task - task.next = 0; - task.done_end = 0; - Task* t = steal(task, context); + task.next = 1; + task.done = 0; + Job job{&task.children[0], &task}; // Allow other threads to steal sub-tasks context.claim_task(&task); // Run available tasks until this task is finished - for (; t; t = get_task(task, context)) { - run(*t, context); + for (; job.task; job = get_job(task, context)) { + run(job, context); } context.claim_task(nullptr); } - RunContext context; + RunContext& context; + ParTask* parent; }; void -run(Task& task, RunContext& context) +run(Job& job, RunContext& context) { - Runner runner{context}; - boost::apply_visitor(runner, task); + Runner runner{context, job.parent}; + boost::apply_visitor(runner, *job.task); + if (job.parent) { + ++job.parent->done; + } } -Task* +Job steal(ParTask& task, RunContext& context) { const unsigned i = task.next++; if (i < task.children.size()) { - return &task.children[i]; + return Job{&task.children[i], &task}; } - return nullptr; + return Job{nullptr, nullptr}; } static bool empty(Task& task) diff --git a/src/server/Task.hpp b/src/server/Task.hpp index 79ada9f0..a425c466 100644 --- a/src/server/Task.hpp +++ b/src/server/Task.hpp @@ -26,6 +26,8 @@ #include <boost/variant/variant.hpp> +#include "Job.hpp" + namespace Ingen { namespace Server { @@ -66,8 +68,7 @@ struct ParTask : MultiTask { ParTask() = default; ParTask(ParTask&& task) - : done_end(task.done_end) - , next(task.next.load()) + : next(task.next.load()) , done(task.done.load()) { children = std::move(task.children); @@ -75,29 +76,23 @@ struct ParTask : MultiTask { ParTask& operator=(ParTask&& task) { children = std::move(task.children); - done_end = task.done_end; next = task.next.load(); done = task.done.load(); return *this; } - unsigned done_end{0}; ///< Index of rightmost done sub-task std::atomic<unsigned> next{0}; ///< Index of next sub-task - std::atomic<bool> done{false}; ///< Completion phase -}; - -struct Job { - Task task; + std::atomic<unsigned> done{0}; ///< Count of finished sub-tasks }; /** Run task in the given context. */ -void run(Task& task, RunContext& context); +void run(Job& job, RunContext& context); /** Simplify and optimize task. */ Task simplify(Task&& task); /** Steal a child task from this task. */ -Task* steal(ParTask& task, RunContext& context); +Job steal(ParTask& task, RunContext& context); /** Pretty print task to the given stream (recursively). */ void dump(Task& task, std::function<void (const std::string&)> sink, unsigned indent, bool first); |