summaryrefslogtreecommitdiffstats
path: root/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/server')
-rw-r--r--src/server/CompiledGraph.cpp2
-rw-r--r--src/server/Engine.cpp11
-rw-r--r--src/server/Engine.hpp9
-rw-r--r--src/server/GraphImpl.cpp3
-rw-r--r--src/server/RunContext.cpp16
-rw-r--r--src/server/RunContext.hpp11
-rw-r--r--src/server/Task.cpp67
-rw-r--r--src/server/Task.hpp17
8 files changed, 63 insertions, 73 deletions
diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp
index 12867d2c..b99300f9 100644
--- a/src/server/CompiledGraph.cpp
+++ b/src/server/CompiledGraph.cpp
@@ -123,7 +123,7 @@ compile_block(BlockImpl* n, ParTask& task, BlockSet& k)
// Prepends n to current task: (... n ...)
task.emplace_front(SingleTask{n});
for (auto p : n->providers()) {
- check_feedback(n, p);
+ // check_feedback(n, p);
if (num_unvisited_dependants(p) == 0) {
k.insert(p);
}
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index fba7c7a3..b05fa75c 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -254,7 +254,7 @@ Engine::signal_tasks_available()
_tasks_available.notify_all();
}
-Task*
+Job
Engine::steal_task(unsigned start_thread)
{
for (unsigned i = 0; i < _run_contexts.size(); ++i) {
@@ -262,13 +262,14 @@ Engine::steal_task(unsigned start_thread)
RunContext* const ctx = _run_contexts[id];
ParTask* par = ctx->task();
if (par) {
- Task* t = steal(*par, *ctx);
- if (t) {
- return t;
+ Job job = steal(*par, *ctx);
+ if (job.task) {
+ return job;
}
}
}
- return nullptr;
+
+ return Job{nullptr, nullptr};
}
SPtr<Store>
diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp
index 1f8cbce1..e968fdd2 100644
--- a/src/server/Engine.hpp
+++ b/src/server/Engine.hpp
@@ -29,6 +29,7 @@
#include "ingen/types.hpp"
#include "Event.hpp"
+#include "Job.hpp"
#include "Load.hpp"
namespace Raul {
@@ -61,12 +62,6 @@ class SocketListener;
class UndoStack;
class Worker;
-struct SingleTask;
-struct SeqTask;
-struct ParTask;
-
-using Task = boost::variant<SingleTask, SeqTask, ParTask>;
-
/**
The engine which executes the process graph.
@@ -165,7 +160,7 @@ public:
bool pending_notifications();
bool wait_for_tasks();
void signal_tasks_available();
- Task* steal_task(unsigned start_thread);
+ Job steal_task(unsigned start_thread);
SPtr<Store> store() const;
diff --git a/src/server/GraphImpl.cpp b/src/server/GraphImpl.cpp
index ef3ef4d4..f2a0c634 100644
--- a/src/server/GraphImpl.cpp
+++ b/src/server/GraphImpl.cpp
@@ -232,7 +232,8 @@ void
GraphImpl::run(RunContext& context)
{
if (_compiled_graph) {
- Server::run(_compiled_graph->master(), context);
+ Job job{&_compiled_graph->master(), nullptr};
+ Server::run(job, context);
}
}
diff --git a/src/server/RunContext.cpp b/src/server/RunContext.cpp
index b446addf..14bbaa30 100644
--- a/src/server/RunContext.cpp
+++ b/src/server/RunContext.cpp
@@ -143,12 +143,17 @@ RunContext::emit_notifications(FrameTime end)
void
RunContext::claim_task(ParTask* task)
{
- if ((_task = task)) {
- _engine.signal_tasks_available();
+ if (task) {
+ if (!_task) {
+ _task = task;
+ _engine.signal_tasks_available();
+ }
+ } else {
+ _task = nullptr;
}
}
-Task*
+Job
RunContext::steal_task() const
{
return _engine.steal_task(_id + 1);
@@ -185,8 +190,9 @@ void
RunContext::run()
{
while (_engine.wait_for_tasks()) {
- for (Task* t; (t = _engine.steal_task(0));) {
- Server::run(*t, *this);
+ for (Job job; (job = _engine.steal_task(0)).task;) {
+ // fprintf(stderr, "%d run\n", id());
+ Server::run(job, *this);
// t->run(*this);
}
}
diff --git a/src/server/RunContext.hpp b/src/server/RunContext.hpp
index 6c8246ca..7db53b1b 100644
--- a/src/server/RunContext.hpp
+++ b/src/server/RunContext.hpp
@@ -25,6 +25,7 @@
#include "lv2/lv2plug.in/ns/ext/urid/urid.h"
#include "raul/RingBuffer.hpp"
+#include "Job.hpp"
#include "types.hpp"
namespace Ingen {
@@ -33,12 +34,6 @@ namespace Server {
class Engine;
class PortImpl;
-struct SingleTask;
-struct SeqTask;
-struct ParTask;
-
-using Task = boost::variant<SingleTask, SeqTask, ParTask>;
-
/** Graph execution context.
*
* This is used to pass whatever information a Node might need to process; such
@@ -125,7 +120,7 @@ public:
void claim_task(ParTask* task);
/** Steal a task from some other context if possible. */
- Task* steal_task() const;
+ Job steal_task() const;
void set_priority(int priority);
void set_rate(SampleCount rate) { _rate = rate; }
@@ -144,7 +139,7 @@ public:
inline bool realtime() const { return _realtime; }
protected:
- const RunContext& operator=(const RunContext& copy) = delete;
+ RunContext& operator=(const RunContext&) = delete;
void run();
diff --git a/src/server/Task.cpp b/src/server/Task.cpp
index 497f7895..48a82085 100644
--- a/src/server/Task.cpp
+++ b/src/server/Task.cpp
@@ -19,31 +19,25 @@
#include <boost/variant/get.hpp>
+#include <xmmintrin.h>
+
namespace Ingen {
namespace Server {
-static Task*
-get_task(ParTask& task, RunContext& context)
+static Job
+get_job(ParTask& task, RunContext& context)
{
// Attempt to "steal" a task from ourselves
- Task* t = steal(task, context);
- if (t) {
- return t;
+ Job job = steal(task, context);
+ if (job.task) {
+ return job;
}
- while (true) {
- // Push done end index as forward as possible
- while (task.done_end < task.children.size() && task.children[task.done_end].done()) {
- ++task.done_end;
- }
-
- if (task.done_end >= task.children.size()) {
- return nullptr; // All child tasks are finished
- }
-
+ while (task.done < task.children.size()) {
// All child tasks claimed, but some are unfinished, steal a task
- if ((t = context.steal_task())) {
- return t;
+ job = context.steal_task();
+ if (job.task) {
+ return job;
}
/* All child tasks are claimed, and we failed to steal any tasks. Spin
@@ -51,7 +45,10 @@ get_task(ParTask& task, RunContext& context)
signal in non-main threads, and maybe even in the main thread
depending on your real-time safe philosophy... more experimentation
here is needed. */
+ _mm_pause();
}
+
+ return Job{nullptr, nullptr};
}
struct Runner
@@ -64,50 +61,50 @@ struct Runner
void operator()(SeqTask& task) {
for (auto& child : task.children) {
- run(child, context);
+ Job job{&child, nullptr};
+ run(job, context);
}
}
void operator()(ParTask& task) {
- // Initialize (not) done state of sub-tasks
- // for (const auto& child : task.children) {
- // child.done = false;
- // }
-
// Grab the first sub-task
- task.next = 0;
- task.done_end = 0;
- Task* t = steal(task, context);
+ task.next = 1;
+ task.done = 0;
+ Job job{&task.children[0], &task};
// Allow other threads to steal sub-tasks
context.claim_task(&task);
// Run available tasks until this task is finished
- for (; t; t = get_task(task, context)) {
- run(*t, context);
+ for (; job.task; job = get_job(task, context)) {
+ run(job, context);
}
context.claim_task(nullptr);
}
- RunContext context;
+ RunContext& context;
+ ParTask* parent;
};
void
-run(Task& task, RunContext& context)
+run(Job& job, RunContext& context)
{
- Runner runner{context};
- boost::apply_visitor(runner, task);
+ Runner runner{context, job.parent};
+ boost::apply_visitor(runner, *job.task);
+ if (job.parent) {
+ ++job.parent->done;
+ }
}
-Task*
+Job
steal(ParTask& task, RunContext& context)
{
const unsigned i = task.next++;
if (i < task.children.size()) {
- return &task.children[i];
+ return Job{&task.children[i], &task};
}
- return nullptr;
+ return Job{nullptr, nullptr};
}
static bool empty(Task& task)
diff --git a/src/server/Task.hpp b/src/server/Task.hpp
index 79ada9f0..a425c466 100644
--- a/src/server/Task.hpp
+++ b/src/server/Task.hpp
@@ -26,6 +26,8 @@
#include <boost/variant/variant.hpp>
+#include "Job.hpp"
+
namespace Ingen {
namespace Server {
@@ -66,8 +68,7 @@ struct ParTask : MultiTask {
ParTask() = default;
ParTask(ParTask&& task)
- : done_end(task.done_end)
- , next(task.next.load())
+ : next(task.next.load())
, done(task.done.load())
{
children = std::move(task.children);
@@ -75,29 +76,23 @@ struct ParTask : MultiTask {
ParTask& operator=(ParTask&& task) {
children = std::move(task.children);
- done_end = task.done_end;
next = task.next.load();
done = task.done.load();
return *this;
}
- unsigned done_end{0}; ///< Index of rightmost done sub-task
std::atomic<unsigned> next{0}; ///< Index of next sub-task
- std::atomic<bool> done{false}; ///< Completion phase
-};
-
-struct Job {
- Task task;
+ std::atomic<unsigned> done{0}; ///< Count of finished sub-tasks
};
/** Run task in the given context. */
-void run(Task& task, RunContext& context);
+void run(Job& job, RunContext& context);
/** Simplify and optimize task. */
Task simplify(Task&& task);
/** Steal a child task from this task. */
-Task* steal(ParTask& task, RunContext& context);
+Job steal(ParTask& task, RunContext& context);
/** Pretty print task to the given stream (recursively). */
void dump(Task& task, std::function<void (const std::string&)> sink, unsigned indent, bool first);