From eb10d32d8b59f2158ba64ba55e310ba0f5f24170 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Wed, 14 May 2008 23:31:15 +0000 Subject: Fix clear patch command (ticket #18). Potential destruction race/leak fixes. More thorough thread assertions on graph object methods. git-svn-id: http://svn.drobilla.net/lad/ingen@1207 a436a847-0d15-0410-975c-d299462d15a1 --- src/libs/client/Store.cpp | 33 +++++++++---- src/libs/engine/Engine.cpp | 3 ++ src/libs/engine/NodeBase.cpp | 44 ++++++++++++------ src/libs/engine/ObjectStore.cpp | 38 +++++++++++++++ src/libs/engine/ObjectStore.hpp | 2 + src/libs/engine/PatchImpl.cpp | 40 ++++++++++++++-- src/libs/engine/PatchImpl.hpp | 3 +- src/libs/engine/events/ClearPatchEvent.cpp | 75 +++++++++++++++++------------- src/libs/engine/events/ClearPatchEvent.hpp | 16 +++++-- src/libs/engine/events/DestroyEvent.cpp | 3 +- src/libs/engine/events/DestroyEvent.hpp | 3 +- 11 files changed, 194 insertions(+), 66 deletions(-) (limited to 'src/libs') diff --git a/src/libs/client/Store.cpp b/src/libs/client/Store.cpp index 24a30064..60c03049 100644 --- a/src/libs/client/Store.cpp +++ b/src/libs/client/Store.cpp @@ -60,7 +60,6 @@ Store::clear() { _objects.clear(); _plugins.clear(); - } @@ -352,8 +351,8 @@ Store::destruction_event(const Path& path) removed.reset(); - //cerr << "Store removed object " << path - // << ", count: " << removed.use_count(); + /*cerr << "Store removed object " << path + << ", count: " << removed.use_count();*/ } void @@ -482,11 +481,28 @@ Store::patch_polyphony_event(const Path& path, uint32_t poly) void Store::patch_cleared_event(const Path& path) { - SharedPtr patch = PtrCast(object(path)); - if (patch) - for (ObjectModel::const_iterator i = patch->children_begin(); i != patch->children_end(); ++i) - if (i->second->graph_parent() == patch.get()) - destruction_event(i->second->path()); + Objects::iterator i = _objects.find(path); + if (i != _objects.end()) { + assert((*i).second->path() == path); + SharedPtr patch = PtrCast(i->second); + + Objects::iterator first_descendant = i; + ++first_descendant; + Objects::iterator descendants_end = _objects.find_descendants_end(i); + SharedPtr< Table > > removed + = _objects.yank(first_descendant, descendants_end); + + for (Objects::iterator i = removed->begin(); i != removed->end(); ++i) { + SharedPtr model = PtrCast(i->second); + assert(model); + model->signal_destroyed.emit(); + if (model->parent() == patch) + patch->remove_child(model); + } + + } else { + cerr << "[Store] Unable to find patch " << path << " to clear." << endl; + } } @@ -498,7 +514,6 @@ Store::variable_change_event(const Path& subject_path, const string& predicate, if (!value) { cerr << "ERROR: variable '" << predicate << "' has no type" << endl; } else if (subject) { - cerr << "Set variable '" << predicate << "' with type " << (int)value.type() << endl; subject->set_variable(predicate, value); } else { add_variable_orphan(subject_path, predicate, value); diff --git a/src/libs/engine/Engine.cpp b/src/libs/engine/Engine.cpp index 0bfcbde7..41ae9a43 100644 --- a/src/libs/engine/Engine.cpp +++ b/src/libs/engine/Engine.cpp @@ -41,6 +41,7 @@ #include "OSCEngineReceiver.hpp" #include "PostProcessor.hpp" #include "ProcessSlave.hpp" +#include "ThreadManager.hpp" #ifdef HAVE_JACK_MIDI #include "JackMidiDriver.hpp" #endif @@ -117,6 +118,8 @@ Engine::driver(DataType type, EventType event_type) int Engine::main() { + Thread::get().set_context(THREAD_POST_PROCESS); + // Loop until quit flag is set (by OSCReceiver) while ( ! _quit_flag) { nanosleep(&main_rate, NULL); diff --git a/src/libs/engine/NodeBase.cpp b/src/libs/engine/NodeBase.cpp index 6fe320e8..54217aae 100644 --- a/src/libs/engine/NodeBase.cpp +++ b/src/libs/engine/NodeBase.cpp @@ -27,6 +27,7 @@ #include "PortImpl.hpp" #include "PatchImpl.hpp" #include "ObjectStore.hpp" +#include "ThreadManager.hpp" using namespace std; @@ -34,19 +35,19 @@ namespace Ingen { NodeBase::NodeBase(PluginImpl* plugin, const string& name, bool polyphonic, PatchImpl* parent, SampleRate srate, size_t buffer_size) -: NodeImpl(parent, name, polyphonic), - _plugin(plugin), - _polyphony((polyphonic && parent) ? parent->internal_polyphony() : 1), - _srate(srate), - _buffer_size(buffer_size), - _activated(false), - _traversed(false), - _input_ready(1), - _process_lock(0), - _n_inputs_ready(0), - _ports(NULL), - _providers(new Raul::List()), - _dependants(new Raul::List()) + : NodeImpl(parent, name, polyphonic) + , _plugin(plugin) + , _polyphony((polyphonic && parent) ? parent->internal_polyphony() : 1) + , _srate(srate) + , _buffer_size(buffer_size) + , _activated(false) + , _traversed(false) + , _input_ready(1) + , _process_lock(0) + , _n_inputs_ready(0) + , _ports(NULL) + , _providers(new Raul::List()) + , _dependants(new Raul::List()) { assert(_plugin); assert(_polyphony > 0); @@ -56,7 +57,8 @@ NodeBase::NodeBase(PluginImpl* plugin, const string& name, bool polyphonic, Patc NodeBase::~NodeBase() { - assert(!_activated); + if (_activated) + deactivate(); delete _providers; delete _dependants; @@ -80,6 +82,7 @@ NodeBase::plugin() const void NodeBase::activate() { + assert(ThreadManager::current_thread_id() == THREAD_PRE_PROCESS); assert(!_activated); _activated = true; } @@ -88,6 +91,7 @@ NodeBase::activate() void NodeBase::deactivate() { + assert(ThreadManager::current_thread_id() == THREAD_POST_PROCESS); assert(_activated); _activated = false; } @@ -96,6 +100,8 @@ NodeBase::deactivate() bool NodeBase::prepare_poly(uint32_t poly) { + assert(ThreadManager::current_thread_id() == THREAD_PRE_PROCESS); + if (!_polyphonic) return true; @@ -110,6 +116,8 @@ NodeBase::prepare_poly(uint32_t poly) bool NodeBase::apply_poly(Raul::Maid& maid, uint32_t poly) { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); + if (!_polyphonic) return true; @@ -124,6 +132,8 @@ NodeBase::apply_poly(Raul::Maid& maid, uint32_t poly) void NodeBase::set_buffer_size(size_t size) { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); + _buffer_size = size; if (_ports) @@ -159,6 +169,7 @@ NodeBase::process_unlock() void NodeBase::wait_for_input(size_t num_providers) { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); assert(_process_lock.get() == 1); while ((unsigned)_n_inputs_ready.get() < num_providers) { @@ -175,6 +186,7 @@ NodeBase::wait_for_input(size_t num_providers) void NodeBase::signal_input_ready() { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); //cout << path() << " SIGNAL" << endl; ++_n_inputs_ready; _input_ready.post(); @@ -186,6 +198,8 @@ NodeBase::signal_input_ready() void NodeBase::pre_process(ProcessContext& context) { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); + // Mix down any ports with multiple inputs if (_ports) for (size_t i=0; i < _ports->size(); ++i) @@ -198,6 +212,8 @@ NodeBase::pre_process(ProcessContext& context) void NodeBase::post_process(ProcessContext& context) { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); + /* Write output ports */ if (_ports) for (size_t i=0; i < _ports->size(); ++i) diff --git a/src/libs/engine/ObjectStore.cpp b/src/libs/engine/ObjectStore.cpp index e8fa3f4d..4497360a 100644 --- a/src/libs/engine/ObjectStore.cpp +++ b/src/libs/engine/ObjectStore.cpp @@ -149,5 +149,43 @@ ObjectStore::remove(Objects::iterator object) } } + +/** Remove all children of an object from the store. + * + * Returned is a vector containing all descendants of the object removed + * in lexicographically sorted order by Path. + */ +SharedPtr< Table > > +ObjectStore::remove_children(const Path& path) +{ + return remove_children(_objects.find(path)); +} + + +/** Remove all children of an object from the store. + * + * Returned is a vector containing all descendants of the object removed + * in lexicographically sorted order by Path. + */ +SharedPtr< Table > > +ObjectStore::remove_children(Objects::iterator object) +{ + if (object != _objects.end()) { + Objects::iterator descendants_end = _objects.find_descendants_end(object); + + if (descendants_end != object) { + Objects::iterator first_child = object; + ++first_child; + return _objects.yank(first_child, descendants_end); + } + + } else { + cerr << "[ObjectStore] WARNING: Removing children of " << object->first << " failed." << endl; + return SharedPtr(); + } + + return SharedPtr(); +} + } // namespace Ingen diff --git a/src/libs/engine/ObjectStore.hpp b/src/libs/engine/ObjectStore.hpp index 9c316f6c..bf85d3e1 100644 --- a/src/libs/engine/ObjectStore.hpp +++ b/src/libs/engine/ObjectStore.hpp @@ -62,6 +62,8 @@ public: SharedPtr< Table > > remove(const Path& path); SharedPtr< Table > > remove(Objects::iterator i); + SharedPtr< Table > > remove_children(const Path& path); + SharedPtr< Table > > remove_children(Objects::iterator i); const Objects& objects() const { return _objects; } Objects& objects() { return _objects; } diff --git a/src/libs/engine/PatchImpl.cpp b/src/libs/engine/PatchImpl.cpp index 13576f1c..dba3d747 100644 --- a/src/libs/engine/PatchImpl.cpp +++ b/src/libs/engine/PatchImpl.cpp @@ -85,6 +85,8 @@ PatchImpl::deactivate() void PatchImpl::disable() { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); + _process = false; for (List::iterator i = _output_ports.begin(); i != _output_ports.end(); ++i) @@ -95,6 +97,8 @@ PatchImpl::disable() bool PatchImpl::prepare_internal_poly(uint32_t poly) { + assert(ThreadManager::current_thread_id() == THREAD_PRE_PROCESS); + /* TODO: ports? internal/external poly? */ for (List::iterator i = _nodes.begin(); i != _nodes.end(); ++i) @@ -112,6 +116,8 @@ PatchImpl::prepare_internal_poly(uint32_t poly) bool PatchImpl::apply_internal_poly(Raul::Maid& maid, uint32_t poly) { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); + /* TODO: ports? internal/external poly? */ for (List::iterator i = _nodes.begin(); i != _nodes.end(); ++i) @@ -239,17 +245,23 @@ PatchImpl::set_buffer_size(size_t size) NodeBase::set_buffer_size(size); assert(_buffer_size == size); - for (List::iterator j = _nodes.begin(); j != _nodes.end(); ++j) - (*j)->set_buffer_size(size); + CompiledPatch* const cp = _compiled_patch; + + for (size_t i=0; i < cp->size(); ++i) + (*cp)[i].node()->set_buffer_size(size); } // Patch specific stuff +/** Add a node. + * Preprocessing thread only. + */ void PatchImpl::add_node(List::Node* ln) { + assert(ThreadManager::current_thread_id() == THREAD_PRE_PROCESS); assert(ln != NULL); assert(ln->elem() != NULL); assert(ln->elem()->parent_patch() == this); @@ -260,11 +272,12 @@ PatchImpl::add_node(List::Node* ln) /** Remove a node. - * Realtime Safe. Preprocessing thread. + * Preprocessing thread only. */ PatchImpl::Nodes::Node* PatchImpl::remove_node(const string& name) { + assert(ThreadManager::current_thread_id() == THREAD_PRE_PROCESS); for (List::iterator i = _nodes.begin(); i != _nodes.end(); ++i) if ((*i)->name() == name) return _nodes.erase(i); @@ -273,11 +286,13 @@ PatchImpl::remove_node(const string& name) } -/** Remove a connection. Realtime safe. +/** Remove a connection. + * Process thread only. */ PatchImpl::Connections::Node* PatchImpl::remove_connection(const PortImpl* src_port, const PortImpl* dst_port) { + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); bool found = false; Connections::Node* connection = NULL; for (Connections::iterator i = _connections.begin(); i != _connections.end(); ++i) { @@ -360,6 +375,23 @@ PatchImpl::remove_port(const string& name) } +/** Remove all ports from ports list used in pre-processing thread. + * + * Ports are not removed from ports array for process thread (which could be + * simultaneously running). Returned is a (inputs, outputs) pair. + * + * Realtime safe. Preprocessing thread only. + */ +void +PatchImpl::clear_ports() +{ + assert(ThreadManager::current_thread_id() == THREAD_PROCESS); + + _input_ports.clear(); + _output_ports.clear(); +} + + Raul::Array* PatchImpl::build_ports_array() const { diff --git a/src/libs/engine/PatchImpl.hpp b/src/libs/engine/PatchImpl.hpp index 94a9ebf3..209062be 100644 --- a/src/libs/engine/PatchImpl.hpp +++ b/src/libs/engine/PatchImpl.hpp @@ -88,7 +88,7 @@ public: // Patch specific stuff not inherited from Node - typedef List Nodes; + typedef List Nodes; void add_node(Nodes::Node* tn); Nodes::Node* remove_node(const string& name); @@ -105,6 +105,7 @@ public: void add_input(List::Node* port) { _input_ports.push_back(port); } ///< Preprocesser thread void add_output(List::Node* port) { _output_ports.push_back(port); } ///< Preprocessor thread List::Node* remove_port(const string& name); + void clear_ports(); void add_connection(Connections::Node* c) { _connections.push_back(c); } Connections::Node* remove_connection(const PortImpl* src_port, const PortImpl* dst_port); diff --git a/src/libs/engine/events/ClearPatchEvent.cpp b/src/libs/engine/events/ClearPatchEvent.cpp index 211ca79e..0e725e65 100644 --- a/src/libs/engine/events/ClearPatchEvent.cpp +++ b/src/libs/engine/events/ClearPatchEvent.cpp @@ -27,15 +27,19 @@ #include "NodeImpl.hpp" #include "ConnectionImpl.hpp" #include "QueuedEventSource.hpp" +#include "AudioDriver.hpp" +#include "MidiDriver.hpp" namespace Ingen { ClearPatchEvent::ClearPatchEvent(Engine& engine, SharedPtr responder, FrameTime time, QueuedEventSource* source, const string& patch_path) -: QueuedEvent(engine, responder, time, true, source), - _patch_path(patch_path), - _patch(NULL), - _process(false) + : QueuedEvent(engine, responder, time, true, source) + , _patch_path(patch_path) + , _driver_port(NULL) + , _process(false) + , _ports_array(NULL) + , _compiled_patch(NULL) { } @@ -43,18 +47,20 @@ ClearPatchEvent::ClearPatchEvent(Engine& engine, SharedPtr responder, void ClearPatchEvent::pre_process() { - cerr << "FIXME: CLEAR PATCH\n"; -#if 0 - _patch = _engine.object_store()->find_patch(_patch_path); + ObjectStore::Objects::iterator patch_iterator = _engine.object_store()->find(_patch_path); - if (_patch != NULL) { - - _process = _patch->enabled(); - - for (List::const_iterator i = _patch->nodes().begin(); i != _patch->nodes().end(); ++i) - (*i)->remove_from_store(); + if (patch_iterator != _engine.object_store()->objects().end()) { + _patch = PtrCast(patch_iterator->second); + if (_patch) { + _process = _patch->enabled(); + _removed_table = _engine.object_store()->remove_children(patch_iterator); + _patch->nodes().clear(); + _patch->connections().clear(); + _ports_array = _patch->build_ports_array(); + if (_patch->enabled()) + _compiled_patch = _patch->compile(); + } } -#endif QueuedEvent::pre_process(); } @@ -65,17 +71,31 @@ ClearPatchEvent::execute(ProcessContext& context) { QueuedEvent::execute(context); - if (_patch != NULL) { + if (_patch && _removed_table) { _patch->disable(); - cerr << "FIXME: CLEAR PATCH\n"; - //for (List::const_iterator i = _patch->nodes().begin(); i != _patch->nodes().end(); ++i) - // (*i)->remove_from_patch(); - if (_patch->compiled_patch() != NULL) { _engine.maid()->push(_patch->compiled_patch()); _patch->compiled_patch(NULL); } + + _patch->clear_ports(); + _patch->connections().clear(); + _patch->compiled_patch(_compiled_patch); + Raul::Array* old_ports = _patch->external_ports(); + _patch->external_ports(_ports_array); + _ports_array = old_ports; + + // Remove driver ports, if necessary + if (_patch->parent() == NULL) { + for (ObjectStore::Objects::iterator i = _removed_table->begin(); i != _removed_table->end(); ++i) { + SharedPtr port = PtrCast(i->second); + if (port && port->type() == DataType::AUDIO) + _driver_port = _engine.audio_driver()->remove_port(port->path()); + else if (port && port->type() == DataType::EVENT) + _driver_port = _engine.midi_driver()->remove_port(port->path()); + } + } } } @@ -84,18 +104,8 @@ void ClearPatchEvent::post_process() { if (_patch != NULL) { - // Delete all nodes - for (List::iterator i = _patch->nodes().begin(); i != _patch->nodes().end(); ++i) { - (*i)->deactivate(); - delete *i; - } - _patch->nodes().clear(); - - // Delete all connections - for (PatchImpl::Connections::iterator i = _patch->connections().begin(); i != _patch->connections().end(); ++i) - (*i).reset(); - - _patch->connections().clear(); + delete _ports_array; + delete _driver_port; // Restore patch's run state if (_process) @@ -105,8 +115,9 @@ ClearPatchEvent::post_process() // Make sure everything's sane assert(_patch->nodes().size() == 0); + assert(_patch->num_ports() == 0); assert(_patch->connections().size() == 0); - + // Reply _responder->respond_ok(); _engine.broadcaster()->send_patch_cleared(_patch_path); diff --git a/src/libs/engine/events/ClearPatchEvent.hpp b/src/libs/engine/events/ClearPatchEvent.hpp index afe66eed..0353dc01 100644 --- a/src/libs/engine/events/ClearPatchEvent.hpp +++ b/src/libs/engine/events/ClearPatchEvent.hpp @@ -20,13 +20,18 @@ #include #include +#include +#include #include "QueuedEvent.hpp" +#include "ObjectStore.hpp" +#include "PatchImpl.hpp" using std::string; namespace Ingen { class PatchImpl; +class DriverPort; /** Delete all nodes from a patch. @@ -43,9 +48,14 @@ public: void post_process(); private: - const string _patch_path; - PatchImpl* _patch; - bool _process; + const string _patch_path; + SharedPtr _patch; + DriverPort* _driver_port; + bool _process; + Raul::Array* _ports_array; ///< New (external) ports for Patch + CompiledPatch* _compiled_patch; ///< Patch's new process order + + SharedPtr< Table > > _removed_table; }; diff --git a/src/libs/engine/events/DestroyEvent.cpp b/src/libs/engine/events/DestroyEvent.cpp index d2b2d42b..4f2cba3f 100644 --- a/src/libs/engine/events/DestroyEvent.cpp +++ b/src/libs/engine/events/DestroyEvent.cpp @@ -199,8 +199,7 @@ DestroyEvent::post_process() _responder->respond_error("Unable to destroy object"); } - if (_driver_port) - delete _driver_port; + delete _driver_port; } diff --git a/src/libs/engine/events/DestroyEvent.hpp b/src/libs/engine/events/DestroyEvent.hpp index 4a82c5bb..134290b1 100644 --- a/src/libs/engine/events/DestroyEvent.hpp +++ b/src/libs/engine/events/DestroyEvent.hpp @@ -60,7 +60,6 @@ public: private: Path _path; ObjectStore::Objects::iterator _store_iterator; - SharedPtr< Table > > _removed_table; SharedPtr _node; ///< Non-NULL iff a node SharedPtr _port; ///< Non-NULL iff a port DriverPort* _driver_port; @@ -70,6 +69,8 @@ private: CompiledPatch* _compiled_patch; ///< Patch's new process order DisconnectNodeEvent* _disconnect_node_event; DisconnectPortEvent* _disconnect_port_event; + + SharedPtr< Table > > _removed_table; }; -- cgit v1.2.1