diff options
Diffstat (limited to 'src/server/Engine.cpp')
-rw-r--r-- | src/server/Engine.cpp | 526 |
1 files changed, 526 insertions, 0 deletions
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp new file mode 100644 index 00000000..a7476845 --- /dev/null +++ b/src/server/Engine.cpp @@ -0,0 +1,526 @@ +/* + This file is part of Ingen. + Copyright 2007-2017 David Robillard <http://drobilla.net/> + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "ingen_config.h" + +#include <sys/mman.h> + +#include <limits> +#include <thread> + +#include "lv2/lv2plug.in/ns/ext/buf-size/buf-size.h" +#include "lv2/lv2plug.in/ns/ext/state/state.h" + +#include "events/CreateGraph.hpp" +#include "ingen/AtomReader.hpp" +#include "ingen/Configuration.hpp" +#include "ingen/Log.hpp" +#include "ingen/Store.hpp" +#include "ingen/StreamWriter.hpp" +#include "ingen/Tee.hpp" +#include "ingen/URIs.hpp" +#include "ingen/World.hpp" +#include "ingen/types.hpp" +#include "raul/Maid.hpp" + +#include "BlockFactory.hpp" +#include "Broadcaster.hpp" +#include "BufferFactory.hpp" +#include "ControlBindings.hpp" +#include "DirectDriver.hpp" +#include "Driver.hpp" +#include "Engine.hpp" +#include "Event.hpp" +#include "EventWriter.hpp" +#include "GraphImpl.hpp" +#include "LV2Options.hpp" +#include "PostProcessor.hpp" +#include "PreProcessContext.hpp" +#include "PreProcessor.hpp" +#include "RunContext.hpp" +#include "ThreadManager.hpp" +#include "UndoStack.hpp" +#include "Worker.hpp" +#ifdef HAVE_SOCKET +#include "SocketListener.hpp" +#endif + +namespace Ingen { +namespace Server { + +INGEN_THREAD_LOCAL unsigned ThreadManager::flags(0); +bool ThreadManager::single_threaded(true); + +Engine::Engine(Ingen::World* world) + : _world(world) + , _options(new LV2Options(world->uris())) + , _buffer_factory(new BufferFactory(*this, world->uris())) + , _maid(new Raul::Maid) + , _worker(new Worker(world->log(), event_queue_size())) + , _sync_worker(new Worker(world->log(), event_queue_size(), true)) + , _broadcaster(new Broadcaster()) + , _control_bindings(new ControlBindings(*this)) + , _block_factory(new BlockFactory(world)) + , _undo_stack(new UndoStack(_world->uris(), _world->uri_map())) + , _redo_stack(new UndoStack(_world->uris(), _world->uri_map())) + , _post_processor(new PostProcessor(*this)) + , _pre_processor(new PreProcessor(*this)) + , _event_writer(new EventWriter(*this)) + , _interface(_event_writer) + , _atom_interface( + new AtomReader(world->uri_map(), world->uris(), world->log(), *_interface)) + , _root_graph(nullptr) + , _cycle_start_time(0) + , _rand_engine(0) + , _uniform_dist(0.0f, 1.0f) + , _quit_flag(false) + , _reset_load_flag(false) + , _atomic_bundles(world->conf().option("atomic-bundles").get<int32_t>()) + , _activated(false) +{ + if (!world->store()) { + world->set_store(SPtr<Ingen::Store>(new Store())); + } + + for (int i = 0; i < world->conf().option("threads").get<int32_t>(); ++i) { + Raul::RingBuffer* ring = new Raul::RingBuffer(24 * event_queue_size()); + _notifications.push_back(ring); + _run_contexts.push_back(new RunContext(*this, ring, i, i > 0)); + } + + _world->lv2_features().add_feature(_worker->schedule_feature()); + _world->lv2_features().add_feature(_options); + _world->lv2_features().add_feature( + SPtr<LV2Features::Feature>( + new LV2Features::EmptyFeature(LV2_BUF_SIZE__powerOf2BlockLength))); + _world->lv2_features().add_feature( + SPtr<LV2Features::Feature>( + new LV2Features::EmptyFeature(LV2_BUF_SIZE__fixedBlockLength))); + _world->lv2_features().add_feature( + SPtr<LV2Features::Feature>( + new LV2Features::EmptyFeature(LV2_BUF_SIZE__boundedBlockLength))); + _world->lv2_features().add_feature( + SPtr<LV2Features::Feature>( + new LV2Features::EmptyFeature(LV2_STATE__loadDefaultState))); + + if (world->conf().option("dump").get<int32_t>()) { + _interface = std::make_shared<Tee>( + Tee::Sinks{ + _event_writer, + std::make_shared<StreamWriter>(world->uri_map(), + world->uris(), + URI("ingen:/engine"), + stderr, + ColorContext::Color::MAGENTA)}); + } +} + +Engine::~Engine() +{ + _root_graph = nullptr; + deactivate(); + + // Process all pending events + const FrameTime end = std::numeric_limits<FrameTime>::max(); + RunContext& ctx = run_context(); + locate(ctx.end(), end - ctx.end()); + _post_processor->set_end_time(end); + _post_processor->process(); + while (!_pre_processor->empty()) { + _pre_processor->process(ctx, *_post_processor, 1); + _post_processor->process(); + } + + _atom_interface.reset(); + + // Delete run contexts + _quit_flag = true; + _tasks_available.notify_all(); + for (RunContext* ctx : _run_contexts) { + ctx->join(); + delete ctx; + } + for (Raul::RingBuffer* ring : _notifications) { + delete ring; + } + + const SPtr<Store> store = this->store(); + if (store) { + for (auto& s : *store.get()) { + if (!dynamic_ptr_cast<NodeImpl>(s.second)->parent()) { + s.second.reset(); + } + } + store->clear(); + } + + _world->set_store(SPtr<Ingen::Store>()); +} + +void +Engine::listen() +{ +#ifdef HAVE_SOCKET + _listener = UPtr<SocketListener>(new SocketListener(*this)); +#endif +} + +void +Engine::advance(SampleCount nframes) +{ + for (RunContext* ctx : _run_contexts) { + ctx->locate(ctx->start() + nframes, block_length()); + } +} + +void +Engine::locate(FrameTime s, SampleCount nframes) +{ + for (RunContext* ctx : _run_contexts) { + ctx->locate(s, nframes); + } +} + +void +Engine::set_root_graph(GraphImpl* graph) +{ + _root_graph = graph; +} + +void +Engine::flush_events(const std::chrono::milliseconds& sleep_ms) +{ + bool finished = !pending_events(); + while (!finished) { + // Run one audio block to execute prepared events + run(block_length()); + advance(block_length()); + + // Run one main iteration to post-process events + main_iteration(); + + // Sleep before continuing if there are still events to process + if (!(finished = !pending_events())) { + std::this_thread::sleep_for(sleep_ms); + } + } +} + +void +Engine::emit_notifications(FrameTime end) +{ + for (RunContext* ctx : _run_contexts) { + ctx->emit_notifications(end); + } +} + +bool +Engine::pending_notifications() +{ + for (const RunContext* ctx : _run_contexts) { + if (ctx->pending_notifications()) { + return true; + } + } + return false; +} + +bool +Engine::wait_for_tasks() +{ + if (!_quit_flag) { + std::unique_lock<std::mutex> lock(_tasks_mutex); + _tasks_available.wait(lock); + } + return !_quit_flag; +} + +void +Engine::signal_tasks_available() +{ + _tasks_available.notify_all(); +} + +Task* +Engine::steal_task(unsigned start_thread) +{ + for (unsigned i = 0; i < _run_contexts.size(); ++i) { + const unsigned id = (start_thread + i) % _run_contexts.size(); + RunContext* const ctx = _run_contexts[id]; + Task* par = ctx->task(); + if (par) { + Task* t = par->steal(*ctx); + if (t) { + return t; + } + } + } + return nullptr; +} + +SPtr<Store> +Engine::store() const +{ + return _world->store(); +} + +SampleRate +Engine::sample_rate() const +{ + return _driver->sample_rate(); +} + +SampleCount +Engine::block_length() const +{ + return _driver->block_length(); +} + +size_t +Engine::sequence_size() const +{ + return _driver->seq_size(); +} + +size_t +Engine::event_queue_size() const +{ + return world()->conf().option("queue-size").get<int32_t>(); +} + +void +Engine::quit() +{ + _quit_flag = true; +} + +Properties +Engine::load_properties() const +{ + const Ingen::URIs& uris = world()->uris(); + + return { { uris.ingen_meanRunLoad, + uris.forge.make(floorf(_run_load.mean) / 100.0f) }, + { uris.ingen_minRunLoad, + uris.forge.make(_run_load.min / 100.0f) }, + { uris.ingen_maxRunLoad, + uris.forge.make(_run_load.max / 100.0f) } }; +} + +bool +Engine::main_iteration() +{ + _post_processor->process(); + _maid->cleanup(); + + if (_run_load.changed) { + _broadcaster->put(URI("ingen:/engine"), load_properties()); + _run_load.changed = false; + } + + return !_quit_flag; +} + +void +Engine::set_driver(SPtr<Driver> driver) +{ + _driver = driver; + for (RunContext* ctx : _run_contexts) { + ctx->set_priority(driver->real_time_priority()); + ctx->set_rate(driver->sample_rate()); + } + + _buffer_factory->set_block_length(driver->block_length()); + _options->set(sample_rate(), + block_length(), + buffer_factory()->default_size(_world->uris().atom_Sequence)); +} + +SampleCount +Engine::event_time() +{ + if (ThreadManager::single_threaded) { + return 0; + } + + return _driver->frame_time() + _driver->block_length(); +} + +uint64_t +Engine::current_time() const +{ + return _clock.now_microseconds(); +} + +void +Engine::reset_load() +{ + _reset_load_flag = true; +} + +void +Engine::init(double sample_rate, uint32_t block_length, size_t seq_size) +{ + set_driver(SPtr<Driver>(new DirectDriver(*this, sample_rate, block_length, seq_size))); +} + +bool +Engine::supports_dynamic_ports() const +{ + return !_driver || _driver->dynamic_ports(); +} + +bool +Engine::activate() +{ + if (!_driver) { + return false; + } + + ThreadManager::single_threaded = true; + + const Ingen::URIs& uris = world()->uris(); + + if (!_root_graph) { + // No root graph has been loaded, create an empty one + const Properties properties = { + {uris.rdf_type, uris.ingen_Graph}, + {uris.ingen_polyphony, + Property(_world->forge().make(1), + Resource::Graph::INTERNAL)}}; + + enqueue_event( + new Events::CreateGraph( + *this, SPtr<Interface>(), -1, 0, Raul::Path("/"), properties)); + + flush_events(std::chrono::milliseconds(10)); + if (!_root_graph) { + return false; + } + } + + _driver->activate(); + _root_graph->enable(); + + ThreadManager::single_threaded = false; + _activated = true; + + return true; +} + +void +Engine::deactivate() +{ + if (_driver) { + _driver->deactivate(); + } + + if (_root_graph) { + _root_graph->deactivate(); + } + + ThreadManager::single_threaded = true; + _activated = false; +} + +unsigned +Engine::run(uint32_t sample_count) +{ + RunContext& ctx = run_context(); + _cycle_start_time = current_time(); + + post_processor()->set_end_time(ctx.end()); + + // Process events that came in during the last cycle + // (Aiming for jitter-free 1 block event latency, ideally) + const unsigned n_processed_events = process_events(); + + // Reset load if graph structure has changed + if (_reset_load_flag) { + _run_load = Load(); + _reset_load_flag = false; + } + + // Run root graph + if (_root_graph) { + // Apply control bindings to input + control_bindings()->pre_process( + ctx, _root_graph->port_impl(0)->buffer(0).get()); + + // Run root graph for this cycle + _root_graph->process(ctx); + + // Emit control binding feedback + control_bindings()->post_process( + ctx, _root_graph->port_impl(1)->buffer(0).get()); + } + + // Update load for this cycle + if (ctx.duration() > 0) { + _run_load.update(current_time() - _cycle_start_time, ctx.duration()); + } + + return n_processed_events; +} + +bool +Engine::pending_events() const +{ + return !_pre_processor->empty() || _post_processor->pending(); +} + +void +Engine::enqueue_event(Event* ev, Event::Mode mode) +{ + _pre_processor->event(ev, mode); +} + +unsigned +Engine::process_events() +{ + const size_t MAX_EVENTS_PER_CYCLE = run_context().nframes() / 8; + return _pre_processor->process( + run_context(), *_post_processor, MAX_EVENTS_PER_CYCLE); +} + +unsigned +Engine::process_all_events() +{ + return _pre_processor->process(run_context(), *_post_processor, 0); +} + +Log& +Engine::log() const +{ + return _world->log(); +} + +void +Engine::register_client(SPtr<Interface> client) +{ + log().info(fmt("Registering client <%1%>\n") % client->uri().c_str()); + _broadcaster->register_client(client); +} + +bool +Engine::unregister_client(SPtr<Interface> client) +{ + log().info(fmt("Unregistering client <%1%>\n") % client->uri().c_str()); + return _broadcaster->unregister_client(client); +} + +} // namespace Server +} // namespace Ingen |