From 9ae720fdc60d7e40b1b8be7c1133a57acb4e564c Mon Sep 17 00:00:00 2001 From: David Robillard Date: Thu, 3 May 2012 02:56:09 +0000 Subject: Separate EventWriter interface from EventQueue. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4319 a436a847-0d15-0410-975c-d299462d15a1 --- src/server/Engine.cpp | 3 +- src/server/EventQueue.cpp | 3 +- src/server/EventWriter.cpp | 188 ++++++++++++++++++++++++++++++++++++ src/server/EventWriter.hpp | 102 ++++++++++++++++++++ src/server/ServerInterfaceImpl.cpp | 190 ------------------------------------- src/server/ServerInterfaceImpl.hpp | 105 -------------------- src/server/ingen_engine.cpp | 10 +- src/server/ingen_lv2.cpp | 20 ++-- src/server/wscript | 2 +- src/socket/ingen_socket_server.cpp | 14 +-- 10 files changed, 319 insertions(+), 318 deletions(-) create mode 100644 src/server/EventWriter.cpp create mode 100644 src/server/EventWriter.hpp delete mode 100644 src/server/ServerInterfaceImpl.cpp delete mode 100644 src/server/ServerInterfaceImpl.hpp diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index 07c3663e..f793ad22 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -28,6 +28,7 @@ #include "ingen/shared/LV2URIMap.hpp" #include "ingen/shared/Store.hpp" #include "ingen/shared/URIs.hpp" + #include "BufferFactory.hpp" #include "ClientBroadcaster.hpp" #include "ControlBindings.hpp" @@ -36,12 +37,12 @@ #include "EngineStore.hpp" #include "Event.hpp" #include "EventSource.hpp" +#include "EventWriter.hpp" #include "MessageContext.hpp" #include "NodeFactory.hpp" #include "PatchImpl.hpp" #include "PostProcessor.hpp" #include "ProcessContext.hpp" -#include "ServerInterfaceImpl.hpp" #include "ThreadManager.hpp" using namespace std; diff --git a/src/server/EventQueue.cpp b/src/server/EventQueue.cpp index 8cb979ca..272273cd 100644 --- a/src/server/EventQueue.cpp +++ b/src/server/EventQueue.cpp @@ -29,11 +29,12 @@ EventQueue::EventQueue() { Thread::set_context(THREAD_PRE_PROCESS); set_name("EventQueue"); + start(); } EventQueue::~EventQueue() { - Thread::stop(); + stop(); } /** Push an unprepared event onto the queue. diff --git a/src/server/EventWriter.cpp b/src/server/EventWriter.cpp new file mode 100644 index 00000000..4725835b --- /dev/null +++ b/src/server/EventWriter.cpp @@ -0,0 +1,188 @@ +/* + This file is part of Ingen. + Copyright 2007-2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#include + +#include "raul/log.hpp" + +#include "ingen/shared/URIs.hpp" + +#include "ClientBroadcaster.hpp" +#include "Driver.hpp" +#include "Engine.hpp" +#include "EventQueue.hpp" +#include "EventWriter.hpp" +#include "events.hpp" + +#define LOG(s) s << "[EventWriter] " + +using namespace std; +using namespace Raul; + +namespace Ingen { +namespace Server { + +EventWriter::EventWriter(Engine& engine, EventSink& sink) + : _request_client(NULL) + , _sink(sink) + , _request_id(-1) + , _engine(engine) + , _in_bundle(false) +{ +} + +EventWriter::~EventWriter() +{ +} + +SampleCount +EventWriter::now() const +{ + // Exactly one cycle latency (some could run ASAP if we get lucky, but not always, and a slight + // constant latency is far better than jittery lower (average) latency + if (_engine.driver()) + return _engine.driver()->frame_time() + _engine.driver()->block_length(); + else + return 0; +} + +void +EventWriter::set_response_id(int32_t id) +{ + if (!_request_client) { // Kludge + _request_client = _engine.broadcaster()->client( + "http://drobilla.net/ns/ingen#internal"); + } + _request_id = id; +} + +/* *** ServerInterface implementation below here *** */ + +// Bundle commands + +void +EventWriter::bundle_begin() +{ + _in_bundle = true; +} + +void +EventWriter::bundle_end() +{ + _in_bundle = false; +} + +// Object commands + +void +EventWriter::put(const URI& uri, + const Resource::Properties& properties, + const Resource::Graph ctx) +{ + _sink.event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties)); +} + +void +EventWriter::delta(const URI& uri, + const Resource::Properties& remove, + const Resource::Properties& add) +{ + _sink.event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); +} + +void +EventWriter::move(const Path& old_path, + const Path& new_path) +{ + _sink.event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path)); +} + +void +EventWriter::del(const URI& uri) +{ + if (uri == "ingen:engine") { + if (_request_client) { + _request_client->response(_request_id, SUCCESS); + } + _engine.quit(); + } else { + _sink.event(new Events::Delete(_engine, _request_client, _request_id, now(), uri)); + } +} + +void +EventWriter::connect(const Path& tail_path, + const Path& head_path) +{ + _sink.event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path)); + +} + +void +EventWriter::disconnect(const Path& src, + const Path& dst) +{ + if (!Path::is_path(src) && !Path::is_path(dst)) { + LOG(Raul::error) << "Bad disconnect request " << src + << " => " << dst << std::endl; + return; + } + + _sink.event(new Events::Disconnect(_engine, _request_client, _request_id, now(), + Path(src.str()), Path(dst.str()))); +} + +void +EventWriter::disconnect_all(const Path& patch_path, + const Path& path) +{ + _sink.event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path)); +} + +void +EventWriter::set_property(const URI& uri, + const URI& predicate, + const Atom& value) +{ + if (uri == "ingen:engine" && predicate == "ingen:enabled" + && value.type() == _engine.world()->forge().Bool) { + if (value.get_bool()) { + _engine.activate(); + _sink.event(new Events::Ping(_engine, _request_client, _request_id, now())); + } else { + _sink.event(new Events::Deactivate(_engine, _request_client, _request_id, now())); + } + } else { + Resource::Properties remove; + remove.insert(make_pair(predicate, _engine.world()->uris()->wildcard)); + Resource::Properties add; + add.insert(make_pair(predicate, value)); + _sink.event(new Events::SetMetadata( + _engine, _request_client, _request_id, now(), false, Resource::DEFAULT, + uri, add, remove)); + } +} + +// Requests // + +void +EventWriter::get(const URI& uri) +{ + _sink.event(new Events::Get(_engine, _request_client, _request_id, now(), uri)); +} + +} // namespace Server +} // namespace Ingen diff --git a/src/server/EventWriter.hpp b/src/server/EventWriter.hpp new file mode 100644 index 00000000..771d22f5 --- /dev/null +++ b/src/server/EventWriter.hpp @@ -0,0 +1,102 @@ +/* + This file is part of Ingen. + Copyright 2007-2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#ifndef INGEN_ENGINE_EVENTWRITER_HPP +#define INGEN_ENGINE_EVENTWRITER_HPP + +#include +#include +#include + +#include "ingen/Interface.hpp" +#include "ingen/Resource.hpp" +#include "raul/SharedPtr.hpp" + +#include "types.hpp" + +namespace Ingen { +namespace Server { + +class Engine; +class EventSink; + +/** An Interface that creates and writes Events to an EventSink. + * + * This is where Interface calls get turned into Events which are actually + * processed by the engine to do things. + */ +class EventWriter : public Interface +{ +public: + explicit EventWriter(Engine& engine, EventSink& sink); + virtual ~EventWriter(); + + Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; } + + void set_response_interface(Interface* iface) { _request_client = iface; } + + virtual void set_response_id(int32_t id); + + virtual void bundle_begin(); + + virtual void bundle_end(); + + virtual void put(const Raul::URI& path, + const Resource::Properties& properties, + const Resource::Graph g=Resource::DEFAULT); + + virtual void delta(const Raul::URI& path, + const Resource::Properties& remove, + const Resource::Properties& add); + + virtual void move(const Raul::Path& old_path, + const Raul::Path& new_path); + + virtual void connect(const Raul::Path& tail, + const Raul::Path& head); + + virtual void disconnect(const Raul::Path& tail, + const Raul::Path& head); + + virtual void set_property(const Raul::URI& subject_path, + const Raul::URI& predicate, + const Raul::Atom& value); + + virtual void del(const Raul::URI& uri); + + virtual void disconnect_all(const Raul::Path& parent_patch_path, + const Raul::Path& path); + + virtual void get(const Raul::URI& uri); + + virtual void response(int32_t id, Status status) {} ///< N/A + virtual void error(const std::string& msg) {} ///< N/A + +protected: + Interface* _request_client; + EventSink& _sink; + int32_t _request_id; + Engine& _engine; + bool _in_bundle; ///< True iff a bundle is currently being received + +private: + SampleCount now() const; +}; + +} // namespace Server +} // namespace Ingen + +#endif // INGEN_ENGINE_QUEUEDENGINEINTERFACE_HPP diff --git a/src/server/ServerInterfaceImpl.cpp b/src/server/ServerInterfaceImpl.cpp deleted file mode 100644 index d3f151e7..00000000 --- a/src/server/ServerInterfaceImpl.cpp +++ /dev/null @@ -1,190 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#include - -#include "raul/log.hpp" - -#include "ingen/shared/URIs.hpp" - -#include "ClientBroadcaster.hpp" -#include "Driver.hpp" -#include "Engine.hpp" -#include "EventQueue.hpp" -#include "ServerInterfaceImpl.hpp" -#include "events.hpp" - -#define LOG(s) s << "[ServerInterfaceImpl] " - -using namespace std; -using namespace Raul; - -namespace Ingen { -namespace Server { - -ServerInterfaceImpl::ServerInterfaceImpl(Engine& engine) - : EventQueue() - , _request_client(NULL) - , _request_id(-1) - , _engine(engine) - , _in_bundle(false) -{ - start(); -} - -ServerInterfaceImpl::~ServerInterfaceImpl() -{ - stop(); -} - -SampleCount -ServerInterfaceImpl::now() const -{ - // Exactly one cycle latency (some could run ASAP if we get lucky, but not always, and a slight - // constant latency is far better than jittery lower (average) latency - if (_engine.driver()) - return _engine.driver()->frame_time() + _engine.driver()->block_length(); - else - return 0; -} - -void -ServerInterfaceImpl::set_response_id(int32_t id) -{ - if (!_request_client) { // Kludge - _request_client = _engine.broadcaster()->client( - "http://drobilla.net/ns/ingen#internal"); - } - _request_id = id; -} - -/* *** ServerInterface implementation below here *** */ - -// Bundle commands - -void -ServerInterfaceImpl::bundle_begin() -{ - _in_bundle = true; -} - -void -ServerInterfaceImpl::bundle_end() -{ - _in_bundle = false; -} - -// Object commands - -void -ServerInterfaceImpl::put(const URI& uri, - const Resource::Properties& properties, - const Resource::Graph ctx) -{ - event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties)); -} - -void -ServerInterfaceImpl::delta(const URI& uri, - const Resource::Properties& remove, - const Resource::Properties& add) -{ - event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); -} - -void -ServerInterfaceImpl::move(const Path& old_path, - const Path& new_path) -{ - event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path)); -} - -void -ServerInterfaceImpl::del(const URI& uri) -{ - if (uri == "ingen:engine") { - if (_request_client) { - _request_client->response(_request_id, SUCCESS); - } - _engine.quit(); - } else { - event(new Events::Delete(_engine, _request_client, _request_id, now(), uri)); - } -} - -void -ServerInterfaceImpl::connect(const Path& tail_path, - const Path& head_path) -{ - event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path)); - -} - -void -ServerInterfaceImpl::disconnect(const Path& src, - const Path& dst) -{ - if (!Path::is_path(src) && !Path::is_path(dst)) { - LOG(Raul::error) << "Bad disconnect request " << src - << " => " << dst << std::endl; - return; - } - - event(new Events::Disconnect(_engine, _request_client, _request_id, now(), - Path(src.str()), Path(dst.str()))); -} - -void -ServerInterfaceImpl::disconnect_all(const Path& patch_path, - const Path& path) -{ - event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path)); -} - -void -ServerInterfaceImpl::set_property(const URI& uri, - const URI& predicate, - const Atom& value) -{ - if (uri == "ingen:engine" && predicate == "ingen:enabled" - && value.type() == _engine.world()->forge().Bool) { - if (value.get_bool()) { - _engine.activate(); - event(new Events::Ping(_engine, _request_client, _request_id, now())); - } else { - event(new Events::Deactivate(_engine, _request_client, _request_id, now())); - } - } else { - Resource::Properties remove; - remove.insert(make_pair(predicate, _engine.world()->uris()->wildcard)); - Resource::Properties add; - add.insert(make_pair(predicate, value)); - event(new Events::SetMetadata( - _engine, _request_client, _request_id, now(), false, Resource::DEFAULT, - uri, add, remove)); - } -} - -// Requests // - -void -ServerInterfaceImpl::get(const URI& uri) -{ - event(new Events::Get(_engine, _request_client, _request_id, now(), uri)); -} - -} // namespace Server -} // namespace Ingen diff --git a/src/server/ServerInterfaceImpl.hpp b/src/server/ServerInterfaceImpl.hpp deleted file mode 100644 index f34afaae..00000000 --- a/src/server/ServerInterfaceImpl.hpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#ifndef INGEN_ENGINE_QUEUEDENGINEINTERFACE_HPP -#define INGEN_ENGINE_QUEUEDENGINEINTERFACE_HPP - -#include -#include -#include -#include "raul/SharedPtr.hpp" -#include "ingen/Interface.hpp" -#include "ingen/Resource.hpp" -#include "EventQueue.hpp" -#include "types.hpp" - -namespace Ingen { -namespace Server { - -class Engine; - -/** A queued (preprocessed) event source / interface. - * - * This is both an Interface and an EventSource, calling Interface methods - * will result in events in the EventSource. - * - * Responses occur through the event mechanism (which notified clients in - * event post_process methods) and are related to an event by an integer ID. - * If you do not register a request, you have no way of knowing if your calls - * are successful. - */ -class ServerInterfaceImpl : public EventQueue, - public Interface -{ -public: - explicit ServerInterfaceImpl(Engine& engine); - virtual ~ServerInterfaceImpl(); - - Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; } - - void set_response_interface(Interface* iface) { _request_client = iface; } - - virtual void set_response_id(int32_t id); - - virtual void bundle_begin(); - - virtual void bundle_end(); - - virtual void put(const Raul::URI& path, - const Resource::Properties& properties, - const Resource::Graph g=Resource::DEFAULT); - - virtual void delta(const Raul::URI& path, - const Resource::Properties& remove, - const Resource::Properties& add); - - virtual void move(const Raul::Path& old_path, - const Raul::Path& new_path); - - virtual void connect(const Raul::Path& tail, - const Raul::Path& head); - - virtual void disconnect(const Raul::Path& tail, - const Raul::Path& head); - - virtual void set_property(const Raul::URI& subject_path, - const Raul::URI& predicate, - const Raul::Atom& value); - - virtual void del(const Raul::URI& uri); - - virtual void disconnect_all(const Raul::Path& parent_patch_path, - const Raul::Path& path); - - virtual void get(const Raul::URI& uri); - - virtual void response(int32_t id, Status status) {} ///< N/A - virtual void error(const std::string& msg) {} ///< N/A - -protected: - Interface* _request_client; - int32_t _request_id; - Engine& _engine; - bool _in_bundle; ///< True iff a bundle is currently being received - -private: - SampleCount now() const; -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_QUEUEDENGINEINTERFACE_HPP diff --git a/src/server/ingen_engine.cpp b/src/server/ingen_engine.cpp index 84d38819..28659ec6 100644 --- a/src/server/ingen_engine.cpp +++ b/src/server/ingen_engine.cpp @@ -17,7 +17,8 @@ #include "ingen/shared/Module.hpp" #include "ingen/shared/World.hpp" #include "Engine.hpp" -#include "ServerInterfaceImpl.hpp" +#include "EventWriter.hpp" +#include "EventQueue.hpp" #include "util.hpp" using namespace Ingen; @@ -27,10 +28,11 @@ struct IngenEngineModule : public Ingen::Shared::Module { Server::set_denormal_flags(); SharedPtr engine(new Server::Engine(world)); world->set_local_engine(engine); - SharedPtr interface( - new Server::ServerInterfaceImpl(*engine.get())); + SharedPtr queue(new Server::EventQueue()); + SharedPtr interface( + new Server::EventWriter(*engine.get(), *queue.get())); world->set_engine(interface); - engine->add_event_source(interface); + engine->add_event_source(queue); assert(world->local_engine() == engine); } }; diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 21f1299d..94e7344d 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -45,10 +45,11 @@ #include "AudioBuffer.hpp" #include "Driver.hpp" #include "Engine.hpp" +#include "EventQueue.hpp" +#include "EventWriter.hpp" #include "PatchImpl.hpp" #include "PostProcessor.hpp" #include "ProcessContext.hpp" -#include "ServerInterfaceImpl.hpp" #include "ThreadManager.hpp" #define NS_INGEN "http://drobilla.net/ns/ingen#" @@ -425,11 +426,12 @@ ingen_instantiate(const LV2_Descriptor* descriptor, plugin->main = new MainThread(engine); plugin->main->set_name("Main"); - SharedPtr interface( - new Server::ServerInterfaceImpl(*engine.get())); + SharedPtr queue(new Server::EventQueue()); + SharedPtr interface( + new Server::EventWriter(*engine.get(), *queue.get())); plugin->world->set_engine(interface); - engine->add_event_source(interface); + engine->add_event_source(queue); Raul::Thread::get().set_context(Server::THREAD_PRE_PROCESS); Server::ThreadManager::single_threaded = true; @@ -450,17 +452,15 @@ ingen_instantiate(const LV2_Descriptor* descriptor, engine->post_processor()->set_end_time(UINT_MAX); - // TODO: Load only necessary plugins - //plugin->world->engine()->get("ingen:plugins"); - interface->process(*engine->post_processor(), context, false); + queue->process(*engine->post_processor(), context, false); engine->post_processor()->process(); plugin->world->parser()->parse_file(plugin->world, plugin->world->engine().get(), patch->filename); - while (!interface->empty()) { - interface->process(*engine->post_processor(), context, false); + while (!queue->empty()) { + queue->process(*engine->post_processor(), context, false); engine->post_processor()->process(); } @@ -493,7 +493,7 @@ ingen_activate(LV2_Handle instance) { IngenPlugin* me = (IngenPlugin*)instance; me->world->local_engine()->activate(); - ((ServerInterfaceImpl*)me->world->engine().get())->start(); + //((EventWriter*)me->world->engine().get())->start(); me->main->start(); } diff --git a/src/server/wscript b/src/server/wscript index faaeb38a..823583ba 100644 --- a/src/server/wscript +++ b/src/server/wscript @@ -14,6 +14,7 @@ def build(bld): EngineStore.cpp Event.cpp EventQueue.cpp + EventWriter.cpp GraphObjectImpl.cpp InputPort.cpp InternalPlugin.cpp @@ -32,7 +33,6 @@ def build(bld): PostProcessor.cpp ProcessContext.cpp ProcessSlave.cpp - ServerInterfaceImpl.cpp events/Connect.cpp events/CreateNode.cpp events/CreatePatch.cpp diff --git a/src/socket/ingen_socket_server.cpp b/src/socket/ingen_socket_server.cpp index 7ee20217..19257add 100644 --- a/src/socket/ingen_socket_server.cpp +++ b/src/socket/ingen_socket_server.cpp @@ -20,7 +20,8 @@ #include "ingen/shared/World.hpp" #include "../server/Engine.hpp" -#include "../server/ServerInterfaceImpl.hpp" +#include "../server/EventWriter.hpp" +#include "../server/EventQueue.hpp" #include "SocketListener.hpp" @@ -29,15 +30,16 @@ using namespace Ingen; struct IngenSocketServerModule : public Ingen::Shared::Module { void load(Ingen::Shared::World* world) { Server::Engine* engine = (Server::Engine*)world->local_engine().get(); - SharedPtr interface( - new Server::ServerInterfaceImpl(*engine)); - receiver = SharedPtr( + SharedPtr queue(new Server::EventQueue()); + SharedPtr interface( + new Server::EventWriter(*engine, *queue.get())); + listener = SharedPtr( new Ingen::Socket::SocketListener(*world, interface)); - engine->add_event_source(interface); + engine->add_event_source(queue); } - SharedPtr receiver; + SharedPtr listener; }; extern "C" { -- cgit v1.2.1