/*
This file is part of Ingen.
Copyright 2015-2017 David Robillard
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or any later version.
Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
You should have received a copy of the GNU Affero General Public License
along with Ingen. If not, see .
*/
#include "BlockImpl.hpp"
#include "Engine.hpp"
#include "Task.hpp"
namespace Ingen {
namespace Server {
void
Task::run(RunContext& context)
{
switch (_mode) {
case Mode::SINGLE:
// fprintf(stderr, "%u run %s\n", context.id(), _block->path().c_str());
_block->process(context);
break;
case Mode::SEQUENTIAL:
for (const auto& task : _children) {
task->run(context);
}
break;
case Mode::PARALLEL:
// Initialize (not) done state of sub-tasks
for (const auto& task : _children) {
task->set_done(false);
}
// Grab the first sub-task
_next = 0;
_done_end = 0;
Task* t = steal(context);
// Allow other threads to steal sub-tasks
context.set_task(this);
context.engine().signal_tasks();
// Run available tasks until this task is finished
for (; t; t = get_task(context)) {
t->run(context);
}
context.set_task(nullptr);
break;
}
set_done(true);
}
Task*
Task::steal(RunContext& context)
{
if (_mode == Mode::PARALLEL) {
const unsigned i = _next++;
if (i < _children.size()) {
return _children[i].get();
}
}
return nullptr;
}
Task*
Task::get_task(RunContext& context)
{
// Attempt to "steal" a task from ourselves
Task* t = steal(context);
if (t) {
return t;
}
while (true) {
// Push done end index as forward as possible
while (_done_end < _children.size() && _children[_done_end]->done()) {
++_done_end;
}
if (_done_end >= _children.size()) {
return nullptr; // All child tasks are finished
}
// All child tasks claimed, but some are unfinished, steal a task
if ((t = context.engine().steal_task(context.id() + 1))) {
return t;
}
/* All child tasks are claimed, and we failed to steal any tasks. Spin
to prevent blocking, though it would probably be wiser to wait for a
signal in non-main threads, and maybe even in the main thread
depending on your real-time safe philosophy... more experimentation
here is needed. */
}
}
std::unique_ptr
Task::simplify(std::unique_ptr&& task)
{
if (task->mode() == Mode::SINGLE) {
return std::move(task);
}
for (auto t = task->_children.begin(); t != task->_children.end();) {
*t = simplify(std::move(*t));
if ((*t)->empty()) {
// Empty task, erase
t = task->_children.erase(t);
} else if ((*t)->mode() == task->_mode) {
// Subtask with the same type, fold child into parent
std::unique_ptr child(std::move(*t));
t = task->_children.erase(t);
t = task->_children.insert(
t,
std::make_move_iterator(child->_children.begin()),
std::make_move_iterator(child->_children.end()));
} else {
++t;
}
}
if (task->_children.size() == 1) {
return std::move(task->_children.front());
}
return std::move(task);
}
void
Task::dump(std::function sink, unsigned indent, bool first) const
{
if (!first) {
sink("\n");
for (unsigned i = 0; i < indent; ++i) {
sink(" ");
}
}
if (_mode == Mode::SINGLE) {
sink(_block->path());
} else {
sink(((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par "));
for (size_t i = 0; i < _children.size(); ++i) {
_children[i]->dump(sink, indent + 5, i == 0);
}
sink(")");
}
}
} // namespace Server
} // namespace Ingen