/*
This file is part of Ingen.
Copyright 2015-2017 David Robillard
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or any later version.
Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
You should have received a copy of the GNU Affero General Public License
along with Ingen. If not, see .
*/
#include "BlockImpl.hpp"
#include "Task.hpp"
#include
#include
namespace Ingen {
namespace Server {
static Job
get_job(ParTask& task, RunContext& context)
{
// Attempt to "steal" a task from ourselves
Job job = steal(task, context);
if (job.task) {
return job;
}
while (task.done < task.children.size()) {
// All child tasks claimed, but some are unfinished, steal a task
job = context.steal_task();
if (job.task) {
return job;
}
/* All child tasks are claimed, and we failed to steal any tasks. Spin
to prevent blocking, though it would probably be wiser to wait for a
signal in non-main threads, and maybe even in the main thread
depending on your real-time safe philosophy... more experimentation
here is needed. */
_mm_pause();
}
return Job{nullptr, nullptr};
}
struct Runner
{
using result_type = void;
void operator()(SingleTask& task) {
task.block->process(context);
}
void operator()(SeqTask& task) {
for (auto& child : task.children) {
Job job{&child, nullptr};
run(job, context);
}
}
void operator()(ParTask& task) {
// Grab the first sub-task
task.next = 1;
task.done = 0;
Job job{&task.children[0], &task};
// Allow other threads to steal sub-tasks
context.claim_task(&task);
// Run available tasks until this task is finished
for (; job.task; job = get_job(task, context)) {
run(job, context);
}
context.claim_task(nullptr);
}
RunContext& context;
ParTask* parent;
};
void
run(Job& job, RunContext& context)
{
Runner runner{context, job.parent};
boost::apply_visitor(runner, *job.task);
if (job.parent) {
++job.parent->done;
}
}
Job
steal(ParTask& task, RunContext& context)
{
const unsigned i = task.next++;
if (i < task.children.size()) {
return Job{&task.children[i], &task};
}
return Job{nullptr, nullptr};
}
static bool empty(Task& task)
{
if (SeqTask* seq = boost::get(&task)) {
return seq->children.empty();
} else if (ParTask* par = boost::get(&task)) {
return par->children.empty();
}
return false;
}
template
static Task
simplify(T&& task)
{
typename std::remove_reference::type ret;
for (auto&& c : task.children) {
auto child = simplify(std::move(c));
if (!empty(child)) {
if (T* merge_child = boost::get(&child)) {
// Child has same type, merge into parent
for (auto&& grandchild : merge_child->children) {
ret.children.emplace_back(std::move(grandchild));
}
} else {
// Append child task
ret.children.emplace_back(std::move(child));
}
}
}
return ret.children.size() == 1 ? std::move(ret.children.front())
: Task(std::move(ret));
}
Task
simplify(Task&& task)
{
if (SeqTask* seq = boost::get(&task)) {
return simplify(std::move(*seq));
} else if (ParTask* par = boost::get(&task)) {
return simplify(std::move(*par));
}
return std::move(task);
}
void
dump(Task& task, std::function sink, unsigned indent, bool first)
{
if (!first) {
sink("\n");
for (unsigned i = 0; i < indent; ++i) {
sink(" ");
}
}
if (SingleTask* single = boost::get(&task)) {
sink(single->block->path());
} else if (SeqTask* seq = boost::get(&task)) {
sink("(seq ");
for (size_t i = 0; i < seq->children.size(); ++i) {
dump(seq->children[i], sink, indent + 5, i == 0);
}
sink(")");
} else if (ParTask* par = boost::get(&task)) {
sink("(par ");
for (size_t i = 0; i < par->children.size(); ++i) {
dump(par->children[i], sink, indent + 5, i == 0);
}
sink(")");
}
}
} // namespace Server
} // namespace Ingen