/*
This file is part of Ingen.
Copyright 2015-2024 David Robillard
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or any later version.
Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
You should have received a copy of the GNU Affero General Public License
along with Ingen. If not, see .
*/
#include "CompiledGraph.hpp"
#include "BlockImpl.hpp"
#include "Engine.hpp"
#include "GraphImpl.hpp"
#include "ThreadManager.hpp"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace ingen::server {
/** Graph contains ambiguous feedback with no delay nodes. */
class FeedbackException : public std::exception
{
public:
explicit FeedbackException(const BlockImpl* n)
: node(n)
{}
FeedbackException(const BlockImpl* n, const BlockImpl* r)
: node(n), root(r)
{}
const BlockImpl* node = nullptr;
const BlockImpl* root = nullptr;
};
static bool
has_provider_with_many_dependants(const BlockImpl* n)
{
return std::any_of(n->providers().begin(),
n->providers().end(),
[](const auto* p) {
return p->dependants().size() > 1;
});
}
CompiledGraph::CompiledGraph(GraphImpl* graph)
: _master{std::make_unique(Task::Mode::SEQUENTIAL)}
{
compile_graph(graph);
}
std::unique_ptr
CompiledGraph::compile(GraphImpl& graph)
{
try {
return std::unique_ptr(new CompiledGraph(&graph));
} catch (const FeedbackException& e) {
Log& log = graph.engine().log();
if (e.node && e.root) {
log.error("Feedback compiling %1% from %2%\n",
e.node->path(), e.root->path());
} else {
log.error("Feedback compiling %1%\n", e.node->path());
}
return nullptr;
}
}
static size_t
num_unvisited_dependants(const BlockImpl* block)
{
return std::count_if(block->dependants().begin(),
block->dependants().end(),
[](const auto* b) {
return b->get_mark() == BlockImpl::Mark::UNVISITED;
});
}
static size_t
parallel_depth(const BlockImpl* block)
{
if (has_provider_with_many_dependants(block)) {
return 2;
}
size_t min_provider_depth = std::numeric_limits::max();
for (const auto* p : block->providers()) {
min_provider_depth = std::min(min_provider_depth, parallel_depth(p));
}
return 2 + min_provider_depth;
}
void
CompiledGraph::compile_graph(GraphImpl* graph)
{
ThreadManager::assert_thread(THREAD_PRE_PROCESS);
// Start with sink nodes (no outputs, or connected only to graph outputs)
std::set blocks;
for (auto& b : graph->blocks()) {
// Mark all blocks as unvisited initially
b.set_mark(BlockImpl::Mark::UNVISITED);
if (b.dependants().empty()) {
// Block has no dependants, add to initial working set
blocks.insert(&b);
}
}
// Keep compiling working set until all nodes are visited
while (!blocks.empty()) {
std::set predecessors;
// Calculate maximum sequential depth to consume this phase
size_t depth = std::numeric_limits::max();
for (const auto* i : blocks) {
depth = std::min(depth, parallel_depth(i));
}
Task par(Task::Mode::PARALLEL);
for (auto* b : blocks) {
assert(num_unvisited_dependants(b) == 0);
Task seq(Task::Mode::SEQUENTIAL);
compile_block(b, seq, depth, predecessors);
par.push_front(std::move(seq));
}
_master->push_front(std::move(par));
blocks = predecessors;
}
_master = Task::simplify(std::move(_master));
if (graph->engine().world().conf().option("trace").get()) {
const ColorContext ctx{stderr, ColorContext::Color::YELLOW};
dump(graph->path());
}
}
/** Throw a FeedbackException iff `dependant` has `root` as a dependency. */
static void
check_feedback(const BlockImpl* root, BlockImpl* provider)
{
if (provider == root) {
throw FeedbackException(root);
}
for (auto* p : provider->providers()) {
const BlockImpl::Mark mark = p->get_mark();
switch (mark) {
case BlockImpl::Mark::UNVISITED:
p->set_mark(BlockImpl::Mark::VISITING);
check_feedback(root, p);
break;
case BlockImpl::Mark::VISITING:
throw FeedbackException(p, root);
case BlockImpl::Mark::VISITED:
break;
}
p->set_mark(mark);
}
}
void
CompiledGraph::compile_provider(const BlockImpl* root,
BlockImpl* block,
Task& task,
size_t max_depth,
std::set& k)
{
if (block->dependants().size() > 1) {
/* Provider has other dependants, so this is the tail of a sequential task.
Add provider to future working set and stop traversal. */
check_feedback(root, block);
if (num_unvisited_dependants(block) == 0) {
k.insert(block);
}
} else if (max_depth > 0) {
// Calling dependant has only this provider, add here
if (task.mode() == Task::Mode::PARALLEL) {
// Inside a parallel task, compile into a new sequential child
Task seq(Task::Mode::SEQUENTIAL);
compile_block(block, seq, max_depth, k);
task.push_front(std::move(seq));
} else {
// Prepend to given sequential task
compile_block(block, task, max_depth, k);
}
} else {
if (num_unvisited_dependants(block) == 0) {
k.insert(block);
}
}
}
void
CompiledGraph::compile_block(BlockImpl* n,
Task& task,
size_t max_depth,
std::set& k)
{
switch (n->get_mark()) {
case BlockImpl::Mark::UNVISITED:
n->set_mark(BlockImpl::Mark::VISITING);
// Execute this task after the providers to follow
task.push_front(Task(Task::Mode::SINGLE, n));
if (n->providers().size() < 2) {
// Single provider, prepend it to this sequential task
for (auto* p : n->providers()) {
compile_provider(n, p, task, max_depth - 1, k);
}
} else if (has_provider_with_many_dependants(n)) {
// Stop recursion and enqueue providers for the next round
for (auto* p : n->providers()) {
if (num_unvisited_dependants(p) == 0) {
k.insert(p);
}
}
} else {
// Multiple providers with only this node as dependant,
// make a new parallel task to execute them
Task par(Task::Mode::PARALLEL);
for (auto* p : n->providers()) {
compile_provider(n, p, par, max_depth - 1, k);
}
task.push_front(std::move(par));
}
n->set_mark(BlockImpl::Mark::VISITED);
break;
case BlockImpl::Mark::VISITING:
throw FeedbackException(n);
case BlockImpl::Mark::VISITED:
break;
}
}
void
CompiledGraph::run(RunContext& ctx)
{
_master->run(ctx);
}
void
CompiledGraph::dump(const std::string& name) const
{
auto sink = [](const std::string& s) {
fwrite(s.c_str(), 1, s.size(), stderr);
};
sink("(compiled-graph ");
sink(name);
_master->dump(sink, 2, false);
sink(")\n");
}
} // namespace ingen::server