From 52011c3b045d9f49a4bc25c9f545bb35e5c2a0a9 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sat, 12 May 2012 07:04:11 +0000 Subject: More work towards checking contexts via parameter rather than thread magic. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4376 a436a847-0d15-0410-975c-d299462d15a1 --- src/server/BufferFactory.cpp | 9 +++++---- src/server/BufferFactory.hpp | 6 +++++- src/server/DuplexPort.cpp | 12 +++++++----- src/server/DuplexPort.hpp | 3 ++- src/server/EdgeImpl.cpp | 6 +++--- src/server/Engine.cpp | 9 ++++----- src/server/Engine.hpp | 20 ++++++++++++++++---- src/server/InputPort.cpp | 11 ++++++----- src/server/InputPort.hpp | 3 ++- src/server/JackDriver.cpp | 4 +++- src/server/LV2RequestRunFeature.hpp | 3 ++- src/server/MessageContext.cpp | 4 ++-- src/server/MessageContext.hpp | 2 +- src/server/NodeImpl.cpp | 3 ++- src/server/OutputPort.cpp | 8 +++++--- src/server/OutputPort.hpp | 3 ++- src/server/PatchImpl.cpp | 4 ++-- src/server/PortImpl.cpp | 5 ++++- src/server/PortImpl.hpp | 7 ++++--- src/server/PreProcessor.cpp | 2 -- src/server/events/Connect.cpp | 3 ++- src/server/events/Disconnect.cpp | 9 ++++++--- src/server/events/SetPortValue.cpp | 8 +++++--- src/server/ingen_lv2.cpp | 1 - 24 files changed, 90 insertions(+), 55 deletions(-) diff --git a/src/server/BufferFactory.cpp b/src/server/BufferFactory.cpp index cc02ba08..2726d7e3 100644 --- a/src/server/BufferFactory.cpp +++ b/src/server/BufferFactory.cpp @@ -88,7 +88,10 @@ BufferFactory::default_buffer_size(LV2_URID type) } BufferRef -BufferFactory::get(LV2_URID type, uint32_t capacity, bool force_create) +BufferFactory::get(Context& context, + LV2_URID type, + uint32_t capacity, + bool force_create) { Raul::AtomicPtr& head_ptr = free_list(type); Buffer* try_head = NULL; @@ -104,7 +107,7 @@ BufferFactory::get(LV2_URID type, uint32_t capacity, bool force_create) } if (!try_head) { - if (!ThreadManager::thread_is(THREAD_PROCESS)) { + if (!_engine.is_process_context(context)) { return create(type, capacity); } else { assert(false); @@ -126,8 +129,6 @@ BufferFactory::silent_buffer() BufferRef BufferFactory::create(LV2_URID type, uint32_t capacity) { - ThreadManager::assert_not_thread(THREAD_PROCESS); - Buffer* buffer = NULL; if (capacity == 0) { diff --git a/src/server/BufferFactory.hpp b/src/server/BufferFactory.hpp index 75cb0b63..ee6ca5d9 100644 --- a/src/server/BufferFactory.hpp +++ b/src/server/BufferFactory.hpp @@ -40,6 +40,7 @@ namespace Shared { class URIs; } namespace Server { +class Context; class Engine; class BufferFactory { @@ -50,7 +51,10 @@ public: static uint32_t audio_buffer_size(SampleCount nframes); uint32_t default_buffer_size(LV2_URID type); - BufferRef get(LV2_URID type, uint32_t capacity, bool force_create=false); + BufferRef get(Context& context, + LV2_URID type, + uint32_t capacity, + bool force_create = false); BufferRef silent_buffer(); diff --git a/src/server/DuplexPort.cpp b/src/server/DuplexPort.cpp index 1a62d562..7da3dd5e 100644 --- a/src/server/DuplexPort.cpp +++ b/src/server/DuplexPort.cpp @@ -55,14 +55,16 @@ DuplexPort::DuplexPort( } bool -DuplexPort::get_buffers(BufferFactory& bufs, +DuplexPort::get_buffers(Context& context, + BufferFactory& bufs, Raul::Array* buffers, uint32_t poly) const { - if (_is_output) - return InputPort::get_buffers(bufs, buffers, poly); - else - return OutputPort::get_buffers(bufs, buffers, poly); + if (_is_output) { + return InputPort::get_buffers(context, bufs, buffers, poly); + } else { + return OutputPort::get_buffers(context, bufs, buffers, poly); + } } /** Prepare for the execution of parent patch */ diff --git a/src/server/DuplexPort.hpp b/src/server/DuplexPort.hpp index 5b8f30e0..d0349f91 100644 --- a/src/server/DuplexPort.hpp +++ b/src/server/DuplexPort.hpp @@ -52,7 +52,8 @@ public: virtual ~DuplexPort() {} - bool get_buffers(BufferFactory& bufs, + bool get_buffers(Context& context, + BufferFactory& bufs, Raul::Array* buffers, uint32_t poly) const; diff --git a/src/server/EdgeImpl.cpp b/src/server/EdgeImpl.cpp index b783587d..50e69ec9 100644 --- a/src/server/EdgeImpl.cpp +++ b/src/server/EdgeImpl.cpp @@ -89,7 +89,7 @@ EdgeImpl::get_sources(Context& context, LV2_Atom obj; _queue->peek(sizeof(LV2_Atom), &obj); BufferRef buf = context.engine().buffer_factory()->get( - head()->buffer_type(), sizeof(LV2_Atom) + obj.size); + context, head()->buffer_type(), sizeof(LV2_Atom) + obj.size); void* data = buf->port_data(PortType::ATOM, context.offset()); _queue->read(sizeof(LV2_Atom) + obj.size, (LV2_Atom*)data); srcs[num_srcs++] = buf.get(); @@ -124,8 +124,8 @@ EdgeImpl::queue(Context& context) LV2_Atom_Sequence* seq = (LV2_Atom_Sequence*)src_buf->atom(); LV2_ATOM_SEQUENCE_FOREACH(seq, ev) { _queue->write(sizeof(LV2_Atom) + ev->body.size, &ev->body); - context.engine().message_context()->run( - _head->parent_node(), context.start() + ev->time.frames); + context.engine().message_context().run( + context, _head->parent_node(), context.start() + ev->time.frames); } } diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index edd0b4a4..f1c714d3 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -57,12 +57,12 @@ Engine::Engine(Ingen::Shared::World* a_world) , _broadcaster(new ClientBroadcaster()) , _control_bindings(NULL) , _maid(new Raul::Maid(event_queue_size())) - , _message_context(new MessageContext(*this)) , _node_factory(new NodeFactory(a_world)) , _pre_processor(new PreProcessor()) , _post_processor(new PostProcessor(*this)) , _event_writer(new EventWriter(*this)) , _root_patch(NULL) + , _message_context(*this) , _process_context(*this) , _quit_flag(false) { @@ -93,7 +93,6 @@ Engine::~Engine() delete _pre_processor; delete _post_processor; delete _node_factory; - delete _message_context; delete _control_bindings; delete _broadcaster; delete _event_writer; @@ -150,7 +149,7 @@ Engine::activate() _buffer_factory->set_block_length(_driver->block_length()); - _message_context->Thread::start(); + _message_context.Thread::start(); const Ingen::Shared::URIs& uris = world()->uris(); Shared::Forge& forge = world()->forge(); @@ -255,8 +254,8 @@ Engine::run(uint32_t sample_count) _process_context, _root_patch->port_impl(1)->buffer(0).get()); // Signal message context to run if necessary - if (message_context()->has_requests()) { - message_context()->signal(_process_context); + if (_message_context.has_requests()) { + _message_context.signal(_process_context); } } diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index c32c398a..bd211d56 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -24,6 +24,7 @@ #include "raul/SharedPtr.hpp" #include "ProcessContext.hpp" +#include "MessageContext.hpp" namespace Raul { class Maid; } @@ -75,10 +76,19 @@ public: void set_driver(SharedPtr driver); + /** Return true iff any events are pending. */ bool pending_events(); + + /** Enqueue an event to be processed (non-realtime threads only). */ void enqueue_event(Event* ev); + + /** Process events (process thread only). */ void process_events(); + bool is_process_context(const Context& context) const { + return &context == &_process_context; + } + Ingen::Shared::World* world() const { return _world; } EventWriter* interface() const { return _event_writer; } @@ -87,11 +97,12 @@ public: ControlBindings* control_bindings() const { return _control_bindings; } Driver* driver() const { return _driver.get(); } Raul::Maid* maid() const { return _maid; } - MessageContext* message_context() const { return _message_context; } NodeFactory* node_factory() const { return _node_factory; } PostProcessor* post_processor() const { return _post_processor; } PatchImpl* root_patch() const { return _root_patch; } - ProcessContext& process_context() { return _process_context; } + + MessageContext& message_context() { return _message_context; } + ProcessContext& process_context() { return _process_context; } SharedPtr engine_store() const; @@ -105,13 +116,14 @@ private: ControlBindings* _control_bindings; SharedPtr _driver; Raul::Maid* _maid; - MessageContext* _message_context; NodeFactory* _node_factory; PreProcessor* _pre_processor; PostProcessor* _post_processor; EventWriter* _event_writer; - PatchImpl* _root_patch; + PatchImpl* _root_patch; + + MessageContext _message_context; ProcessContext _process_context; bool _quit_flag; diff --git a/src/server/InputPort.cpp b/src/server/InputPort.cpp index 4239ae89..29229eb0 100644 --- a/src/server/InputPort.cpp +++ b/src/server/InputPort.cpp @@ -78,12 +78,13 @@ InputPort::apply_poly(ProcessContext& context, Raul::Maid& maid, uint32_t poly) * @return true iff buffers are locally owned by the port */ bool -InputPort::get_buffers(BufferFactory& bufs, +InputPort::get_buffers(Context& context, + BufferFactory& bufs, Raul::Array* buffers, uint32_t poly) const { - size_t num_edges = (ThreadManager::thread_is(THREAD_PROCESS)) - ? _edges.size() : _num_edges; + const bool is_process_context = bufs.engine().is_process_context(context); + size_t num_edges = is_process_context ? _edges.size() : _num_edges; if (is_a(PortType::AUDIO) && num_edges == 0) { // Audio input with no edges, use shared zero buffer @@ -92,7 +93,7 @@ InputPort::get_buffers(BufferFactory& bufs, return false; } else if (num_edges == 1) { - if (ThreadManager::thread_is(THREAD_PROCESS)) { + if (is_process_context) { if (!_edges.front().must_mix() && !_edges.front().must_queue()) { // Single non-mixing conneciton, use buffers directly @@ -105,7 +106,7 @@ InputPort::get_buffers(BufferFactory& bufs, // Otherwise, allocate local buffers for (uint32_t v = 0; v < poly; ++v) { - buffers->at(v) = _bufs.get(buffer_type(), _buffer_size); + buffers->at(v) = _bufs.get(context, buffer_type(), _buffer_size); buffers->at(v)->clear(); } return true; diff --git a/src/server/InputPort.hpp b/src/server/InputPort.hpp index 2565a48d..0dbf3550 100644 --- a/src/server/InputPort.hpp +++ b/src/server/InputPort.hpp @@ -73,7 +73,8 @@ public: bool apply_poly(ProcessContext& context, Raul::Maid& maid, uint32_t poly); - bool get_buffers(BufferFactory& bufs, + bool get_buffers(Context& context, + BufferFactory& bufs, Raul::Array* buffers, uint32_t poly) const; diff --git a/src/server/JackDriver.cpp b/src/server/JackDriver.cpp index 46234af0..c5c9abe9 100644 --- a/src/server/JackDriver.cpp +++ b/src/server/JackDriver.cpp @@ -64,7 +64,9 @@ JackPort::JackPort(JackDriver* driver, DuplexPort* patch_port) , _driver(driver) , _jack_port(NULL) { - patch_port->setup_buffers(*driver->_engine.buffer_factory(), patch_port->poly()); + patch_port->setup_buffers(driver->_engine.message_context(), + *driver->_engine.buffer_factory(), + patch_port->poly()); create(); } diff --git a/src/server/LV2RequestRunFeature.hpp b/src/server/LV2RequestRunFeature.hpp index fddab4b8..aae05e3b 100644 --- a/src/server/LV2RequestRunFeature.hpp +++ b/src/server/LV2RequestRunFeature.hpp @@ -49,7 +49,8 @@ struct RequestRunFeature : public Ingen::Shared::LV2Features::Feature { return LV2_WORKER_ERR_UNKNOWN; Engine* engine = (Engine*)info->world->engine().get(); - engine->message_context()->run( + engine->message_context().run( + engine->process_context(), dynamic_cast(info->node), engine->driver()->frame_time()); diff --git a/src/server/MessageContext.cpp b/src/server/MessageContext.cpp index 4a476bd6..f6e16ebf 100644 --- a/src/server/MessageContext.cpp +++ b/src/server/MessageContext.cpp @@ -42,9 +42,9 @@ MessageContext::MessageContext(Engine& engine) } void -MessageContext::run(NodeImpl* node, FrameTime time) +MessageContext::run(Context& context, NodeImpl* node, FrameTime time) { - if (ThreadManager::thread_is(THREAD_PROCESS)) { + if (_engine.is_process_context(context)) { const Request r(time, node); _requests.write(sizeof(Request), &r); // signal() will be called at the end of this process cycle diff --git a/src/server/MessageContext.hpp b/src/server/MessageContext.hpp index 4d2ec826..1ec238d4 100644 --- a/src/server/MessageContext.hpp +++ b/src/server/MessageContext.hpp @@ -48,7 +48,7 @@ public: /** Schedule a message context run at a certain time. * Safe to call from either process thread or pre-process thread. */ - void run(NodeImpl* node, FrameTime time); + void run(Context& context, NodeImpl* node, FrameTime time); protected: struct Request { diff --git a/src/server/NodeImpl.cpp b/src/server/NodeImpl.cpp index 1778e277..ab0783f4 100644 --- a/src/server/NodeImpl.cpp +++ b/src/server/NodeImpl.cpp @@ -23,6 +23,7 @@ #include "AudioBuffer.hpp" #include "ClientBroadcaster.hpp" +#include "Engine.hpp" #include "EngineStore.hpp" #include "NodeImpl.hpp" #include "PatchImpl.hpp" @@ -88,7 +89,7 @@ NodeImpl::activate(BufferFactory& bufs) for (uint32_t p = 0; p < num_ports(); ++p) { PortImpl* const port = _ports->at(p); - port->setup_buffers(bufs, port->poly()); + port->setup_buffers(bufs.engine().message_context(), bufs, port->poly()); port->connect_buffers(); for (uint32_t v = 0; v < _polyphony; ++v) { Buffer* const buf = port->buffer(v).get(); diff --git a/src/server/OutputPort.cpp b/src/server/OutputPort.cpp index 16984e3b..f1f3dfd7 100644 --- a/src/server/OutputPort.cpp +++ b/src/server/OutputPort.cpp @@ -18,6 +18,7 @@ #include "Buffer.hpp" #include "BufferFactory.hpp" +#include "Engine.hpp" #include "NodeImpl.hpp" #include "OutputPort.hpp" #include "ProcessContext.hpp" @@ -44,16 +45,17 @@ OutputPort::OutputPort(BufferFactory& bufs, _broadcast = true; - setup_buffers(bufs, poly); + setup_buffers(bufs.engine().message_context(), bufs, poly); } bool -OutputPort::get_buffers(BufferFactory& bufs, +OutputPort::get_buffers(Context& context, + BufferFactory& bufs, Raul::Array* buffers, uint32_t poly) const { for (uint32_t v = 0; v < poly; ++v) - buffers->at(v) = bufs.get(buffer_type(), _buffer_size); + buffers->at(v) = bufs.get(context, buffer_type(), _buffer_size); return true; } diff --git a/src/server/OutputPort.hpp b/src/server/OutputPort.hpp index 2338f5cf..0ca62d5d 100644 --- a/src/server/OutputPort.hpp +++ b/src/server/OutputPort.hpp @@ -51,7 +51,8 @@ public: virtual ~OutputPort() {} - bool get_buffers(BufferFactory& bufs, + bool get_buffers(Context& context, + BufferFactory& bufs, Raul::Array* buffers, uint32_t poly) const; diff --git a/src/server/PatchImpl.cpp b/src/server/PatchImpl.cpp index dd20fbf1..9a21ee23 100644 --- a/src/server/PatchImpl.cpp +++ b/src/server/PatchImpl.cpp @@ -131,14 +131,14 @@ PatchImpl::apply_internal_poly(ProcessContext& context, for (uint32_t j = 0; j < (*i)->num_ports(); ++j) { PortImpl* const port = (*i)->port_impl(j); if (port->is_input() && dynamic_cast(port)->direct_connect()) - port->setup_buffers(bufs, port->poly()); + port->setup_buffers(context, bufs, port->poly()); port->connect_buffers(context.offset()); } } const bool polyphonic = parent_patch() && (poly == parent_patch()->internal_poly()); for (Ports::iterator i = _outputs.begin(); i != _outputs.end(); ++i) - (*i)->setup_buffers(bufs, polyphonic ? poly : 1); + (*i)->setup_buffers(context, bufs, polyphonic ? poly : 1); _internal_poly = poly; diff --git a/src/server/PortImpl.cpp b/src/server/PortImpl.cpp index e9bb39ac..8cb333ba 100644 --- a/src/server/PortImpl.cpp +++ b/src/server/PortImpl.cpp @@ -140,7 +140,10 @@ void PortImpl::prepare_poly_buffers(BufferFactory& bufs) { if (_prepared_buffers) - get_buffers(bufs, _prepared_buffers, _prepared_buffers->size()); + get_buffers(bufs.engine().message_context(), + bufs, + _prepared_buffers, + _prepared_buffers->size()); } bool diff --git a/src/server/PortImpl.hpp b/src/server/PortImpl.hpp index da2ba237..503e619f 100644 --- a/src/server/PortImpl.hpp +++ b/src/server/PortImpl.hpp @@ -107,12 +107,13 @@ public: /** Empty buffer contents completely (ie silence) */ virtual void clear_buffers(); - virtual bool get_buffers(BufferFactory& bufs, + virtual bool get_buffers(Context& context, + BufferFactory& bufs, Raul::Array* buffers, uint32_t poly) const = 0; - void setup_buffers(BufferFactory& bufs, uint32_t poly) { - get_buffers(bufs, _buffers, poly); + void setup_buffers(Context& context, BufferFactory& bufs, uint32_t poly) { + get_buffers(context, bufs, _buffers, poly); } virtual void connect_buffers(SampleCount offset=0); diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index e733771c..c1eb1dd3 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -40,8 +40,6 @@ PreProcessor::~PreProcessor() void PreProcessor::event(Event* const ev) { - ThreadManager::assert_not_thread(THREAD_PROCESS); - // TODO: Probably possible to make this lock-free with CAS Glib::Mutex::Lock lock(_mutex); diff --git a/src/server/events/Connect.cpp b/src/server/events/Connect.cpp index e0b42424..e2d6cae7 100644 --- a/src/server/events/Connect.cpp +++ b/src/server/events/Connect.cpp @@ -143,7 +143,8 @@ Connect::pre_process() } _buffers = new Raul::Array(_dst_input_port->poly()); - _dst_input_port->get_buffers(*_engine.buffer_factory(), + _dst_input_port->get_buffers(_engine.message_context(), + *_engine.buffer_factory(), _buffers, _dst_input_port->poly()); if (_patch->enabled()) diff --git a/src/server/events/Disconnect.cpp b/src/server/events/Disconnect.cpp index d6fd999e..035a6f87 100644 --- a/src/server/events/Disconnect.cpp +++ b/src/server/events/Disconnect.cpp @@ -93,8 +93,10 @@ Disconnect::Impl::Impl(Engine& e, if (_dst_input_port->num_edges() == 0) { _buffers = new Raul::Array(_dst_input_port->poly()); - _dst_input_port->get_buffers(*_engine.buffer_factory(), - _buffers, _dst_input_port->poly()); + _dst_input_port->get_buffers(_engine.message_context(), + *_engine.buffer_factory(), + _buffers, + _dst_input_port->poly()); const bool is_control = _dst_input_port->is_a(PortType::CONTROL) || _dst_input_port->is_a(PortType::CV); @@ -188,7 +190,8 @@ Disconnect::Impl::execute(ProcessContext& context, bool set_dst_buffers) if (_buffers) { _engine.maid()->push(_dst_input_port->set_buffers(context, _buffers)); } else { - _dst_input_port->setup_buffers(*_engine.buffer_factory(), + _dst_input_port->setup_buffers(context, + *_engine.buffer_factory(), _dst_input_port->poly()); } _dst_input_port->connect_buffers(); diff --git a/src/server/events/SetPortValue.cpp b/src/server/events/SetPortValue.cpp index db3a72e4..1fb02cc4 100644 --- a/src/server/events/SetPortValue.cpp +++ b/src/server/events/SetPortValue.cpp @@ -84,9 +84,11 @@ SetPortValue::pre_process() // Port is a message context port, set its value and // call the plugin's message run function once if (_port && _port->parent_node()->context() == Context::MESSAGE) { - apply(*_engine.message_context()); - _engine.message_context()->run(_port->parent_node(), - _engine.driver()->frame_time() + _engine.driver()->block_length()); + apply(_engine.message_context()); + _engine.message_context().run( + _engine.message_context(), + _port->parent_node(), + _engine.driver()->frame_time() + _engine.driver()->block_length()); } if (_port) { diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index e9852534..1f459c15 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -214,7 +214,6 @@ public: } void flush_to_ui() { - assert(ThreadManager::thread_is(THREAD_PROCESS)); assert(_ports.size() >= 2); LV2_Atom_Sequence* seq = (LV2_Atom_Sequence*)_ports[1]->buffer(); -- cgit v1.2.1