diff options
Diffstat (limited to 'src/server/PatchImpl.cpp')
-rw-r--r-- | src/server/PatchImpl.cpp | 470 |
1 files changed, 470 insertions, 0 deletions
diff --git a/src/server/PatchImpl.cpp b/src/server/PatchImpl.cpp new file mode 100644 index 00000000..72545b1b --- /dev/null +++ b/src/server/PatchImpl.cpp @@ -0,0 +1,470 @@ +/* This file is part of Ingen. + * Copyright 2007-2011 David Robillard <http://drobilla.net> + * + * Ingen is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include <cassert> +#include <cmath> +#include <string> +#include "raul/log.hpp" +#include "shared/World.hpp" +#include "shared/LV2URIMap.hpp" +#include "ThreadManager.hpp" +#include "NodeImpl.hpp" +#include "PatchImpl.hpp" +#include "PatchPlugin.hpp" +#include "PortImpl.hpp" +#include "ConnectionImpl.hpp" +#include "DuplexPort.hpp" +#include "Engine.hpp" +#include "ProcessSlave.hpp" +#include "Driver.hpp" +#include "ingen-config.h" + +using namespace std; +using namespace Raul; + +namespace Ingen { +namespace Server { + +PatchImpl::PatchImpl(Engine& engine, + const Raul::Symbol& symbol, + uint32_t poly, + PatchImpl* parent, + SampleRate srate, + uint32_t internal_poly) + : NodeImpl(new PatchPlugin(*engine.world()->uris().get(), + engine.world()->uris()->ingen_Patch.c_str(), "patch", "Ingen Patch"), + symbol, poly, parent, srate) + , _engine(engine) + , _internal_poly(internal_poly) + , _compiled_patch(NULL) + , _process(false) +{ + assert(internal_poly >= 1); +} + +PatchImpl::~PatchImpl() +{ + assert(!_activated); + + delete _compiled_patch; + delete _plugin; +} + +void +PatchImpl::activate(BufferFactory& bufs) +{ + NodeImpl::activate(bufs); + + for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i) + (*i)->activate(bufs); + + assert(_activated); +} + +void +PatchImpl::deactivate() +{ + if (_activated) { + NodeImpl::deactivate(); + + for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i) { + if ((*i)->activated()) + (*i)->deactivate(); + assert(!(*i)->activated()); + } + } + assert(!_activated); +} + +void +PatchImpl::disable() +{ + ThreadManager::assert_thread(THREAD_PROCESS); + + _process = false; + + for (List<PortImpl*>::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i) + if ((*i)->context() == Context::AUDIO) + (*i)->clear_buffers(); +} + +bool +PatchImpl::prepare_internal_poly(BufferFactory& bufs, uint32_t poly) +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + + // TODO: Subpatch dynamic polyphony (i.e. changing port polyphony) + + for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i) + (*i)->prepare_poly(bufs, poly); + + for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i) + for (uint32_t j = 0; j < (*i)->num_ports(); ++j) + (*i)->port_impl(j)->prepare_poly_buffers(bufs); + + return true; +} + +bool +PatchImpl::apply_internal_poly(ProcessContext& context, BufferFactory& bufs, Raul::Maid& maid, uint32_t poly) +{ + ThreadManager::assert_thread(THREAD_PROCESS); + + // TODO: Subpatch dynamic polyphony (i.e. changing port polyphony) + + for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i) + (*i)->apply_poly(maid, poly); + + for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i) { + for (uint32_t j = 0; j < (*i)->num_ports(); ++j) { + PortImpl* const port = (*i)->port_impl(j); + if (port->is_input() && dynamic_cast<InputPort*>(port)->direct_connect()) + port->setup_buffers(bufs, port->poly()); + port->connect_buffers(context.offset()); + } + } + + const bool polyphonic = parent_patch() && (poly == parent_patch()->internal_poly()); + for (List<PortImpl*>::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i) + (*i)->setup_buffers(bufs, polyphonic ? poly : 1); + + _internal_poly = poly; + + return true; +} + +/** Run the patch for the specified number of frames. + * + * Calls all Nodes in (roughly, if parallel) the order _compiled_patch specifies. + */ +void +PatchImpl::process(ProcessContext& context) +{ + if (!_process) + return; + + NodeImpl::pre_process(context); + + // Run all nodes + if (_compiled_patch && _compiled_patch->size() > 0) { + if (context.slaves().size() > 0) { + process_parallel(context); + } else { + process_single(context); + } + } + + // Queue any cross-context connections + for (CompiledPatch::QueuedConnections::iterator i = _compiled_patch->queued_connections.begin(); + i != _compiled_patch->queued_connections.end(); ++i) { + (*i)->queue(context); + } + + NodeImpl::post_process(context); +} + +void +PatchImpl::process_parallel(ProcessContext& context) +{ + size_t n_slaves = context.slaves().size(); + + CompiledPatch* const cp = _compiled_patch; + + /* Start p-1 slaves */ + + if (n_slaves >= cp->size()) + n_slaves = cp->size()-1; + + if (n_slaves > 0) { + for (size_t i = 0; i < cp->size(); ++i) + (*cp)[i].node()->reset_input_ready(); + + for (size_t i = 0; i < n_slaves; ++i) + context.slaves()[i]->whip(cp, i+1, context); + } + + /* Process ourself until everything is done + * This is analogous to ProcessSlave::_whipped(), but this is the master + * (i.e. what the main Jack process thread calls). Where ProcessSlave + * waits on input, this just skips the node and tries the next, to avoid + * waiting in the Jack thread which pisses Jack off. + */ + + size_t index = 0; + size_t num_finished = 0; // Number of consecutive finished nodes hit + + while (num_finished < cp->size()) { + CompiledNode& n = (*cp)[index]; + + if (n.node()->process_lock()) { + if (n.node()->n_inputs_ready() == n.n_providers()) { + n.node()->process(context); + + /* Signal dependants their input is ready */ + for (uint32_t i = 0; i < n.dependants().size(); ++i) + n.dependants()[i]->signal_input_ready(); + + ++num_finished; + } else { + n.node()->process_unlock(); + num_finished = 0; + } + } else { + if (n.node()->n_inputs_ready() == n.n_providers()) + ++num_finished; + else + num_finished = 0; + } + + index = (index + 1) % cp->size(); + } + + /* Tell slaves we're done in case we beat them, and pray they're + * really done by the start of next cycle. + * FIXME: This probably breaks (race) at extremely small nframes where + * ingen is the majority of the DSP load. + */ + for (uint32_t i = 0; i < n_slaves; ++i) + context.slaves()[i]->finish(); +} + +void +PatchImpl::process_single(ProcessContext& context) +{ + for (size_t i = 0; i < _compiled_patch->size(); ++i) + (*_compiled_patch)[i].node()->process(context); +} + +void +PatchImpl::set_buffer_size(Context& context, BufferFactory& bufs, PortType type, size_t size) +{ + NodeImpl::set_buffer_size(context, bufs, type, size); + + for (size_t i = 0; i < _compiled_patch->size(); ++i) + (*_compiled_patch)[i].node()->set_buffer_size(context, bufs, type, size); +} + +// Patch specific stuff + +/** Add a node. + * Preprocessing thread only. + */ +void +PatchImpl::add_node(List<NodeImpl*>::Node* ln) +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + assert(ln != NULL); + assert(ln->elem() != NULL); + assert(ln->elem()->parent_patch() == this); + //assert(ln->elem()->polyphony() == _internal_poly); + + _nodes.push_back(ln); +} + +/** Remove a node. + * Preprocessing thread only. + */ +PatchImpl::Nodes::Node* +PatchImpl::remove_node(const Raul::Symbol& symbol) +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + for (List<NodeImpl*>::iterator i = _nodes.begin(); i != _nodes.end(); ++i) + if ((*i)->symbol() == symbol) + return _nodes.erase(i); + + return NULL; +} + +void +PatchImpl::add_connection(SharedPtr<ConnectionImpl> c) +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + _connections.insert(make_pair(make_pair(c->src_port(), c->dst_port()), c)); +} + +/** Remove a connection. + * Preprocessing thread only. + */ +SharedPtr<ConnectionImpl> +PatchImpl::remove_connection(const PortImpl* src_port, const PortImpl* dst_port) +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + Connections::iterator i = _connections.find(make_pair(src_port, dst_port)); + if (i != _connections.end()) { + SharedPtr<ConnectionImpl> c = PtrCast<ConnectionImpl>(i->second); + _connections.erase(i); + return c; + } else { + error << "[PatchImpl::remove_connection] Connection not found" << endl; + return SharedPtr<ConnectionImpl>(); + } +} + +bool +PatchImpl::has_connection(const PortImpl* src_port, const PortImpl* dst_port) const +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + Connections::const_iterator i = _connections.find(make_pair(src_port, dst_port)); + return (i != _connections.end()); +} + +uint32_t +PatchImpl::num_ports() const +{ + if (ThreadManager::thread_is(THREAD_PROCESS)) + return NodeImpl::num_ports(); + else + return _input_ports.size() + _output_ports.size(); +} + +/** Create a port. Not realtime safe. + */ +PortImpl* +PatchImpl::create_port(BufferFactory& bufs, const string& name, PortType type, size_t buffer_size, bool is_output, bool polyphonic) +{ + if (type == PortType::UNKNOWN) { + error << "[PatchImpl::create_port] Unknown port type " << type.uri() << endl; + return NULL; + } + + assert( !(type == PortType::UNKNOWN) ); + + return new DuplexPort(bufs, this, name, num_ports(), polyphonic, _polyphony, + type, Raul::Atom(), buffer_size, is_output); +} + +/** Remove port from ports list used in pre-processing thread. + * + * Port is not removed from ports array for process thread (which could be + * simultaneously running). + * + * Realtime safe. Preprocessing thread only. + */ +List<PortImpl*>::Node* +PatchImpl::remove_port(const string& symbol) +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + + bool found = false; + List<PortImpl*>::Node* ret = NULL; + for (List<PortImpl*>::iterator i = _input_ports.begin(); i != _input_ports.end(); ++i) { + if ((*i)->symbol() == symbol) { + ret = _input_ports.erase(i); + found = true; + break; + } + } + + if (!found) + for (List<PortImpl*>::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i) { + if ((*i)->symbol() == symbol) { + ret = _output_ports.erase(i); + found = true; + break; + } + } + + if ( ! found) + error << "[PatchImpl::remove_port] Port not found!" << endl; + + return ret; +} + +/** Remove all ports from ports list used in pre-processing thread. + * + * Ports are not removed from ports array for process thread (which could be + * simultaneously running). Returned is a (inputs, outputs) pair. + * + * Realtime safe. Preprocessing thread only. + */ +void +PatchImpl::clear_ports() +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + + _input_ports.clear(); + _output_ports.clear(); +} + +Raul::Array<PortImpl*>* +PatchImpl::build_ports_array() const +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + + Raul::Array<PortImpl*>* const result = new Raul::Array<PortImpl*>(_input_ports.size() + _output_ports.size()); + + size_t i = 0; + + for (List<PortImpl*>::const_iterator p = _input_ports.begin(); p != _input_ports.end(); ++p,++i) + result->at(i) = *p; + + for (List<PortImpl*>::const_iterator p = _output_ports.begin(); p != _output_ports.end(); ++p,++i) + result->at(i) = *p; + + return result; +} + +/** Find the process order for this Patch. + * + * The process order is a flat list that the patch will execute in order + * when it's run() method is called. Return value is a newly allocated list + * which the caller is reponsible to delete. Note that this function does + * NOT actually set the process order, it is returned so it can be inserted + * at the beginning of an audio cycle (by various Events). + * + * Not realtime safe. + */ +CompiledPatch* +PatchImpl::compile() const +{ + ThreadManager::assert_thread(THREAD_PRE_PROCESS); + + CompiledPatch* const compiled_patch = new CompiledPatch();//_nodes.size()); + + for (Nodes::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) + (*i)->traversed(false); + + for (Nodes::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) { + NodeImpl* const node = (*i); + // Either a sink or connected to our output ports: + if ( ( ! node->traversed()) && node->dependants()->size() == 0) + compile_recursive(node, compiled_patch); + } + + // Traverse any nodes we didn't hit yet + for (Nodes::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i) { + NodeImpl* const node = (*i); + if ( ! node->traversed()) + compile_recursive(node, compiled_patch); + } + + // Add any queued connections that must be run after a cycle + for (Connections::const_iterator i = _connections.begin(); i != _connections.end(); ++i) { + SharedPtr<ConnectionImpl> c = PtrCast<ConnectionImpl>(i->second); + if (c->src_port()->context() == Context::AUDIO && + c->dst_port()->context() == Context::MESSAGE) { + compiled_patch->queued_connections.push_back(c.get()); + } + } + + assert(compiled_patch->size() == _nodes.size()); + + return compiled_patch; +} + +} // namespace Server +} // namespace Ingen |