From fed7aaf0901e1b26dcc2bbb222f67b11f6b9d291 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Mon, 12 Sep 2016 22:37:22 +0800 Subject: Add parallel graph execution --- src/Configuration.cpp | 8 ++- src/server/CompiledGraph.cpp | 98 ++++----------------------- src/server/CompiledGraph.hpp | 33 ++------- src/server/DirectDriver.hpp | 4 +- src/server/Driver.hpp | 5 +- src/server/Engine.cpp | 89 +++++++++++++++++++++---- src/server/Engine.hpp | 19 ++++-- src/server/JackDriver.cpp | 4 +- src/server/JackDriver.hpp | 5 +- src/server/PostProcessor.cpp | 10 +-- src/server/RunContext.cpp | 38 ++++++++++- src/server/RunContext.hpp | 40 ++++++++--- src/server/Task.cpp | 155 +++++++++++++++++++++++++++++++++++++++++++ src/server/Task.hpp | 98 +++++++++++++++++++++++++++ src/server/ingen_lv2.cpp | 8 ++- src/server/wscript | 1 + 16 files changed, 457 insertions(+), 158 deletions(-) create mode 100644 src/server/Task.cpp create mode 100644 src/server/Task.hpp (limited to 'src') diff --git a/src/Configuration.cpp b/src/Configuration.cpp index 13e2d722..e165118f 100644 --- a/src/Configuration.cpp +++ b/src/Configuration.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -66,8 +67,9 @@ Configuration::Configuration(Forge& forge) add("path", "path", 'L', "Target path for loaded graph", SESSION, forge.String, Atom()); add("queueSize", "queue-size", 'q', "Event queue size", GLOBAL, forge.Int, forge.make(4096)); add("flushLog", "flush-log", 'f', "Flush logs after every entry", GLOBAL, forge.Bool, forge.make(false)); - add("dump", "dump", 'd', "Print debug output", GLOBAL, forge.Bool, forge.make(false)); - add("trace", "trace", 't', "Show LV2 plugin trace messages", GLOBAL, forge.Bool, forge.make(false)); + add("dump", "dump", 'd', "Print debug output", SESSION, forge.Bool, forge.make(false)); + add("trace", "trace", 't', "Show LV2 plugin trace messages", SESSION, forge.Bool, forge.make(false)); + add("threads", "threads", 'p', "Number of processing threads", GLOBAL, forge.Int, forge.make(int32_t(std::max(std::thread::hardware_concurrency(), 1U)))); add("humanNames", "human-names", 0, "Show human names in GUI", GUI, forge.Bool, forge.make(true)); add("portLabels", "port-labels", 0, "Show port labels in GUI", GUI, forge.Bool, forge.make(true)); add("graphDirectory", "graph-directory", 0, "Default directory for opening graphs", GUI, forge.String, Atom()); diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp index 21bd5337..fdf59cd4 100644 --- a/src/server/CompiledGraph.cpp +++ b/src/server/CompiledGraph.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2015 David Robillard + Copyright 2015-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -16,6 +16,7 @@ #include +#include "ingen/ColorContext.hpp" #include "ingen/Configuration.hpp" #include "ingen/Log.hpp" @@ -92,7 +93,6 @@ CompiledGraph::compile_graph(GraphImpl* graph) if (b.providers().empty()) { // Block has no dependencies, add to initial working set - _log.info(fmt("Initial: %1%\n") % b.path()); blocks.insert(&b); } } @@ -127,8 +127,11 @@ CompiledGraph::compile_graph(GraphImpl* graph) _master.simplify(); - if (graph->engine().world()->conf().option("dump").get()) { - dump(std::cout); + if (graph->engine().world()->conf().option("trace").get()) { + dump([this](const std::string& msg) { + ColorContext ctx(stderr, ColorContext::Color::YELLOW); + fwrite(msg.c_str(), 1, msg.size(), stderr); + }); } } @@ -184,18 +187,8 @@ CompiledGraph::compile_dependant(const BlockImpl* root, void CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set& k) { - static unsigned indent = 0; - switch (n->get_mark()) { case BlockImpl::Mark::UNVISITED: - indent += 4; - for (unsigned i = 0; i < indent; ++i) { - _log.info(" "); - } - - _log.info(fmt("Compile block %1% (%2% dependants, %3% providers) {\n") - % n->path() % n->dependants().size() % n->providers().size()); - n->set_mark(BlockImpl::Mark::VISITING); // Execute this task before the dependants to follow @@ -215,11 +208,6 @@ CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set& k) task.push_back(par); } n->set_mark(BlockImpl::Mark::VISITED); - for (unsigned i = 0; i < indent; ++i) { - _log.info(" "); - } - _log.info("} " + n->path() + "\n"); - indent -= 4; break; case BlockImpl::Mark::VISITING: @@ -237,74 +225,12 @@ CompiledGraph::run(RunContext& context) } void -CompiledGraph::dump(std::ostream& os) const -{ - os << "(compiled-graph " << _path; - _master.dump(os, 2, false); - os << ")" << std::endl; -} - -void -CompiledGraph::Task::run(RunContext& context) -{ - switch (_mode) { - case Mode::SINGLE: - _block->process(context); - break; - case Mode::SEQUENTIAL: - case Mode::PARALLEL: - for (Task& task : *this) { - task.run(context); - } - break; - } -} - -void -CompiledGraph::Task::simplify() -{ - if (_mode != Mode::SINGLE) { - for (std::vector::iterator t = begin(); t != end();) { - t->simplify(); - if (t->mode() != Mode::SINGLE && t->empty()) { - // Empty task, erase - t = erase(t); - } else if (t->mode() == _mode) { - // Subtask with the same type, fold child into parent - const Task child(*t); - t = erase(t); - t = insert(t, child.begin(), child.end()); - } else { - ++t; - } - } - - if (size() == 1) { - const Task t(front()); - *this = t; - } - } -} - -void -CompiledGraph::Task::dump(std::ostream& os, unsigned indent, bool first) const +CompiledGraph::dump(std::function sink) const { - if (!first) { - os << std::endl; - for (unsigned i = 0; i < indent; ++i) { - os << " "; - } - } - - if (_mode == Mode::SINGLE) { - os << _block->path(); - } else { - os << ((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par "); - for (size_t i = 0; i < size(); ++i) { - (*this)[i].dump(os, indent + 5, i == 0); - } - os << ")"; - } + sink("(compiled-graph "); + sink(_path); + _master.dump(sink, 2, false); + sink(")\n"); } } // namespace Server diff --git a/src/server/CompiledGraph.hpp b/src/server/CompiledGraph.hpp index 663752e3..7dc40865 100644 --- a/src/server/CompiledGraph.hpp +++ b/src/server/CompiledGraph.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -17,7 +17,7 @@ #ifndef INGEN_ENGINE_COMPILEDGRAPH_HPP #define INGEN_ENGINE_COMPILEDGRAPH_HPP -#include +#include #include #include @@ -25,6 +25,8 @@ #include "raul/Noncopyable.hpp" #include "raul/Path.hpp" +#include "Task.hpp" + namespace Ingen { class Log; @@ -45,36 +47,11 @@ class CompiledGraph : public Raul::Maid::Disposable , public Raul::Noncopyable { public: - class Task : public std::vector { - public: - enum class Mode { - SINGLE, ///< Single block to run - SEQUENTIAL, ///< Elements must be run sequentially in order - PARALLEL ///< Elements may be run in any order in parallel - }; - - Task(Mode mode, BlockImpl* block=NULL) - : _mode(mode) - , _block(block) - {} - - void run(RunContext& context); - void dump(std::ostream& os, unsigned indent, bool first) const; - void simplify(); - - Mode mode() const { return _mode; } - BlockImpl* block() const { return _block; } - - private: - Mode _mode; ///< Execution mode - BlockImpl* _block; ///< Used for SINGLE only - }; - static CompiledGraph* compile(GraphImpl* graph); void run(RunContext& context); - void dump(std::ostream& os) const; + void dump(std::function sink) const; private: CompiledGraph(GraphImpl* graph); diff --git a/src/server/DirectDriver.hpp b/src/server/DirectDriver.hpp index a9800947..339d9987 100644 --- a/src/server/DirectDriver.hpp +++ b/src/server/DirectDriver.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -84,6 +84,8 @@ public: virtual void append_time_events(RunContext& context, Buffer& buffer) {} + virtual int real_time_priority() { return 60; } + private: typedef boost::intrusive::list Ports; diff --git a/src/server/Driver.hpp b/src/server/Driver.hpp index dc4eee1d..5a8fbed0 100644 --- a/src/server/Driver.hpp +++ b/src/server/Driver.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -96,6 +96,9 @@ public: /** Append time events for this cycle to `buffer`. */ virtual void append_time_events(RunContext& context, Buffer& buffer) = 0; + + /** Return the real-time priority of the audio thread, or -1. */ + virtual int real_time_priority() = 0; }; } // namespace Server diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index 097b38fc..4d3332ef 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -83,7 +83,6 @@ Engine::Engine(Ingen::World* world) , _worker(new Worker(world->log(), event_queue_size())) , _sync_worker(new Worker(world->log(), event_queue_size(), true)) , _listener(NULL) - , _run_context(*this) , _rand_engine(0) , _uniform_dist(0.0f, 1.0f) , _quit_flag(false) @@ -96,6 +95,10 @@ Engine::Engine(Ingen::World* world) _control_bindings = new ControlBindings(*this); + for (int i = 0; i < world->conf().option("threads").get(); ++i) { + _run_contexts.push_back(new RunContext(*this, i, i > 0)); + } + _world->lv2_features().add_feature(_worker->schedule_feature()); _world->lv2_features().add_feature(_options); _world->lv2_features().add_feature( @@ -134,11 +137,12 @@ Engine::~Engine() // Process all pending events const FrameTime end = std::numeric_limits::max(); - _run_context.locate(_run_context.end(), end - _run_context.end()); + RunContext& ctx = run_context(); + locate(ctx.end(), end - ctx.end()); _post_processor->set_end_time(end); _post_processor->process(); while (!_pre_processor->empty()) { - _pre_processor->process(_run_context, *_post_processor, 1); + _pre_processor->process(ctx, *_post_processor, 1); _post_processor->process(); } @@ -181,6 +185,64 @@ Engine::listen() #endif } +void +Engine::locate(FrameTime s, SampleCount nframes) +{ + for (RunContext* ctx : _run_contexts) { + ctx->locate(s, nframes); + } +} + +void +Engine::emit_notifications(FrameTime end) +{ + for (RunContext* ctx : _run_contexts) { + ctx->emit_notifications(end); + } +} + +bool +Engine::pending_notifications() +{ + for (const RunContext* ctx : _run_contexts) { + if (ctx->pending_notifications()) { + return true; + } + } + return false; +} + +bool +Engine::wait_for_tasks() +{ + std::unique_lock lock(_tasks_mutex); + _tasks_available.wait(lock); + return !_quit_flag; +} + +void +Engine::signal_tasks() +{ + _tasks_available.notify_all(); +} + +Task* +Engine::steal_task(unsigned start_thread) +{ + for (unsigned i = 0; i < _run_contexts.size(); ++i) { + const unsigned id = (start_thread + i) % _run_contexts.size(); + RunContext* const ctx = _run_contexts[id]; + Task* par = ctx->task(); + if (par) { + Task* t = par->steal(*ctx); + if (t) { + return t; + } + } + } + return NULL; +} + SPtr Engine::store() const { @@ -211,6 +273,9 @@ void Engine::set_driver(SPtr driver) { _driver = driver; + for (RunContext* ctx : _run_contexts) { + ctx->set_priority(driver->real_time_priority()); + } } SampleCount @@ -221,7 +286,7 @@ Engine::event_time() } const SampleCount start = _direct_driver - ? _run_context.start() + ? run_context().start() : _driver->frame_time(); /* Exactly one cycle latency (some could run ASAP if we get lucky, but not @@ -268,7 +333,7 @@ Engine::activate() *this, SPtr(), -1, 0, Raul::Path("/"), graph_properties); // Execute in "fake" process context (we are single threaded) - RunContext context(*this); + RunContext context(run_context()); ev.pre_process(); ev.execute(context); ev.post_process(); @@ -301,13 +366,13 @@ Engine::deactivate() unsigned Engine::run(uint32_t sample_count) { - _run_context.locate(_run_context.end(), sample_count); + RunContext& ctx = run_context(); // Apply control bindings to input control_bindings()->pre_process( - _run_context, _root_graph->port_impl(0)->buffer(0).get()); + ctx, _root_graph->port_impl(0)->buffer(0).get()); - post_processor()->set_end_time(_run_context.end()); + post_processor()->set_end_time(ctx.end()); // Process events that came in during the last cycle // (Aiming for jitter-free 1 block event latency, ideally) @@ -315,11 +380,11 @@ Engine::run(uint32_t sample_count) // Run root graph if (_root_graph) { - _root_graph->process(_run_context); + _root_graph->process(ctx); // Emit control binding feedback control_bindings()->post_process( - _run_context, _root_graph->port_impl(1)->buffer(0).get()); + ctx, _root_graph->port_impl(1)->buffer(0).get()); } return n_processed_events; @@ -340,9 +405,9 @@ Engine::enqueue_event(Event* ev, Event::Mode mode) unsigned Engine::process_events() { - const size_t MAX_EVENTS_PER_CYCLE = _run_context.nframes() / 8; + const size_t MAX_EVENTS_PER_CYCLE = run_context().nframes() / 8; return _pre_processor->process( - _run_context, *_post_processor, MAX_EVENTS_PER_CYCLE); + run_context(), *_post_processor, MAX_EVENTS_PER_CYCLE); } void diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 16699293..2d5e053e 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -17,9 +17,10 @@ #ifndef INGEN_ENGINE_ENGINE_HPP #define INGEN_ENGINE_ENGINE_HPP -#include - #include +#include +#include +#include #include "ingen/EngineBase.hpp" #include "ingen/Interface.hpp" @@ -115,7 +116,14 @@ public: Worker* worker() const { return _worker; } Worker* sync_worker() const { return _sync_worker; } - RunContext& run_context() { return _run_context; } + RunContext& run_context() { return *_run_contexts[0]; } + + void locate(FrameTime s, SampleCount nframes); + void emit_notifications(FrameTime end); + bool pending_notifications(); + bool wait_for_tasks(); + void signal_tasks(); + Task* steal_task(unsigned start_thread); SPtr store() const; @@ -144,11 +152,14 @@ private: Worker* _sync_worker; SocketListener* _listener; - RunContext _run_context; + std::vector _run_contexts; std::mt19937 _rand_engine; std::uniform_real_distribution _uniform_dist; + std::condition_variable _tasks_available; + std::mutex _tasks_mutex; + bool _quit_flag; bool _direct_driver; bool _atomic_bundles; diff --git a/src/server/JackDriver.cpp b/src/server/JackDriver.cpp index db567866..7f0e7ae0 100644 --- a/src/server/JackDriver.cpp +++ b/src/server/JackDriver.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -479,7 +479,7 @@ JackDriver::_process_cb(jack_nframes_t nframes) _transport_state = jack_transport_query(_client, &_position); - _engine.run_context().locate(start_of_current_cycle, nframes); + _engine.locate(start_of_current_cycle, nframes); // Read input for (auto& p : _ports) { diff --git a/src/server/JackDriver.hpp b/src/server/JackDriver.hpp index ef7bbd78..2d50d892 100644 --- a/src/server/JackDriver.hpp +++ b/src/server/JackDriver.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -23,6 +23,7 @@ #include #include +#include #include #ifdef INGEN_JACK_SESSION #include @@ -87,6 +88,8 @@ public: void append_time_events(RunContext& context, Buffer& buffer); + int real_time_priority() { return jack_client_real_time_priority(_client); } + jack_client_t* jack_client() const { return _client; } SampleCount block_length() const { return _block_length; } size_t seq_size() const { return _seq_size; } diff --git a/src/server/PostProcessor.cpp b/src/server/PostProcessor.cpp index d6016105..65323f7f 100644 --- a/src/server/PostProcessor.cpp +++ b/src/server/PostProcessor.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -70,7 +70,7 @@ PostProcessor::append(RunContext& context, Event* first, Event* last) bool PostProcessor::pending() const { - return _head.load() || _engine.run_context().pending_notifications(); + return _head.load() || _engine.pending_notifications(); } void @@ -85,7 +85,7 @@ PostProcessor::process() Event* next = ev->next(); if (!next || next->time() >= end_time) { // Process audio thread notifications until end - _engine.run_context().emit_notifications(end_time); + _engine.emit_notifications(end_time); return; } @@ -95,7 +95,7 @@ PostProcessor::process() ev = next; // Process audio thread notifications up until this event's time - _engine.run_context().emit_notifications(ev->time()); + _engine.emit_notifications(ev->time()); // Post-process event ev->post_process(); @@ -109,7 +109,7 @@ PostProcessor::process() _head = ev; // Process remaining audio thread notifications until end - _engine.run_context().emit_notifications(end_time); + _engine.emit_notifications(end_time); } } // namespace Server diff --git a/src/server/RunContext.cpp b/src/server/RunContext.cpp index ee1eaf04..e107d247 100644 --- a/src/server/RunContext.cpp +++ b/src/server/RunContext.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -20,9 +20,11 @@ #include "Broadcaster.hpp" #include "BufferFactory.hpp" +#include "Driver.hpp" #include "Engine.hpp" #include "PortImpl.hpp" #include "RunContext.hpp" +#include "Task.hpp" namespace Ingen { namespace Server { @@ -44,10 +46,13 @@ struct Notification LV2_URID type; }; -RunContext::RunContext(Engine& engine) +RunContext::RunContext(Engine& engine, unsigned id, bool threaded) : _engine(engine) , _event_sink( new Raul::RingBuffer(engine.event_queue_size() * sizeof(Notification))) + , _task(nullptr) + , _thread(threaded ? new std::thread(&RunContext::run, this) : nullptr) + , _id(id) , _start(0) , _end(0) , _offset(0) @@ -59,6 +64,9 @@ RunContext::RunContext(Engine& engine) RunContext::RunContext(const RunContext& copy) : _engine(copy._engine) , _event_sink(copy._event_sink) + , _task(nullptr) + , _thread(nullptr) + , _id(copy._id) , _start(copy._start) , _end(copy._end) , _offset(copy._offset) @@ -138,5 +146,31 @@ RunContext::emit_notifications(FrameTime end) } } +void +RunContext::set_priority(int priority) +{ + if (_thread) { + pthread_t pthread = _thread->native_handle(); + const int policy = (priority > 0) ? SCHED_FIFO : SCHED_OTHER; + sched_param sp; + sp.sched_priority = (priority > 0) ? priority : 0; + if (pthread_setschedparam(pthread, policy, &sp)) { + _engine.log().error( + fmt("Failed to set real-time priority of run thread (%s)\n") + % strerror(errno)); + } + } +} + +void +RunContext::run() +{ + while (_engine.wait_for_tasks()) { + for (Task* t; (t = _engine.steal_task(0));) { + t->run(*this); + } + } +} + } // namespace Server } // namespace Ingen diff --git a/src/server/RunContext.hpp b/src/server/RunContext.hpp index 803d7829..803e0c3b 100644 --- a/src/server/RunContext.hpp +++ b/src/server/RunContext.hpp @@ -17,6 +17,8 @@ #ifndef INGEN_ENGINE_RUNCONTEXT_HPP #define INGEN_ENGINE_RUNCONTEXT_HPP +#include + #include "ingen/Atom.hpp" #include "ingen/World.hpp" #include "raul/RingBuffer.hpp" @@ -28,6 +30,7 @@ namespace Server { class Engine; class PortImpl; +class Task; /** Graph execution context. * @@ -44,8 +47,19 @@ class PortImpl; class RunContext { public: - RunContext(Engine& engine); - RunContext(const RunContext& copy); + /** Create a new run context. + * + * @param threaded If true, then this context is a worker which will launch + * a thread and execute tasks as they become available. + */ + RunContext(Engine& engine, unsigned id, bool threaded); + + /** Create a sub-context of `parent`. + * + * This is used to subdivide process cycles, the sub-context is + * lightweight and only serves to pass different time attributes. + */ + RunContext(const RunContext& parent); virtual ~RunContext(); @@ -79,18 +93,20 @@ public: _nframes = nframes; } - inline void locate(const RunContext& other) { - _start = other._start; - _end = other._end; - _nframes = other._nframes; - } - inline void slice(SampleCount offset, SampleCount nframes) { _offset = offset; _nframes = nframes; } + inline void set_task(Task* task) { + _task = task; + } + + void set_priority(int priority); + inline Engine& engine() const { return _engine; } + inline Task* task() const { return _task; } + inline unsigned id() const { return _id; } inline FrameTime start() const { return _start; } inline FrameTime time() const { return _start + _offset; } inline FrameTime end() const { return _end; } @@ -101,9 +117,13 @@ public: protected: const RunContext& operator=(const RunContext& copy) = delete; - Engine& _engine; ///< Engine we're running in + void run(); - Raul::RingBuffer* _event_sink; ///< Port updates from process context + Engine& _engine; ///< Engine we're running in + Raul::RingBuffer* _event_sink; ///< Port updates from process context + Task* _task; ///< Currently executing task + std::thread* _thread; ///< Thread (NULL for main run context) + unsigned _id; ///< Context ID FrameTime _start; ///< Start frame of this cycle, timeline relative FrameTime _end; ///< End frame of this cycle, timeline relative diff --git a/src/server/Task.cpp b/src/server/Task.cpp new file mode 100644 index 00000000..e15a4d4a --- /dev/null +++ b/src/server/Task.cpp @@ -0,0 +1,155 @@ +/* + This file is part of Ingen. + Copyright 2015-2016 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#include "BlockImpl.hpp" +#include "Engine.hpp" +#include "Task.hpp" + +namespace Ingen { +namespace Server { + +void +Task::run(RunContext& context) +{ + switch (_mode) { + case Mode::SINGLE: + // fprintf(stderr, "%u run %s\n", context.id(), _block->path().c_str()); + _block->process(context); + break; + case Mode::SEQUENTIAL: + for (Task& task : *this) { + task.run(context); + } + break; + case Mode::PARALLEL: + // Initialize (not) done state of sub-tasks + for (Task& task : *this) { + task.set_done(false); + } + + // Grab the first sub-task + _next = 0; + _done_end = 0; + Task* t = steal(context); + + // Allow other threads to steal sub-tasks + context.set_task(this); + context.engine().signal_tasks(); + + // Run available tasks until this task is finished + for (; t; t = get_task(context)) { + t->run(context); + } + context.set_task(nullptr); + break; + } + + set_done(true); +} + +Task* +Task::steal(RunContext& context) +{ + if (_mode == Mode::PARALLEL) { + const unsigned i = _next++; + if (i < size()) { + return &(*this)[i]; + } + } + + return NULL; +} + +Task* +Task::get_task(RunContext& context) +{ + // Attempt to "steal" a task from ourselves + Task* t = steal(context); + if (t) { + return t; + } + + while (true) { + // Push done end index as forward as possible + for (; (*this)[_done_end].done(); ++_done_end) {} + + if (_done_end >= size()) { + return NULL; // All child tasks are finished + } + + // All child tasks claimed, but some are unfinished, steal a task + t = context.engine().steal_task(context.id() + 1); + if (t) { + return t; + } + + /* All child tasks are claimed, and we failed to steal any tasks. Spin + to prevent blocking, though it would probably be wiser to wait for a + signal in non-main threads, and maybe even in the main thread + depending on your real-time safe philosophy... more experimentation + here is needed. */ + } +} + +void +Task::simplify() +{ + if (_mode != Mode::SINGLE) { + for (std::vector::iterator t = begin(); t != end();) { + t->simplify(); + if (t->mode() != Mode::SINGLE && t->empty()) { + // Empty task, erase + t = erase(t); + } else if (t->mode() == _mode) { + // Subtask with the same type, fold child into parent + const Task child(*t); + t = erase(t); + t = insert(t, child.begin(), child.end()); + } else { + ++t; + } + } + + if (size() == 1) { + const Task t(front()); + *this = t; + } + } +} + +void +Task::dump(std::function sink, unsigned indent, bool first) const +{ + if (!first) { + sink("\n"); + for (unsigned i = 0; i < indent; ++i) { + sink(" "); + } + } + + if (_mode == Mode::SINGLE) { + sink(_block->path()); + } else { + sink(((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par ")); + for (size_t i = 0; i < size(); ++i) { + (*this)[i].dump(sink, indent + 5, i == 0); + } + sink(")"); + } +} + +} // namespace Server +} // namespace Ingen diff --git a/src/server/Task.hpp b/src/server/Task.hpp new file mode 100644 index 00000000..2c1a0cc2 --- /dev/null +++ b/src/server/Task.hpp @@ -0,0 +1,98 @@ +/* + This file is part of Ingen. + Copyright 2007-2016 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#ifndef INGEN_ENGINE_TASK_HPP +#define INGEN_ENGINE_TASK_HPP + +#include +#include +#include + +namespace Ingen { +namespace Server { + +class BlockImpl; +class RunContext; + +class Task : public std::vector { +public: + enum class Mode { + SINGLE, ///< Single block to run + SEQUENTIAL, ///< Elements must be run sequentially in order + PARALLEL ///< Elements may be run in any order in parallel + }; + + Task(Mode mode, BlockImpl* block=NULL) + : _block(block) + , _mode(mode) + , _done_end(0) + , _next(0) + , _done(false) + { + assert(!(mode == Mode::SINGLE && !block)); + } + + Task& operator=(const Task& copy) { + *static_cast*>(this) = copy; + _block = copy._block; + _mode = copy._mode; + _done_end = copy._done_end; + _next = copy._next.load(); + _done = copy._done.load(); + return *this; + } + + Task(const Task& copy) + : std::vector(copy) + , _block(copy._block) + , _mode(copy._mode) + , _done_end(copy._done_end) + , _next(copy._next.load()) + , _done(copy._done.load()) + {} + + /** Run task in the given context. */ + void run(RunContext& context); + + /** Pretty print task to the given stream (recursively). */ + void dump(std::function sink, unsigned indent, bool first) const; + + /** Simplify task expression. */ + void simplify(); + + /** Steal a child task from this task (succeeds for PARALLEL only). */ + Task* steal(RunContext& context); + + Mode mode() const { return _mode; } + BlockImpl* block() const { return _block; } + bool done() const { return _done; } + + void set_done(bool done) { _done = done; } + +private: + Task* get_task(RunContext& context); + + BlockImpl* _block; ///< Used for SINGLE only + Mode _mode; ///< Execution mode + unsigned _done_end; ///< Index of rightmost done sub-task + std::atomic _next; ///< Index of next sub-task + std::atomic _done; ///< Completion phase +}; + +} // namespace Server +} // namespace Ingen + +#endif // INGEN_ENGINE_TASK_HPP diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 0c32731c..a21cb4c9 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard + Copyright 2007-2016 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -161,7 +161,7 @@ public: } void run(uint32_t nframes) { - _engine.run_context().locate(_frame_time, nframes); + _engine.locate(_frame_time, nframes); // Notify buffer is a Chunk with size set to the available space _notify_capacity = ((LV2_Atom_Sequence*)_ports[1]->buffer())->atom.size; @@ -256,6 +256,8 @@ public: } } + virtual int real_time_priority() { return 60; } + /** Called in run thread for events received at control input port. */ bool enqueue_message(const LV2_Atom* atom) { if (_from_ui.write(lv2_atom_total_size(atom), atom) == 0) { @@ -558,7 +560,7 @@ ingen_instantiate(const LV2_Descriptor* descriptor, engine->activate(); Server::ThreadManager::single_threaded = true; - engine->run_context().locate(0, block_length); + engine->locate(0, block_length); engine->post_processor()->set_end_time(block_length); engine->process_events(); diff --git a/src/server/wscript b/src/server/wscript index fad48330..68c1c5f1 100644 --- a/src/server/wscript +++ b/src/server/wscript @@ -28,6 +28,7 @@ def build(bld): PreProcessor.cpp RunContext.cpp SocketListener.cpp + Task.cpp UndoStack.cpp Worker.cpp events/Connect.cpp -- cgit v1.2.1