From 0d4792af695d94a1e476adcdb65eed8a501ddbd4 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Fri, 21 Oct 2011 21:45:14 +0000 Subject: Make OSCEngineReceiver and HTTPEngineReceiver has-a ServerInterfaceImpl (instead of is-a). Towards further modularization... git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@3570 a436a847-0d15-0410-975c-d299462d15a1 --- src/server/EventSource.cpp | 10 +++++++ src/server/EventSource.hpp | 8 +++--- src/server/HTTPEngineReceiver.cpp | 31 +++++++++++++------- src/server/HTTPEngineReceiver.hpp | 23 +++++++++------ src/server/OSCEngineReceiver.cpp | 58 ++++++++++++++++++++------------------ src/server/OSCEngineReceiver.hpp | 19 ++++++++----- src/server/Request.hpp | 8 ++---- src/server/ServerInterfaceImpl.cpp | 24 ++++++++-------- src/server/ServerInterfaceImpl.hpp | 7 +++-- src/server/events/Delete.cpp | 1 - src/server/events/SetMetadata.cpp | 2 +- src/server/ingen_http.cpp | 9 +++++- src/server/ingen_osc.cpp | 8 +++++- 13 files changed, 126 insertions(+), 82 deletions(-) diff --git a/src/server/EventSource.cpp b/src/server/EventSource.cpp index 2711451e..0b7a0a79 100644 --- a/src/server/EventSource.cpp +++ b/src/server/EventSource.cpp @@ -63,6 +63,16 @@ EventSource::push_queued(Event* const ev) whip(); } +/** Prepare all unprepared events. + */ +void +EventSource::prepare_all() +{ + while (unprepared_events()) { + whip(); + } +} + /** Process all events for a cycle. * * Executed events will be pushed to @a dest. diff --git a/src/server/EventSource.hpp b/src/server/EventSource.hpp index 462a2cd8..8a72184e 100644 --- a/src/server/EventSource.hpp +++ b/src/server/EventSource.hpp @@ -35,21 +35,21 @@ class ProcessContext; * (realtime audio thread) and executes them, then they are sent to the * PostProcessor and finalised (post-processing thread). */ -class EventSource : protected Raul::Slave +class EventSource : public Raul::Slave { public: explicit EventSource(); virtual ~EventSource(); + void prepare_all(); void process(PostProcessor& dest, ProcessContext& context, bool limit=true); - bool empty() { return !_head.get(); } + inline bool unprepared_events() const { return (_prepared_back.get() != NULL); } + inline bool empty() const { return !_head.get(); } protected: void push_queued(Event* const ev); - inline bool unprepared_events() { return (_prepared_back.get() != NULL); } - virtual void _whipped(); ///< Prepare 1 event private: diff --git a/src/server/HTTPEngineReceiver.cpp b/src/server/HTTPEngineReceiver.cpp index 15362a80..62882bf2 100644 --- a/src/server/HTTPEngineReceiver.cpp +++ b/src/server/HTTPEngineReceiver.cpp @@ -37,6 +37,7 @@ #include "EventSource.hpp" #include "HTTPClientSender.hpp" #include "HTTPEngineReceiver.hpp" +#include "ServerInterfaceImpl.hpp" #include "ThreadManager.hpp" #define LOG(s) s << "[HTTPEngineReceiver] " @@ -50,8 +51,11 @@ using namespace Serialisation; namespace Server { -HTTPEngineReceiver::HTTPEngineReceiver(Engine& engine, uint16_t port) - : ServerInterfaceImpl(engine) +HTTPEngineReceiver::HTTPEngineReceiver(Engine& engine, + SharedPtr interface, + uint16_t port) + : _engine(engine) + , _interface(interface) , _server(soup_server_new(SOUP_SERVER_PORT, port, NULL)) { _receive_thread = new ReceiveThread(*this); @@ -63,8 +67,8 @@ HTTPEngineReceiver::HTTPEngineReceiver(Engine& engine, uint16_t port) if (!engine.world()->parser() || !engine.world()->serialiser()) engine.world()->load_module("serialisation"); - Thread::set_name("HTTPEngineReceiver"); - start(); + _interface->set_name("HTTPEngineReceiver"); + _interface->start(); _receive_thread->set_name("HTTPEngineReceiver Listener"); _receive_thread->start(); } @@ -72,7 +76,7 @@ HTTPEngineReceiver::HTTPEngineReceiver(Engine& engine, uint16_t port) HTTPEngineReceiver::~HTTPEngineReceiver() { _receive_thread->stop(); - stop(); + _interface->stop(); delete _receive_thread; if (_server) { @@ -81,6 +85,12 @@ HTTPEngineReceiver::~HTTPEngineReceiver() } } +void +HTTPEngineReceiver::ReceiveThread::whip() +{ + _receiver._interface->prepare_all(); +} + void HTTPEngineReceiver::message_callback(SoupServer* server, SoupMessage* msg, @@ -89,7 +99,8 @@ HTTPEngineReceiver::message_callback(SoupServer* server, SoupClientContext* client, void* data) { - HTTPEngineReceiver* me = (HTTPEngineReceiver*)data; + HTTPEngineReceiver* me = (HTTPEngineReceiver*)data; + ServerInterfaceImpl* interface = me->_interface.get(); using namespace Ingen::Shared; @@ -123,7 +134,7 @@ HTTPEngineReceiver::message_callback(SoupServer* server, } else if (msg->method == SOUP_METHOD_GET && path.substr(0, 8) == "/plugins") { // FIXME: kludge #if 0 - me->get("ingen:plugins"); + interface->get("ingen:plugins"); me->_receive_thread->whip(); serialiser->start_to_string("/", base_uri); @@ -143,7 +154,7 @@ HTTPEngineReceiver::message_callback(SoupServer* server, } else if (path.substr(0, 7) == "/stream") { HTTPClientSender* client = new HTTPClientSender(me->_engine); - me->register_client(client); + interface->register_client(client); // Respond with port number of stream for client const int port = client->listen_port(); @@ -204,11 +215,11 @@ HTTPEngineReceiver::message_callback(SoupServer* server, return; } - parser->parse_string(me->_engine.world(), me, msg->request_body->data, base_uri); + parser->parse_string(me->_engine.world(), interface, msg->request_body->data, base_uri); soup_message_set_status(msg, SOUP_STATUS_OK); } else if (msg->method == SOUP_METHOD_DELETE) { - me->del(path); + interface->del(path); soup_message_set_status(msg, SOUP_STATUS_OK); } else { diff --git a/src/server/HTTPEngineReceiver.hpp b/src/server/HTTPEngineReceiver.hpp index f942ee96..67e47ec4 100644 --- a/src/server/HTTPEngineReceiver.hpp +++ b/src/server/HTTPEngineReceiver.hpp @@ -22,7 +22,7 @@ #include -#include "ServerInterfaceImpl.hpp" +#include "raul/Thread.hpp" typedef struct _SoupServer SoupServer; typedef struct _SoupMessage SoupMessage; @@ -31,20 +31,23 @@ typedef struct SoupClientContext SoupClientContext; namespace Ingen { namespace Server { -class HTTPEngineReceiver : public ServerInterfaceImpl +class ServerInterfaceImpl; +class Engine; + +class HTTPEngineReceiver { public: - HTTPEngineReceiver(Engine& engine, uint16_t port); + HTTPEngineReceiver(Engine& engine, + SharedPtr interface, + uint16_t port); + ~HTTPEngineReceiver(); private: struct ReceiveThread : public Raul::Thread { explicit ReceiveThread(HTTPEngineReceiver& receiver) : _receiver(receiver) {} virtual void _run(); - virtual void whip() { - while (_receiver.unprepared_events()) - _receiver.whip(); - } + virtual void whip(); private: HTTPEngineReceiver& _receiver; }; @@ -54,8 +57,10 @@ private: static void message_callback(SoupServer* server, SoupMessage* msg, const char* path, GHashTable *query, SoupClientContext* client, void* data); - ReceiveThread* _receive_thread; - SoupServer* _server; + Engine& _engine; + SharedPtr _interface; + ReceiveThread* _receive_thread; + SoupServer* _server; }; } // namespace Server diff --git a/src/server/OSCEngineReceiver.cpp b/src/server/OSCEngineReceiver.cpp index 4a24989a..cac49524 100644 --- a/src/server/OSCEngineReceiver.cpp +++ b/src/server/OSCEngineReceiver.cpp @@ -36,6 +36,7 @@ #include "Engine.hpp" #include "OSCClientSender.hpp" #include "OSCEngineReceiver.hpp" +#include "ServerInterfaceImpl.hpp" #include "ThreadManager.hpp" #define LOG(s) s << "[OSCEngineReceiver] " @@ -56,8 +57,11 @@ namespace Server { * See the "Client OSC Namespace Documentation" for details.

*/ -OSCEngineReceiver::OSCEngineReceiver(Engine& engine, uint16_t port) - : ServerInterfaceImpl(engine) +OSCEngineReceiver::OSCEngineReceiver(Engine& engine, + SharedPtr interface, + uint16_t port) + : _engine(engine) + , _interface(interface) , _server(NULL) { _receive_thread = new ReceiveThread(*this); @@ -117,8 +121,8 @@ OSCEngineReceiver::OSCEngineReceiver(Engine& engine, uint16_t port) lo_server_add_method(_server, NULL, NULL, unknown_cb, NULL); - Thread::set_name("OSCEngineReceiver"); - start(); + //_interface->set_name("OSCEngineReceiver"); + //_interface->start(); _receive_thread->set_name("OSCEngineReceiver Listener"); _receive_thread->start(); _receive_thread->set_scheduling(SCHED_FIFO, 5); @@ -127,7 +131,7 @@ OSCEngineReceiver::OSCEngineReceiver(Engine& engine, uint16_t port) OSCEngineReceiver::~OSCEngineReceiver() { _receive_thread->stop(); - stop(); + //_interface->stop(); delete _receive_thread; if (_server != NULL) { @@ -156,11 +160,9 @@ OSCEngineReceiver::ReceiveThread::_run() // Enqueue every other message that is here "now" // (would this provide truly atomic bundles?) - while (lo_server_recv_noblock(_receiver._server, 0) > 0) - if (_receiver.unprepared_events()) - _receiver.whip(); + while (lo_server_recv_noblock(_receiver._server, 0) > 0) {} - // No more unprepared events + _receiver._interface->prepare_all();; } } @@ -190,21 +192,21 @@ OSCEngineReceiver::set_response_address_cb(const char* path, const char* types, const lo_address addr = lo_message_get_source(msg); char* const url = lo_address_get_url(addr); - const SharedPtr r = me->_request; + const SharedPtr r = me->_interface->request(); /* Different address than last time, have to do a lookup */ if (!r || !r->client() || strcmp(url, r->client()->uri().c_str())) { ClientInterface* client = me->_engine.broadcaster()->client(url); - if (client) - me->_request = SharedPtr(new Request(me, client, id)); - else - me->_request = SharedPtr(new Request(me)); + Request* req = (client) + ? new Request(client, id) + : new Request(); + me->_interface->set_request(SharedPtr(req)); } if (id != -1) { - me->set_next_response_id(id); + me->_interface->set_next_response_id(id); } else { - me->disable_responses(); + me->_interface->disable_responses(); } free(url); @@ -265,7 +267,7 @@ OSCEngineReceiver::_ping_cb(const char* path, const char* types, lo_arg** argv, int OSCEngineReceiver::_ping_slow_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg) { - ping(); + _interface->ping(); return 0; } @@ -286,7 +288,7 @@ OSCEngineReceiver::_register_client_cb(const char* path, const char* types, lo_a ClientInterface* client = new OSCClientSender( (const char*)url, _engine.world()->conf()->option("packet-size").get_int32()); - register_client(client); + _interface->register_client(client); free(url); return 0; @@ -305,7 +307,7 @@ OSCEngineReceiver::_unregister_client_cb(const char* path, const char* types, lo lo_address addr = lo_message_get_source(msg); char* url = lo_address_get_url(addr); - unregister_client(url); + _interface->unregister_client(url); free(url); return 0; @@ -322,7 +324,7 @@ OSCEngineReceiver::_unregister_client_cb(const char* path, const char* types, lo int OSCEngineReceiver::_get_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg) { - get(&argv[1]->s); + _interface->get(&argv[1]->s); return 0; } @@ -347,7 +349,7 @@ OSCEngineReceiver::_put_cb(const char* path, const char* types, lo_arg** argv, i for (int i = 3; i < argc-1; i += 2) prop.insert(make_pair(&argv[i]->s, AtomLiblo::lo_arg_to_atom(types[i+1], argv[i+1]))); - put(obj_path, prop, Resource::uri_to_graph(ctx)); + _interface->put(obj_path, prop, Resource::uri_to_graph(ctx)); return 0; } @@ -380,7 +382,7 @@ OSCEngineReceiver::_delta_add_cb(const char* path, const char* types, lo_arg** a int OSCEngineReceiver::_delta_end_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg) { - delta(_delta_uri, _delta_remove, _delta_add); + _interface->delta(_delta_uri, _delta_remove, _delta_add); _delta_uri = Raul::URI(); _delta_remove.clear(); _delta_add.clear(); @@ -402,7 +404,7 @@ OSCEngineReceiver::_move_cb(const char* path, const char* types, lo_arg** argv, const char* old_path = &argv[1]->s; const char* new_path = &argv[2]->s; - move(old_path, new_path); + _interface->move(old_path, new_path); return 0; } @@ -419,7 +421,7 @@ OSCEngineReceiver::_del_cb(const char* path, const char* types, lo_arg** argv, i { const char* uri = &argv[1]->s; - del(uri); + _interface->del(uri); return 0; } @@ -438,7 +440,7 @@ OSCEngineReceiver::_connect_cb(const char* path, const char* types, lo_arg** arg const char* src_port_path = &argv[1]->s; const char* dst_port_path = &argv[2]->s; - connect(src_port_path, dst_port_path); + _interface->connect(src_port_path, dst_port_path); return 0; } @@ -457,7 +459,7 @@ OSCEngineReceiver::_disconnect_cb(const char* path, const char* types, lo_arg** const char* src_port_path = &argv[1]->s; const char* dst_port_path = &argv[2]->s; - disconnect(src_port_path, dst_port_path); + _interface->disconnect(src_port_path, dst_port_path); return 0; } @@ -476,7 +478,7 @@ OSCEngineReceiver::_disconnect_all_cb(const char* path, const char* types, lo_ar const char* patch_path = &argv[1]->s; const char* object_path = &argv[2]->s; - disconnect_all(patch_path, object_path); + _interface->disconnect_all(patch_path, object_path); return 0; } @@ -565,7 +567,7 @@ OSCEngineReceiver::_set_property_cb(const char* path, const char* types, lo_arg* Raul::Atom value = Raul::AtomLiblo::lo_arg_to_atom(types[3], argv[3]); - set_property(object_path, key, value); + _interface->set_property(object_path, key, value); return 0; } diff --git a/src/server/OSCEngineReceiver.hpp b/src/server/OSCEngineReceiver.hpp index 4282b0a8..a165afc0 100644 --- a/src/server/OSCEngineReceiver.hpp +++ b/src/server/OSCEngineReceiver.hpp @@ -20,16 +20,16 @@ #include #include -#include "ServerInterfaceImpl.hpp" + #include "Request.hpp" #include "ingen-config.h" namespace Ingen { + namespace Server { -class JackDriver; -class NodeFactory; -class PatchImpl; +class Engine; +class ServerInterfaceImpl; /* Some boilerplate killing macros... */ #define LO_HANDLER_ARGS const char* path, const char* types, lo_arg** argv, int argc, lo_message msg @@ -54,10 +54,13 @@ inline static int name##_cb(LO_HANDLER_ARGS, void* myself)\ * * \ingroup engine */ -class OSCEngineReceiver : public ServerInterfaceImpl +class OSCEngineReceiver { public: - OSCEngineReceiver(Engine& engine, uint16_t port); + OSCEngineReceiver(Engine& engine, + SharedPtr interface, + uint16_t port); + ~OSCEngineReceiver(); private: @@ -116,7 +119,9 @@ private: LO_HANDLER(set_property); LO_HANDLER(property_set); - lo_server _server; + Engine& _engine; + SharedPtr _interface; + lo_server _server; }; } // namespace Server diff --git a/src/server/Request.hpp b/src/server/Request.hpp index da3ebece..0e0b60b8 100644 --- a/src/server/Request.hpp +++ b/src/server/Request.hpp @@ -38,15 +38,12 @@ namespace Server { class Request { public: - Request(EventSource* source=0, - ClientInterface* client=0, + Request(ClientInterface* client=0, int32_t id=1) - : _source(source) - , _client(client) + : _client(client) , _id(id) {} - EventSource* source() { return _source; } int32_t id() const { return _id; } void set_id(int32_t id) { _id = id; } @@ -64,7 +61,6 @@ public: } private: - EventSource* _source; ClientInterface* _client; int32_t _id; }; diff --git a/src/server/ServerInterfaceImpl.cpp b/src/server/ServerInterfaceImpl.cpp index 1109a388..fcf9cc00 100644 --- a/src/server/ServerInterfaceImpl.cpp +++ b/src/server/ServerInterfaceImpl.cpp @@ -35,7 +35,7 @@ namespace Server { ServerInterfaceImpl::ServerInterfaceImpl(Engine& engine) : EventSource() - , _request(new Request(this, NULL, 0)) + , _request(new Request(NULL, 0)) , _engine(engine) , _in_bundle(false) { @@ -80,7 +80,7 @@ ServerInterfaceImpl::register_client(ClientInterface* client) { push_queued(new Events::RegisterClient(_engine, _request, now(), client->uri(), client)); if (!_request) { - _request = SharedPtr(new Request(this, client, 1)); + _request = SharedPtr(new Request(client, 1)); } else { _request->set_id(1); _request->set_client(client); @@ -115,23 +115,23 @@ ServerInterfaceImpl::bundle_end() void ServerInterfaceImpl::put(const URI& uri, - const Resource::Properties& properties, - const Resource::Graph ctx) + const Resource::Properties& properties, + const Resource::Graph ctx) { push_queued(new Events::SetMetadata(_engine, _request, now(), true, ctx, uri, properties)); } void ServerInterfaceImpl::delta(const URI& uri, - const Resource::Properties& remove, - const Resource::Properties& add) + const Resource::Properties& remove, + const Resource::Properties& add) { push_queued(new Events::SetMetadata(_engine, _request, now(), false, Resource::DEFAULT, uri, add, remove)); } void ServerInterfaceImpl::move(const Path& old_path, - const Path& new_path) + const Path& new_path) { push_queued(new Events::Move(_engine, _request, now(), old_path, new_path)); } @@ -149,7 +149,7 @@ ServerInterfaceImpl::del(const URI& uri) void ServerInterfaceImpl::connect(const Path& src_port_path, - const Path& dst_port_path) + const Path& dst_port_path) { push_queued(new Events::Connect(_engine, _request, now(), src_port_path, dst_port_path)); @@ -157,7 +157,7 @@ ServerInterfaceImpl::connect(const Path& src_port_path, void ServerInterfaceImpl::disconnect(const URI& src, - const URI& dst) + const URI& dst) { if (!Path::is_path(src) && !Path::is_path(dst)) { std::cerr << "Bad disconnect request " << src << " => " << dst << std::endl; @@ -170,15 +170,15 @@ ServerInterfaceImpl::disconnect(const URI& src, void ServerInterfaceImpl::disconnect_all(const Path& patch_path, - const Path& path) + const Path& path) { push_queued(new Events::DisconnectAll(_engine, _request, now(), patch_path, path)); } void ServerInterfaceImpl::set_property(const URI& uri, - const URI& predicate, - const Atom& value) + const URI& predicate, + const Atom& value) { if (uri == "ingen:engine" && predicate == "ingen:enabled" && value.type() == Atom::BOOL) { diff --git a/src/server/ServerInterfaceImpl.hpp b/src/server/ServerInterfaceImpl.hpp index 6c65dcb9..1afbafce 100644 --- a/src/server/ServerInterfaceImpl.hpp +++ b/src/server/ServerInterfaceImpl.hpp @@ -45,7 +45,7 @@ class Engine; * are successful. */ class ServerInterfaceImpl : public EventSource, - public ServerInterface + public ServerInterface { public: ServerInterfaceImpl(Engine& engine); @@ -97,9 +97,12 @@ public: virtual void ping(); virtual void get(const Raul::URI& uri); -protected: + SharedPtr request() { return _request; } + void set_request(SharedPtr r) { _request = r; } + virtual void disable_responses(); +protected: SharedPtr _request; ///< NULL if responding disabled Engine& _engine; bool _in_bundle; ///< True iff a bundle is currently being received diff --git a/src/server/events/Delete.cpp b/src/server/events/Delete.cpp index 0b975298..d0fbb5fa 100644 --- a/src/server/events/Delete.cpp +++ b/src/server/events/Delete.cpp @@ -53,7 +53,6 @@ Delete::Delete(Engine& engine, , _lock(engine.engine_store()->lock(), Glib::NOT_LOCK) { assert(request); - assert(request->source()); if (Raul::Path::is_path(uri)) _path = Raul::Path(uri.str()); diff --git a/src/server/events/SetMetadata.cpp b/src/server/events/SetMetadata.cpp index a304d245..4133803b 100644 --- a/src/server/events/SetMetadata.cpp +++ b/src/server/events/SetMetadata.cpp @@ -126,7 +126,7 @@ SetMetadata::pre_process() Shared::ResourceImpl::type(uris, _properties, is_patch, is_node, is_port, is_output); // Create a separate request without a source so EventSource isn't unblocked twice - SharedPtr sub_request(new Request(NULL, _request->client(), _request->id())); + SharedPtr sub_request(new Request(_request->client(), _request->id())); if (is_patch) { uint32_t poly = 1; diff --git a/src/server/ingen_http.cpp b/src/server/ingen_http.cpp index dabd7360..c0bc0ce4 100644 --- a/src/server/ingen_http.cpp +++ b/src/server/ingen_http.cpp @@ -18,6 +18,7 @@ #include "ingen/shared/Module.hpp" #include "ingen/shared/World.hpp" #include "HTTPEngineReceiver.hpp" +#include "ServerInterfaceImpl.hpp" #include "Engine.hpp" using namespace std; @@ -26,12 +27,18 @@ using namespace Ingen; struct IngenHTTPModule : public Ingen::Shared::Module { void load(Ingen::Shared::World* world) { Server::Engine* engine = (Server::Engine*)world->local_engine().get(); - SharedPtr interface( + SharedPtr interface( + new Server::ServerInterfaceImpl(*engine)); + + receiver = SharedPtr( new Server::HTTPEngineReceiver( *engine, + interface, world->conf()->option("engine-port").get_int32())); engine->add_event_source(interface); } + + SharedPtr receiver; }; extern "C" { diff --git a/src/server/ingen_osc.cpp b/src/server/ingen_osc.cpp index 5b0ba285..024a93ce 100644 --- a/src/server/ingen_osc.cpp +++ b/src/server/ingen_osc.cpp @@ -18,6 +18,7 @@ #include "ingen/shared/Module.hpp" #include "ingen/shared/World.hpp" #include "OSCEngineReceiver.hpp" +#include "ServerInterfaceImpl.hpp" #include "Engine.hpp" using namespace std; @@ -26,12 +27,17 @@ using namespace Ingen; struct IngenOSCModule : public Ingen::Shared::Module { void load(Ingen::Shared::World* world) { Server::Engine* engine = (Server::Engine*)world->local_engine().get(); - SharedPtr interface( + SharedPtr interface( + new Server::ServerInterfaceImpl(*engine)); + receiver = SharedPtr( new Server::OSCEngineReceiver( *engine, + interface, world->conf()->option("engine-port").get_int32())); engine->add_event_source(interface); } + + SharedPtr receiver; }; extern "C" { -- cgit v1.2.1