From c94231abbc601652e73423ec6e43a1e241024a17 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Tue, 17 Jul 2012 20:32:37 +0000 Subject: Implement worker extension correctly (for one voice only). Woring sequence port I/O for LV2 nodes. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4543 a436a847-0d15-0410-975c-d299462d15a1 --- src/server/Buffer.cpp | 17 +++-- src/server/Buffer.hpp | 3 + src/server/Engine.cpp | 4 + src/server/Engine.hpp | 6 +- src/server/InputPort.cpp | 2 +- src/server/LV2Info.cpp | 3 - src/server/LV2Node.cpp | 37 ++++++++-- src/server/LV2Node.hpp | 30 +++++++- src/server/LV2RequestRunFeature.hpp | 86 ---------------------- src/server/MessageContext.cpp | 2 +- src/server/NodeImpl.hpp | 4 +- src/server/OutputPort.cpp | 2 +- src/server/PatchImpl.hpp | 2 + src/server/Worker.cpp | 143 ++++++++++++++++++++++++++++++++++++ src/server/Worker.hpp | 61 +++++++++++++++ src/server/events/SetPortValue.cpp | 10 ++- src/server/wscript | 1 + src/shared/URIs.cpp | 1 + 18 files changed, 300 insertions(+), 114 deletions(-) delete mode 100644 src/server/LV2RequestRunFeature.hpp create mode 100644 src/server/Worker.cpp create mode 100644 src/server/Worker.hpp (limited to 'src') diff --git a/src/server/Buffer.cpp b/src/server/Buffer.cpp index 85fe727c..5438b4ce 100644 --- a/src/server/Buffer.cpp +++ b/src/server/Buffer.cpp @@ -43,12 +43,6 @@ Buffer::Buffer(BufferFactory& bufs, LV2_URID type, uint32_t capacity) , _next(NULL) , _refs(0) { - if (capacity > UINT32_MAX) { - Raul::error << "Event buffer size " << capacity << " too large, aborting." - << std::endl; - throw std::bad_alloc(); - } - #ifdef HAVE_POSIX_MEMALIGN int ret = posix_memalign((void**)&_atom, 16, capacity); #else @@ -56,7 +50,7 @@ Buffer::Buffer(BufferFactory& bufs, LV2_URID type, uint32_t capacity) int ret = (_atom != NULL) ? 0 : -1; #endif - if (ret != 0) { + if (ret) { Raul::error << "Failed to allocate event buffer." << std::endl; throw std::bad_alloc(); } @@ -144,6 +138,15 @@ Buffer::prepare_write(Context& context) } } +void +Buffer::prepare_output_write(Context& context) +{ + if (_type == _factory.uris().atom_Sequence) { + _atom->type = (LV2_URID)_factory.uris().atom_Chunk; + _atom->size = _capacity - sizeof(LV2_Atom_Sequence); + } +} + bool Buffer::append_event(int64_t frames, uint32_t size, diff --git a/src/server/Buffer.hpp b/src/server/Buffer.hpp index f0a84910..ee8f2361 100644 --- a/src/server/Buffer.hpp +++ b/src/server/Buffer.hpp @@ -55,6 +55,9 @@ public: LV2_URID type() const { return _type; } uint32_t capacity() const { return _capacity; } + /// Sequence buffers only + void prepare_output_write(Context& context); + /// Sequence buffers only bool append_event(int64_t frames, uint32_t size, diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index 85ca530c..30350d52 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -40,6 +40,7 @@ #include "PreProcessor.hpp" #include "ProcessContext.hpp" #include "ThreadManager.hpp" +#include "Worker.hpp" using namespace std; @@ -59,6 +60,7 @@ Engine::Engine(Ingen::Shared::World* a_world) , _post_processor(new PostProcessor(*this)) , _event_writer(new EventWriter(*this)) , _root_patch(NULL) + , _worker(new Worker(event_queue_size())) , _message_context(*this) , _process_context(*this) , _quit_flag(false) @@ -75,6 +77,8 @@ Engine::Engine(Ingen::Shared::World* a_world) } _control_bindings = new ControlBindings(*this); + + _world->lv2_features().add_feature(_worker->schedule_feature()); } Engine::~Engine() diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 11b7b35b..4ee14988 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -47,6 +47,7 @@ class PostProcessor; class PreProcessor; class ProcessContext; class PatchImpl; +class Worker; /** The engine which executes the process graph. @@ -101,6 +102,7 @@ public: NodeFactory* node_factory() const { return _node_factory; } PostProcessor* post_processor() const { return _post_processor; } PatchImpl* root_patch() const { return _root_patch; } + Worker* worker() const { return _worker; } MessageContext& message_context() { return _message_context; } ProcessContext& process_context() { return _process_context; } @@ -121,8 +123,8 @@ private: PreProcessor* _pre_processor; PostProcessor* _post_processor; EventWriter* _event_writer; - - PatchImpl* _root_patch; + PatchImpl* _root_patch; + Worker* _worker; MessageContext _message_context; ProcessContext _process_context; diff --git a/src/server/InputPort.cpp b/src/server/InputPort.cpp index 2510edfa..25059242 100644 --- a/src/server/InputPort.cpp +++ b/src/server/InputPort.cpp @@ -212,7 +212,7 @@ InputPort::post_process(Context& context) if (_buffer_type == _bufs.uris().atom_Sequence) { // Clear events received via a SetPortValue for (uint32_t v = 0; v < _poly; ++v) { - buffer(v)->clear(); + buffer(v)->prepare_write(context); } } _set_by_user = false; diff --git a/src/server/LV2Info.cpp b/src/server/LV2Info.cpp index 7f985492..085036c1 100644 --- a/src/server/LV2Info.cpp +++ b/src/server/LV2Info.cpp @@ -27,7 +27,6 @@ #include "ingen/shared/LV2Features.hpp" #include "LV2Info.hpp" -#include "LV2RequestRunFeature.hpp" #include "LV2ResizeFeature.hpp" namespace Ingen { @@ -55,8 +54,6 @@ LV2Info::LV2Info(Ingen::Shared::World* world) world->lv2_features().add_feature( SharedPtr(new ResizeFeature())); - world->lv2_features().add_feature( - SharedPtr(new RequestRunFeature())); } LV2Info::~LV2Info() diff --git a/src/server/LV2Node.cpp b/src/server/LV2Node.cpp index 0ee5c500..88cfff73 100644 --- a/src/server/LV2Node.cpp +++ b/src/server/LV2Node.cpp @@ -387,9 +387,8 @@ LV2Node::instantiate(BufferFactory& bufs) // FIXME: Polyphony + worker? if (lilv_plugin_has_feature(plug, info->work_schedule)) { _worker_iface = (LV2_Worker_Interface*) - lilv_instance_get_extension_data( - (LilvInstance*)(*_instances)[0].get(), - LV2_WORKER__interface); + lilv_instance_get_extension_data(instance(0), + LV2_WORKER__interface); } return ret; @@ -413,11 +412,25 @@ LV2Node::deactivate() lilv_instance_deactivate(instance(i)); } +LV2_Worker_Status +LV2Node::work_respond(LV2_Worker_Respond_Handle handle, + uint32_t size, + const void* data) +{ + LV2Node* node = (LV2Node*)handle; + LV2Node::Response* r = new LV2Node::Response(size, data); + node->_responses.push_back(*r); + return LV2_WORKER_SUCCESS; +} + void -LV2Node::work(MessageContext& context, uint32_t size, const void* data) +LV2Node::work(uint32_t size, const void* data) { if (_worker_iface) { - _worker_iface->work(instance(0), NULL, NULL, size, data); + LV2_Handle inst = lilv_instance_get_handle(instance(0)); + if (_worker_iface->work(inst, work_respond, this, size, data)) { + Raul::error(Raul::fmt("Error calling %1% work method\n") % _path); + } } } @@ -429,6 +442,20 @@ LV2Node::process(ProcessContext& context) for (uint32_t i = 0; i < _polyphony; ++i) lilv_instance_run(instance(i), context.nframes()); + if (_worker_iface) { + LV2_Handle inst = lilv_instance_get_handle(instance(0)); + while (!_responses.empty()) { + Response& r = _responses.front(); + _worker_iface->work_response(inst, r.size, r.data); + _responses.pop_front(); + context.engine().maid()->push(&r); + } + + if (_worker_iface->end_run) { + _worker_iface->end_run(inst); + } + } + NodeImpl::post_process(context); } diff --git a/src/server/LV2Node.hpp b/src/server/LV2Node.hpp index 03f8bf43..18d96dc8 100644 --- a/src/server/LV2Node.hpp +++ b/src/server/LV2Node.hpp @@ -55,7 +55,7 @@ public: void activate(BufferFactory& bufs); void deactivate(); - void work(MessageContext& context, uint32_t size, const void* data); + void work(uint32_t size, const void* data); void process(ProcessContext& context); @@ -74,10 +74,38 @@ protected: typedef Raul::Array< SharedPtr > Instances; + struct Response : public Raul::Deletable + , public Raul::Noncopyable + , public boost::intrusive::slist_base_hook<> + { + inline Response(uint32_t s, const void* d) + : size(s) + , data(malloc(s)) + { + memcpy(data, d, s); + } + + ~Response() { + free(data); + } + + const uint32_t size; + void* const data; + }; + + typedef boost::intrusive::slist, + boost::intrusive::constant_time_size + > Responses; + + static LV2_Worker_Status work_respond( + LV2_Worker_Respond_Handle handle, uint32_t size, const void* data); + LV2Plugin* _lv2_plugin; Instances* _instances; Instances* _prepared_instances; LV2_Worker_Interface* _worker_iface; + Responses _responses; SharedPtr _features; }; diff --git a/src/server/LV2RequestRunFeature.hpp b/src/server/LV2RequestRunFeature.hpp deleted file mode 100644 index aae05e3b..00000000 --- a/src/server/LV2RequestRunFeature.hpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#ifndef INGEN_ENGINE_LV2_REQUEST_RUN_FEATURE_HPP -#define INGEN_ENGINE_LV2_REQUEST_RUN_FEATURE_HPP - -#include "lv2/lv2plug.in/ns/ext/worker/worker.h" - -#include "raul/log.hpp" - -#include "ingen/shared/LV2Features.hpp" - -#include "Driver.hpp" -#include "Engine.hpp" -#include "MessageContext.hpp" -#include "NodeImpl.hpp" -#include "PortImpl.hpp" - -namespace Ingen { -namespace Server { - -struct RequestRunFeature : public Ingen::Shared::LV2Features::Feature { - struct Info { - inline Info(Shared::World* w, Node* n) : world(w), node(n) {} - Shared::World* world; - Node* node; - }; - - static LV2_Worker_Status - schedule_work(LV2_Worker_Schedule_Handle handle, - uint32_t size, - const void* data) - { - Info* info = reinterpret_cast(handle); - if (!info->world->engine()) - return LV2_WORKER_ERR_UNKNOWN; - - Engine* engine = (Engine*)info->world->engine().get(); - engine->message_context().run( - engine->process_context(), - dynamic_cast(info->node), - engine->driver()->frame_time()); - - return LV2_WORKER_SUCCESS; - } - - static void delete_feature(LV2_Feature* feature) { - free(feature->data); - free(feature); - } - - SharedPtr feature(Shared::World* world, Node* n) { - const NodeImpl* node = dynamic_cast(n); - if (!node) - return SharedPtr(); - - LV2_Worker_Schedule* data = (LV2_Worker_Schedule*)malloc( - sizeof(LV2_Worker_Schedule)); - data->handle = new Info(world, n); - data->schedule_work = schedule_work; - - LV2_Feature* f = (LV2_Feature*)malloc(sizeof(LV2_Feature)); - f->URI = LV2_WORKER__schedule; - f->data = data; - - return SharedPtr(f, &delete_feature); - } -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_LV2_REQUEST_RUN_FEATURE_HPP diff --git a/src/server/MessageContext.cpp b/src/server/MessageContext.cpp index 3a4f9742..e0c6b010 100644 --- a/src/server/MessageContext.cpp +++ b/src/server/MessageContext.cpp @@ -105,7 +105,7 @@ void MessageContext::execute(const Request& req) { NodeImpl* node = req.node; - node->work(*this, 0, NULL); + node->message_process(*this); } } // namespace Server diff --git a/src/server/NodeImpl.hpp b/src/server/NodeImpl.hpp index 36cc65d3..f97c6d90 100644 --- a/src/server/NodeImpl.hpp +++ b/src/server/NodeImpl.hpp @@ -121,9 +121,7 @@ public: virtual void learn() {} /** Run the node for one instant in the non-realtime worker thread. */ - virtual void work(MessageContext& context, - uint32_t size, - const void* data) {} + virtual void message_process(MessageContext& context) {} /** Do whatever needs doing in the process thread before process() is called */ virtual void pre_process(ProcessContext& context); diff --git a/src/server/OutputPort.cpp b/src/server/OutputPort.cpp index 56da75d9..eee852f5 100644 --- a/src/server/OutputPort.cpp +++ b/src/server/OutputPort.cpp @@ -63,7 +63,7 @@ void OutputPort::pre_process(Context& context) { for (uint32_t v = 0; v < _poly; ++v) - _buffers->at(v)->prepare_write(context); + _buffers->at(v)->prepare_output_write(context); } void diff --git a/src/server/PatchImpl.hpp b/src/server/PatchImpl.hpp index d1b11187..c3791712 100644 --- a/src/server/PatchImpl.hpp +++ b/src/server/PatchImpl.hpp @@ -155,6 +155,8 @@ public: uint32_t internal_poly() const { return _poly_pre; } uint32_t internal_poly_process() const { return _poly_process; } + Engine& engine() { return _engine; } + private: inline void compile_recursive(NodeImpl* n, CompiledPatch* output) const; void process_parallel(ProcessContext& context); diff --git a/src/server/Worker.cpp b/src/server/Worker.cpp new file mode 100644 index 00000000..c3cebffd --- /dev/null +++ b/src/server/Worker.cpp @@ -0,0 +1,143 @@ +/* + This file is part of Ingen. + Copyright 2007-2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#include "ingen/shared/LV2Features.hpp" +#include "lv2/lv2plug.in/ns/ext/worker/worker.h" +#include "raul/log.hpp" + +#include "Driver.hpp" +#include "Engine.hpp" +#include "MessageContext.hpp" +#include "LV2Node.hpp" +#include "PatchImpl.hpp" +#include "Worker.hpp" + +namespace Ingen { +namespace Server { + +/// A message in the Worker::_requests ring +struct MessageHeader { + LV2Node* node; ///< Node this message is from + uint32_t size; ///< Size of following data + // `size' bytes of data follow here +}; + +static LV2_Worker_Status +schedule(LV2_Worker_Schedule_Handle handle, + uint32_t size, + const void* data) +{ + LV2Node* node = (LV2Node*)handle; + Engine& engine = node->parent_patch()->engine(); + Worker* worker = engine.worker(); + + return worker->request(node, size, data); +} + +LV2_Worker_Status +Worker::request(LV2Node* node, + uint32_t size, + const void* data) +{ + if (_requests.write_space() < sizeof(MessageHeader) + size) { + Raul::error("Work request ring overflow\n"); + return LV2_WORKER_ERR_NO_SPACE; + } + + const MessageHeader msg = { node, size }; + if (_requests.write(sizeof(msg), &msg) != sizeof(msg)) { + Raul::error("Error writing header to work request ring\n"); + return LV2_WORKER_ERR_UNKNOWN; + } + if (_requests.write(size, data) != size) { + Raul::error("Error writing body to work request ring\n"); + return LV2_WORKER_ERR_UNKNOWN; + } + + _sem.post(); + + return LV2_WORKER_SUCCESS; +} + +static void +delete_feature(LV2_Feature* feature) +{ + free(feature->data); + free(feature); +} + +SharedPtr +Worker::Schedule::feature(Shared::World* world, Node* n) +{ + LV2Node* node = dynamic_cast(n); + if (!node) { + return SharedPtr(); + } + + LV2_Worker_Schedule* data = (LV2_Worker_Schedule*)malloc( + sizeof(LV2_Worker_Schedule)); + data->handle = node; + data->schedule_work = schedule; + + LV2_Feature* f = (LV2_Feature*)malloc(sizeof(LV2_Feature)); + f->URI = LV2_WORKER__schedule; + f->data = data; + + return SharedPtr(f, &delete_feature); + +} + +Worker::Worker(uint32_t buffer_size) + : Raul::Thread("Worker") + , _schedule(new Schedule()) + , _sem(0) + , _requests(buffer_size) + , _responses(buffer_size) + , _buffer((uint8_t*)malloc(buffer_size)) + , _buffer_size(buffer_size) +{ + start(); +} + +void +Worker::_run() +{ + while (!_exit_flag) { + _sem.wait(); + MessageHeader msg; + if (_requests.read_space() > sizeof(msg)) { + if (_requests.read(sizeof(msg), &msg) != sizeof(msg)) { + Raul::error("Error reading header from work request ring\n"); + continue; + } + + if (msg.size >= _buffer_size - sizeof(msg)) { + Raul::error("Corrupt work request ring\n"); + return; + } + + if (_requests.read(msg.size, _buffer) != msg.size) { + Raul::error("Error reading body from work request ring\n"); + continue; + } + + msg.node->work(msg.size, _buffer); + } + } +} + +} // namespace Server +} // namespace Ingen diff --git a/src/server/Worker.hpp b/src/server/Worker.hpp new file mode 100644 index 00000000..c390564e --- /dev/null +++ b/src/server/Worker.hpp @@ -0,0 +1,61 @@ +/* + This file is part of Ingen. + Copyright 2007-2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#ifndef INGEN_ENGINE_WORKER_HPP +#define INGEN_ENGINE_WORKER_HPP + +#include "ingen/shared/LV2Features.hpp" +#include "lv2/lv2plug.in/ns/ext/worker/worker.h" +#include "raul/RingBuffer.hpp" +#include "raul/Semaphore.hpp" +#include "raul/Thread.hpp" + +namespace Ingen { +namespace Server { + +class LV2Node; + +class Worker : public Raul::Thread +{ +public: + Worker(uint32_t buffer_size); + + struct Schedule : public Shared::LV2Features::Feature { + SharedPtr feature(Shared::World* world, Node* n); + }; + + LV2_Worker_Status request(LV2Node* node, + uint32_t size, + const void* data); + + SharedPtr schedule_feature() { return _schedule; } + +private: + SharedPtr _schedule; + + Raul::Semaphore _sem; + Raul::RingBuffer _requests; + Raul::RingBuffer _responses; + uint8_t* _buffer; + uint32_t _buffer_size; + + virtual void _run(); +}; + +} // namespace Server +} // namespace Ingen + +#endif // INGEN_ENGINE_WORKER_HPP diff --git a/src/server/events/SetPortValue.cpp b/src/server/events/SetPortValue.cpp index e2abcb17..673c9984 100644 --- a/src/server/events/SetPortValue.cpp +++ b/src/server/events/SetPortValue.cpp @@ -113,10 +113,12 @@ SetPortValue::apply(Context& context) } } else if (buf->type() == uris.atom_Sequence) { buf->prepare_write(context); // FIXME: incorrect - if (!buf->append_event(_time - context.start(), - _value.size(), - _value.type(), - (const uint8_t*)_value.get_body())) { + if (buf->append_event(_time - context.start(), + _value.size(), + _value.type(), + (const uint8_t*)_value.get_body())) { + _port->raise_set_by_user_flag(); + } else { Raul::warn(Raul::fmt("Error writing to port %1%\n") % _port_path); } } else { diff --git a/src/server/wscript b/src/server/wscript index 610ffb25..95fcb706 100644 --- a/src/server/wscript +++ b/src/server/wscript @@ -30,6 +30,7 @@ def build(bld): PreProcessor.cpp ProcessContext.cpp ProcessSlave.cpp + Worker.cpp events/Connect.cpp events/CreateNode.cpp events/CreatePatch.cpp diff --git a/src/shared/URIs.cpp b/src/shared/URIs.cpp index 277946d5..347a7a4a 100644 --- a/src/shared/URIs.cpp +++ b/src/shared/URIs.cpp @@ -40,6 +40,7 @@ URIs::URIs(Shared::Forge& f, URIMap* map) , atom_AtomPort (forge, map, LV2_ATOM__AtomPort) , atom_Blank (forge, map, LV2_ATOM__Blank) , atom_Bool (forge, map, LV2_ATOM__Bool) + , atom_Chunk (forge, map, LV2_ATOM__Chunk) , atom_Float (forge, map, LV2_ATOM__Float) , atom_Int (forge, map, LV2_ATOM__Int) , atom_Resource (forge, map, LV2_ATOM__Resource) -- cgit v1.2.1