/* This file is part of Ingen. Copyright 2015-2017 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or any later version. Ingen is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. You should have received a copy of the GNU Affero General Public License along with Ingen. If not, see . */ #include "BlockImpl.hpp" #include "Task.hpp" #include namespace Ingen { namespace Server { static Task* get_task(ParTask& task, RunContext& context) { // Attempt to "steal" a task from ourselves Task* t = steal(task, context); if (t) { return t; } while (true) { // Push done end index as forward as possible while (task.done_end < task.children.size() && task.children[task.done_end].done()) { ++task.done_end; } if (task.done_end >= task.children.size()) { return nullptr; // All child tasks are finished } // All child tasks claimed, but some are unfinished, steal a task if ((t = context.steal_task())) { return t; } /* All child tasks are claimed, and we failed to steal any tasks. Spin to prevent blocking, though it would probably be wiser to wait for a signal in non-main threads, and maybe even in the main thread depending on your real-time safe philosophy... more experimentation here is needed. */ } } struct Runner { using result_type = void; void operator()(SingleTask& task) { task.block->process(context); } void operator()(SeqTask& task) { for (auto& child : task.children) { run(child, context); } } void operator()(ParTask& task) { // Initialize (not) done state of sub-tasks // for (const auto& child : task.children) { // child.done = false; // } // Grab the first sub-task task.next = 0; task.done_end = 0; Task* t = steal(task, context); // Allow other threads to steal sub-tasks context.claim_task(&task); // Run available tasks until this task is finished for (; t; t = get_task(task, context)) { run(*t, context); } context.claim_task(nullptr); } RunContext context; }; void run(Task& task, RunContext& context) { Runner runner{context}; boost::apply_visitor(runner, task); } Task* steal(ParTask& task, RunContext& context) { const unsigned i = task.next++; if (i < task.children.size()) { return &task.children[i]; } return nullptr; } static bool empty(Task& task) { if (SeqTask* seq = boost::get(&task)) { return seq->children.empty(); } else if (ParTask* par = boost::get(&task)) { return par->children.empty(); } return false; } template static Task simplify(T&& task) { typename std::remove_reference::type ret; for (auto&& c : task.children) { auto child = simplify(std::move(c)); if (!empty(child)) { if (T* merge_child = boost::get(&child)) { // Child has same type, merge into parent for (auto&& grandchild : merge_child->children) { ret.children.emplace_back(std::move(grandchild)); } } else { // Append child task ret.children.emplace_back(std::move(child)); } } } return ret.children.size() == 1 ? std::move(ret.children.front()) : Task(std::move(ret)); } Task simplify(Task&& task) { if (SeqTask* seq = boost::get(&task)) { return simplify(std::move(*seq)); } else if (ParTask* par = boost::get(&task)) { return simplify(std::move(*par)); } return std::move(task); } void dump(Task& task, std::function sink, unsigned indent, bool first) { if (!first) { sink("\n"); for (unsigned i = 0; i < indent; ++i) { sink(" "); } } if (SingleTask* single = boost::get(&task)) { sink(single->block->path()); } else if (SeqTask* seq = boost::get(&task)) { sink("(seq "); for (size_t i = 0; i < seq->children.size(); ++i) { dump(seq->children[i], sink, indent + 5, i == 0); } sink(")"); } else if (ParTask* par = boost::get(&task)) { sink("(par "); for (size_t i = 0; i < par->children.size(); ++i) { dump(par->children[i], sink, indent + 5, i == 0); } sink(")"); } } } // namespace Server } // namespace Ingen