summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ingen/Configuration.hpp1
-rw-r--r--ingen/EngineBase.hpp5
-rw-r--r--src/Configuration.cpp8
-rw-r--r--src/server/CompiledGraph.cpp98
-rw-r--r--src/server/CompiledGraph.hpp33
-rw-r--r--src/server/DirectDriver.hpp4
-rw-r--r--src/server/Driver.hpp5
-rw-r--r--src/server/Engine.cpp89
-rw-r--r--src/server/Engine.hpp19
-rw-r--r--src/server/JackDriver.cpp4
-rw-r--r--src/server/JackDriver.hpp5
-rw-r--r--src/server/PostProcessor.cpp10
-rw-r--r--src/server/RunContext.cpp38
-rw-r--r--src/server/RunContext.hpp40
-rw-r--r--src/server/Task.cpp155
-rw-r--r--src/server/Task.hpp98
-rw-r--r--src/server/ingen_lv2.cpp8
-rw-r--r--src/server/wscript1
-rw-r--r--tests/ingen_test.cpp8
19 files changed, 470 insertions, 159 deletions
diff --git a/ingen/Configuration.hpp b/ingen/Configuration.hpp
index 28b6a19d..6d4655d6 100644
--- a/ingen/Configuration.hpp
+++ b/ingen/Configuration.hpp
@@ -23,6 +23,7 @@
#include <list>
#include <map>
+#include <ostream>
#include <string>
#include "ingen/Atom.hpp"
diff --git a/ingen/EngineBase.hpp b/ingen/EngineBase.hpp
index 3a460b62..7ba4467a 100644
--- a/ingen/EngineBase.hpp
+++ b/ingen/EngineBase.hpp
@@ -68,6 +68,11 @@ public:
virtual bool pending_events() = 0;
/**
+ Locate to a given cycle.
+ */
+ virtual void locate(uint32_t start, uint32_t sample_count) = 0;
+
+ /**
Process audio for `sample_count` frames.
If the return value is non-zero, events have been processed and are
diff --git a/src/Configuration.cpp b/src/Configuration.cpp
index 13e2d722..e165118f 100644
--- a/src/Configuration.cpp
+++ b/src/Configuration.cpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -19,6 +19,7 @@
#include <string.h>
#include <iostream>
+#include <thread>
#include <glibmm/fileutils.h>
#include <glibmm/miscutils.h>
@@ -66,8 +67,9 @@ Configuration::Configuration(Forge& forge)
add("path", "path", 'L', "Target path for loaded graph", SESSION, forge.String, Atom());
add("queueSize", "queue-size", 'q', "Event queue size", GLOBAL, forge.Int, forge.make(4096));
add("flushLog", "flush-log", 'f', "Flush logs after every entry", GLOBAL, forge.Bool, forge.make(false));
- add("dump", "dump", 'd', "Print debug output", GLOBAL, forge.Bool, forge.make(false));
- add("trace", "trace", 't', "Show LV2 plugin trace messages", GLOBAL, forge.Bool, forge.make(false));
+ add("dump", "dump", 'd', "Print debug output", SESSION, forge.Bool, forge.make(false));
+ add("trace", "trace", 't', "Show LV2 plugin trace messages", SESSION, forge.Bool, forge.make(false));
+ add("threads", "threads", 'p', "Number of processing threads", GLOBAL, forge.Int, forge.make(int32_t(std::max(std::thread::hardware_concurrency(), 1U))));
add("humanNames", "human-names", 0, "Show human names in GUI", GUI, forge.Bool, forge.make(true));
add("portLabels", "port-labels", 0, "Show port labels in GUI", GUI, forge.Bool, forge.make(true));
add("graphDirectory", "graph-directory", 0, "Default directory for opening graphs", GUI, forge.String, Atom());
diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp
index 21bd5337..fdf59cd4 100644
--- a/src/server/CompiledGraph.cpp
+++ b/src/server/CompiledGraph.cpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2015 David Robillard <http://drobilla.net/>
+ Copyright 2015-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -16,6 +16,7 @@
#include <algorithm>
+#include "ingen/ColorContext.hpp"
#include "ingen/Configuration.hpp"
#include "ingen/Log.hpp"
@@ -92,7 +93,6 @@ CompiledGraph::compile_graph(GraphImpl* graph)
if (b.providers().empty()) {
// Block has no dependencies, add to initial working set
- _log.info(fmt("Initial: %1%\n") % b.path());
blocks.insert(&b);
}
}
@@ -127,8 +127,11 @@ CompiledGraph::compile_graph(GraphImpl* graph)
_master.simplify();
- if (graph->engine().world()->conf().option("dump").get<int32_t>()) {
- dump(std::cout);
+ if (graph->engine().world()->conf().option("trace").get<int32_t>()) {
+ dump([this](const std::string& msg) {
+ ColorContext ctx(stderr, ColorContext::Color::YELLOW);
+ fwrite(msg.c_str(), 1, msg.size(), stderr);
+ });
}
}
@@ -184,18 +187,8 @@ CompiledGraph::compile_dependant(const BlockImpl* root,
void
CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set<BlockImpl*>& k)
{
- static unsigned indent = 0;
-
switch (n->get_mark()) {
case BlockImpl::Mark::UNVISITED:
- indent += 4;
- for (unsigned i = 0; i < indent; ++i) {
- _log.info(" ");
- }
-
- _log.info(fmt("Compile block %1% (%2% dependants, %3% providers) {\n")
- % n->path() % n->dependants().size() % n->providers().size());
-
n->set_mark(BlockImpl::Mark::VISITING);
// Execute this task before the dependants to follow
@@ -215,11 +208,6 @@ CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set<BlockImpl*>& k)
task.push_back(par);
}
n->set_mark(BlockImpl::Mark::VISITED);
- for (unsigned i = 0; i < indent; ++i) {
- _log.info(" ");
- }
- _log.info("} " + n->path() + "\n");
- indent -= 4;
break;
case BlockImpl::Mark::VISITING:
@@ -237,74 +225,12 @@ CompiledGraph::run(RunContext& context)
}
void
-CompiledGraph::dump(std::ostream& os) const
-{
- os << "(compiled-graph " << _path;
- _master.dump(os, 2, false);
- os << ")" << std::endl;
-}
-
-void
-CompiledGraph::Task::run(RunContext& context)
-{
- switch (_mode) {
- case Mode::SINGLE:
- _block->process(context);
- break;
- case Mode::SEQUENTIAL:
- case Mode::PARALLEL:
- for (Task& task : *this) {
- task.run(context);
- }
- break;
- }
-}
-
-void
-CompiledGraph::Task::simplify()
-{
- if (_mode != Mode::SINGLE) {
- for (std::vector<Task>::iterator t = begin(); t != end();) {
- t->simplify();
- if (t->mode() != Mode::SINGLE && t->empty()) {
- // Empty task, erase
- t = erase(t);
- } else if (t->mode() == _mode) {
- // Subtask with the same type, fold child into parent
- const Task child(*t);
- t = erase(t);
- t = insert(t, child.begin(), child.end());
- } else {
- ++t;
- }
- }
-
- if (size() == 1) {
- const Task t(front());
- *this = t;
- }
- }
-}
-
-void
-CompiledGraph::Task::dump(std::ostream& os, unsigned indent, bool first) const
+CompiledGraph::dump(std::function<void (const std::string&)> sink) const
{
- if (!first) {
- os << std::endl;
- for (unsigned i = 0; i < indent; ++i) {
- os << " ";
- }
- }
-
- if (_mode == Mode::SINGLE) {
- os << _block->path();
- } else {
- os << ((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par ");
- for (size_t i = 0; i < size(); ++i) {
- (*this)[i].dump(os, indent + 5, i == 0);
- }
- os << ")";
- }
+ sink("(compiled-graph ");
+ sink(_path);
+ _master.dump(sink, 2, false);
+ sink(")\n");
}
} // namespace Server
diff --git a/src/server/CompiledGraph.hpp b/src/server/CompiledGraph.hpp
index 663752e3..7dc40865 100644
--- a/src/server/CompiledGraph.hpp
+++ b/src/server/CompiledGraph.hpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -17,7 +17,7 @@
#ifndef INGEN_ENGINE_COMPILEDGRAPH_HPP
#define INGEN_ENGINE_COMPILEDGRAPH_HPP
-#include <ostream>
+#include <functional>
#include <set>
#include <vector>
@@ -25,6 +25,8 @@
#include "raul/Noncopyable.hpp"
#include "raul/Path.hpp"
+#include "Task.hpp"
+
namespace Ingen {
class Log;
@@ -45,36 +47,11 @@ class CompiledGraph : public Raul::Maid::Disposable
, public Raul::Noncopyable
{
public:
- class Task : public std::vector<Task> {
- public:
- enum class Mode {
- SINGLE, ///< Single block to run
- SEQUENTIAL, ///< Elements must be run sequentially in order
- PARALLEL ///< Elements may be run in any order in parallel
- };
-
- Task(Mode mode, BlockImpl* block=NULL)
- : _mode(mode)
- , _block(block)
- {}
-
- void run(RunContext& context);
- void dump(std::ostream& os, unsigned indent, bool first) const;
- void simplify();
-
- Mode mode() const { return _mode; }
- BlockImpl* block() const { return _block; }
-
- private:
- Mode _mode; ///< Execution mode
- BlockImpl* _block; ///< Used for SINGLE only
- };
-
static CompiledGraph* compile(GraphImpl* graph);
void run(RunContext& context);
- void dump(std::ostream& os) const;
+ void dump(std::function<void (const std::string&)> sink) const;
private:
CompiledGraph(GraphImpl* graph);
diff --git a/src/server/DirectDriver.hpp b/src/server/DirectDriver.hpp
index a9800947..339d9987 100644
--- a/src/server/DirectDriver.hpp
+++ b/src/server/DirectDriver.hpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -84,6 +84,8 @@ public:
virtual void append_time_events(RunContext& context,
Buffer& buffer) {}
+ virtual int real_time_priority() { return 60; }
+
private:
typedef boost::intrusive::list<EnginePort> Ports;
diff --git a/src/server/Driver.hpp b/src/server/Driver.hpp
index dc4eee1d..5a8fbed0 100644
--- a/src/server/Driver.hpp
+++ b/src/server/Driver.hpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -96,6 +96,9 @@ public:
/** Append time events for this cycle to `buffer`. */
virtual void append_time_events(RunContext& context,
Buffer& buffer) = 0;
+
+ /** Return the real-time priority of the audio thread, or -1. */
+ virtual int real_time_priority() = 0;
};
} // namespace Server
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index 259dbffd..1a0ab0a0 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -83,7 +83,6 @@ Engine::Engine(Ingen::World* world)
, _worker(new Worker(world->log(), event_queue_size()))
, _sync_worker(new Worker(world->log(), event_queue_size(), true))
, _listener(NULL)
- , _run_context(*this)
, _rand_engine(0)
, _uniform_dist(0.0f, 1.0f)
, _quit_flag(false)
@@ -96,6 +95,10 @@ Engine::Engine(Ingen::World* world)
_control_bindings = new ControlBindings(*this);
+ for (int i = 0; i < world->conf().option("threads").get<int32_t>(); ++i) {
+ _run_contexts.push_back(new RunContext(*this, i, i > 0));
+ }
+
_world->lv2_features().add_feature(_worker->schedule_feature());
_world->lv2_features().add_feature(_options);
_world->lv2_features().add_feature(
@@ -134,11 +137,12 @@ Engine::~Engine()
// Process all pending events
const FrameTime end = std::numeric_limits<FrameTime>::max();
- _run_context.locate(_run_context.end(), end - _run_context.end());
+ RunContext& ctx = run_context();
+ locate(ctx.end(), end - ctx.end());
_post_processor->set_end_time(end);
_post_processor->process();
while (!_pre_processor->empty()) {
- _pre_processor->process(_run_context, *_post_processor, 1);
+ _pre_processor->process(ctx, *_post_processor, 1);
_post_processor->process();
}
@@ -181,6 +185,64 @@ Engine::listen()
#endif
}
+void
+Engine::locate(FrameTime s, SampleCount nframes)
+{
+ for (RunContext* ctx : _run_contexts) {
+ ctx->locate(s, nframes);
+ }
+}
+
+void
+Engine::emit_notifications(FrameTime end)
+{
+ for (RunContext* ctx : _run_contexts) {
+ ctx->emit_notifications(end);
+ }
+}
+
+bool
+Engine::pending_notifications()
+{
+ for (const RunContext* ctx : _run_contexts) {
+ if (ctx->pending_notifications()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool
+Engine::wait_for_tasks()
+{
+ std::unique_lock<std::mutex> lock(_tasks_mutex);
+ _tasks_available.wait(lock);
+ return !_quit_flag;
+}
+
+void
+Engine::signal_tasks()
+{
+ _tasks_available.notify_all();
+}
+
+Task*
+Engine::steal_task(unsigned start_thread)
+{
+ for (unsigned i = 0; i < _run_contexts.size(); ++i) {
+ const unsigned id = (start_thread + i) % _run_contexts.size();
+ RunContext* const ctx = _run_contexts[id];
+ Task* par = ctx->task();
+ if (par) {
+ Task* t = par->steal(*ctx);
+ if (t) {
+ return t;
+ }
+ }
+ }
+ return NULL;
+}
+
SPtr<Store>
Engine::store() const
{
@@ -211,6 +273,9 @@ void
Engine::set_driver(SPtr<Driver> driver)
{
_driver = driver;
+ for (RunContext* ctx : _run_contexts) {
+ ctx->set_priority(driver->real_time_priority());
+ }
}
SampleCount
@@ -221,7 +286,7 @@ Engine::event_time()
}
const SampleCount start = _direct_driver
- ? _run_context.start()
+ ? run_context().start()
: _driver->frame_time();
/* Exactly one cycle latency (some could run ASAP if we get lucky, but not
@@ -268,7 +333,7 @@ Engine::activate()
*this, SPtr<Interface>(), -1, 0, Raul::Path("/"), graph_properties);
// Execute in "fake" process context (we are single threaded)
- RunContext context(*this);
+ RunContext context(run_context());
ev.pre_process();
ev.execute(context);
ev.post_process();
@@ -301,13 +366,13 @@ Engine::deactivate()
unsigned
Engine::run(uint32_t sample_count)
{
- _run_context.locate(_run_context.end(), sample_count);
+ RunContext& ctx = run_context();
// Apply control bindings to input
control_bindings()->pre_process(
- _run_context, _root_graph->port_impl(0)->buffer(0).get());
+ ctx, _root_graph->port_impl(0)->buffer(0).get());
- post_processor()->set_end_time(_run_context.end());
+ post_processor()->set_end_time(ctx.end());
// Process events that came in during the last cycle
// (Aiming for jitter-free 1 block event latency, ideally)
@@ -315,11 +380,11 @@ Engine::run(uint32_t sample_count)
// Run root graph
if (_root_graph) {
- _root_graph->process(_run_context);
+ _root_graph->process(ctx);
// Emit control binding feedback
control_bindings()->post_process(
- _run_context, _root_graph->port_impl(1)->buffer(0).get());
+ ctx, _root_graph->port_impl(1)->buffer(0).get());
}
return n_processed_events;
@@ -340,9 +405,9 @@ Engine::enqueue_event(Event* ev, Event::Mode mode)
unsigned
Engine::process_events()
{
- const size_t MAX_EVENTS_PER_CYCLE = _run_context.nframes() / 8;
+ const size_t MAX_EVENTS_PER_CYCLE = run_context().nframes() / 8;
return _pre_processor->process(
- _run_context, *_post_processor, MAX_EVENTS_PER_CYCLE);
+ run_context(), *_post_processor, MAX_EVENTS_PER_CYCLE);
}
void
diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp
index 16699293..2d5e053e 100644
--- a/src/server/Engine.hpp
+++ b/src/server/Engine.hpp
@@ -17,9 +17,10 @@
#ifndef INGEN_ENGINE_ENGINE_HPP
#define INGEN_ENGINE_ENGINE_HPP
-#include <random>
-
#include <boost/utility.hpp>
+#include <condition_variable>
+#include <mutex>
+#include <random>
#include "ingen/EngineBase.hpp"
#include "ingen/Interface.hpp"
@@ -115,7 +116,14 @@ public:
Worker* worker() const { return _worker; }
Worker* sync_worker() const { return _sync_worker; }
- RunContext& run_context() { return _run_context; }
+ RunContext& run_context() { return *_run_contexts[0]; }
+
+ void locate(FrameTime s, SampleCount nframes);
+ void emit_notifications(FrameTime end);
+ bool pending_notifications();
+ bool wait_for_tasks();
+ void signal_tasks();
+ Task* steal_task(unsigned start_thread);
SPtr<Store> store() const;
@@ -144,11 +152,14 @@ private:
Worker* _sync_worker;
SocketListener* _listener;
- RunContext _run_context;
+ std::vector<RunContext*> _run_contexts;
std::mt19937 _rand_engine;
std::uniform_real_distribution<float> _uniform_dist;
+ std::condition_variable _tasks_available;
+ std::mutex _tasks_mutex;
+
bool _quit_flag;
bool _direct_driver;
bool _atomic_bundles;
diff --git a/src/server/JackDriver.cpp b/src/server/JackDriver.cpp
index db567866..7f0e7ae0 100644
--- a/src/server/JackDriver.cpp
+++ b/src/server/JackDriver.cpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -479,7 +479,7 @@ JackDriver::_process_cb(jack_nframes_t nframes)
_transport_state = jack_transport_query(_client, &_position);
- _engine.run_context().locate(start_of_current_cycle, nframes);
+ _engine.locate(start_of_current_cycle, nframes);
// Read input
for (auto& p : _ports) {
diff --git a/src/server/JackDriver.hpp b/src/server/JackDriver.hpp
index ef7bbd78..2d50d892 100644
--- a/src/server/JackDriver.hpp
+++ b/src/server/JackDriver.hpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -23,6 +23,7 @@
#include <atomic>
#include <jack/jack.h>
+#include <jack/thread.h>
#include <jack/transport.h>
#ifdef INGEN_JACK_SESSION
#include <jack/session.h>
@@ -87,6 +88,8 @@ public:
void append_time_events(RunContext& context,
Buffer& buffer);
+ int real_time_priority() { return jack_client_real_time_priority(_client); }
+
jack_client_t* jack_client() const { return _client; }
SampleCount block_length() const { return _block_length; }
size_t seq_size() const { return _seq_size; }
diff --git a/src/server/PostProcessor.cpp b/src/server/PostProcessor.cpp
index d6016105..65323f7f 100644
--- a/src/server/PostProcessor.cpp
+++ b/src/server/PostProcessor.cpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -70,7 +70,7 @@ PostProcessor::append(RunContext& context, Event* first, Event* last)
bool
PostProcessor::pending() const
{
- return _head.load() || _engine.run_context().pending_notifications();
+ return _head.load() || _engine.pending_notifications();
}
void
@@ -85,7 +85,7 @@ PostProcessor::process()
Event* next = ev->next();
if (!next || next->time() >= end_time) {
// Process audio thread notifications until end
- _engine.run_context().emit_notifications(end_time);
+ _engine.emit_notifications(end_time);
return;
}
@@ -95,7 +95,7 @@ PostProcessor::process()
ev = next;
// Process audio thread notifications up until this event's time
- _engine.run_context().emit_notifications(ev->time());
+ _engine.emit_notifications(ev->time());
// Post-process event
ev->post_process();
@@ -109,7 +109,7 @@ PostProcessor::process()
_head = ev;
// Process remaining audio thread notifications until end
- _engine.run_context().emit_notifications(end_time);
+ _engine.emit_notifications(end_time);
}
} // namespace Server
diff --git a/src/server/RunContext.cpp b/src/server/RunContext.cpp
index ee1eaf04..e107d247 100644
--- a/src/server/RunContext.cpp
+++ b/src/server/RunContext.cpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -20,9 +20,11 @@
#include "Broadcaster.hpp"
#include "BufferFactory.hpp"
+#include "Driver.hpp"
#include "Engine.hpp"
#include "PortImpl.hpp"
#include "RunContext.hpp"
+#include "Task.hpp"
namespace Ingen {
namespace Server {
@@ -44,10 +46,13 @@ struct Notification
LV2_URID type;
};
-RunContext::RunContext(Engine& engine)
+RunContext::RunContext(Engine& engine, unsigned id, bool threaded)
: _engine(engine)
, _event_sink(
new Raul::RingBuffer(engine.event_queue_size() * sizeof(Notification)))
+ , _task(nullptr)
+ , _thread(threaded ? new std::thread(&RunContext::run, this) : nullptr)
+ , _id(id)
, _start(0)
, _end(0)
, _offset(0)
@@ -59,6 +64,9 @@ RunContext::RunContext(Engine& engine)
RunContext::RunContext(const RunContext& copy)
: _engine(copy._engine)
, _event_sink(copy._event_sink)
+ , _task(nullptr)
+ , _thread(nullptr)
+ , _id(copy._id)
, _start(copy._start)
, _end(copy._end)
, _offset(copy._offset)
@@ -138,5 +146,31 @@ RunContext::emit_notifications(FrameTime end)
}
}
+void
+RunContext::set_priority(int priority)
+{
+ if (_thread) {
+ pthread_t pthread = _thread->native_handle();
+ const int policy = (priority > 0) ? SCHED_FIFO : SCHED_OTHER;
+ sched_param sp;
+ sp.sched_priority = (priority > 0) ? priority : 0;
+ if (pthread_setschedparam(pthread, policy, &sp)) {
+ _engine.log().error(
+ fmt("Failed to set real-time priority of run thread (%s)\n")
+ % strerror(errno));
+ }
+ }
+}
+
+void
+RunContext::run()
+{
+ while (_engine.wait_for_tasks()) {
+ for (Task* t; (t = _engine.steal_task(0));) {
+ t->run(*this);
+ }
+ }
+}
+
} // namespace Server
} // namespace Ingen
diff --git a/src/server/RunContext.hpp b/src/server/RunContext.hpp
index 803d7829..803e0c3b 100644
--- a/src/server/RunContext.hpp
+++ b/src/server/RunContext.hpp
@@ -17,6 +17,8 @@
#ifndef INGEN_ENGINE_RUNCONTEXT_HPP
#define INGEN_ENGINE_RUNCONTEXT_HPP
+#include <thread>
+
#include "ingen/Atom.hpp"
#include "ingen/World.hpp"
#include "raul/RingBuffer.hpp"
@@ -28,6 +30,7 @@ namespace Server {
class Engine;
class PortImpl;
+class Task;
/** Graph execution context.
*
@@ -44,8 +47,19 @@ class PortImpl;
class RunContext
{
public:
- RunContext(Engine& engine);
- RunContext(const RunContext& copy);
+ /** Create a new run context.
+ *
+ * @param threaded If true, then this context is a worker which will launch
+ * a thread and execute tasks as they become available.
+ */
+ RunContext(Engine& engine, unsigned id, bool threaded);
+
+ /** Create a sub-context of `parent`.
+ *
+ * This is used to subdivide process cycles, the sub-context is
+ * lightweight and only serves to pass different time attributes.
+ */
+ RunContext(const RunContext& parent);
virtual ~RunContext();
@@ -79,18 +93,20 @@ public:
_nframes = nframes;
}
- inline void locate(const RunContext& other) {
- _start = other._start;
- _end = other._end;
- _nframes = other._nframes;
- }
-
inline void slice(SampleCount offset, SampleCount nframes) {
_offset = offset;
_nframes = nframes;
}
+ inline void set_task(Task* task) {
+ _task = task;
+ }
+
+ void set_priority(int priority);
+
inline Engine& engine() const { return _engine; }
+ inline Task* task() const { return _task; }
+ inline unsigned id() const { return _id; }
inline FrameTime start() const { return _start; }
inline FrameTime time() const { return _start + _offset; }
inline FrameTime end() const { return _end; }
@@ -101,9 +117,13 @@ public:
protected:
const RunContext& operator=(const RunContext& copy) = delete;
- Engine& _engine; ///< Engine we're running in
+ void run();
- Raul::RingBuffer* _event_sink; ///< Port updates from process context
+ Engine& _engine; ///< Engine we're running in
+ Raul::RingBuffer* _event_sink; ///< Port updates from process context
+ Task* _task; ///< Currently executing task
+ std::thread* _thread; ///< Thread (NULL for main run context)
+ unsigned _id; ///< Context ID
FrameTime _start; ///< Start frame of this cycle, timeline relative
FrameTime _end; ///< End frame of this cycle, timeline relative
diff --git a/src/server/Task.cpp b/src/server/Task.cpp
new file mode 100644
index 00000000..e15a4d4a
--- /dev/null
+++ b/src/server/Task.cpp
@@ -0,0 +1,155 @@
+/*
+ This file is part of Ingen.
+ Copyright 2015-2016 David Robillard <http://drobilla.net/>
+
+ Ingen is free software: you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free
+ Software Foundation, either version 3 of the License, or any later version.
+
+ Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with Ingen. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "BlockImpl.hpp"
+#include "Engine.hpp"
+#include "Task.hpp"
+
+namespace Ingen {
+namespace Server {
+
+void
+Task::run(RunContext& context)
+{
+ switch (_mode) {
+ case Mode::SINGLE:
+ // fprintf(stderr, "%u run %s\n", context.id(), _block->path().c_str());
+ _block->process(context);
+ break;
+ case Mode::SEQUENTIAL:
+ for (Task& task : *this) {
+ task.run(context);
+ }
+ break;
+ case Mode::PARALLEL:
+ // Initialize (not) done state of sub-tasks
+ for (Task& task : *this) {
+ task.set_done(false);
+ }
+
+ // Grab the first sub-task
+ _next = 0;
+ _done_end = 0;
+ Task* t = steal(context);
+
+ // Allow other threads to steal sub-tasks
+ context.set_task(this);
+ context.engine().signal_tasks();
+
+ // Run available tasks until this task is finished
+ for (; t; t = get_task(context)) {
+ t->run(context);
+ }
+ context.set_task(nullptr);
+ break;
+ }
+
+ set_done(true);
+}
+
+Task*
+Task::steal(RunContext& context)
+{
+ if (_mode == Mode::PARALLEL) {
+ const unsigned i = _next++;
+ if (i < size()) {
+ return &(*this)[i];
+ }
+ }
+
+ return NULL;
+}
+
+Task*
+Task::get_task(RunContext& context)
+{
+ // Attempt to "steal" a task from ourselves
+ Task* t = steal(context);
+ if (t) {
+ return t;
+ }
+
+ while (true) {
+ // Push done end index as forward as possible
+ for (; (*this)[_done_end].done(); ++_done_end) {}
+
+ if (_done_end >= size()) {
+ return NULL; // All child tasks are finished
+ }
+
+ // All child tasks claimed, but some are unfinished, steal a task
+ t = context.engine().steal_task(context.id() + 1);
+ if (t) {
+ return t;
+ }
+
+ /* All child tasks are claimed, and we failed to steal any tasks. Spin
+ to prevent blocking, though it would probably be wiser to wait for a
+ signal in non-main threads, and maybe even in the main thread
+ depending on your real-time safe philosophy... more experimentation
+ here is needed. */
+ }
+}
+
+void
+Task::simplify()
+{
+ if (_mode != Mode::SINGLE) {
+ for (std::vector<Task>::iterator t = begin(); t != end();) {
+ t->simplify();
+ if (t->mode() != Mode::SINGLE && t->empty()) {
+ // Empty task, erase
+ t = erase(t);
+ } else if (t->mode() == _mode) {
+ // Subtask with the same type, fold child into parent
+ const Task child(*t);
+ t = erase(t);
+ t = insert(t, child.begin(), child.end());
+ } else {
+ ++t;
+ }
+ }
+
+ if (size() == 1) {
+ const Task t(front());
+ *this = t;
+ }
+ }
+}
+
+void
+Task::dump(std::function<void (const std::string&)> sink, unsigned indent, bool first) const
+{
+ if (!first) {
+ sink("\n");
+ for (unsigned i = 0; i < indent; ++i) {
+ sink(" ");
+ }
+ }
+
+ if (_mode == Mode::SINGLE) {
+ sink(_block->path());
+ } else {
+ sink(((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par "));
+ for (size_t i = 0; i < size(); ++i) {
+ (*this)[i].dump(sink, indent + 5, i == 0);
+ }
+ sink(")");
+ }
+}
+
+} // namespace Server
+} // namespace Ingen
diff --git a/src/server/Task.hpp b/src/server/Task.hpp
new file mode 100644
index 00000000..2c1a0cc2
--- /dev/null
+++ b/src/server/Task.hpp
@@ -0,0 +1,98 @@
+/*
+ This file is part of Ingen.
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
+
+ Ingen is free software: you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free
+ Software Foundation, either version 3 of the License, or any later version.
+
+ Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with Ingen. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef INGEN_ENGINE_TASK_HPP
+#define INGEN_ENGINE_TASK_HPP
+
+#include <cassert>
+#include <ostream>
+#include <vector>
+
+namespace Ingen {
+namespace Server {
+
+class BlockImpl;
+class RunContext;
+
+class Task : public std::vector<Task> {
+public:
+ enum class Mode {
+ SINGLE, ///< Single block to run
+ SEQUENTIAL, ///< Elements must be run sequentially in order
+ PARALLEL ///< Elements may be run in any order in parallel
+ };
+
+ Task(Mode mode, BlockImpl* block=NULL)
+ : _block(block)
+ , _mode(mode)
+ , _done_end(0)
+ , _next(0)
+ , _done(false)
+ {
+ assert(!(mode == Mode::SINGLE && !block));
+ }
+
+ Task& operator=(const Task& copy) {
+ *static_cast<std::vector<Task>*>(this) = copy;
+ _block = copy._block;
+ _mode = copy._mode;
+ _done_end = copy._done_end;
+ _next = copy._next.load();
+ _done = copy._done.load();
+ return *this;
+ }
+
+ Task(const Task& copy)
+ : std::vector<Task>(copy)
+ , _block(copy._block)
+ , _mode(copy._mode)
+ , _done_end(copy._done_end)
+ , _next(copy._next.load())
+ , _done(copy._done.load())
+ {}
+
+ /** Run task in the given context. */
+ void run(RunContext& context);
+
+ /** Pretty print task to the given stream (recursively). */
+ void dump(std::function<void (const std::string&)> sink, unsigned indent, bool first) const;
+
+ /** Simplify task expression. */
+ void simplify();
+
+ /** Steal a child task from this task (succeeds for PARALLEL only). */
+ Task* steal(RunContext& context);
+
+ Mode mode() const { return _mode; }
+ BlockImpl* block() const { return _block; }
+ bool done() const { return _done; }
+
+ void set_done(bool done) { _done = done; }
+
+private:
+ Task* get_task(RunContext& context);
+
+ BlockImpl* _block; ///< Used for SINGLE only
+ Mode _mode; ///< Execution mode
+ unsigned _done_end; ///< Index of rightmost done sub-task
+ std::atomic<unsigned> _next; ///< Index of next sub-task
+ std::atomic<bool> _done; ///< Completion phase
+};
+
+} // namespace Server
+} // namespace Ingen
+
+#endif // INGEN_ENGINE_TASK_HPP
diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp
index 0c32731c..a21cb4c9 100644
--- a/src/server/ingen_lv2.cpp
+++ b/src/server/ingen_lv2.cpp
@@ -1,6 +1,6 @@
/*
This file is part of Ingen.
- Copyright 2007-2015 David Robillard <http://drobilla.net/>
+ Copyright 2007-2016 David Robillard <http://drobilla.net/>
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
@@ -161,7 +161,7 @@ public:
}
void run(uint32_t nframes) {
- _engine.run_context().locate(_frame_time, nframes);
+ _engine.locate(_frame_time, nframes);
// Notify buffer is a Chunk with size set to the available space
_notify_capacity = ((LV2_Atom_Sequence*)_ports[1]->buffer())->atom.size;
@@ -256,6 +256,8 @@ public:
}
}
+ virtual int real_time_priority() { return 60; }
+
/** Called in run thread for events received at control input port. */
bool enqueue_message(const LV2_Atom* atom) {
if (_from_ui.write(lv2_atom_total_size(atom), atom) == 0) {
@@ -558,7 +560,7 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
engine->activate();
Server::ThreadManager::single_threaded = true;
- engine->run_context().locate(0, block_length);
+ engine->locate(0, block_length);
engine->post_processor()->set_end_time(block_length);
engine->process_events();
diff --git a/src/server/wscript b/src/server/wscript
index fad48330..68c1c5f1 100644
--- a/src/server/wscript
+++ b/src/server/wscript
@@ -28,6 +28,7 @@ def build(bld):
PreProcessor.cpp
RunContext.cpp
SocketListener.cpp
+ Task.cpp
UndoStack.cpp
Worker.cpp
events/Connect.cpp
diff --git a/tests/ingen_test.cpp b/tests/ingen_test.cpp
index 12e9e9f9..6633e5c5 100644
--- a/tests/ingen_test.cpp
+++ b/tests/ingen_test.cpp
@@ -135,10 +135,16 @@ ingen_try(bool cond, const char* msg)
static void
flush_events(Ingen::World* world)
{
+ static const uint32_t block_length = 4096;
+ int count = 0;
+ uint32_t offset = 0;
while (world->engine()->pending_events()) {
- world->engine()->run(4096);
+ world->engine()->locate(offset, block_length);
+ world->engine()->run(block_length);
world->engine()->main_iteration();
g_usleep(1000);
+ ++count;
+ offset += block_length;
}
}