From 24d998447070dbfef3eaf7762dce7e97c3903801 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sun, 16 Nov 2008 02:49:22 +0000 Subject: TCP notification stream support (not fully implemented yet, but transport stuff is working). Support multiple event sources in the engine. Clean up HTTP/TCP stuff. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@1721 a436a847-0d15-0410-975c-d299462d15a1 --- src/client/ClientStore.cpp | 8 +- src/client/ClientStore.hpp | 2 +- src/client/HTTPClientReceiver.cpp | 101 +++++++++++++++++++------ src/client/HTTPClientReceiver.hpp | 6 +- src/client/OSCClientReceiver.cpp | 6 +- src/client/OSCClientReceiver.hpp | 2 +- src/client/SigClientInterface.hpp | 6 +- src/client/ThreadedSigClientInterface.hpp | 8 +- src/common/interface/ClientInterface.hpp | 2 +- src/engine/ClientBroadcaster.cpp | 4 +- src/engine/ClientBroadcaster.hpp | 2 +- src/engine/Engine.cpp | 21 +++--- src/engine/Engine.hpp | 10 ++- src/engine/HTTPClientSender.cpp | 6 +- src/engine/HTTPClientSender.hpp | 8 +- src/engine/HTTPEngineReceiver.cpp | 14 ++-- src/engine/OSCClientSender.cpp | 8 +- src/engine/OSCClientSender.hpp | 2 +- src/engine/events/SendPortActivityEvent.cpp | 2 +- src/gui/ConnectWindow.cpp | 2 +- src/ingen/main.cpp | 9 ++- src/shared/HTTPSender.cpp | 111 ++++++++++++++++++++++++---- src/shared/HTTPSender.hpp | 21 ++++-- 23 files changed, 252 insertions(+), 109 deletions(-) (limited to 'src') diff --git a/src/client/ClientStore.cpp b/src/client/ClientStore.cpp index fbcf5929..cf9e06a0 100644 --- a/src/client/ClientStore.cpp +++ b/src/client/ClientStore.cpp @@ -55,7 +55,7 @@ ClientStore::ClientStore(SharedPtr engine, SharedPtrsignal_property_change.connect(sigc::mem_fun(this, &ClientStore::set_property)); emitter->signal_port_value.connect(sigc::mem_fun(this, &ClientStore::set_port_value)); emitter->signal_voice_value.connect(sigc::mem_fun(this, &ClientStore::set_voice_value)); - emitter->signal_port_activity.connect(sigc::mem_fun(this, &ClientStore::port_activity)); + emitter->signal_activity.connect(sigc::mem_fun(this, &ClientStore::activity)); } @@ -557,13 +557,13 @@ ClientStore::set_voice_value(const string& port_path, uint32_t voice, const Raul void -ClientStore::port_activity(const Path& port_path) +ClientStore::activity(const Path& path) { - SharedPtr port = PtrCast(object(port_path)); + SharedPtr port = PtrCast(object(path)); if (port) port->signal_activity.emit(); else - cerr << "ERROR: activity for nonexistant port " << port_path << endl; + cerr << "ERROR: activity for nonexistant port " << path << endl; } diff --git a/src/client/ClientStore.hpp b/src/client/ClientStore.hpp index 291f9af9..944ab752 100644 --- a/src/client/ClientStore.hpp +++ b/src/client/ClientStore.hpp @@ -118,7 +118,7 @@ private: // Slots for SigClientInterface signals void rename(const Path& old_path, const Path& new_path); void patch_cleared(const Path& path); - void port_activity(const Path& port_path); + void activity(const Path& path); bool attempt_connection(const Path& src_port_path, const Path& dst_port_path, bool add_orphan=false); diff --git a/src/client/HTTPClientReceiver.cpp b/src/client/HTTPClientReceiver.cpp index 9116f853..572ff548 100644 --- a/src/client/HTTPClientReceiver.cpp +++ b/src/client/HTTPClientReceiver.cpp @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include "module/Module.hpp" #include "HTTPClientReceiver.hpp" @@ -49,33 +51,73 @@ HTTPClientReceiver::~HTTPClientReceiver() } +HTTPClientReceiver::Listener::~Listener() +{ + close(_sock); +} + +HTTPClientReceiver::Listener::Listener(SoupSession* session, const std::string uri) + : _uri(uri) + , _session(session) +{ + string port_str = uri.substr(uri.find_last_of(":")+1); + int port = atoi(port_str.c_str()); + + cout << "HTTP listen URI: " << uri << " port: " << port << endl; + + struct sockaddr_in servaddr; + + // Create listen address + memset(&servaddr, 0, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(port); + + // Create listen socket + if ((_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + cerr << "Error creating listening socket: %s" << strerror(errno) << endl; + _sock = -1; + return; + } + + // Set remote address (FIXME: always localhost) + if (inet_aton("127.0.0.1", &servaddr.sin_addr) <= 0) { + cerr << "Invalid remote IP address" << endl; + _sock = -1; + return; + } + + // Connect to server + if (connect(_sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) { + cerr << "Error calling connect: " << strerror(errno) << endl; + _sock = -1; + return; + } +} + + void HTTPClientReceiver::Listener::_run() { -#if 0 - cout << "LISTENER RUN" << endl; - /*const string uri = "http://localhost:16180"; - SoupMessage* msg = soup_message_new("GET", (uri + "/stream").c_str()); - soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED); - soup_session_send_message(_session, msg);*/ - - size_t offset = 0; - soup_message_body_set_accumulate(_msg->response_body, false); + char in = '\0'; + char last = '\0'; + string recv = ""; + while (true) { - SoupBuffer* chunk = soup_message_body_get_chunk(_msg->response_body, offset); - if (chunk == NULL) { - //cout << "WAITING FOR DATA" << endl; - } else if (chunk->length == 0) { - cout << "CHUNKED TRANSFER COMPLETED" << endl; - break; - } else { - cout << "RECEIVED CHUNK: " << (char*)chunk->data << endl; - offset += chunk->length; + while (read(_sock, &in, 1) > 0 ) { + recv += in; + if (last == '\n' && in == '\n') { + if (recv != "") { + cout << "RECEIVED UPDATE:\n" << recv << endl; + recv = ""; + last = '\0'; + } + break; + } + last = in; } } - cout << "LISTENER FINISHED" << endl; -#endif + cout << "HTTP listener finished" << endl; } @@ -84,10 +126,10 @@ HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, voi { HTTPClientReceiver* me = (HTTPClientReceiver*)ptr; const string path = soup_message_get_uri(msg)->path; - cout << "MESSAGE: " << path << endl; if (path == "/") { me->_target->response_ok(0); me->_target->enable(); + } else if (path == "/plugins") { if (msg->response_body->data == NULL) { cout << "ERROR: Empty response" << endl; @@ -98,6 +140,7 @@ HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, voi Glib::ustring(msg->response_body->data), Glib::ustring("."), Glib::ustring("")); } + } else if (path == "/patch") { if (msg->response_body->data == NULL) { cout << "ERROR: Empty response" << endl; @@ -108,10 +151,19 @@ HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, voi Glib::ustring(msg->response_body->data), Glib::ustring("/patch/"), Glib::ustring("")); } + } else if (path == "/stream") { - cout << "STREAM" << endl; - //me->_listener = boost::shared_ptr(new Listener(me->_session, msg)); - //me->_listener->start(); + if (msg->response_body->data == NULL) { + cout << "ERROR: Empty response" << endl; + } else { + string uri = string(soup_uri_to_string(soup_message_get_uri(msg), false)); + uri = uri.substr(0, uri.find_last_of(":")); + uri += string(":") + msg->response_body->data; + cout << "Stream URI: " << uri << endl; + me->_listener = boost::shared_ptr(new Listener(me->_session, uri)); + me->_listener->start(); + } + } else { cerr << "UNKNOWN MESSAGE: " << path << endl; } @@ -163,3 +215,4 @@ HTTPClientReceiver::stop() } // namespace Client } // namespace Ingen + diff --git a/src/client/HTTPClientReceiver.hpp b/src/client/HTTPClientReceiver.hpp index 379ffe2d..015a551f 100644 --- a/src/client/HTTPClientReceiver.hpp +++ b/src/client/HTTPClientReceiver.hpp @@ -50,11 +50,13 @@ private: class Listener : public Raul::Thread { public: - Listener(SoupSession* session, SoupMessage* msg) : _session(session), _msg(msg) {} + Listener(SoupSession* session, const std::string uri); + ~Listener(); void _run(); private: + std::string _uri; + int _sock; SoupSession* _session; - SoupMessage* _msg; }; friend class Listener; diff --git a/src/client/OSCClientReceiver.cpp b/src/client/OSCClientReceiver.cpp index fa191206..bc8659df 100644 --- a/src/client/OSCClientReceiver.cpp +++ b/src/client/OSCClientReceiver.cpp @@ -155,7 +155,7 @@ OSCClientReceiver::setup_callbacks() lo_server_thread_add_method(_st, "/ingen/set_property", NULL, set_property_cb, this); lo_server_thread_add_method(_st, "/ingen/set_port_value", "sf", set_port_value_cb, this); lo_server_thread_add_method(_st, "/ingen/set_voice_value", "sif", set_voice_value_cb, this); - lo_server_thread_add_method(_st, "/ingen/port_activity", "s", port_activity_cb, this); + lo_server_thread_add_method(_st, "/ingen/activity", "s", activity_cb, this); lo_server_thread_add_method(_st, "/ingen/program_add", "siis", program_add_cb, this); lo_server_thread_add_method(_st, "/ingen/program_remove", "sii", program_remove_cb, this); } @@ -321,11 +321,11 @@ OSCClientReceiver::_set_voice_value_cb(const char* path, const char* types, lo_a int -OSCClientReceiver::_port_activity_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg) +OSCClientReceiver::_activity_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg) { const char* const port_path = &argv[0]->s; - _target->port_activity(port_path); + _target->activity(port_path); return 0; } diff --git a/src/client/OSCClientReceiver.hpp b/src/client/OSCClientReceiver.hpp index ea5871b3..9203f096 100644 --- a/src/client/OSCClientReceiver.hpp +++ b/src/client/OSCClientReceiver.hpp @@ -95,7 +95,7 @@ private: LO_HANDLER(set_property); LO_HANDLER(set_port_value); LO_HANDLER(set_voice_value); - LO_HANDLER(port_activity); + LO_HANDLER(activity); LO_HANDLER(program_add); LO_HANDLER(program_remove); }; diff --git a/src/client/SigClientInterface.hpp b/src/client/SigClientInterface.hpp index 63586832..36ca44b9 100644 --- a/src/client/SigClientInterface.hpp +++ b/src/client/SigClientInterface.hpp @@ -66,7 +66,7 @@ public: sigc::signal signal_property_change; sigc::signal signal_port_value; sigc::signal signal_voice_value; - sigc::signal signal_port_activity; + sigc::signal signal_activity; sigc::signal signal_program_add; sigc::signal signal_program_remove; @@ -139,8 +139,8 @@ protected: void set_voice_value(const string& port_path, uint32_t voice, const Raul::Atom& value) { if (_enabled) signal_voice_value.emit(port_path, voice, value); } - void port_activity(const string& port_path) - { if (_enabled) signal_port_activity.emit(port_path); } + void activity(const string& port_path) + { if (_enabled) signal_activity.emit(port_path); } void program_add(const string& path, uint32_t bank, uint32_t program, const string& name) { if (_enabled) signal_program_add.emit(path, bank, program, name); } diff --git a/src/client/ThreadedSigClientInterface.hpp b/src/client/ThreadedSigClientInterface.hpp index f31ba37b..968954bc 100644 --- a/src/client/ThreadedSigClientInterface.hpp +++ b/src/client/ThreadedSigClientInterface.hpp @@ -64,7 +64,7 @@ public: , variable_change_slot(signal_variable_change.make_slot()) , property_change_slot(signal_property_change.make_slot()) , port_value_slot(signal_port_value.make_slot()) - , port_activity_slot(signal_port_activity.make_slot()) + , activity_slot(signal_activity.make_slot()) , program_add_slot(signal_program_add.make_slot()) , program_remove_slot(signal_program_remove.make_slot()) { @@ -133,8 +133,8 @@ public: void set_voice_value(const string& port_path, uint32_t voice, const Raul::Atom& value) { push_sig(sigc::bind(voice_value_slot, port_path, voice, value)); } - void port_activity(const string& port_path) - { push_sig(sigc::bind(port_activity_slot, port_path)); } + void activity(const string& port_path) + { push_sig(sigc::bind(activity_slot, port_path)); } void program_add(const string& path, uint32_t bank, uint32_t program, const string& name) { push_sig(sigc::bind(program_add_slot, path, bank, program, name)); } @@ -172,7 +172,7 @@ private: sigc::slot property_change_slot; sigc::slot port_value_slot; sigc::slot voice_value_slot; - sigc::slot port_activity_slot; + sigc::slot activity_slot; sigc::slot program_add_slot; sigc::slot program_remove_slot; }; diff --git a/src/common/interface/ClientInterface.hpp b/src/common/interface/ClientInterface.hpp index 07c00fd7..d904ea9d 100644 --- a/src/common/interface/ClientInterface.hpp +++ b/src/common/interface/ClientInterface.hpp @@ -72,7 +72,7 @@ public: virtual void object_renamed(const std::string& old_path, const std::string& new_path) = 0; - virtual void port_activity(const std::string& port_path) = 0; + virtual void activity(const std::string& path) = 0; virtual void program_add(const std::string& node_path, uint32_t bank, diff --git a/src/engine/ClientBroadcaster.cpp b/src/engine/ClientBroadcaster.cpp index f13743f6..19cb0cd4 100644 --- a/src/engine/ClientBroadcaster.cpp +++ b/src/engine/ClientBroadcaster.cpp @@ -222,10 +222,10 @@ ClientBroadcaster::send_port_value(const string& port_path, const Raul::Atom& va void -ClientBroadcaster::send_port_activity(const string& port_path) +ClientBroadcaster::send_activity(const string& path) { for (Clients::const_iterator i = _clients.begin(); i != _clients.end(); ++i) - (*i).second->port_activity(port_path); + (*i).second->activity(path); } diff --git a/src/engine/ClientBroadcaster.hpp b/src/engine/ClientBroadcaster.hpp index 8f9afca3..2de4b1b9 100644 --- a/src/engine/ClientBroadcaster.hpp +++ b/src/engine/ClientBroadcaster.hpp @@ -77,7 +77,7 @@ public: void send_variable_change(const string& node_path, const string& key, const Raul::Atom& value); void send_property_change(const string& node_path, const string& key, const Raul::Atom& value); void send_port_value(const string& port_path, const Raul::Atom& value); - void send_port_activity(const string& port_path); + void send_activity(const string& path); void send_program_add(const string& node_path, int bank, int program, const string& name); void send_program_remove(const string& node_path, int bank, int program); diff --git a/src/engine/Engine.cpp b/src/engine/Engine.cpp index d221b4f9..eb9e2aa8 100644 --- a/src/engine/Engine.cpp +++ b/src/engine/Engine.cpp @@ -165,12 +165,9 @@ Engine::main_iteration() void -Engine::set_event_source(SharedPtr source) +Engine::add_event_source(SharedPtr source) { - if (_event_source) - cerr << "Warning: Dropped event source (engine interface)" << endl; - - _event_source = source; + _event_sources.insert(source); } @@ -192,8 +189,8 @@ Engine::activate(size_t parallelism) if (!_midi_driver) _midi_driver = new DummyMidiDriver(); - if (_event_source) - _event_source->activate(); + for (EventSources::iterator i = _event_sources.begin(); i != _event_sources.end(); ++i) + (*i)->activate(); // Create root patch @@ -229,8 +226,8 @@ Engine::deactivate() if (!_activated) return; - if (_event_source) - _event_source->deactivate(); + for (EventSources::iterator i = _event_sources.begin(); i != _event_sources.end(); ++i) + (*i)->deactivate(); /*for (Tree::iterator i = _engine_store->objects().begin(); i != _engine_store->objects().end(); ++i) @@ -257,7 +254,7 @@ Engine::deactivate() _post_processor->process(); _audio_driver.reset(); - _event_source.reset(); + _event_sources.clear(); _activated = false; } @@ -266,8 +263,8 @@ Engine::deactivate() void Engine::process_events(ProcessContext& context) { - if (_event_source) - _event_source->process(*_post_processor, context); + for (EventSources::iterator i = _event_sources.begin(); i != _event_sources.end(); ++i) + (*i)->process(*_post_processor, context); } diff --git a/src/engine/Engine.hpp b/src/engine/Engine.hpp index 8edf37c3..261f135f 100644 --- a/src/engine/Engine.hpp +++ b/src/engine/Engine.hpp @@ -21,6 +21,7 @@ #include "config.h" #include #include +#include #include #include "raul/SharedPtr.hpp" #include "module/global.hpp" @@ -81,7 +82,6 @@ public: virtual bool activated() { return _activated; } Raul::Maid* maid() const { return _maid; } - EventSource* event_source() const { return _event_source.get(); } AudioDriver* audio_driver() const { return _audio_driver.get(); } MidiDriver* midi_driver() const { return _midi_driver; } OSCDriver* osc_driver() const { return _osc_driver; } @@ -97,10 +97,10 @@ public: /** Set the driver for the given data type (replacing the old) */ virtual void set_driver(DataType type, SharedPtr driver); - - virtual void set_event_source(SharedPtr source); virtual void set_midi_driver(MidiDriver* driver); + virtual void add_event_source(SharedPtr source); + Ingen::Shared::World* world() { return _world; } typedef std::vector ProcessSlaves; @@ -108,9 +108,11 @@ public: inline ProcessSlaves& process_slaves() { return _process_slaves; } private: + typedef std::set< SharedPtr > EventSources; + EventSources _event_sources; + ProcessSlaves _process_slaves; Ingen::Shared::World* _world; - SharedPtr _event_source; SharedPtr _audio_driver; MidiDriver* _midi_driver; OSCDriver* _osc_driver; diff --git a/src/engine/HTTPClientSender.cpp b/src/engine/HTTPClientSender.cpp index ae97e1ca..ec60cb44 100644 --- a/src/engine/HTTPClientSender.cpp +++ b/src/engine/HTTPClientSender.cpp @@ -135,9 +135,9 @@ HTTPClientSender::set_voice_value(const std::string& port_path, uint32_t voice, void -HTTPClientSender::port_activity(const std::string& port_path) +HTTPClientSender::activity(const std::string& path) { - //lo_send(_address, "/ingen/port_activity", "s", port_path.c_str(), LO_ARGS_END); + //lo_send(_address, "/ingen/activity", "s", port_path.c_str(), LO_ARGS_END); } @@ -159,9 +159,7 @@ HTTPClientSender::new_plugin(const std::string& uri, void HTTPClientSender::new_patch(const std::string& path, uint32_t poly) { - cout << "HTTP NEW PATCH" << endl; send_chunk(string("<").append(path).append("> a ingen:Patch")); - //send("/ingen/new_patch", "si", path.c_str(), poly, LO_ARGS_END); } diff --git a/src/engine/HTTPClientSender.hpp b/src/engine/HTTPClientSender.hpp index 8e4f3d33..57aaed0e 100644 --- a/src/engine/HTTPClientSender.hpp +++ b/src/engine/HTTPClientSender.hpp @@ -41,12 +41,10 @@ namespace Shared { class EngineInterface; } */ class HTTPClientSender : public Shared::ClientInterface - , public Raul::Thread , public Shared::HTTPSender { public: - HTTPClientSender(SoupServer* s, SoupMessage* m) - : Shared::HTTPSender(s, m) + HTTPClientSender() {} bool enabled() const { return _enabled; } @@ -58,7 +56,7 @@ public: void bundle_end() { HTTPSender::bundle_end(); } void transfer_begin() { HTTPSender::transfer_begin(); } void transfer_end() { HTTPSender::transfer_end(); } - + std::string uri() const { return "http://example.org/"; } void subscribe(Shared::EngineInterface* engine) { } @@ -115,7 +113,7 @@ public: uint32_t voice, const Raul::Atom& value); - virtual void port_activity(const std::string& port_path); + virtual void activity(const std::string& path); virtual void program_add(const std::string& node_path, uint32_t bank, diff --git a/src/engine/HTTPEngineReceiver.cpp b/src/engine/HTTPEngineReceiver.cpp index 9b6b6fb9..82bebc0c 100644 --- a/src/engine/HTTPEngineReceiver.cpp +++ b/src/engine/HTTPEngineReceiver.cpp @@ -148,16 +148,20 @@ HTTPEngineReceiver::message_callback(SoupServer* server, SoupMessage* msg, const return; } else if (path.substr(0, 6) == "/patch") { path = '/' + path.substr(6); + } else if (path.substr(0, 7) == "/stream") { - cout << "REGISTERING CLIENT" << endl; - // FIXME: memory leak - ClientInterface* client = new HTTPClientSender(me->_server, msg); - soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED); + HTTPClientSender* client = new HTTPClientSender(); me->register_client(client); + + // Respond with port number of stream for client + const int port = client->listen_port(); + char buf[32]; + snprintf(buf, 32, "%d", port); + soup_message_set_status(msg, SOUP_STATUS_OK); + soup_message_set_response(msg, mime_type, SOUP_MEMORY_COPY, buf, strlen(buf)); return; } else { - cout << "UNKNOWN PATH: " << path << endl; soup_message_set_status(msg, SOUP_STATUS_NOT_FOUND); soup_message_set_response(msg, "text/plain", SOUP_MEMORY_STATIC, "Unknown path\n\n", 14); diff --git a/src/engine/OSCClientSender.cpp b/src/engine/OSCClientSender.cpp index d9893241..aa99c1d8 100644 --- a/src/engine/OSCClientSender.cpp +++ b/src/engine/OSCClientSender.cpp @@ -278,16 +278,16 @@ OSCClientSender::set_voice_value(const std::string& port_path, uint32_t voice, c /** \page client_osc_namespace - *

\b /ingen/port_activity - Notification of activity for a port (e.g. MIDI messages) - * \arg \b path (string) - Path of port

\n \n + *

\b /ingen/activity - Notification of "activity" (e.g. port message blinkenlights) + * \arg \b path (string) - Path of object

\n \n */ void -OSCClientSender::port_activity(const std::string& port_path) +OSCClientSender::activity(const std::string& path) { if (!_enabled) return; - lo_send(_address, "/ingen/port_activity", "s", port_path.c_str(), LO_ARGS_END); + lo_send(_address, "/ingen/activity", "s", path.c_str(), LO_ARGS_END); } diff --git a/src/engine/OSCClientSender.hpp b/src/engine/OSCClientSender.hpp index 3de967ab..879484c8 100644 --- a/src/engine/OSCClientSender.hpp +++ b/src/engine/OSCClientSender.hpp @@ -114,7 +114,7 @@ public: uint32_t voice, const Raul::Atom& value); - virtual void port_activity(const std::string& port_path); + virtual void activity(const std::string& path); virtual void program_add(const std::string& node_path, uint32_t bank, diff --git a/src/engine/events/SendPortActivityEvent.cpp b/src/engine/events/SendPortActivityEvent.cpp index 0ab3abdd..3a408d8d 100644 --- a/src/engine/events/SendPortActivityEvent.cpp +++ b/src/engine/events/SendPortActivityEvent.cpp @@ -26,7 +26,7 @@ namespace Ingen { void SendPortActivityEvent::post_process() { - _engine.broadcaster()->send_port_activity(_port->path()); + _engine.broadcaster()->send_activity(_port->path()); } diff --git a/src/gui/ConnectWindow.cpp b/src/gui/ConnectWindow.cpp index e52e83ac..a9221c04 100644 --- a/src/gui/ConnectWindow.cpp +++ b/src/gui/ConnectWindow.cpp @@ -223,7 +223,7 @@ ConnectWindow::connect(bool existing) new QueuedEngineInterface(*world->local_engine, Ingen::event_queue_size, Ingen::event_queue_size)); world->engine = interface; - world->local_engine->set_event_source(interface); + world->local_engine->add_event_source(interface); } SharedPtr client(new SigClientInterface()); diff --git a/src/ingen/main.cpp b/src/ingen/main.cpp index 8d155c5c..4c73f1de 100644 --- a/src/ingen/main.cpp +++ b/src/ingen/main.cpp @@ -137,7 +137,7 @@ main(int argc, char** argv) Ingen::QueuedEngineInterface* (*new_interface)(Ingen::Engine& engine); if (engine_osc_module->get_symbol("new_queued_interface", (void*&)new_interface)) { SharedPtr interface(new_interface(*engine)); - world->local_engine->set_event_source(interface); + world->local_engine->add_event_source(interface); engine_interface = interface; world->engine = engine_interface; } @@ -149,7 +149,7 @@ main(int argc, char** argv) if (engine_osc_module->get_symbol("new_osc_receiver", (void*&)new_receiver)) { SharedPtr source(new_receiver(*engine, pre_processor_queue_size, args.engine_port_arg)); - world->local_engine->set_event_source(source); + world->local_engine->add_event_source(source); } } #endif @@ -158,8 +158,9 @@ main(int argc, char** argv) // FIXE: leak Ingen::HTTPEngineReceiver* (*new_receiver)(Ingen::Engine& engine, uint16_t port); if (engine_http_module->get_symbol("new_http_receiver", (void*&)new_receiver)) { - HTTPEngineReceiver* receiver = new_receiver( - *world->local_engine, args.engine_port_arg); + boost::shared_ptr receiver(new_receiver( + *world->local_engine, args.engine_port_arg)); + world->local_engine->add_event_source(receiver); receiver->activate(); } } diff --git a/src/shared/HTTPSender.cpp b/src/shared/HTTPSender.cpp index 20135b2a..7f760786 100644 --- a/src/shared/HTTPSender.cpp +++ b/src/shared/HTTPSender.cpp @@ -20,6 +20,12 @@ #include #include #include +#include +#include +#include +#include + +#include using namespace std; @@ -27,53 +33,126 @@ namespace Ingen { namespace Shared { -HTTPSender::HTTPSender(SoupServer* server, SoupMessage* msg) - : _server(server) - , _msg(msg) +HTTPSender::HTTPSender() + : _listen_port(-1) + , _listen_sock(-1) + , _client_sock(-1) + , _send_state(Immediate) { - soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED); - cout << "Hello?" << endl; - send_chunk("hello"); + Thread::set_name("HTTP Sender"); + + struct sockaddr_in addr; + + // Create listen address + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + + // Create listen socket + if ((_listen_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) { + fprintf(stderr, "Error creating listening socket (%s)\n", strerror(errno)); + exit(EXIT_FAILURE); + } + + // Bind our socket addresss to the listening socket + if (bind(_listen_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + fprintf(stderr, "Error calling bind (%s)\n", strerror(errno)); + _listen_sock = -1; + } + + // Find port number + socklen_t length = sizeof(addr); + if (getsockname(_listen_sock, (struct sockaddr*)&addr, &length) == -1) { + fprintf(stderr, "Error calling getsockname (%s)\n", strerror(errno)); + _listen_sock = -1; + return; + } + + if (listen(_listen_sock, 1) < 0 ) { + cerr << "Error calling listen: %s" << strerror(errno) << endl; + _listen_sock = -1; + return; + } + + _listen_port = ntohs(addr.sin_port); + cout << "Opening event stream on TCP port " << _listen_port << endl; + start(); } HTTPSender::~HTTPSender() { - cout << "HTTP SENDER EXIT" << endl; - soup_message_body_complete(_msg->response_body); - soup_server_unpause_message(_server, _msg); + stop(); + if (_listen_sock != -1) + close(_listen_sock); + if (_client_sock != -1) + close(_client_sock); +} + +void +HTTPSender::_run() +{ + if (_listen_sock == -1) { + cerr << "Unable to open socket, exiting sender thread" << endl; + return; + } + + // Accept connection + if ((_client_sock = accept(_listen_sock, NULL, NULL) ) < 0) { + cerr << "Error calling accept: " << strerror(errno) << endl; + return; + } + + // Hold connection open and write when signalled + while (true) { + _mutex.lock(); + _signal.wait(_mutex); + + write(_client_sock, _transfer.c_str(), _transfer.length()); + write(_client_sock, "\n\n", 2); + + _mutex.unlock(); + } + + close(_listen_sock); + _listen_sock = -1; } void HTTPSender::bundle_begin() { + _mutex.lock(); _send_state = SendingBundle; + _transfer = ""; + _mutex.unlock(); } void HTTPSender::bundle_end() { + _mutex.lock(); assert(_send_state == SendingBundle); - soup_message_body_append(_msg->response_body, - SOUP_MEMORY_TEMPORARY, _transfer.c_str(), _transfer.length()); - soup_server_unpause_message(_server, _msg); - _transfer = ""; + _signal.broadcast(); _send_state = Immediate; + _mutex.unlock(); } void HTTPSender::send_chunk(const std::string& buf) { + _mutex.lock(); + if (_send_state == Immediate) { - soup_message_body_append(_msg->response_body, - SOUP_MEMORY_TEMPORARY, buf.c_str(), buf.length()); - soup_server_unpause_message(_server, _msg); + _transfer = ""; + _signal.broadcast(); } else { _transfer.append(buf); } + + _mutex.unlock(); } diff --git a/src/shared/HTTPSender.hpp b/src/shared/HTTPSender.hpp index 1025c071..1077b76d 100644 --- a/src/shared/HTTPSender.hpp +++ b/src/shared/HTTPSender.hpp @@ -20,14 +20,15 @@ #include #include -#include +#include +#include "raul/Thread.hpp" namespace Ingen { namespace Shared { -class HTTPSender { +class HTTPSender : public Raul::Thread { public: - HTTPSender(SoupServer* server, SoupMessage* msg); + HTTPSender(); virtual ~HTTPSender(); // Message bundling @@ -38,13 +39,21 @@ public: void transfer_begin() { bundle_begin(); } void transfer_end() { bundle_end(); } + int listen_port() const { return _listen_port; } + protected: - void send_chunk(const std::string& buf); + void _run(); + void send_chunk(const std::string& buf); + enum SendState { Immediate, SendingBundle }; - SoupServer* _server; - SoupMessage* _msg; + Glib::Mutex _mutex; + Glib::Cond _signal; + + int _listen_port; + int _listen_sock; + int _client_sock; SendState _send_state; std::string _transfer; }; -- cgit v1.2.1