diff options
Diffstat (limited to 'src/server')
59 files changed, 1256 insertions, 496 deletions
diff --git a/src/server/BlockFactory.cpp b/src/server/BlockFactory.cpp index b715ecb7..de7c2de2 100644 --- a/src/server/BlockFactory.cpp +++ b/src/server/BlockFactory.cpp @@ -21,8 +21,8 @@ #include "ingen/LV2Features.hpp" #include "ingen/Log.hpp" #include "ingen/World.hpp" +#include "internals/BlockDelay.hpp" #include "internals/Controller.hpp" -#include "internals/Delay.hpp" #include "internals/Note.hpp" #include "internals/Time.hpp" #include "internals/Trigger.hpp" @@ -111,12 +111,12 @@ void BlockFactory::load_internal_plugins() { Ingen::URIs& uris = _world->uris(); + InternalPlugin* block_delay_plug = BlockDelayNode::internal_plugin(uris); + _plugins.insert(make_pair(block_delay_plug->uri(), block_delay_plug)); + InternalPlugin* controller_plug = ControllerNode::internal_plugin(uris); _plugins.insert(make_pair(controller_plug->uri(), controller_plug)); - InternalPlugin* delay_plug = DelayNode::internal_plugin(uris); - _plugins.insert(make_pair(delay_plug->uri(), delay_plug)); - InternalPlugin* note_plug = NoteNode::internal_plugin(uris); _plugins.insert(make_pair(note_plug->uri(), note_plug)); diff --git a/src/server/BlockImpl.cpp b/src/server/BlockImpl.cpp index 02611137..d3a5b02d 100644 --- a/src/server/BlockImpl.cpp +++ b/src/server/BlockImpl.cpp @@ -42,10 +42,10 @@ BlockImpl::BlockImpl(PluginImpl* plugin, , _plugin(plugin) , _ports(NULL) , _polyphony((polyphonic && parent) ? parent->internal_poly() : 1) + , _mark(Mark::UNVISITED) , _polyphonic(polyphonic) , _activated(false) , _enabled(true) - , _traversed(false) { assert(_plugin); assert(_polyphony > 0); diff --git a/src/server/BlockImpl.hpp b/src/server/BlockImpl.hpp index 47eaa6eb..2d7211ab 100644 --- a/src/server/BlockImpl.hpp +++ b/src/server/BlockImpl.hpp @@ -178,9 +178,10 @@ public: uint32_t num_ports() const { return _ports ? _ports->size() : 0; } virtual uint32_t polyphony() const { return _polyphony; } - /** Used by the process order finding algorithm (ie during connections) */ - bool traversed() const { return _traversed; } - void traversed(bool b) { _traversed = b; } + /** Mark used during graph compilation */ + enum class Mark { UNVISITED, VISITING, VISITED }; + Mark get_mark() const { return _mark; } + void set_mark(Mark m) { _mark = m; } protected: PortImpl* nth_port_by_type(uint32_t n, bool input, PortType type); @@ -190,10 +191,10 @@ protected: uint32_t _polyphony; std::set<BlockImpl*> _providers; ///< Blocks connected to this one's input ports std::set<BlockImpl*> _dependants; ///< Blocks this one's output ports are connected to + Mark _mark; ///< Mark for graph compilation algorithm bool _polyphonic; bool _activated; bool _enabled; - bool _traversed; ///< Flag for process order algorithm }; } // namespace Server diff --git a/src/server/BufferFactory.cpp b/src/server/BufferFactory.cpp index aeaa0d44..749e83d0 100644 --- a/src/server/BufferFactory.cpp +++ b/src/server/BufferFactory.cpp @@ -75,6 +75,12 @@ BufferFactory::audio_buffer_size(SampleCount nframes) } uint32_t +BufferFactory::audio_buffer_size() const +{ + return _engine.driver()->block_length() * sizeof(Sample); +} + +uint32_t BufferFactory::default_size(LV2_URID type) const { if (type == _uris.atom_Float) { diff --git a/src/server/BufferFactory.hpp b/src/server/BufferFactory.hpp index 2fae3244..ccb01899 100644 --- a/src/server/BufferFactory.hpp +++ b/src/server/BufferFactory.hpp @@ -46,7 +46,9 @@ public: ~BufferFactory(); static uint32_t audio_buffer_size(SampleCount nframes); - uint32_t default_size(LV2_URID type) const; + + uint32_t audio_buffer_size() const; + uint32_t default_size(LV2_URID type) const; BufferRef get_buffer(LV2_URID type, LV2_URID value_type, diff --git a/src/server/CompiledGraph.cpp b/src/server/CompiledGraph.cpp new file mode 100644 index 00000000..fdf59cd4 --- /dev/null +++ b/src/server/CompiledGraph.cpp @@ -0,0 +1,237 @@ +/* + This file is part of Ingen. + Copyright 2015-2016 David Robillard <http://drobilla.net/> + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <algorithm> + +#include "ingen/ColorContext.hpp" +#include "ingen/Configuration.hpp" +#include "ingen/Log.hpp" + +#include "CompiledGraph.hpp" +#include "Engine.hpp" +#include "GraphImpl.hpp" +#include "ThreadManager.hpp" + +namespace Ingen { +namespace Server { + +/** Graph contains ambiguous feedback with no delay nodes. */ +class FeedbackException : public std::exception { +public: + FeedbackException(const BlockImpl* node, const BlockImpl* root=NULL) + : node(node) + , root(root) + {} + + const BlockImpl* node; + const BlockImpl* root; +}; + +CompiledGraph::CompiledGraph(GraphImpl* graph) + : _log(graph->engine().log()) + , _path(graph->path()) + , _master(Task::Mode::SEQUENTIAL) +{ + compile_graph(graph); +} + +CompiledGraph* +CompiledGraph::compile(GraphImpl* graph) +{ + try { + return new CompiledGraph(graph); + } catch (FeedbackException e) { + Log& log = graph->engine().log(); + if (e.node && e.root) { + log.error(fmt("Feedback compiling %1% from %2%\n") + % e.node->path() % e.root->path()); + } else { + log.error(fmt("Feedback compiling %1%\n") + % e.node->path()); + } + return NULL; + } +} + +void +CompiledGraph::compile_set(const std::set<BlockImpl*>& blocks, + Task& task, + std::set<BlockImpl*>& k) +{ + // Keep compiling working set until all nodes are visited + for (BlockImpl* block : blocks) { + // Each block is the start of a new sequential task + Task seq(Task::Mode::SEQUENTIAL); + compile_block(block, seq, k); + task.push_back(seq); + } +} + +void +CompiledGraph::compile_graph(GraphImpl* graph) +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + + // Start with sink nodes (no outputs, or connected only to graph outputs) + std::set<BlockImpl*> blocks; + for (auto& b : graph->blocks()) { + // Mark all blocks as unvisited initially + b.set_mark(BlockImpl::Mark::UNVISITED); + + if (b.providers().empty()) { + // Block has no dependencies, add to initial working set + blocks.insert(&b); + } + } + + // Compile initial working set into master task + Task start(Task::Mode::PARALLEL); + std::set<BlockImpl*> next; + compile_set(blocks, start, next); + _master.push_back(start); + + // Keep compiling working set until all connected nodes are visited + while (!next.empty()) { + blocks.clear(); + // The working set is a parallel task... + Task par(Task::Mode::PARALLEL); + for (BlockImpl* block : next) { + // ... where each block is the start of a new sequential task + Task seq(Task::Mode::SEQUENTIAL); + compile_block(block, seq, blocks); + par.push_back(seq); + } + _master.push_back(par); + next = blocks; + } + + // Compile any nodes that weren't reached (disconnected cycles) + for (auto& b : graph->blocks()) { + if (b.get_mark() == BlockImpl::Mark::UNVISITED) { + compile_block(&b, _master, next); + } + } + + _master.simplify(); + + if (graph->engine().world()->conf().option("trace").get<int32_t>()) { + dump([this](const std::string& msg) { + ColorContext ctx(stderr, ColorContext::Color::YELLOW); + fwrite(msg.c_str(), 1, msg.size(), stderr); + }); + } +} + +/** Throw a FeedbackException iff `dependant` has `root` as a dependency. */ +static void +check_feedback(const BlockImpl* root, BlockImpl* dependant) +{ + if (dependant == root) { + throw FeedbackException(root); + } + + for (auto& d : dependant->dependants()) { + const BlockImpl::Mark mark = d->get_mark(); + switch (mark) { + case BlockImpl::Mark::UNVISITED: + d->set_mark(BlockImpl::Mark::VISITING); + check_feedback(root, d); + break; + case BlockImpl::Mark::VISITING: + throw FeedbackException(d, root); + case BlockImpl::Mark::VISITED: + break; + } + d->set_mark(mark); + } +} + +void +CompiledGraph::compile_dependant(const BlockImpl* root, + BlockImpl* block, + Task& task, + std::set<BlockImpl*>& k) +{ + if (block->providers().size() > 1) { + /* Dependant has other providers, so this is the start of a sequential task. + Add dependant to future working set and stop traversal. */ + check_feedback(root, block); + k.insert(block); + } else { + // Dependant has only this provider, add here + if (task.mode() == Task::Mode::PARALLEL) { + // Inside a parallel task, compile into a new sequential child + Task seq(Task::Mode::SEQUENTIAL); + compile_block(block, seq, k); + task.push_back(seq); + } else { + // Append to given sequential task + compile_block(block, task, k); + } + } +} + +void +CompiledGraph::compile_block(BlockImpl* n, Task& task, std::set<BlockImpl*>& k) +{ + switch (n->get_mark()) { + case BlockImpl::Mark::UNVISITED: + n->set_mark(BlockImpl::Mark::VISITING); + + // Execute this task before the dependants to follow + task.push_back(Task(Task::Mode::SINGLE, n)); + + if (n->dependants().size() < 2) { + // Single dependant, append to this sequential task + for (auto& d : n->dependants()) { + compile_dependant(n, d, task, k); + } + } else { + // Multiple dependants, create a new parallel task + Task par(Task::Mode::PARALLEL); + for (auto& d : n->dependants()) { + compile_dependant(n, d, par, k); + } + task.push_back(par); + } + n->set_mark(BlockImpl::Mark::VISITED); + break; + + case BlockImpl::Mark::VISITING: + throw FeedbackException(n); + + case BlockImpl::Mark::VISITED: + break; + } +} + +void +CompiledGraph::run(RunContext& context) +{ + _master.run(context); +} + +void +CompiledGraph::dump(std::function<void (const std::string&)> sink) const +{ + sink("(compiled-graph "); + sink(_path); + _master.dump(sink, 2, false); + sink(")\n"); +} + +} // namespace Server +} // namespace Ingen diff --git a/src/server/CompiledGraph.hpp b/src/server/CompiledGraph.hpp index 9f4071a5..7dc40865 100644 --- a/src/server/CompiledGraph.hpp +++ b/src/server/CompiledGraph.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -17,41 +17,58 @@ #ifndef INGEN_ENGINE_COMPILEDGRAPH_HPP #define INGEN_ENGINE_COMPILEDGRAPH_HPP +#include <functional> +#include <set> #include <vector> -#include <list> #include "raul/Maid.hpp" #include "raul/Noncopyable.hpp" +#include "raul/Path.hpp" + +#include "Task.hpp" namespace Ingen { + +class Log; + namespace Server { class BlockImpl; +class GraphImpl; +class RunContext; -/** All information required about a block to execute it in an audio thread. +/** A graph ``compiled'' into a quickly executable form. + * + * This is a flat sequence of nodes ordered such that the process thread can + * execute the nodes in order and have nodes always executed before any of + * their dependencies. */ -class CompiledBlock { +class CompiledGraph : public Raul::Maid::Disposable + , public Raul::Noncopyable +{ public: - CompiledBlock(BlockImpl* b) : _block(b) {} + static CompiledGraph* compile(GraphImpl* graph); + + void run(RunContext& context); - BlockImpl* block() const { return _block; } + void dump(std::function<void (const std::string&)> sink) const; private: - BlockImpl* _block; -}; + CompiledGraph(GraphImpl* graph); -/** A graph ``compiled'' into a flat structure with the correct order so - * the audio thread(s) can execute it without threading problems (since - * the preprocessor thread modifies the graph). - * - * The blocks contained here are sorted in the order they must be executed. - * The parallel processing algorithm guarantees no block will be executed - * before its providers, using this order as well as semaphores. - */ -class CompiledGraph : public std::vector<CompiledBlock> - , public Raul::Maid::Disposable - , public Raul::Noncopyable -{ + typedef std::set<BlockImpl*> BlockSet; + + void compile_graph(GraphImpl* graph); + void compile_set(const BlockSet& blocks, Task& task, BlockSet& k); + void compile_block(BlockImpl* block, Task& task, BlockSet& k); + void compile_dependant(const BlockImpl* root, + BlockImpl* block, + Task& task, + BlockSet& k); + + Log& _log; + const Raul::Path _path; + Task _master; }; } // namespace Server diff --git a/src/server/DirectDriver.hpp b/src/server/DirectDriver.hpp index a9800947..339d9987 100644 --- a/src/server/DirectDriver.hpp +++ b/src/server/DirectDriver.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -84,6 +84,8 @@ public: virtual void append_time_events(RunContext& context, Buffer& buffer) {} + virtual int real_time_priority() { return 60; } + private: typedef boost::intrusive::list<EnginePort> Ports; diff --git a/src/server/Driver.hpp b/src/server/Driver.hpp index dc4eee1d..5a8fbed0 100644 --- a/src/server/Driver.hpp +++ b/src/server/Driver.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -96,6 +96,9 @@ public: /** Append time events for this cycle to `buffer`. */ virtual void append_time_events(RunContext& context, Buffer& buffer) = 0; + + /** Return the real-time priority of the audio thread, or -1. */ + virtual int real_time_priority() = 0; }; } // namespace Server diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index 259dbffd..0b468ce1 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -47,6 +47,7 @@ #include "GraphImpl.hpp" #include "LV2Options.hpp" #include "PostProcessor.hpp" +#include "PreProcessContext.hpp" #include "PreProcessor.hpp" #include "RunContext.hpp" #include "ThreadManager.hpp" @@ -83,10 +84,11 @@ Engine::Engine(Ingen::World* world) , _worker(new Worker(world->log(), event_queue_size())) , _sync_worker(new Worker(world->log(), event_queue_size(), true)) , _listener(NULL) - , _run_context(*this) + , _cycle_start_time(0) , _rand_engine(0) , _uniform_dist(0.0f, 1.0f) , _quit_flag(false) + , _reset_load_flag(false) , _direct_driver(true) , _atomic_bundles(world->conf().option("atomic-bundles").get<int32_t>()) { @@ -96,6 +98,10 @@ Engine::Engine(Ingen::World* world) _control_bindings = new ControlBindings(*this); + for (int i = 0; i < world->conf().option("threads").get<int32_t>(); ++i) { + _run_contexts.push_back(new RunContext(*this, i, i > 0)); + } + _world->lv2_features().add_feature(_worker->schedule_feature()); _world->lv2_features().add_feature(_options); _world->lv2_features().add_feature( @@ -134,11 +140,12 @@ Engine::~Engine() // Process all pending events const FrameTime end = std::numeric_limits<FrameTime>::max(); - _run_context.locate(_run_context.end(), end - _run_context.end()); + RunContext& ctx = run_context(); + locate(ctx.end(), end - ctx.end()); _post_processor->set_end_time(end); _post_processor->process(); while (!_pre_processor->empty()) { - _pre_processor->process(_run_context, *_post_processor, 1); + _pre_processor->process(ctx, *_post_processor, 1); _post_processor->process(); } @@ -181,6 +188,64 @@ Engine::listen() #endif } +void +Engine::locate(FrameTime s, SampleCount nframes) +{ + for (RunContext* ctx : _run_contexts) { + ctx->locate(s, nframes); + } +} + +void +Engine::emit_notifications(FrameTime end) +{ + for (RunContext* ctx : _run_contexts) { + ctx->emit_notifications(end); + } +} + +bool +Engine::pending_notifications() +{ + for (const RunContext* ctx : _run_contexts) { + if (ctx->pending_notifications()) { + return true; + } + } + return false; +} + +bool +Engine::wait_for_tasks() +{ + std::unique_lock<std::mutex> lock(_tasks_mutex); + _tasks_available.wait(lock); + return !_quit_flag; +} + +void +Engine::signal_tasks() +{ + _tasks_available.notify_all(); +} + +Task* +Engine::steal_task(unsigned start_thread) +{ + for (unsigned i = 0; i < _run_contexts.size(); ++i) { + const unsigned id = (start_thread + i) % _run_contexts.size(); + RunContext* const ctx = _run_contexts[id]; + Task* par = ctx->task(); + if (par) { + Task* t = par->steal(*ctx); + if (t) { + return t; + } + } + } + return NULL; +} + SPtr<Store> Engine::store() const { @@ -202,8 +267,29 @@ Engine::quit() bool Engine::main_iteration() { + const Ingen::URIs& uris = world()->uris(); + _post_processor->process(); _maid->cleanup(); + + if (_event_load.changed) { + _broadcaster->set_property(Raul::URI("ingen:/engine"), + uris.ingen_maxEventLoad, + uris.forge.make(_event_load.max / 100.0f)); + _event_load.changed = false; + } + + if (_run_load.changed) { + _broadcaster->put(Raul::URI("ingen:/engine"), + { { uris.ingen_meanRunLoad, + uris.forge.make(floorf(_run_load.mean) / 100.0f) }, + { uris.ingen_minRunLoad, + uris.forge.make(_run_load.min / 100.0f) }, + { uris.ingen_maxRunLoad, + uris.forge.make(_run_load.max / 100.0f) } }); + _run_load.changed = false; + } + return !_quit_flag; } @@ -211,6 +297,10 @@ void Engine::set_driver(SPtr<Driver> driver) { _driver = driver; + for (RunContext* ctx : _run_contexts) { + ctx->set_priority(driver->real_time_priority()); + ctx->set_rate(driver->sample_rate()); + } } SampleCount @@ -220,14 +310,31 @@ Engine::event_time() return 0; } - const SampleCount start = _direct_driver - ? _run_context.start() + // FIXME: Jitter with direct driver + const SampleCount now = _direct_driver + ? run_context().start() : _driver->frame_time(); - /* Exactly one cycle latency (some could run ASAP if we get lucky, but not - always, and a slight constant latency is far better than jittery lower - (average) latency */ - return start + _driver->block_length(); + return now + _driver->block_length(); +} + +uint64_t +Engine::current_time(const RunContext& context) const +{ + struct timespec time; +#ifdef CLOCK_MONOTONIC_RAW + clock_gettime(CLOCK_MONOTONIC_RAW, &time); +#else + clock_gettime(CLOCK_MONOTONIC, &time); +#endif + + return (uint64_t)time.tv_sec * 1e6 + (uint64_t)time.tv_nsec / 1e3; +} + +void +Engine::reset_load() +{ + _reset_load_flag = true; } void @@ -268,9 +375,10 @@ Engine::activate() *this, SPtr<Interface>(), -1, 0, Raul::Path("/"), graph_properties); // Execute in "fake" process context (we are single threaded) - RunContext context(*this); - ev.pre_process(); - ev.execute(context); + PreProcessContext pctx; + RunContext rctx(run_context()); + ev.pre_process(pctx); + ev.execute(rctx); ev.post_process(); _root_graph = ev.graph(); @@ -301,25 +409,37 @@ Engine::deactivate() unsigned Engine::run(uint32_t sample_count) { - _run_context.locate(_run_context.end(), sample_count); + RunContext& ctx = run_context(); + _cycle_start_time = current_time(ctx); // Apply control bindings to input control_bindings()->pre_process( - _run_context, _root_graph->port_impl(0)->buffer(0).get()); + ctx, _root_graph->port_impl(0)->buffer(0).get()); - post_processor()->set_end_time(_run_context.end()); + post_processor()->set_end_time(ctx.end()); // Process events that came in during the last cycle // (Aiming for jitter-free 1 block event latency, ideally) const unsigned n_processed_events = process_events(); + const uint64_t t_events = current_time(ctx); // Run root graph if (_root_graph) { - _root_graph->process(_run_context); + _root_graph->process(ctx); // Emit control binding feedback control_bindings()->post_process( - _run_context, _root_graph->port_impl(1)->buffer(0).get()); + ctx, _root_graph->port_impl(1)->buffer(0).get()); + } + + // Update load for this cycle + if (ctx.duration() > 0) { + _event_load.update(t_events - _cycle_start_time, ctx.duration()); + _run_load.update(current_time(ctx) - t_events, ctx.duration()); + if (_reset_load_flag) { + _run_load = Load(); + _reset_load_flag = false; + } } return n_processed_events; @@ -340,9 +460,9 @@ Engine::enqueue_event(Event* ev, Event::Mode mode) unsigned Engine::process_events() { - const size_t MAX_EVENTS_PER_CYCLE = _run_context.nframes() / 8; + const size_t MAX_EVENTS_PER_CYCLE = run_context().nframes() / 8; return _pre_processor->process( - _run_context, *_post_processor, MAX_EVENTS_PER_CYCLE); + run_context(), *_post_processor, MAX_EVENTS_PER_CYCLE); } void diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 16699293..fc80d64b 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -17,9 +17,10 @@ #ifndef INGEN_ENGINE_ENGINE_HPP #define INGEN_ENGINE_ENGINE_HPP -#include <random> - #include <boost/utility.hpp> +#include <condition_variable> +#include <mutex> +#include <random> #include "ingen/EngineBase.hpp" #include "ingen/Interface.hpp" @@ -88,8 +89,27 @@ public: void set_driver(SPtr<Driver> driver); + /** Return the frame time to execute an event that arrived now. + * + * This aims to return a time one cycle from "now", so that events ideally + * have 1 cycle of latency with no jitter. + */ SampleCount event_time(); + /** Return the time this cycle began processing in microseconds. + * + * This value is comparable to the value returned by current_time(). + */ + inline uint64_t cycle_start_time(const RunContext& context) const { + return _cycle_start_time; + } + + /** Return the current time in microseconds. */ + uint64_t current_time(const RunContext& context) const; + + /** Reset the load statistics (when the expected DSP load changes). */ + void reset_load(); + /** Enqueue an event to be processed (non-realtime threads only). */ void enqueue_event(Event* ev, Event::Mode mode=Event::Mode::NORMAL); @@ -115,16 +135,53 @@ public: Worker* worker() const { return _worker; } Worker* sync_worker() const { return _sync_worker; } - RunContext& run_context() { return _run_context; } + RunContext& run_context() { return *_run_contexts[0]; } + + void locate(FrameTime s, SampleCount nframes); + void emit_notifications(FrameTime end); + bool pending_notifications(); + bool wait_for_tasks(); + void signal_tasks(); + Task* steal_task(unsigned start_thread); SPtr<Store> store() const; size_t event_queue_size() const; - bool atomic_bundles() const { return _atomic_bundles; } + size_t n_threads() const { return _run_contexts.size(); } + bool atomic_bundles() const { return _atomic_bundles; } private: Ingen::World* _world; + struct Load { + void update(uint64_t time, uint64_t available) { + const uint64_t load = time * 100 / available; + if (load < min) { + min = load; + changed = true; + } + if (load > max) { + max = load; + changed = true; + } + if (++n == 1) { + mean = load; + } else { + const float a = mean + ((float)load - mean) / (float)++n; + if (a != mean) { + changed = floorf(a) != floorf(mean); + mean = a; + } + } + } + + uint64_t min = std::numeric_limits<uint64_t>::max(); + uint64_t max = 0; + float mean = 0.0f; + uint64_t n = 0; + bool changed = false; + }; + BlockFactory* _block_factory; Broadcaster* _broadcaster; BufferFactory* _buffer_factory; @@ -144,12 +201,19 @@ private: Worker* _sync_worker; SocketListener* _listener; - RunContext _run_context; + std::vector<RunContext*> _run_contexts; + uint64_t _cycle_start_time; + Load _event_load; + Load _run_load; std::mt19937 _rand_engine; std::uniform_real_distribution<float> _uniform_dist; + std::condition_variable _tasks_available; + std::mutex _tasks_mutex; + bool _quit_flag; + bool _reset_load_flag; bool _direct_driver; bool _atomic_bundles; }; diff --git a/src/server/Event.hpp b/src/server/Event.hpp index 8ed25c0f..203e5d1d 100644 --- a/src/server/Event.hpp +++ b/src/server/Event.hpp @@ -35,6 +35,7 @@ namespace Server { class Engine; class RunContext; +class PreProcessContext; /** An event (command) to perform some action on Ingen. * @@ -60,7 +61,7 @@ public: enum class Execution { NORMAL, BLOCK, UNBLOCK }; /** Pre-process event before execution (non-realtime). */ - virtual bool pre_process() = 0; + virtual bool pre_process(PreProcessContext& ctx) = 0; /** Execute this event in the audio thread (realtime). */ virtual void execute(RunContext& context) = 0; diff --git a/src/server/GraphImpl.cpp b/src/server/GraphImpl.cpp index 86966b7c..c1ee6a2b 100644 --- a/src/server/GraphImpl.cpp +++ b/src/server/GraphImpl.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -233,11 +233,8 @@ GraphImpl::process(RunContext& context) void GraphImpl::run(RunContext& context) { - if (_compiled_graph && _compiled_graph->size() > 0) { - // Run all blocks - for (size_t i = 0; i < _compiled_graph->size(); ++i) { - (*_compiled_graph)[i].block()->process(context); - } + if (_compiled_graph) { + _compiled_graph->run(context); } } @@ -250,10 +247,11 @@ GraphImpl::set_buffer_size(RunContext& context, BlockImpl::set_buffer_size(context, bufs, type, size); if (_compiled_graph) { - for (size_t i = 0; i < _compiled_graph->size(); ++i) { - const CompiledBlock& block = (*_compiled_graph)[i]; - block.block()->set_buffer_size(context, bufs, type, size); - } + // FIXME + // for (size_t i = 0; i < _compiled_graph->size(); ++i) { + // const CompiledBlock& block = (*_compiled_graph)[i]; + // block.block()->set_buffer_size(context, bufs, type, size); + // } } } @@ -299,13 +297,15 @@ GraphImpl::has_arc(const PortImpl* tail, const PortImpl* dst_port) const return (i != _arcs.end()); } -void -GraphImpl::set_compiled_graph(CompiledGraph* cg) +CompiledGraph* +GraphImpl::swap_compiled_graph(CompiledGraph* cg) { - if (_compiled_graph && _compiled_graph != cg) { - _engine.maid()->dispose(_compiled_graph); + CompiledGraph* const old = _compiled_graph; + if (old && cg != old) { + _engine.reset_load(); } _compiled_graph = cg; + return old; } uint32_t @@ -353,55 +353,5 @@ GraphImpl::build_ports_array() return result; } -static inline void -compile_recursive(BlockImpl* n, CompiledGraph* output) -{ - if (n == NULL || n->traversed()) - return; - - n->traversed(true); - assert(output != NULL); - - for (auto& p : n->providers()) - if (!p->traversed()) - compile_recursive(p, output); - - output->push_back(CompiledBlock(n)); -} - -CompiledGraph* -GraphImpl::compile() -{ - ThreadManager::assert_thread(THREAD_PRE_PROCESS); - - CompiledGraph* const compiled_graph = new CompiledGraph(); - - for (auto& b : _blocks) { - b.traversed(false); - } - - for (auto& b : _blocks) { - // Either a sink or connected to our output ports: - if (!b.traversed() && b.dependants().empty()) { - compile_recursive(&b, compiled_graph); - } - } - - // Traverse any blocks we didn't hit yet - for (auto& b : _blocks) { - if (!b.traversed()) { - compile_recursive(&b, compiled_graph); - } - } - - if (compiled_graph->size() != _blocks.size()) { - _engine.log().error(fmt("Failed to compile graph %1%\n") % _path); - delete compiled_graph; - return NULL; - } - - return compiled_graph; -} - } // namespace Server } // namespace Ingen diff --git a/src/server/GraphImpl.hpp b/src/server/GraphImpl.hpp index 7352da39..f7d0be32 100644 --- a/src/server/GraphImpl.hpp +++ b/src/server/GraphImpl.hpp @@ -19,6 +19,8 @@ #include <cstdlib> +#include "ingen/ingen.h" + #include "BlockImpl.hpp" #include "CompiledGraph.hpp" #include "DuplexPort.hpp" @@ -160,21 +162,12 @@ public: bool has_arc(const PortImpl* tail, const PortImpl* head) const; - void set_compiled_graph(CompiledGraph* cp); + /** Set a new compiled graph to run, and return the old one. */ + CompiledGraph* swap_compiled_graph(CompiledGraph* cp) INGEN_WARN_UNUSED_RESULT; Raul::Array<PortImpl*>* external_ports() { return _ports; } void external_ports(Raul::Array<PortImpl*>* pa) { _ports = pa; } - /** Compile the graph into a version suitable for real-time execution. - * - * The CompiledGraph is a flat list that the graph will execute in order - * when its run() method is called. The returned object is newly allocated - * and owned by the caller. This function is non-realtime and does not - * affect processing, to take effect the returned object must be installed - * in the audio thread with set_compiled_graph(). - */ - CompiledGraph* compile(); - Raul::Array<PortImpl*>* build_ports_array(); /** Whether to run this graph's DSP bits in the audio thread */ @@ -195,7 +188,7 @@ private: Ports _inputs; ///< Pre-process thread only Ports _outputs; ///< Pre-process thread only Blocks _blocks; ///< Pre-process thread only - bool _process; + bool _process; ///< True iff graph is enabled }; } // namespace Server diff --git a/src/server/InternalPlugin.cpp b/src/server/InternalPlugin.cpp index 075bc67d..3d065fe3 100644 --- a/src/server/InternalPlugin.cpp +++ b/src/server/InternalPlugin.cpp @@ -16,7 +16,7 @@ #include "ingen/URIs.hpp" #include "internals/Controller.hpp" -#include "internals/Delay.hpp" +#include "internals/BlockDelay.hpp" #include "internals/Note.hpp" #include "internals/Time.hpp" #include "internals/Trigger.hpp" @@ -50,10 +50,10 @@ InternalPlugin::instantiate(BufferFactory& bufs, { const SampleCount srate = engine.driver()->sample_rate(); - if (uri() == NS_INTERNALS "Controller") { + if (uri() == NS_INTERNALS "BlockDelay") { + return new BlockDelayNode(this, bufs, symbol, polyphonic, parent, srate); + } else if (uri() == NS_INTERNALS "Controller") { return new ControllerNode(this, bufs, symbol, polyphonic, parent, srate); - } else if (uri() == NS_INTERNALS "Delay") { - return new DelayNode(this, bufs, symbol, polyphonic, parent, srate); } else if (uri() == NS_INTERNALS "Note") { return new NoteNode(this, bufs, symbol, polyphonic, parent, srate); } else if (uri() == NS_INTERNALS "Time") { diff --git a/src/server/JackDriver.cpp b/src/server/JackDriver.cpp index db567866..7f0e7ae0 100644 --- a/src/server/JackDriver.cpp +++ b/src/server/JackDriver.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -479,7 +479,7 @@ JackDriver::_process_cb(jack_nframes_t nframes) _transport_state = jack_transport_query(_client, &_position); - _engine.run_context().locate(start_of_current_cycle, nframes); + _engine.locate(start_of_current_cycle, nframes); // Read input for (auto& p : _ports) { diff --git a/src/server/JackDriver.hpp b/src/server/JackDriver.hpp index ef7bbd78..2d50d892 100644 --- a/src/server/JackDriver.hpp +++ b/src/server/JackDriver.hpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -23,6 +23,7 @@ #include <atomic> #include <jack/jack.h> +#include <jack/thread.h> #include <jack/transport.h> #ifdef INGEN_JACK_SESSION #include <jack/session.h> @@ -87,6 +88,8 @@ public: void append_time_events(RunContext& context, Buffer& buffer); + int real_time_priority() { return jack_client_real_time_priority(_client); } + jack_client_t* jack_client() const { return _client; } SampleCount block_length() const { return _block_length; } size_t seq_size() const { return _seq_size; } diff --git a/src/server/PostProcessor.cpp b/src/server/PostProcessor.cpp index d6016105..6c709518 100644 --- a/src/server/PostProcessor.cpp +++ b/src/server/PostProcessor.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -30,7 +30,7 @@ class Sentinel : public Event { public: Sentinel(Engine& engine) : Event(engine) {} - bool pre_process() { return false; } + bool pre_process(PreProcessContext& ctx) { return false; } void execute(RunContext& context) {} void post_process() {} }; @@ -70,7 +70,7 @@ PostProcessor::append(RunContext& context, Event* first, Event* last) bool PostProcessor::pending() const { - return _head.load() || _engine.run_context().pending_notifications(); + return _head.load() || _engine.pending_notifications(); } void @@ -85,7 +85,7 @@ PostProcessor::process() Event* next = ev->next(); if (!next || next->time() >= end_time) { // Process audio thread notifications until end - _engine.run_context().emit_notifications(end_time); + _engine.emit_notifications(end_time); return; } @@ -95,7 +95,7 @@ PostProcessor::process() ev = next; // Process audio thread notifications up until this event's time - _engine.run_context().emit_notifications(ev->time()); + _engine.emit_notifications(ev->time()); // Post-process event ev->post_process(); @@ -109,7 +109,7 @@ PostProcessor::process() _head = ev; // Process remaining audio thread notifications until end - _engine.run_context().emit_notifications(end_time); + _engine.emit_notifications(end_time); } } // namespace Server diff --git a/src/server/PreProcessContext.hpp b/src/server/PreProcessContext.hpp new file mode 100644 index 00000000..bf1115e8 --- /dev/null +++ b/src/server/PreProcessContext.hpp @@ -0,0 +1,70 @@ +/* + This file is part of Ingen. + Copyright 2007-2016 David Robillard <http://drobilla.net/> + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef INGEN_ENGINE_PREPROCESSCONTEXT_HPP +#define INGEN_ENGINE_PREPROCESSCONTEXT_HPP + +#include <unordered_set> + +#include "GraphImpl.hpp" + +namespace Ingen { +namespace Server { + +/** Event pre-processing context. + * + * \ingroup engine + */ +class PreProcessContext +{ +public: + typedef std::unordered_set<GraphImpl*> DirtyGraphs; + + /** Return true iff an atomic bundle is currently being pre-processed. */ + bool in_bundle() const { return _in_bundle; } + + /** Set/unset atomic bundle flag. */ + void set_in_bundle(bool b) { _in_bundle = b; } + + /** Return true iff graph should be compiled now (after a change). + * + * This may return false when an atomic bundle is deferring compilation, in + * which case the graph is flagged as dirty for later compilation. + */ + bool must_compile(GraphImpl* graph) { + if (!graph->enabled()) { + return false; + } else if (_in_bundle) { + _dirty_graphs.insert(graph); + return false; + } else { + return true; + } + } + + /** Return all graphs that require compilation after an atomic bundle. */ + const DirtyGraphs& dirty_graphs() const { return _dirty_graphs; } + DirtyGraphs& dirty_graphs() { return _dirty_graphs; } + +private: + DirtyGraphs _dirty_graphs; + bool _in_bundle = false; +}; + +} // namespace Server +} // namespace Ingen + +#endif // INGEN_ENGINE_PREPROCESSCONTEXT_HPP diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index f0008afc..9933bde2 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -18,10 +18,12 @@ #include "ingen/AtomSink.hpp" #include "ingen/AtomWriter.hpp" +#include "ingen/Configuration.hpp" #include "Engine.hpp" #include "Event.hpp" #include "PostProcessor.hpp" +#include "PreProcessContext.hpp" #include "PreProcessor.hpp" #include "RunContext.hpp" #include "ThreadManager.hpp" @@ -131,6 +133,14 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit) } if (n_processed > 0) { + Engine& engine = context.engine(); + if (engine.world()->conf().option("trace").get<int32_t>()) { + const uint64_t start = engine.cycle_start_time(context); + const uint64_t end = engine.current_time(context); + fprintf(stderr, "Processed %zu events in %zu us\n", + n_processed, end - start); + } + Event* next = (Event*)last->next(); last->next(NULL); dest.append(context, head, last); @@ -149,6 +159,8 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit) void PreProcessor::run() { + PreProcessContext ctx; + UndoStack& undo_stack = *_engine.undo_stack(); UndoStack& redo_stack = *_engine.redo_stack(); AtomWriter undo_writer( @@ -168,7 +180,7 @@ PreProcessor::run() } assert(!ev->is_prepared()); - if (ev->pre_process()) { + if (ev->pre_process(ctx)) { switch (ev->get_mode()) { case Event::Mode::NORMAL: case Event::Mode::REDO: diff --git a/src/server/RunContext.cpp b/src/server/RunContext.cpp index ee1eaf04..e107d247 100644 --- a/src/server/RunContext.cpp +++ b/src/server/RunContext.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -20,9 +20,11 @@ #include "Broadcaster.hpp" #include "BufferFactory.hpp" +#include "Driver.hpp" #include "Engine.hpp" #include "PortImpl.hpp" #include "RunContext.hpp" +#include "Task.hpp" namespace Ingen { namespace Server { @@ -44,10 +46,13 @@ struct Notification LV2_URID type; }; -RunContext::RunContext(Engine& engine) +RunContext::RunContext(Engine& engine, unsigned id, bool threaded) : _engine(engine) , _event_sink( new Raul::RingBuffer(engine.event_queue_size() * sizeof(Notification))) + , _task(nullptr) + , _thread(threaded ? new std::thread(&RunContext::run, this) : nullptr) + , _id(id) , _start(0) , _end(0) , _offset(0) @@ -59,6 +64,9 @@ RunContext::RunContext(Engine& engine) RunContext::RunContext(const RunContext& copy) : _engine(copy._engine) , _event_sink(copy._event_sink) + , _task(nullptr) + , _thread(nullptr) + , _id(copy._id) , _start(copy._start) , _end(copy._end) , _offset(copy._offset) @@ -138,5 +146,31 @@ RunContext::emit_notifications(FrameTime end) } } +void +RunContext::set_priority(int priority) +{ + if (_thread) { + pthread_t pthread = _thread->native_handle(); + const int policy = (priority > 0) ? SCHED_FIFO : SCHED_OTHER; + sched_param sp; + sp.sched_priority = (priority > 0) ? priority : 0; + if (pthread_setschedparam(pthread, policy, &sp)) { + _engine.log().error( + fmt("Failed to set real-time priority of run thread (%s)\n") + % strerror(errno)); + } + } +} + +void +RunContext::run() +{ + while (_engine.wait_for_tasks()) { + for (Task* t; (t = _engine.steal_task(0));) { + t->run(*this); + } + } +} + } // namespace Server } // namespace Ingen diff --git a/src/server/RunContext.hpp b/src/server/RunContext.hpp index 803d7829..eba30545 100644 --- a/src/server/RunContext.hpp +++ b/src/server/RunContext.hpp @@ -17,6 +17,8 @@ #ifndef INGEN_ENGINE_RUNCONTEXT_HPP #define INGEN_ENGINE_RUNCONTEXT_HPP +#include <thread> + #include "ingen/Atom.hpp" #include "ingen/World.hpp" #include "raul/RingBuffer.hpp" @@ -28,6 +30,7 @@ namespace Server { class Engine; class PortImpl; +class Task; /** Graph execution context. * @@ -44,8 +47,19 @@ class PortImpl; class RunContext { public: - RunContext(Engine& engine); - RunContext(const RunContext& copy); + /** Create a new run context. + * + * @param threaded If true, then this context is a worker which will launch + * a thread and execute tasks as they become available. + */ + RunContext(Engine& engine, unsigned id, bool threaded); + + /** Create a sub-context of `parent`. + * + * This is used to subdivide process cycles, the sub-context is + * lightweight and only serves to pass different time attributes. + */ + RunContext(const RunContext& parent); virtual ~RunContext(); @@ -73,42 +87,63 @@ public: /** Return true iff any notifications are pending. */ bool pending_notifications() const { return _event_sink->read_space(); } + /** Return the duration of this cycle in microseconds. + * + * This is the cycle length in frames (nframes) converted to microseconds, + * that is, the amount of real time that this cycle's audio represents. + * Note that this is unrelated to the amount of time available to execute a + * cycle (other than the fact that it must be processed in significantly + * less time to avoid a dropout when running in real time). + */ + inline uint64_t duration() const { + return (uint64_t)_nframes * 1e6 / _rate; + } + inline void locate(FrameTime s, SampleCount nframes) { _start = s; _end = s + nframes; _nframes = nframes; } - inline void locate(const RunContext& other) { - _start = other._start; - _end = other._end; - _nframes = other._nframes; - } - inline void slice(SampleCount offset, SampleCount nframes) { _offset = offset; _nframes = nframes; } + inline void set_task(Task* task) { + _task = task; + } + + void set_priority(int priority); + void set_rate(SampleCount rate) { _rate = rate; } + inline Engine& engine() const { return _engine; } + inline Task* task() const { return _task; } + inline unsigned id() const { return _id; } inline FrameTime start() const { return _start; } inline FrameTime time() const { return _start + _offset; } inline FrameTime end() const { return _end; } inline SampleCount offset() const { return _offset; } inline SampleCount nframes() const { return _nframes; } + inline SampleCount rate() const { return _rate; } inline bool realtime() const { return _realtime; } protected: const RunContext& operator=(const RunContext& copy) = delete; - Engine& _engine; ///< Engine we're running in + void run(); - Raul::RingBuffer* _event_sink; ///< Port updates from process context + Engine& _engine; ///< Engine we're running in + Raul::RingBuffer* _event_sink; ///< Port updates from process context + Task* _task; ///< Currently executing task + std::thread* _thread; ///< Thread (NULL for main run context) + unsigned _id; ///< Context ID FrameTime _start; ///< Start frame of this cycle, timeline relative FrameTime _end; ///< End frame of this cycle, timeline relative SampleCount _offset; ///< Offset into data buffers SampleCount _nframes; ///< Number of frames past offset to process + SampleCount _rate; ///< Sample rate in Hz bool _realtime; ///< True iff context is hard realtime bool _copy; ///< True iff this is a copy (shared event_sink) }; diff --git a/src/server/Task.cpp b/src/server/Task.cpp new file mode 100644 index 00000000..e15a4d4a --- /dev/null +++ b/src/server/Task.cpp @@ -0,0 +1,155 @@ +/* + This file is part of Ingen. + Copyright 2015-2016 David Robillard <http://drobilla.net/> + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "BlockImpl.hpp" +#include "Engine.hpp" +#include "Task.hpp" + +namespace Ingen { +namespace Server { + +void +Task::run(RunContext& context) +{ + switch (_mode) { + case Mode::SINGLE: + // fprintf(stderr, "%u run %s\n", context.id(), _block->path().c_str()); + _block->process(context); + break; + case Mode::SEQUENTIAL: + for (Task& task : *this) { + task.run(context); + } + break; + case Mode::PARALLEL: + // Initialize (not) done state of sub-tasks + for (Task& task : *this) { + task.set_done(false); + } + + // Grab the first sub-task + _next = 0; + _done_end = 0; + Task* t = steal(context); + + // Allow other threads to steal sub-tasks + context.set_task(this); + context.engine().signal_tasks(); + + // Run available tasks until this task is finished + for (; t; t = get_task(context)) { + t->run(context); + } + context.set_task(nullptr); + break; + } + + set_done(true); +} + +Task* +Task::steal(RunContext& context) +{ + if (_mode == Mode::PARALLEL) { + const unsigned i = _next++; + if (i < size()) { + return &(*this)[i]; + } + } + + return NULL; +} + +Task* +Task::get_task(RunContext& context) +{ + // Attempt to "steal" a task from ourselves + Task* t = steal(context); + if (t) { + return t; + } + + while (true) { + // Push done end index as forward as possible + for (; (*this)[_done_end].done(); ++_done_end) {} + + if (_done_end >= size()) { + return NULL; // All child tasks are finished + } + + // All child tasks claimed, but some are unfinished, steal a task + t = context.engine().steal_task(context.id() + 1); + if (t) { + return t; + } + + /* All child tasks are claimed, and we failed to steal any tasks. Spin + to prevent blocking, though it would probably be wiser to wait for a + signal in non-main threads, and maybe even in the main thread + depending on your real-time safe philosophy... more experimentation + here is needed. */ + } +} + +void +Task::simplify() +{ + if (_mode != Mode::SINGLE) { + for (std::vector<Task>::iterator t = begin(); t != end();) { + t->simplify(); + if (t->mode() != Mode::SINGLE && t->empty()) { + // Empty task, erase + t = erase(t); + } else if (t->mode() == _mode) { + // Subtask with the same type, fold child into parent + const Task child(*t); + t = erase(t); + t = insert(t, child.begin(), child.end()); + } else { + ++t; + } + } + + if (size() == 1) { + const Task t(front()); + *this = t; + } + } +} + +void +Task::dump(std::function<void (const std::string&)> sink, unsigned indent, bool first) const +{ + if (!first) { + sink("\n"); + for (unsigned i = 0; i < indent; ++i) { + sink(" "); + } + } + + if (_mode == Mode::SINGLE) { + sink(_block->path()); + } else { + sink(((_mode == Mode::SEQUENTIAL) ? "(seq " : "(par ")); + for (size_t i = 0; i < size(); ++i) { + (*this)[i].dump(sink, indent + 5, i == 0); + } + sink(")"); + } +} + +} // namespace Server +} // namespace Ingen diff --git a/src/server/Task.hpp b/src/server/Task.hpp new file mode 100644 index 00000000..2c1a0cc2 --- /dev/null +++ b/src/server/Task.hpp @@ -0,0 +1,98 @@ +/* + This file is part of Ingen. + Copyright 2007-2016 David Robillard <http://drobilla.net/> + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef INGEN_ENGINE_TASK_HPP +#define INGEN_ENGINE_TASK_HPP + +#include <cassert> +#include <ostream> +#include <vector> + +namespace Ingen { +namespace Server { + +class BlockImpl; +class RunContext; + +class Task : public std::vector<Task> { +public: + enum class Mode { + SINGLE, ///< Single block to run + SEQUENTIAL, ///< Elements must be run sequentially in order + PARALLEL ///< Elements may be run in any order in parallel + }; + + Task(Mode mode, BlockImpl* block=NULL) + : _block(block) + , _mode(mode) + , _done_end(0) + , _next(0) + , _done(false) + { + assert(!(mode == Mode::SINGLE && !block)); + } + + Task& operator=(const Task& copy) { + *static_cast<std::vector<Task>*>(this) = copy; + _block = copy._block; + _mode = copy._mode; + _done_end = copy._done_end; + _next = copy._next.load(); + _done = copy._done.load(); + return *this; + } + + Task(const Task& copy) + : std::vector<Task>(copy) + , _block(copy._block) + , _mode(copy._mode) + , _done_end(copy._done_end) + , _next(copy._next.load()) + , _done(copy._done.load()) + {} + + /** Run task in the given context. */ + void run(RunContext& context); + + /** Pretty print task to the given stream (recursively). */ + void dump(std::function<void (const std::string&)> sink, unsigned indent, bool first) const; + + /** Simplify task expression. */ + void simplify(); + + /** Steal a child task from this task (succeeds for PARALLEL only). */ + Task* steal(RunContext& context); + + Mode mode() const { return _mode; } + BlockImpl* block() const { return _block; } + bool done() const { return _done; } + + void set_done(bool done) { _done = done; } + +private: + Task* get_task(RunContext& context); + + BlockImpl* _block; ///< Used for SINGLE only + Mode _mode; ///< Execution mode + unsigned _done_end; ///< Index of rightmost done sub-task + std::atomic<unsigned> _next; ///< Index of next sub-task + std::atomic<bool> _done; ///< Completion phase +}; + +} // namespace Server +} // namespace Ingen + +#endif // INGEN_ENGINE_TASK_HPP diff --git a/src/server/UndoStack.cpp b/src/server/UndoStack.cpp index 87391bc7..de4c64ca 100644 --- a/src/server/UndoStack.cpp +++ b/src/server/UndoStack.cpp @@ -44,7 +44,7 @@ UndoStack::start_entry() } bool -UndoStack::write(const LV2_Atom* msg) +UndoStack::write(const LV2_Atom* msg, int32_t default_id) { _stack.back().push_event(msg); return true; diff --git a/src/server/UndoStack.hpp b/src/server/UndoStack.hpp index d8d60e84..6ce6475f 100644 --- a/src/server/UndoStack.hpp +++ b/src/server/UndoStack.hpp @@ -78,7 +78,7 @@ public: UndoStack(URIs& uris, URIMap& map) : _uris(uris), _map(map), _depth(0) {} int start_entry(); - bool write(const LV2_Atom* msg); + bool write(const LV2_Atom* msg, int32_t default_id=0); int finish_entry(); bool empty() const { return _stack.empty(); } diff --git a/src/server/events/Connect.cpp b/src/server/events/Connect.cpp index 66f726a6..35084ea6 100644 --- a/src/server/events/Connect.cpp +++ b/src/server/events/Connect.cpp @@ -26,6 +26,8 @@ #include "InputPort.hpp" #include "OutputPort.hpp" #include "PortImpl.hpp" +#include "PreProcessContext.hpp" +#include "internals/BlockDelay.hpp" #include "types.hpp" namespace Ingen { @@ -48,9 +50,9 @@ Connect::Connect(Engine& engine, {} bool -Connect::pre_process() +Connect::pre_process(PreProcessContext& ctx) { - std::unique_lock<std::mutex> lock(_engine.store()->mutex()); + std::lock_guard<std::mutex> lock(_engine.store()->mutex()); Node* tail = _engine.store()->get(_tail_path); if (!tail) { @@ -111,19 +113,31 @@ Connect::pre_process() provider... */ if (tail_block != head_block && tail_block->parent() == head_block->parent()) { + // Connection is between blocks inside a graph, compile graph + // The tail block is now a dependency (provider) of the head block head_block->providers().insert(tail_block); - tail_block->dependants().insert(head_block); + + if (!dynamic_cast<Internals::BlockDelayNode*>(tail_block)) { + /* Arcs leaving a delay node are ignored for the purposes of + compilation, since the output is from the previous cycle and + does not affect execution order. Otherwise, the head block is + now a dependant of the head block. */ + tail_block->dependants().insert(head_block); + } + + if (ctx.must_compile(_graph)) { + if (!(_compiled_graph = CompiledGraph::compile(_graph))) { + head_block->providers().erase(tail_block); + tail_block->dependants().erase(head_block); + return Event::pre_process_done(Status::COMPILATION_FAILED); + } + } } _graph->add_arc(_arc); _head->increment_num_arcs(); - tail_output->inherit_neighbour(_head, _tail_remove, _tail_add); - _head->inherit_neighbour(tail_output, _head_remove, _head_add); - - lock.unlock(); - if (!_head->is_driver_port()) { _voices = new Raul::Array<PortImpl::Voice>(_head->poly()); _head->get_buffers(*_engine.buffer_factory(), @@ -132,9 +146,8 @@ Connect::pre_process() false); } - if (_graph->enabled()) { - _compiled_graph = _graph->compile(); - } + tail_output->inherit_neighbour(_head, _tail_remove, _tail_add); + _head->inherit_neighbour(tail_output, _head_remove, _head_add); return Event::pre_process_done(Status::SUCCESS); } @@ -148,7 +161,9 @@ Connect::execute(RunContext& context) _engine.maid()->dispose(_head->set_voices(context, _voices)); } _head->connect_buffers(); - _graph->set_compiled_graph(_compiled_graph); + if (_compiled_graph) { + _compiled_graph = _graph->swap_compiled_graph(_compiled_graph); + } } } @@ -167,6 +182,8 @@ Connect::post_process() Node::path_to_uri(_tail_path), _tail_remove, _tail_add); } } + + delete _compiled_graph; } void diff --git a/src/server/events/Connect.hpp b/src/server/events/Connect.hpp index 84b2854b..9b98b167 100644 --- a/src/server/events/Connect.hpp +++ b/src/server/events/Connect.hpp @@ -52,7 +52,7 @@ public: const Raul::Path& tail, const Raul::Path& head); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/Copy.cpp b/src/server/events/Copy.cpp index 0ff53843..30ffb43d 100644 --- a/src/server/events/Copy.cpp +++ b/src/server/events/Copy.cpp @@ -26,6 +26,7 @@ #include "Engine.hpp" #include "EnginePort.hpp" #include "GraphImpl.hpp" +#include "PreProcessContext.hpp" #include "events/Copy.hpp" namespace Ingen { @@ -48,9 +49,9 @@ Copy::Copy(Engine& engine, {} bool -Copy::pre_process() +Copy::pre_process(PreProcessContext& ctx) { - std::unique_lock<std::mutex> lock(_engine.store()->mutex()); + std::lock_guard<std::mutex> lock(_engine.store()->mutex()); if (Node::uri_is_path(_old_uri)) { // Old URI is a path within the engine @@ -69,16 +70,16 @@ Copy::pre_process() if (Node::uri_is_path(_new_uri)) { // Copy to path within the engine - return engine_to_engine(); + return engine_to_engine(ctx); } else if (_new_uri.scheme() == "file") { // Copy to filesystem path (i.e. save) - return engine_to_filesystem(); + return engine_to_filesystem(ctx); } else { return Event::pre_process_done(Status::BAD_REQUEST); } } else if (_old_uri.scheme() == "file") { if (Node::uri_is_path(_new_uri)) { - filesystem_to_engine(); + return filesystem_to_engine(ctx); } else { // Ingen is not your file manager return Event::pre_process_done(Status::BAD_REQUEST); @@ -89,7 +90,7 @@ Copy::pre_process() } bool -Copy::engine_to_engine() +Copy::engine_to_engine(PreProcessContext& ctx) { // Only support a single source for now const Raul::Path new_path = Node::uri_to_path(_new_uri); @@ -125,8 +126,8 @@ Copy::engine_to_engine() _engine.store()->add(_block); // Compile graph with new block added for insertion in audio thread - if (_parent->enabled()) { - _compiled_graph = _parent->compile(); + if (ctx.must_compile(_parent)) { + _compiled_graph = CompiledGraph::compile(_parent); } return Event::pre_process_done(Status::SUCCESS); @@ -142,7 +143,7 @@ ends_with(const std::string& str, const std::string& end) } bool -Copy::engine_to_filesystem() +Copy::engine_to_filesystem(PreProcessContext& ctx) { // Ensure source is a graph SPtr<GraphImpl> graph = dynamic_ptr_cast<GraphImpl>(_old_block); @@ -168,7 +169,7 @@ Copy::engine_to_filesystem() } bool -Copy::filesystem_to_engine() +Copy::filesystem_to_engine(PreProcessContext& ctx) { if (!_engine.world()->parser()) { return Event::pre_process_done(Status::INTERNAL_ERROR); @@ -196,9 +197,8 @@ Copy::filesystem_to_engine() void Copy::execute(RunContext& context) { - if (_block) { - _parent->set_compiled_graph(_compiled_graph); - _compiled_graph = NULL; // Graph takes ownership + if (_block && _compiled_graph) { + _compiled_graph = _parent->swap_compiled_graph(_compiled_graph); } } @@ -209,6 +209,7 @@ Copy::post_process() if (respond() == Status::SUCCESS) { _engine.broadcaster()->copy(_old_uri, _new_uri); } + delete _compiled_graph; } void diff --git a/src/server/events/Copy.hpp b/src/server/events/Copy.hpp index 68ee31da..55310757 100644 --- a/src/server/events/Copy.hpp +++ b/src/server/events/Copy.hpp @@ -46,15 +46,15 @@ public: const Raul::URI& old_uri, const Raul::URI& new_uri); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); private: - bool engine_to_engine(); - bool engine_to_filesystem(); - bool filesystem_to_engine(); + bool engine_to_engine(PreProcessContext& ctx); + bool engine_to_filesystem(PreProcessContext& ctx); + bool filesystem_to_engine(PreProcessContext& ctx); const Raul::URI _old_uri; const Raul::URI _new_uri; diff --git a/src/server/events/CreateBlock.cpp b/src/server/events/CreateBlock.cpp index 28afe4b2..231df4e2 100644 --- a/src/server/events/CreateBlock.cpp +++ b/src/server/events/CreateBlock.cpp @@ -28,6 +28,7 @@ #include "GraphImpl.hpp" #include "PluginImpl.hpp" #include "PortImpl.hpp" +#include "PreProcessContext.hpp" namespace Ingen { namespace Server { @@ -53,7 +54,7 @@ CreateBlock::~CreateBlock() } bool -CreateBlock::pre_process() +CreateBlock::pre_process(PreProcessContext& ctx) { typedef Resource::Properties::const_iterator iterator; @@ -137,8 +138,8 @@ CreateBlock::pre_process() /* Compile graph with new block added for insertion in audio thread TODO: Since the block is not connected at this point, a full compilation could be avoided and the block simply appended. */ - if (_graph->enabled()) { - _compiled_graph = _graph->compile(); + if (ctx.must_compile(_graph)) { + _compiled_graph = CompiledGraph::compile(_graph); } _update.put_block(_block); @@ -149,9 +150,8 @@ CreateBlock::pre_process() void CreateBlock::execute(RunContext& context) { - if (_block) { - _graph->set_compiled_graph(_compiled_graph); - _compiled_graph = NULL; // Graph takes ownership + if (_status == Status::SUCCESS && _compiled_graph) { + _compiled_graph = _graph->swap_compiled_graph(_compiled_graph); } } diff --git a/src/server/events/CreateBlock.hpp b/src/server/events/CreateBlock.hpp index 00205c6a..b0aa6aa4 100644 --- a/src/server/events/CreateBlock.hpp +++ b/src/server/events/CreateBlock.hpp @@ -47,7 +47,7 @@ public: ~CreateBlock(); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/CreateGraph.cpp b/src/server/events/CreateGraph.cpp index ca36f258..40f539f7 100644 --- a/src/server/events/CreateGraph.cpp +++ b/src/server/events/CreateGraph.cpp @@ -23,6 +23,7 @@ #include "Driver.hpp" #include "Engine.hpp" #include "GraphImpl.hpp" +#include "PreProcessContext.hpp" #include "events/CreateGraph.hpp" #include "events/CreatePort.hpp" @@ -51,17 +52,18 @@ CreateGraph::build_child_events() // Properties common to both ports Resource::Properties control_properties; - control_properties.put(uris.rdf_type, uris.atom_AtomPort); control_properties.put(uris.atom_bufferType, uris.atom_Sequence); control_properties.put(uris.atom_supports, uris.patch_Message); - control_properties.put(uris.rsz_minimumSize, uris.forge.make(4096)); + control_properties.put(uris.lv2_designation, uris.lv2_control); control_properties.put(uris.lv2_portProperty, uris.lv2_connectionOptional); + control_properties.put(uris.rdf_type, uris.atom_AtomPort); + control_properties.put(uris.rsz_minimumSize, uris.forge.make(4096)); - // Add control input + // Add control port (message receive) Resource::Properties in_properties(control_properties); + in_properties.put(uris.lv2_index, uris.forge.make(0)); in_properties.put(uris.lv2_name, uris.forge.alloc("Control")); in_properties.put(uris.rdf_type, uris.lv2_InputPort); - in_properties.put(uris.lv2_index, uris.forge.make(0)); in_properties.put(uris.ingen_canvasX, uris.forge.make(32.0f), Resource::Graph::EXTERNAL); in_properties.put(uris.ingen_canvasY, uris.forge.make(32.0f), @@ -74,11 +76,11 @@ CreateGraph::build_child_events() _path.child(Raul::Symbol("control")), in_properties))); - // Add control out + // Add notify port (message respond) Resource::Properties out_properties(control_properties); + out_properties.put(uris.lv2_index, uris.forge.make(1)); out_properties.put(uris.lv2_name, uris.forge.alloc("Notify")); out_properties.put(uris.rdf_type, uris.lv2_OutputPort); - out_properties.put(uris.lv2_index, uris.forge.make(1)); out_properties.put(uris.ingen_canvasX, uris.forge.make(128.0f), Resource::Graph::EXTERNAL); out_properties.put(uris.ingen_canvasY, uris.forge.make(32.0f), @@ -92,7 +94,7 @@ CreateGraph::build_child_events() } bool -CreateGraph::pre_process() +CreateGraph::pre_process(PreProcessContext& ctx) { if (_engine.store()->get(_path)) { return Event::pre_process_done(Status::EXISTS, _path); @@ -163,7 +165,9 @@ CreateGraph::pre_process() _parent->add_block(*_graph); if (_parent->enabled()) { _graph->enable(); - _compiled_graph = _parent->compile(); + } + if (ctx.must_compile(_parent)) { + _compiled_graph = CompiledGraph::compile(_parent); } } @@ -179,7 +183,7 @@ CreateGraph::pre_process() // Build and pre-process child events to create standard ports build_child_events(); for (SPtr<Event> ev : _child_events) { - ev->pre_process(); + ev->pre_process(ctx); } return Event::pre_process_done(Status::SUCCESS); @@ -189,8 +193,8 @@ void CreateGraph::execute(RunContext& context) { if (_graph) { - if (_parent) { - _parent->set_compiled_graph(_compiled_graph); + if (_parent && _compiled_graph) { + _compiled_graph = _parent->swap_compiled_graph(_compiled_graph); } for (SPtr<Event> ev : _child_events) { @@ -213,6 +217,8 @@ CreateGraph::post_process() } } _child_events.clear(); + + delete _compiled_graph; } void diff --git a/src/server/events/CreateGraph.hpp b/src/server/events/CreateGraph.hpp index 9cae32ba..794742ac 100644 --- a/src/server/events/CreateGraph.hpp +++ b/src/server/events/CreateGraph.hpp @@ -44,7 +44,7 @@ public: const Raul::Path& path, const Resource::Properties& properties); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/CreatePort.cpp b/src/server/events/CreatePort.cpp index 9384ce09..60e57a71 100644 --- a/src/server/events/CreatePort.cpp +++ b/src/server/events/CreatePort.cpp @@ -86,7 +86,7 @@ CreatePort::CreatePort(Engine& engine, } bool -CreatePort::pre_process() +CreatePort::pre_process(PreProcessContext& ctx) { if (_port_type == PortType::UNKNOWN) { return Event::pre_process_done(Status::UNKNOWN_TYPE, _path); diff --git a/src/server/events/CreatePort.hpp b/src/server/events/CreatePort.hpp index c002df59..f3e2092d 100644 --- a/src/server/events/CreatePort.hpp +++ b/src/server/events/CreatePort.hpp @@ -51,7 +51,7 @@ public: const Raul::Path& path, const Resource::Properties& properties); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/Delete.cpp b/src/server/events/Delete.cpp index 5ca70a3a..bab2594a 100644 --- a/src/server/events/Delete.cpp +++ b/src/server/events/Delete.cpp @@ -29,6 +29,7 @@ #include "GraphImpl.hpp" #include "PluginImpl.hpp" #include "PortImpl.hpp" +#include "PreProcessContext.hpp" namespace Ingen { namespace Server { @@ -54,10 +55,11 @@ Delete::Delete(Engine& engine, Delete::~Delete() { delete _disconnect_event; + delete _compiled_graph; } bool -Delete::pre_process() +Delete::pre_process(PreProcessContext& ctx) { if (_path.is_root() || _path == "/control" || _path == "/notify") { return Event::pre_process_done(Status::NOT_DELETABLE, _path); @@ -84,26 +86,29 @@ Delete::pre_process() } // Take a writer lock while we modify the store - std::unique_lock<std::mutex> lock(_engine.store()->mutex()); + std::lock_guard<std::mutex> lock(_engine.store()->mutex()); _engine.store()->remove(iter, _removed_objects); if (_block) { parent->remove_block(*_block); _disconnect_event = new DisconnectAll(_engine, parent, _block.get()); - _disconnect_event->pre_process(); + _disconnect_event->pre_process(ctx); - if (parent->enabled()) { - _compiled_graph = parent->compile(); + if (ctx.must_compile(parent)) { + _compiled_graph = CompiledGraph::compile(parent); } } else if (_port) { parent->remove_port(*_port); _disconnect_event = new DisconnectAll(_engine, parent, _port.get()); - _disconnect_event->pre_process(); + _disconnect_event->pre_process(ctx); + + if (ctx.must_compile(parent)) { + _compiled_graph = CompiledGraph::compile(parent); + } if (parent->enabled()) { - _compiled_graph = parent->compile(); - _ports_array = parent->build_ports_array(); + _ports_array = parent->build_ports_array(); assert(_ports_array->size() == parent->num_ports_non_rt()); } @@ -137,8 +142,8 @@ Delete::execute(RunContext& context) } } - if (parent) { - parent->set_compiled_graph(_compiled_graph); + if (parent && _compiled_graph) { + _compiled_graph = parent->swap_compiled_graph(_compiled_graph); } } diff --git a/src/server/events/Delete.hpp b/src/server/events/Delete.hpp index 5cd40b14..fd797804 100644 --- a/src/server/events/Delete.hpp +++ b/src/server/events/Delete.hpp @@ -54,7 +54,7 @@ public: ~Delete(); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/Delta.cpp b/src/server/events/Delta.cpp index 49ea27ff..62d08367 100644 --- a/src/server/events/Delta.cpp +++ b/src/server/events/Delta.cpp @@ -79,6 +79,7 @@ Delta::~Delta() delete s; delete _create_event; + delete _compiled_graph; } void @@ -98,7 +99,6 @@ Delta::add_set_event(const char* port_symbol, _engine, _request_client, _request_id, _time, port, Atom(size, type, value), true); - ev->pre_process(); _set_events.push_back(ev); } @@ -155,7 +155,7 @@ get_file_node(LilvWorld* lworld, const URIs& uris, const Atom& value) */ bool -Delta::pre_process() +Delta::pre_process(PreProcessContext& ctx) { const Ingen::URIs& uris = _engine.world()->uris(); @@ -231,7 +231,7 @@ Delta::pre_process() path, _properties); } if (_create_event) { - if (_create_event->pre_process()) { + if (_create_event->pre_process(ctx)) { _object = _engine.store()->get(path); // Get object for setting } else { return Event::pre_process_done(Status::CREATION_FAILED, _subject); @@ -322,7 +322,6 @@ Delta::pre_process() } else if (key == uris.ingen_value || key == uris.ingen_activity) { SetPortValue* ev = new SetPortValue( _engine, _request_client, _request_id, _time, port, value); - ev->pre_process(); _set_events.push_back(ev); } else if (key == uris.midi_binding) { if (port->is_a(PortType::CONTROL) || port->is_a(PortType::CV)) { @@ -374,8 +373,11 @@ Delta::pre_process() if (value.type() == uris.forge.Bool) { op = SpecialType::ENABLE; // FIXME: defer this until all other metadata has been processed - if (value.get<int32_t>() && !_graph->enabled()) - _compiled_graph = _graph->compile(); + if (value.get<int32_t>() && !_graph->enabled()) { + if (!(_compiled_graph = CompiledGraph::compile(_graph))) { + _status = Status::COMPILATION_FAILED; + } + } } else { _status = Status::BAD_VALUE_TYPE; } @@ -446,6 +448,10 @@ Delta::pre_process() _types.push_back(op); } + for (auto& s : _set_events) { + s->pre_process(ctx); + } + if (poly_changed) { lock.unlock(); _poly_lock.lock(); @@ -493,7 +499,7 @@ Delta::execute(RunContext& context) if (_graph) { if (value.get<int32_t>()) { if (_compiled_graph) { - _graph->set_compiled_graph(_compiled_graph); + _compiled_graph = _graph->swap_compiled_graph(_compiled_graph); } _graph->enable(); } else { diff --git a/src/server/events/Delta.hpp b/src/server/events/Delta.hpp index 8b00fd3a..9e6cbecd 100644 --- a/src/server/events/Delta.hpp +++ b/src/server/events/Delta.hpp @@ -38,7 +38,7 @@ namespace Server { class CompiledGraph; class Engine; class GraphImpl; -class ProcessContext; +class RunContext; namespace Events { @@ -73,7 +73,7 @@ public: uint32_t size, uint32_t type); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/Disconnect.cpp b/src/server/events/Disconnect.cpp index 8d47ac5a..476a0cee 100644 --- a/src/server/events/Disconnect.cpp +++ b/src/server/events/Disconnect.cpp @@ -29,6 +29,7 @@ #include "InputPort.hpp" #include "OutputPort.hpp" #include "PortImpl.hpp" +#include "PreProcessContext.hpp" #include "RunContext.hpp" #include "ThreadManager.hpp" #include "events/Disconnect.hpp" @@ -55,6 +56,7 @@ Disconnect::Disconnect(Engine& engine, Disconnect::~Disconnect() { delete _impl; + delete _compiled_graph; } Disconnect::Impl::Impl(Engine& e, @@ -112,9 +114,9 @@ Disconnect::Impl::Impl(Engine& e, } bool -Disconnect::pre_process() +Disconnect::pre_process(PreProcessContext& ctx) { - std::unique_lock<std::mutex> lock(_engine.store()->mutex()); + std::lock_guard<std::mutex> lock(_engine.store()->mutex()); if (_tail_path.parent().parent() != _head_path.parent().parent() && _tail_path.parent() != _head_path.parent().parent() @@ -166,8 +168,9 @@ Disconnect::pre_process() dynamic_cast<OutputPort*>(tail), dynamic_cast<InputPort*>(head)); - if (_graph->enabled()) - _compiled_graph = _graph->compile(); + if (ctx.must_compile(_graph)) { + _compiled_graph = CompiledGraph::compile(_graph); + } return Event::pre_process_done(Status::SUCCESS); } @@ -204,7 +207,9 @@ Disconnect::execute(RunContext& context) { if (_status == Status::SUCCESS) { if (_impl->execute(context, true)) { - _graph->set_compiled_graph(_compiled_graph); + if (_compiled_graph) { + _compiled_graph = _graph->swap_compiled_graph(_compiled_graph); + } } else { _status = Status::NOT_FOUND; } diff --git a/src/server/events/Disconnect.hpp b/src/server/events/Disconnect.hpp index 69d9469c..19ffcf3b 100644 --- a/src/server/events/Disconnect.hpp +++ b/src/server/events/Disconnect.hpp @@ -54,7 +54,7 @@ public: ~Disconnect(); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/DisconnectAll.cpp b/src/server/events/DisconnectAll.cpp index 380aced5..2dea6a76 100644 --- a/src/server/events/DisconnectAll.cpp +++ b/src/server/events/DisconnectAll.cpp @@ -31,6 +31,7 @@ #include "InputPort.hpp" #include "OutputPort.hpp" #include "PortImpl.hpp" +#include "PreProcessContext.hpp" #include "events/Disconnect.hpp" #include "events/DisconnectAll.hpp" #include "util.hpp" @@ -76,10 +77,12 @@ DisconnectAll::~DisconnectAll() { for (auto& i : _impls) delete i; + + delete _compiled_graph; } bool -DisconnectAll::pre_process() +DisconnectAll::pre_process(PreProcessContext& ctx) { std::unique_lock<std::mutex> lock(_engine.store()->mutex(), std::defer_lock); @@ -136,8 +139,11 @@ DisconnectAll::pre_process() dynamic_cast<InputPort*>(a->head()))); } - if (!_deleting && _parent->enabled()) - _compiled_graph = _parent->compile(); + if (!_deleting && ctx.must_compile(_parent)) { + if (!(_compiled_graph = CompiledGraph::compile(_parent))) { + return Event::pre_process_done(Status::COMPILATION_FAILED); + } + } return Event::pre_process_done(Status::SUCCESS); } @@ -152,7 +158,9 @@ DisconnectAll::execute(RunContext& context) } } - _parent->set_compiled_graph(_compiled_graph); + if (_compiled_graph) { + _compiled_graph = _parent->swap_compiled_graph(_compiled_graph); + } } void diff --git a/src/server/events/DisconnectAll.hpp b/src/server/events/DisconnectAll.hpp index 1ddfc536..1fc1f757 100644 --- a/src/server/events/DisconnectAll.hpp +++ b/src/server/events/DisconnectAll.hpp @@ -56,7 +56,7 @@ public: ~DisconnectAll(); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/Get.cpp b/src/server/events/Get.cpp index fa56f23a..dfa16ef5 100644 --- a/src/server/events/Get.cpp +++ b/src/server/events/Get.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -46,9 +46,9 @@ Get::Get(Engine& engine, {} bool -Get::pre_process() +Get::pre_process(PreProcessContext& ctx) { - std::unique_lock<std::mutex> lock(_engine.store()->mutex()); + std::lock_guard<std::mutex> lock(_engine.store()->mutex()); if (_uri == "ingen:/plugins") { _plugins = _engine.block_factory()->plugins(); @@ -90,10 +90,14 @@ Get::post_process() } else if (_uri == "ingen:/engine") { // TODO: Keep a proper RDF model of the engine URIs& uris = _engine.world()->uris(); - _request_client->set_property( + _request_client->put( Raul::URI("ingen:/engine"), - uris.param_sampleRate, - uris.forge.make(int32_t(_engine.driver()->sample_rate()))); + { { uris.param_sampleRate, + uris.forge.make(int32_t(_engine.driver()->sample_rate())) }, + { uris.bufsz_maxBlockLength, + uris.forge.make(int32_t(_engine.driver()->block_length())) }, + { uris.ingen_numThreads, + uris.forge.make(int32_t(_engine.n_threads())) } }); } else { _response.send(_request_client.get()); } diff --git a/src/server/events/Get.hpp b/src/server/events/Get.hpp index f685df21..35955d9a 100644 --- a/src/server/events/Get.hpp +++ b/src/server/events/Get.hpp @@ -47,7 +47,7 @@ public: SampleCount timestamp, const Raul::URI& uri); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context) {} void post_process(); diff --git a/src/server/events/Mark.cpp b/src/server/events/Mark.cpp index 11690487..c72cc14f 100644 --- a/src/server/events/Mark.cpp +++ b/src/server/events/Mark.cpp @@ -15,6 +15,7 @@ */ #include "Engine.hpp" +#include "PreProcessContext.hpp" #include "UndoStack.hpp" #include "events/Mark.hpp" @@ -32,8 +33,15 @@ Mark::Mark(Engine& engine, , _depth(0) {} +Mark::~Mark() +{ + for (const auto& g : _compiled_graphs) { + delete g.second; + } +} + bool -Mark::pre_process() +Mark::pre_process(PreProcessContext& ctx) { UndoStack* const stack = ((_mode == Mode::UNDO) ? _engine.redo_stack() @@ -41,10 +49,21 @@ Mark::pre_process() switch (_type) { case Type::BUNDLE_START: + ctx.set_in_bundle(true); _depth = stack->start_entry(); break; case Type::BUNDLE_END: _depth = stack->finish_entry(); + ctx.set_in_bundle(false); + if (!ctx.dirty_graphs().empty()) { + for (GraphImpl* g : ctx.dirty_graphs()) { + CompiledGraph* cg = CompiledGraph::compile(g); + if (cg) { + _compiled_graphs.insert(std::make_pair(g, cg)); + } + } + ctx.dirty_graphs().clear(); + } break; } @@ -53,7 +72,11 @@ Mark::pre_process() void Mark::execute(RunContext& context) -{} +{ + for (auto& g : _compiled_graphs) { + g.second = g.first->swap_compiled_graph(g.second); + } +} void Mark::post_process() diff --git a/src/server/events/Mark.hpp b/src/server/events/Mark.hpp index d2db0834..68ba5149 100644 --- a/src/server/events/Mark.hpp +++ b/src/server/events/Mark.hpp @@ -44,15 +44,20 @@ public: SampleCount timestamp, Type type); - bool pre_process(); + ~Mark(); + + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); Execution get_execution() const; private: - Type _type; - int _depth; + typedef std::map<GraphImpl*, CompiledGraph*> CompiledGraphs; + + CompiledGraphs _compiled_graphs; + Type _type; + int _depth; }; } // namespace Events diff --git a/src/server/events/Move.cpp b/src/server/events/Move.cpp index b30c2ec2..aee74ab6 100644 --- a/src/server/events/Move.cpp +++ b/src/server/events/Move.cpp @@ -46,9 +46,9 @@ Move::~Move() } bool -Move::pre_process() +Move::pre_process(PreProcessContext& ctx) { - std::unique_lock<std::mutex> lock(_engine.store()->mutex()); + std::lock_guard<std::mutex> lock(_engine.store()->mutex()); if (!_old_path.parent().is_parent_of(_new_path)) { return Event::pre_process_done(Status::PARENT_DIFFERS, _new_path); diff --git a/src/server/events/Move.hpp b/src/server/events/Move.hpp index ef308a01..c45c73aa 100644 --- a/src/server/events/Move.hpp +++ b/src/server/events/Move.hpp @@ -45,7 +45,7 @@ public: ~Move(); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); void undo(Interface& target); diff --git a/src/server/events/SetPortValue.cpp b/src/server/events/SetPortValue.cpp index 0fac88c2..5bcb6dbf 100644 --- a/src/server/events/SetPortValue.cpp +++ b/src/server/events/SetPortValue.cpp @@ -53,7 +53,7 @@ SetPortValue::~SetPortValue() } bool -SetPortValue::pre_process() +SetPortValue::pre_process(PreProcessContext& ctx) { if (_port->is_output()) { return Event::pre_process_done(Status::DIRECTION_MISMATCH, _port->path()); diff --git a/src/server/events/SetPortValue.hpp b/src/server/events/SetPortValue.hpp index 7b49096f..aac5e033 100644 --- a/src/server/events/SetPortValue.hpp +++ b/src/server/events/SetPortValue.hpp @@ -47,7 +47,7 @@ public: ~SetPortValue(); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); diff --git a/src/server/events/Undo.cpp b/src/server/events/Undo.cpp index 0510d50f..83279744 100644 --- a/src/server/events/Undo.cpp +++ b/src/server/events/Undo.cpp @@ -34,7 +34,7 @@ Undo::Undo(Engine& engine, {} bool -Undo::pre_process() +Undo::pre_process(PreProcessContext& ctx) { UndoStack* const stack = _is_redo ? _engine.redo_stack() : _engine.undo_stack(); const Event::Mode mode = _is_redo ? Event::Mode::REDO : Event::Mode::UNDO; diff --git a/src/server/events/Undo.hpp b/src/server/events/Undo.hpp index c95daea9..300c74f2 100644 --- a/src/server/events/Undo.hpp +++ b/src/server/events/Undo.hpp @@ -38,7 +38,7 @@ public: SampleCount timestamp, bool is_redo); - bool pre_process(); + bool pre_process(PreProcessContext& ctx); void execute(RunContext& context); void post_process(); diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 0c32731c..cc01f68a 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -1,6 +1,6 @@ /* This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> + Copyright 2007-2016 David Robillard <http://drobilla.net/> Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free @@ -161,7 +161,7 @@ public: } void run(uint32_t nframes) { - _engine.run_context().locate(_frame_time, nframes); + _engine.locate(_frame_time, nframes); // Notify buffer is a Chunk with size set to the available space _notify_capacity = ((LV2_Atom_Sequence*)_ports[1]->buffer())->atom.size; @@ -256,6 +256,8 @@ public: } } + virtual int real_time_priority() { return 60; } + /** Called in run thread for events received at control input port. */ bool enqueue_message(const LV2_Atom* atom) { if (_from_ui.write(lv2_atom_total_size(atom), atom) == 0) { @@ -272,7 +274,7 @@ public: /** AtomSink::write implementation called by the PostProcessor in the main * thread to write responses to the UI. */ - bool write(const LV2_Atom* atom) { + bool write(const LV2_Atom* atom, int32_t default_id) { // Called from post-processor in main thread while (_to_ui.write(lv2_atom_total_size(atom), atom) == 0) { // Overflow, wait until ring is drained next cycle @@ -558,7 +560,7 @@ ingen_instantiate(const LV2_Descriptor* descriptor, engine->activate(); Server::ThreadManager::single_threaded = true; - engine->run_context().locate(0, block_length); + engine->locate(0, block_length); engine->post_processor()->set_end_time(block_length); engine->process_events(); diff --git a/src/server/internals/BlockDelay.cpp b/src/server/internals/BlockDelay.cpp new file mode 100644 index 00000000..3dee2feb --- /dev/null +++ b/src/server/internals/BlockDelay.cpp @@ -0,0 +1,89 @@ +/* + This file is part of Ingen. + Copyright 2007-2015 David Robillard <http://drobilla.net/> + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <limits.h> + +#include <cmath> + +#include "ingen/URIs.hpp" +#include "raul/Array.hpp" +#include "raul/Maid.hpp" + +#include "Buffer.hpp" +#include "InputPort.hpp" +#include "InternalPlugin.hpp" +#include "OutputPort.hpp" +#include "RunContext.hpp" +#include "internals/BlockDelay.hpp" + +namespace Ingen { +namespace Server { +namespace Internals { + +InternalPlugin* BlockDelayNode::internal_plugin(URIs& uris) { + return new InternalPlugin( + uris, Raul::URI(NS_INTERNALS "BlockDelay"), Raul::Symbol("blockDelay")); +} + +BlockDelayNode::BlockDelayNode(InternalPlugin* plugin, + BufferFactory& bufs, + const Raul::Symbol& symbol, + bool polyphonic, + GraphImpl* parent, + SampleRate srate) + : InternalBlock(plugin, symbol, polyphonic, parent, srate) +{ + const Ingen::URIs& uris = bufs.uris(); + _ports = new Raul::Array<PortImpl*>(2); + + _in_port = new InputPort(bufs, this, Raul::Symbol("in"), 0, 1, + PortType::AUDIO, 0, bufs.forge().make(0.0f)); + _in_port->set_property(uris.lv2_name, bufs.forge().alloc("In")); + _ports->at(0) = _in_port; + + _out_port = new OutputPort(bufs, this, Raul::Symbol("out"), 0, 1, + PortType::AUDIO, 0, bufs.forge().make(0.0f)); + _out_port->set_property(uris.lv2_name, bufs.forge().alloc("Out")); + _ports->at(1) = _out_port; +} + +BlockDelayNode::~BlockDelayNode() +{ + _buffer.reset(); +} + +void +BlockDelayNode::activate(BufferFactory& bufs) +{ + _buffer = bufs.get_buffer( + bufs.uris().atom_Sound, 0, bufs.audio_buffer_size(), false, true); + + BlockImpl::activate(bufs); +} + +void +BlockDelayNode::run(RunContext& context) +{ + // Copy buffer from last cycle to output + _out_port->buffer(0)->copy(context, _buffer.get()); + + // Copy input from this cycle to buffer + _buffer->copy(context, _in_port->buffer(0).get()); +} + +} // namespace Internals +} // namespace Server +} // namespace Ingen diff --git a/src/server/internals/Delay.hpp b/src/server/internals/BlockDelay.hpp index 0dc5da30..e1ef5311 100644 --- a/src/server/internals/Delay.hpp +++ b/src/server/internals/BlockDelay.hpp @@ -14,11 +14,10 @@ along with Ingen. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef INGEN_INTERNALS_DELAY_HPP -#define INGEN_INTERNALS_DELAY_HPP - -#include <math.h> +#ifndef INGEN_INTERNALS_BLOCKDELAY_HPP +#define INGEN_INTERNALS_BLOCKDELAY_HPP +#include "BufferRef.hpp" #include "InternalBlock.hpp" #include "types.hpp" @@ -32,17 +31,17 @@ class BufferFactory; namespace Internals { -class DelayNode : public InternalBlock +class BlockDelayNode : public InternalBlock { public: - DelayNode(InternalPlugin* plugin, - BufferFactory& bufs, - const Raul::Symbol& symbol, - bool polyphonic, - GraphImpl* parent, - SampleRate srate); + BlockDelayNode(InternalPlugin* plugin, + BufferFactory& bufs, + const Raul::Symbol& symbol, + bool polyphonic, + GraphImpl* parent, + SampleRate srate); - ~DelayNode(); + ~BlockDelayNode(); void activate(BufferFactory& bufs); @@ -50,24 +49,14 @@ public: static InternalPlugin* internal_plugin(URIs& uris); - float delay_samples() const { return _delay_samples; } - private: - inline float& buffer_at(int64_t phase) const { return _buffer[phase & _buffer_mask]; } - - InputPort* _delay_port; InputPort* _in_port; OutputPort* _out_port; - float* _buffer; - uint32_t _buffer_length; - uint32_t _buffer_mask; - uint64_t _write_phase; - float _last_delay_time; - float _delay_samples; + BufferRef _buffer; }; } // namespace Server } // namespace Ingen } // namespace Internals -#endif // INGEN_INTERNALS_DELAY_HPP +#endif // INGEN_INTERNALS_BLOCKDELAY_HPP diff --git a/src/server/internals/Delay.cpp b/src/server/internals/Delay.cpp deleted file mode 100644 index 6ac97008..00000000 --- a/src/server/internals/Delay.cpp +++ /dev/null @@ -1,205 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2015 David Robillard <http://drobilla.net/> - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <limits.h> - -#include <cmath> - -#include "ingen/URIs.hpp" -#include "raul/Array.hpp" -#include "raul/Maid.hpp" - -#include "Buffer.hpp" -#include "Driver.hpp" -#include "Engine.hpp" -#include "GraphImpl.hpp" -#include "InputPort.hpp" -#include "InternalPlugin.hpp" -#include "OutputPort.hpp" -#include "RunContext.hpp" -#include "internals/Delay.hpp" -#include "util.hpp" - -#define CALC_DELAY(delaytime) \ - (f_clamp (delaytime * (float)sample_rate, 1.0f, (float)(buffer_mask + 1))) - -using namespace std; - -namespace Ingen { -namespace Server { -namespace Internals { - -static const float MAX_DELAY_SECONDS = 8.0f; - -InternalPlugin* DelayNode::internal_plugin(URIs& uris) { - return new InternalPlugin( - uris, Raul::URI(NS_INTERNALS "Delay"), Raul::Symbol("delay")); -} - -DelayNode::DelayNode(InternalPlugin* plugin, - BufferFactory& bufs, - const Raul::Symbol& symbol, - bool polyphonic, - GraphImpl* parent, - SampleRate srate) - : InternalBlock(plugin, symbol, polyphonic, parent, srate) - , _buffer(0) - , _buffer_length(0) - , _buffer_mask(0) - , _write_phase(0) -{ - const Ingen::URIs& uris = bufs.uris(); - _ports = new Raul::Array<PortImpl*>(3); - - const float default_delay = 1.0f; - _last_delay_time = default_delay; - _delay_samples = default_delay; - - _delay_port = new InputPort(bufs, this, Raul::Symbol("delay"), 1, _polyphony, - PortType::CONTROL, 0, bufs.forge().make(default_delay)); - _delay_port->set_property(uris.lv2_name, bufs.forge().alloc("Delay")); - _delay_port->set_property(uris.lv2_default, bufs.forge().make(default_delay)); - _delay_port->set_property(uris.lv2_minimum, bufs.forge().make((float)(1.0/(double)srate))); - _delay_port->set_property(uris.lv2_maximum, bufs.forge().make(MAX_DELAY_SECONDS)); - _ports->at(0) = _delay_port; - - _in_port = new InputPort(bufs, this, Raul::Symbol("in"), 0, 1, - PortType::AUDIO, 0, bufs.forge().make(0.0f)); - _in_port->set_property(uris.lv2_name, bufs.forge().alloc("Input")); - _ports->at(1) = _in_port; - - _out_port = new OutputPort(bufs, this, Raul::Symbol("out"), 0, 1, - PortType::AUDIO, 0, bufs.forge().make(0.0f)); - _out_port->set_property(uris.lv2_name, - bufs.forge().alloc("Output")); - _ports->at(2) = _out_port; - - //_buffer = bufs.get(PortType::AUDIO, bufs.audio_buffer_size(buffer_length_frames), true); - -} - -DelayNode::~DelayNode() -{ - //_buffer.reset(); - free(_buffer); -} - -void -DelayNode::activate(BufferFactory& bufs) -{ - BlockImpl::activate(bufs); - const SampleRate rate = bufs.engine().driver()->sample_rate(); - const SampleCount min_size = MAX_DELAY_SECONDS * rate; - - // Smallest power of two larger than min_size - SampleCount size = 1; - while (size < min_size) - size <<= 1; - - _buffer = (float*)calloc(size, sizeof(float)); - _buffer_mask = size - 1; - _buffer_length = size; - //_buffer->clear(); - _write_phase = 0; -} - -static inline float f_clamp(float x, float a, float b) -{ - const float x1 = fabs(x - a); - const float x2 = fabs(x - b); - - x = x1 + a + b; - x -= x2; - x *= 0.5; - - return x; -} - -static inline float cube_interp(const float fr, const float inm1, const float - in, const float inp1, const float inp2) -{ - return in + 0.5f * fr * ( - inp1 - inm1 + fr * ( - 4.0f * inp1 + 2.0f * inm1 - 5.0f * in - inp2 + fr * ( - 3.0f * (in - inp1) - inm1 + inp2))); -} - -void -DelayNode::run(RunContext& context) -{ - Buffer* const delay_buf = _delay_port->buffer(0).get(); - Buffer* const in_buf = _in_port->buffer(0).get(); - Buffer* const out_buf = _out_port->buffer(0).get(); - - DelayNode* plugin_data = this; - - const float* const in = in_buf->samples(); - float* const out = out_buf->samples(); - const float delay_time = delay_buf->samples()[0]; - const uint32_t buffer_mask = plugin_data->_buffer_mask; - const SampleRate sample_rate = context.engine().driver()->sample_rate(); - float delay_samples = plugin_data->_delay_samples; - int64_t write_phase = plugin_data->_write_phase; - const uint32_t sample_count = context.nframes(); - - if (write_phase == 0) { - _last_delay_time = delay_time; - _delay_samples = delay_samples = CALC_DELAY(delay_time); - } - - if (delay_time == _last_delay_time) { - const int64_t idelay_samples = (int64_t)delay_samples; - const float frac = delay_samples - idelay_samples; - - for (uint32_t i = 0; i < sample_count; i++) { - int64_t read_phase = write_phase - (int64_t)delay_samples; - const float read = cube_interp(frac, - buffer_at(read_phase - 1), - buffer_at(read_phase), - buffer_at(read_phase + 1), - buffer_at(read_phase + 2)); - buffer_at(write_phase++) = in[i]; - out[i] = read; - } - } else { - const float next_delay_samples = CALC_DELAY(delay_time); - const float delay_samples_slope = (next_delay_samples - delay_samples) / sample_count; - - for (uint32_t i = 0; i < sample_count; i++) { - delay_samples += delay_samples_slope; - write_phase++; - const int64_t read_phase = write_phase - (int64_t)delay_samples; - const int64_t idelay_samples = (int64_t)delay_samples; - const float frac = delay_samples - idelay_samples; - const float read = cube_interp(frac, - buffer_at(read_phase - 1), - buffer_at(read_phase), - buffer_at(read_phase + 1), - buffer_at(read_phase + 2)); - buffer_at(write_phase) = in[i]; - out[i] = read; - } - - _last_delay_time = delay_time; - _delay_samples = delay_samples; - } - - _write_phase = write_phase; -} - -} // namespace Internals -} // namespace Server -} // namespace Ingen diff --git a/src/server/wscript b/src/server/wscript index 6f5ffb86..68c1c5f1 100644 --- a/src/server/wscript +++ b/src/server/wscript @@ -9,6 +9,7 @@ def build(bld): Broadcaster.cpp Buffer.cpp BufferFactory.cpp + CompiledGraph.cpp ClientUpdate.cpp ControlBindings.cpp DuplexPort.cpp @@ -27,6 +28,7 @@ def build(bld): PreProcessor.cpp RunContext.cpp SocketListener.cpp + Task.cpp UndoStack.cpp Worker.cpp events/Connect.cpp @@ -44,8 +46,8 @@ def build(bld): events/SetPortValue.cpp events/Undo.cpp ingen_engine.cpp + internals/BlockDelay.cpp internals/Controller.cpp - internals/Delay.cpp internals/Note.cpp internals/Time.cpp internals/Trigger.cpp |