/* This file is part of Ingen. Copyright 2015-2017 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or any later version. Ingen is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. You should have received a copy of the GNU Affero General Public License along with Ingen. If not, see . */ #include "BlockImpl.hpp" #include "Task.hpp" #include #include namespace Ingen { namespace Server { static Job get_job(ParTask& task, RunContext& context) { // Attempt to "steal" a task from ourselves Job job = steal(task, context); if (job.task) { return job; } while (task.done < task.children.size()) { // All child tasks claimed, but some are unfinished, steal a task job = context.steal_task(); if (job.task) { return job; } /* All child tasks are claimed, and we failed to steal any tasks. Spin to prevent blocking, though it would probably be wiser to wait for a signal in non-main threads, and maybe even in the main thread depending on your real-time safe philosophy... more experimentation here is needed. */ _mm_pause(); } return Job{nullptr, nullptr}; } struct Runner { using result_type = void; void operator()(SingleTask& task) { task.block->process(context); } void operator()(SeqTask& task) { for (auto& child : task.children) { Job job{&child, nullptr}; run(job, context); } } void operator()(ParTask& task) { // Grab the first sub-task task.next = 1; task.done = 0; Job job{&task.children[0], &task}; // Allow other threads to steal sub-tasks context.claim_task(&task); // Run available tasks until this task is finished for (; job.task; job = get_job(task, context)) { run(job, context); } context.claim_task(nullptr); } RunContext& context; ParTask* parent; }; void run(Job& job, RunContext& context) { Runner runner{context, job.parent}; boost::apply_visitor(runner, *job.task); if (job.parent) { ++job.parent->done; } } Job steal(ParTask& task, RunContext& context) { const unsigned i = task.next++; if (i < task.children.size()) { return Job{&task.children[i], &task}; } return Job{nullptr, nullptr}; } static bool empty(Task& task) { if (SeqTask* seq = boost::get(&task)) { return seq->children.empty(); } else if (ParTask* par = boost::get(&task)) { return par->children.empty(); } return false; } template static Task simplify(T&& task) { typename std::remove_reference::type ret; for (auto&& c : task.children) { auto child = simplify(std::move(c)); if (!empty(child)) { if (T* merge_child = boost::get(&child)) { // Child has same type, merge into parent for (auto&& grandchild : merge_child->children) { ret.children.emplace_back(std::move(grandchild)); } } else { // Append child task ret.children.emplace_back(std::move(child)); } } } return ret.children.size() == 1 ? std::move(ret.children.front()) : Task(std::move(ret)); } Task simplify(Task&& task) { if (SeqTask* seq = boost::get(&task)) { return simplify(std::move(*seq)); } else if (ParTask* par = boost::get(&task)) { return simplify(std::move(*par)); } return std::move(task); } void dump(Task& task, std::function sink, unsigned indent, bool first) { if (!first) { sink("\n"); for (unsigned i = 0; i < indent; ++i) { sink(" "); } } if (SingleTask* single = boost::get(&task)) { sink(single->block->path()); } else if (SeqTask* seq = boost::get(&task)) { sink("(seq "); for (size_t i = 0; i < seq->children.size(); ++i) { dump(seq->children[i], sink, indent + 5, i == 0); } sink(")"); } else if (ParTask* par = boost::get(&task)) { sink("(par "); for (size_t i = 0; i < par->children.size(); ++i) { dump(par->children[i], sink, indent + 5, i == 0); } sink(")"); } } } // namespace Server } // namespace Ingen