From c67392abf59b500fe405101d7ad896d9da869e47 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Tue, 5 Jan 2010 03:51:50 +0000 Subject: Realtime safe buffer reference handling. Turns out that dropping a shared_ptr is not realtime safe, even if you use a realtime safe deleter. Instead, instrusive_ptr is used for buffer references, so a buffer reference may safely be dropped in the audio thread (in which case it will be recycled by the BufferFactory). Faster, cleaner, better. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@2341 a436a847-0d15-0410-975c-d299462d15a1 --- src/engine/AudioBuffer.cpp | 4 ++-- src/engine/AudioBuffer.hpp | 2 +- src/engine/Buffer.hpp | 35 ++++++++++++++++++++++++++++------- src/engine/BufferFactory.cpp | 20 ++++++++++---------- src/engine/BufferFactory.hpp | 9 +++++++-- src/engine/ConnectionImpl.cpp | 7 ++++--- src/engine/ConnectionImpl.hpp | 12 ++++++------ src/engine/EventBuffer.cpp | 4 ++-- src/engine/EventBuffer.hpp | 2 +- src/engine/LADSPANode.cpp | 2 +- src/engine/LADSPANode.hpp | 3 ++- src/engine/LV2Node.cpp | 2 +- src/engine/LV2Node.hpp | 3 ++- src/engine/NodeBase.hpp | 9 +++++---- src/engine/NodeImpl.hpp | 3 ++- src/engine/ObjectBuffer.cpp | 4 ++-- src/engine/ObjectBuffer.hpp | 2 +- src/engine/ObjectSender.cpp | 3 ++- src/engine/PortImpl.cpp | 4 ++-- src/engine/PortImpl.hpp | 10 +++++----- 20 files changed, 86 insertions(+), 54 deletions(-) diff --git a/src/engine/AudioBuffer.cpp b/src/engine/AudioBuffer.cpp index b564a3ba..693fa503 100644 --- a/src/engine/AudioBuffer.cpp +++ b/src/engine/AudioBuffer.cpp @@ -36,8 +36,8 @@ namespace Ingen { using namespace Shared; -AudioBuffer::AudioBuffer(Shared::PortType type, size_t size) - : ObjectBuffer(size + sizeof(LV2_Object) +AudioBuffer::AudioBuffer(BufferFactory& factory, Shared::PortType type, size_t size) + : ObjectBuffer(factory, size + sizeof(LV2_Object) + (type == PortType::AUDIO ? sizeof(LV2_Vector_Body) : 0)) , _state(OK) , _set_value(0) diff --git a/src/engine/AudioBuffer.hpp b/src/engine/AudioBuffer.hpp index 8b0e95a5..38e4d293 100644 --- a/src/engine/AudioBuffer.hpp +++ b/src/engine/AudioBuffer.hpp @@ -33,7 +33,7 @@ namespace Ingen { class AudioBuffer : public ObjectBuffer { public: - AudioBuffer(Shared::PortType type, size_t capacity); + AudioBuffer(BufferFactory& factory, Shared::PortType type, size_t capacity); void clear(); diff --git a/src/engine/Buffer.hpp b/src/engine/Buffer.hpp index 160e6144..61455603 100644 --- a/src/engine/Buffer.hpp +++ b/src/engine/Buffer.hpp @@ -15,28 +15,33 @@ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -#ifndef BUFFER_H -#define BUFFER_H +#ifndef BUFFER_HPP +#define BUFFER_HPP #include #include #include +#include #include "raul/Deletable.hpp" #include "raul/SharedPtr.hpp" -#include "types.hpp" #include "interface/PortType.hpp" +#include "types.hpp" +#include "BufferFactory.hpp" namespace Ingen { class Context; class Engine; +class BufferFactory; class Buffer : public boost::noncopyable, public Raul::Deletable { public: - Buffer(Shared::PortType type, size_t size) - : _type(type) + Buffer(BufferFactory& factory, Shared::PortType type, size_t size) + : _factory(factory) + , _type(type) , _size(size) + , _refs(0) {} /** Clear contents and reset state */ @@ -58,7 +63,16 @@ public: Shared::PortType type() const { return _type; } size_t size() const { return _size; } + inline void ref() { ++_refs; } + + inline void deref() { + assert(_refs > 0); + if ((--_refs) == 0) + _factory.recycle(this); + } + protected: + BufferFactory& _factory; Shared::PortType _type; size_t _size; @@ -67,9 +81,16 @@ protected: private: Buffer* _next; ///< Intrusive linked list for BufferFactory + size_t _refs; ///< Intrusive reference count for intrusive_ptr }; - } // namespace Ingen -#endif // BUFFER_H + +namespace boost { + inline void intrusive_ptr_add_ref(Ingen::Buffer* b) { b->ref(); } + inline void intrusive_ptr_release(Ingen::Buffer* b) { b->deref(); } +} + + +#endif // BUFFER_HPP diff --git a/src/engine/BufferFactory.cpp b/src/engine/BufferFactory.cpp index aff44254..8a5617fa 100644 --- a/src/engine/BufferFactory.cpp +++ b/src/engine/BufferFactory.cpp @@ -45,7 +45,7 @@ struct BufferDeleter { }; -SharedPtr +BufferFactory::Ref BufferFactory::get(Shared::PortType type, size_t size, bool force_create) { Raul::AtomicPtr& head_ptr = free_list(type); @@ -66,16 +66,16 @@ BufferFactory::get(Shared::PortType type, size_t size, bool force_create) return create(type, size); } else { cerr << "ERROR: Failed to obtain buffer" << endl; - return SharedPtr(); + return Ref(); } } try_head->_next = NULL; - return SharedPtr(try_head, BufferDeleter(*this)); + return Ref(try_head); } -SharedPtr +BufferFactory::Ref BufferFactory::create(Shared::PortType type, size_t size) { assert(ThreadManager::current_thread_id() != THREAD_PROCESS); @@ -85,7 +85,7 @@ BufferFactory::create(Shared::PortType type, size_t size) if (type.is_control()) { if (size == 0) size = sizeof(LV2_Object) + sizeof(float); - AudioBuffer* ret = new AudioBuffer(type, size); + AudioBuffer* ret = new AudioBuffer(*this, type, size); ret->object()->type = _map->object_class_vector; ((LV2_Vector_Body*)ret->object()->body)->elem_type = _map->object_class_float32; buffer = ret; @@ -93,23 +93,23 @@ BufferFactory::create(Shared::PortType type, size_t size) if (size == 0) size = sizeof(LV2_Object) + sizeof(LV2_Vector_Body) + _engine.audio_driver()->buffer_size() * sizeof(float); - AudioBuffer* ret = new AudioBuffer(type, size); + AudioBuffer* ret = new AudioBuffer(*this, type, size); ret->object()->type = _map->object_class_float32; buffer = ret; } else if (type.is_events()) { if (size == 0) size = _engine.audio_driver()->buffer_size() * 4; // FIXME - buffer = new EventBuffer(size); + buffer = new EventBuffer(*this, size); } else if (type.is_value() || type.is_message()) { if (size == 0) size = 32; // FIXME - buffer = new ObjectBuffer(std::max(size, sizeof(LV2_Object) + sizeof(void*))); + buffer = new ObjectBuffer(*this, std::max(size, sizeof(LV2_Object) + sizeof(void*))); } else { cout << "ERROR: Failed to create buffer of unknown type" << endl; - return SharedPtr(); + return Ref(); } - return SharedPtr(buffer, BufferDeleter(*this)); + return Ref(buffer); } void diff --git a/src/engine/BufferFactory.hpp b/src/engine/BufferFactory.hpp index d6e0b70c..8fddb70f 100644 --- a/src/engine/BufferFactory.hpp +++ b/src/engine/BufferFactory.hpp @@ -19,6 +19,7 @@ #define BUFFER_FACTORY_H #include +#include #include "interface/PortType.hpp" #include "glibmm/thread.h" #include "raul/RingBuffer.hpp" @@ -29,6 +30,7 @@ namespace Ingen { using namespace Shared; class Engine; +class Buffer; namespace Shared { class LV2URIMap; } @@ -36,13 +38,16 @@ class BufferFactory { public: BufferFactory(Engine& engine, SharedPtr map); - SharedPtr get(Shared::PortType type, size_t size=0, bool force_create=false); + typedef boost::intrusive_ptr Ref; + + Ref get(Shared::PortType type, size_t size=0, bool force_create=false); private: friend class BufferDeleter; + friend class Buffer; void recycle(Buffer* buf); - SharedPtr create(Shared::PortType type, size_t size=0); + Ref create(Shared::PortType type, size_t size=0); inline Raul::AtomicPtr& free_list(Shared::PortType type) { switch (type.symbol()) { diff --git a/src/engine/ConnectionImpl.cpp b/src/engine/ConnectionImpl.cpp index f934fc21..6317c632 100644 --- a/src/engine/ConnectionImpl.cpp +++ b/src/engine/ConnectionImpl.cpp @@ -17,6 +17,7 @@ #include #include "raul/Maid.hpp" +#include "raul/IntrusivePtr.hpp" #include "AudioBuffer.hpp" #include "BufferFactory.hpp" #include "ConnectionImpl.hpp" @@ -103,13 +104,13 @@ void ConnectionImpl::process(Context& context) { if (must_queue()) { - SharedPtr src_buf = PtrCast(_src_port->buffer(0)); + IntrusivePtr src_buf = PtrCast(_src_port->buffer(0)); if (!src_buf) { cerr << "ERROR: Queued connection but source is not an EventBuffer" << endl; return; } - SharedPtr local_buf = PtrCast(_local_buffer); + IntrusivePtr local_buf = PtrCast(_local_buffer); if (!local_buf) { cerr << "ERROR: Queued connection but source is not an EventBuffer" << endl; return; @@ -138,7 +139,7 @@ ConnectionImpl::queue(Context& context) if (!must_queue()) return; - SharedPtr src_buf = PtrCast(_src_port->buffer(0)); + IntrusivePtr src_buf = PtrCast(_src_port->buffer(0)); if (!src_buf) { cerr << "ERROR: Queued connection but source is not an EventBuffer" << endl; return; diff --git a/src/engine/ConnectionImpl.hpp b/src/engine/ConnectionImpl.hpp index 9daea1f1..63d2ecc4 100644 --- a/src/engine/ConnectionImpl.hpp +++ b/src/engine/ConnectionImpl.hpp @@ -70,7 +70,7 @@ public: * buffer, and will return accordingly (e.g. the same buffer for every * voice in a mono->poly connection). */ - inline SharedPtr buffer(uint32_t voice) const { + inline BufferFactory::Ref buffer(uint32_t voice) const { if (must_mix() || must_queue()) { return _local_buffer; } else if ( ! _src_port->polyphonic()) { @@ -95,11 +95,11 @@ protected: Raul::RingBuffer* _queue; - BufferFactory& _bufs; - PortImpl* const _src_port; - PortImpl* const _dst_port; - SharedPtr _local_buffer; - bool _pending_disconnection; + BufferFactory& _bufs; + PortImpl* const _src_port; + PortImpl* const _dst_port; + BufferFactory::Ref _local_buffer; + bool _pending_disconnection; }; diff --git a/src/engine/EventBuffer.cpp b/src/engine/EventBuffer.cpp index da6b5745..4f7f6b5f 100644 --- a/src/engine/EventBuffer.cpp +++ b/src/engine/EventBuffer.cpp @@ -33,8 +33,8 @@ using namespace Shared; /** Allocate a new event buffer. * \a capacity is in bytes (not number of events). */ -EventBuffer::EventBuffer(size_t capacity) - : Buffer(PortType(PortType::EVENTS), capacity) +EventBuffer::EventBuffer(BufferFactory& factory, size_t capacity) + : Buffer(factory, PortType(PortType::EVENTS), capacity) , _latest_frames(0) , _latest_subframes(0) { diff --git a/src/engine/EventBuffer.hpp b/src/engine/EventBuffer.hpp index d3acbdd2..d094e4a4 100644 --- a/src/engine/EventBuffer.hpp +++ b/src/engine/EventBuffer.hpp @@ -29,7 +29,7 @@ namespace Ingen { class EventBuffer : public Buffer { public: - EventBuffer(size_t capacity); + EventBuffer(BufferFactory& factory, size_t capacity); ~EventBuffer(); void* port_data(Shared::PortType port_type) { return _data; } diff --git a/src/engine/LADSPANode.cpp b/src/engine/LADSPANode.cpp index b89d2bca..3d405b5c 100644 --- a/src/engine/LADSPANode.cpp +++ b/src/engine/LADSPANode.cpp @@ -300,7 +300,7 @@ LADSPANode::process(ProcessContext& context) void -LADSPANode::set_port_buffer(uint32_t voice, uint32_t port_num, SharedPtr buf) +LADSPANode::set_port_buffer(uint32_t voice, uint32_t port_num, BufferFactory::Ref buf) { assert(voice < _polyphony); _descriptor->connect_port((*_instances)[voice], port_num, diff --git a/src/engine/LADSPANode.hpp b/src/engine/LADSPANode.hpp index 29f8fc5b..ef7db579 100644 --- a/src/engine/LADSPANode.hpp +++ b/src/engine/LADSPANode.hpp @@ -21,6 +21,7 @@ #include #include #include +#include "raul/IntrusivePtr.hpp" #include "types.hpp" #include "NodeBase.hpp" #include "PluginImpl.hpp" @@ -55,7 +56,7 @@ public: void process(ProcessContext& context); - void set_port_buffer(uint32_t voice, uint32_t port_num, SharedPtr buf); + void set_port_buffer(uint32_t voice, uint32_t port_num, IntrusivePtr buf); protected: void get_port_limits(unsigned long port_index, diff --git a/src/engine/LV2Node.cpp b/src/engine/LV2Node.cpp index e2f91927..6f39782b 100644 --- a/src/engine/LV2Node.cpp +++ b/src/engine/LV2Node.cpp @@ -367,7 +367,7 @@ LV2Node::process(ProcessContext& context) void -LV2Node::set_port_buffer(uint32_t voice, uint32_t port_num, SharedPtr buf) +LV2Node::set_port_buffer(uint32_t voice, uint32_t port_num, BufferFactory::Ref buf) { assert(voice < _polyphony); slv2_instance_connect_port((*_instances)[voice], port_num, diff --git a/src/engine/LV2Node.hpp b/src/engine/LV2Node.hpp index 64b74315..7f1714f1 100644 --- a/src/engine/LV2Node.hpp +++ b/src/engine/LV2Node.hpp @@ -20,6 +20,7 @@ #include #include "slv2/slv2.h" +#include "raul/IntrusivePtr.hpp" #include "contexts.lv2/contexts.h" #include "types.hpp" #include "NodeBase.hpp" @@ -58,7 +59,7 @@ public: void process(ProcessContext& context); - void set_port_buffer(uint32_t voice, uint32_t port_num, SharedPtr buf); + void set_port_buffer(uint32_t voice, uint32_t port_num, IntrusivePtr buf); protected: LV2Plugin* _lv2_plugin; diff --git a/src/engine/NodeBase.hpp b/src/engine/NodeBase.hpp index 7a7db8e1..cf7ae0d6 100644 --- a/src/engine/NodeBase.hpp +++ b/src/engine/NodeBase.hpp @@ -21,13 +21,14 @@ #include "types.hpp" #include #include -#include "raul/Semaphore.hpp" -#include "raul/AtomicInt.hpp" #include "raul/Array.hpp" #include "raul/Atom.hpp" +#include "raul/AtomicInt.hpp" +#include "raul/IntrusivePtr.hpp" +#include "raul/Semaphore.hpp" +#include "contexts.lv2/contexts.h" #include "interface/Port.hpp" #include "NodeImpl.hpp" -#include "contexts.lv2/contexts.h" namespace Ingen { @@ -82,7 +83,7 @@ public: virtual void process(ProcessContext& context) = 0; virtual void post_process(Context& context); - virtual void set_port_buffer(uint32_t voice, uint32_t port_num, SharedPtr buf) {} + virtual void set_port_buffer(uint32_t voice, uint32_t port_num, IntrusivePtr buf) {} virtual void set_buffer_size(BufferFactory& bufs, size_t size); diff --git a/src/engine/NodeImpl.hpp b/src/engine/NodeImpl.hpp index 55ff5417..cbb212b7 100644 --- a/src/engine/NodeImpl.hpp +++ b/src/engine/NodeImpl.hpp @@ -19,6 +19,7 @@ #define NODEIMPL_H #include +#include "raul/IntrusivePtr.hpp" #include "interface/Node.hpp" #include "GraphObjectImpl.hpp" @@ -133,7 +134,7 @@ public: */ virtual void process(ProcessContext& context) = 0; - virtual void set_port_buffer(uint32_t voice, uint32_t port_num, SharedPtr buf) = 0; + virtual void set_port_buffer(uint32_t voice, uint32_t port_num, IntrusivePtr buf) = 0; virtual uint32_t num_ports() const = 0; diff --git a/src/engine/ObjectBuffer.cpp b/src/engine/ObjectBuffer.cpp index 6104e19b..9611f124 100644 --- a/src/engine/ObjectBuffer.cpp +++ b/src/engine/ObjectBuffer.cpp @@ -37,8 +37,8 @@ using namespace Shared; /** Allocate a new object buffer. * \a capacity is in bytes, including LV2_Object header */ -ObjectBuffer::ObjectBuffer(size_t capacity) - : Buffer(PortType(PortType::VALUE), capacity) +ObjectBuffer::ObjectBuffer(BufferFactory& factory, size_t capacity) + : Buffer(factory, PortType(PortType::VALUE), capacity) { //cerr << "Creating Object Buffer capacity = " << capacity << endl; assert(capacity >= sizeof(LV2_Object)); diff --git a/src/engine/ObjectBuffer.hpp b/src/engine/ObjectBuffer.hpp index 64c52299..821d1028 100644 --- a/src/engine/ObjectBuffer.hpp +++ b/src/engine/ObjectBuffer.hpp @@ -29,7 +29,7 @@ class Context; class ObjectBuffer : public Buffer { public: - ObjectBuffer(size_t capacity); + ObjectBuffer(BufferFactory& factory, size_t capacity); void clear(); diff --git a/src/engine/ObjectSender.cpp b/src/engine/ObjectSender.cpp index 75c25567..c25bf9c4 100644 --- a/src/engine/ObjectSender.cpp +++ b/src/engine/ObjectSender.cpp @@ -142,7 +142,8 @@ ObjectSender::send_port(ClientInterface* client, const PortImpl* port, bool bund // Send control value if (port->type() == PortType::CONTROL) { - const Sample& value = PtrCast(port->buffer(0))->value_at(0); + //const Sample& value = PtrCast(port->buffer(0))->value_at(0); + const Sample& value = ((const AudioBuffer*)port->buffer(0).get())->value_at(0); client->set_port_value(port->path(), value); } diff --git a/src/engine/PortImpl.cpp b/src/engine/PortImpl.cpp index 45ee76da..bbda42ea 100644 --- a/src/engine/PortImpl.cpp +++ b/src/engine/PortImpl.cpp @@ -59,7 +59,7 @@ PortImpl::PortImpl(BufferFactory& bufs, , _set_by_user(false) , _last_broadcasted_value(_value.type() == Atom::FLOAT ? _value.get_float() : 0.0f) // default? , _context(Context::AUDIO) - , _buffers(new Array< SharedPtr >(poly)) + , _buffers(new Array(poly)) , _prepared_buffers(NULL) { assert(node != NULL); @@ -113,7 +113,7 @@ PortImpl::prepare_poly(BufferFactory& bufs, uint32_t poly) /* FIXME: poly never goes down, harsh on memory.. */ if (poly > _poly) { - _prepared_buffers = new Array< SharedPtr >(poly, *_buffers); + _prepared_buffers = new Array(poly, *_buffers); for (uint32_t i = _poly; i < _prepared_buffers->size(); ++i) _prepared_buffers->at(i) = bufs.get(_type, _buffer_size); } diff --git a/src/engine/PortImpl.hpp b/src/engine/PortImpl.hpp index e004e03f..70e8e7ab 100644 --- a/src/engine/PortImpl.hpp +++ b/src/engine/PortImpl.hpp @@ -72,10 +72,10 @@ public: const Raul::Atom& value() const { return _value; } void set_value(const Raul::Atom& v) { _value = v; } - inline SharedPtr buffer(uint32_t voice) const { + inline BufferFactory::Ref buffer(uint32_t voice) const { return _buffers->at(voice); } - inline SharedPtr prepared_buffer(uint32_t voice) const { + inline BufferFactory::Ref prepared_buffer(uint32_t voice) const { return _prepared_buffers->at(voice); } @@ -130,11 +130,11 @@ protected: bool _set_by_user; Raul::Atom _last_broadcasted_value; - Context::ID _context; - Raul::Array< SharedPtr >* _buffers; + Context::ID _context; + Raul::Array* _buffers; // Dynamic polyphony - Raul::Array< SharedPtr >* _prepared_buffers; + Raul::Array* _prepared_buffers; friend class Engine; virtual ~PortImpl(); -- cgit v1.2.1