summaryrefslogtreecommitdiffstats
path: root/src/server/Engine.cpp
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2016-09-12 22:37:22 +0800
committerDavid Robillard <d@drobilla.net>2016-10-02 12:24:56 -0400
commit9b8bce71893ef450992f82a28a6a0287c479baaf (patch)
tree8c9c240e32f8201d2f999a2de2baaca6281783c3 /src/server/Engine.cpp
parent938456884934a74a2850c02edc17575021131709 (diff)
downloadingen-9b8bce71893ef450992f82a28a6a0287c479baaf.tar.gz
ingen-9b8bce71893ef450992f82a28a6a0287c479baaf.tar.bz2
ingen-9b8bce71893ef450992f82a28a6a0287c479baaf.zip
Add parallel graph execution
Diffstat (limited to 'src/server/Engine.cpp')
-rw-r--r--src/server/Engine.cpp89
1 files changed, 77 insertions, 12 deletions
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index 259dbffd..1a0ab0a0 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -83,7 +83,6 @@ Engine::Engine(Ingen::World* world)
, _worker(new Worker(world->log(), event_queue_size()))
, _sync_worker(new Worker(world->log(), event_queue_size(), true))
, _listener(NULL)
- , _run_context(*this)
, _rand_engine(0)
, _uniform_dist(0.0f, 1.0f)
, _quit_flag(false)
@@ -96,6 +95,10 @@ Engine::Engine(Ingen::World* world)
_control_bindings = new ControlBindings(*this);
+ for (int i = 0; i < world->conf().option("threads").get<int32_t>(); ++i) {
+ _run_contexts.push_back(new RunContext(*this, i, i > 0));
+ }
+
_world->lv2_features().add_feature(_worker->schedule_feature());
_world->lv2_features().add_feature(_options);
_world->lv2_features().add_feature(
@@ -134,11 +137,12 @@ Engine::~Engine()
// Process all pending events
const FrameTime end = std::numeric_limits<FrameTime>::max();
- _run_context.locate(_run_context.end(), end - _run_context.end());
+ RunContext& ctx = run_context();
+ locate(ctx.end(), end - ctx.end());
_post_processor->set_end_time(end);
_post_processor->process();
while (!_pre_processor->empty()) {
- _pre_processor->process(_run_context, *_post_processor, 1);
+ _pre_processor->process(ctx, *_post_processor, 1);
_post_processor->process();
}
@@ -181,6 +185,64 @@ Engine::listen()
#endif
}
+void
+Engine::locate(FrameTime s, SampleCount nframes)
+{
+ for (RunContext* ctx : _run_contexts) {
+ ctx->locate(s, nframes);
+ }
+}
+
+void
+Engine::emit_notifications(FrameTime end)
+{
+ for (RunContext* ctx : _run_contexts) {
+ ctx->emit_notifications(end);
+ }
+}
+
+bool
+Engine::pending_notifications()
+{
+ for (const RunContext* ctx : _run_contexts) {
+ if (ctx->pending_notifications()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool
+Engine::wait_for_tasks()
+{
+ std::unique_lock<std::mutex> lock(_tasks_mutex);
+ _tasks_available.wait(lock);
+ return !_quit_flag;
+}
+
+void
+Engine::signal_tasks()
+{
+ _tasks_available.notify_all();
+}
+
+Task*
+Engine::steal_task(unsigned start_thread)
+{
+ for (unsigned i = 0; i < _run_contexts.size(); ++i) {
+ const unsigned id = (start_thread + i) % _run_contexts.size();
+ RunContext* const ctx = _run_contexts[id];
+ Task* par = ctx->task();
+ if (par) {
+ Task* t = par->steal(*ctx);
+ if (t) {
+ return t;
+ }
+ }
+ }
+ return NULL;
+}
+
SPtr<Store>
Engine::store() const
{
@@ -211,6 +273,9 @@ void
Engine::set_driver(SPtr<Driver> driver)
{
_driver = driver;
+ for (RunContext* ctx : _run_contexts) {
+ ctx->set_priority(driver->real_time_priority());
+ }
}
SampleCount
@@ -221,7 +286,7 @@ Engine::event_time()
}
const SampleCount start = _direct_driver
- ? _run_context.start()
+ ? run_context().start()
: _driver->frame_time();
/* Exactly one cycle latency (some could run ASAP if we get lucky, but not
@@ -268,7 +333,7 @@ Engine::activate()
*this, SPtr<Interface>(), -1, 0, Raul::Path("/"), graph_properties);
// Execute in "fake" process context (we are single threaded)
- RunContext context(*this);
+ RunContext context(run_context());
ev.pre_process();
ev.execute(context);
ev.post_process();
@@ -301,13 +366,13 @@ Engine::deactivate()
unsigned
Engine::run(uint32_t sample_count)
{
- _run_context.locate(_run_context.end(), sample_count);
+ RunContext& ctx = run_context();
// Apply control bindings to input
control_bindings()->pre_process(
- _run_context, _root_graph->port_impl(0)->buffer(0).get());
+ ctx, _root_graph->port_impl(0)->buffer(0).get());
- post_processor()->set_end_time(_run_context.end());
+ post_processor()->set_end_time(ctx.end());
// Process events that came in during the last cycle
// (Aiming for jitter-free 1 block event latency, ideally)
@@ -315,11 +380,11 @@ Engine::run(uint32_t sample_count)
// Run root graph
if (_root_graph) {
- _root_graph->process(_run_context);
+ _root_graph->process(ctx);
// Emit control binding feedback
control_bindings()->post_process(
- _run_context, _root_graph->port_impl(1)->buffer(0).get());
+ ctx, _root_graph->port_impl(1)->buffer(0).get());
}
return n_processed_events;
@@ -340,9 +405,9 @@ Engine::enqueue_event(Event* ev, Event::Mode mode)
unsigned
Engine::process_events()
{
- const size_t MAX_EVENTS_PER_CYCLE = _run_context.nframes() / 8;
+ const size_t MAX_EVENTS_PER_CYCLE = run_context().nframes() / 8;
return _pre_processor->process(
- _run_context, *_post_processor, MAX_EVENTS_PER_CYCLE);
+ run_context(), *_post_processor, MAX_EVENTS_PER_CYCLE);
}
void