summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2008-11-16 02:49:22 +0000
committerDavid Robillard <d@drobilla.net>2008-11-16 02:49:22 +0000
commit24d998447070dbfef3eaf7762dce7e97c3903801 (patch)
tree0feffd6ca3c4459e0a7ff6fad9cf48b7816f2cd7 /src
parentfb6471ac9d5daefd3655bc19532a6028b5f0ead4 (diff)
downloadingen-24d998447070dbfef3eaf7762dce7e97c3903801.tar.gz
ingen-24d998447070dbfef3eaf7762dce7e97c3903801.tar.bz2
ingen-24d998447070dbfef3eaf7762dce7e97c3903801.zip
TCP notification stream support (not fully implemented yet, but transport stuff is working).
Support multiple event sources in the engine. Clean up HTTP/TCP stuff. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@1721 a436a847-0d15-0410-975c-d299462d15a1
Diffstat (limited to 'src')
-rw-r--r--src/client/ClientStore.cpp8
-rw-r--r--src/client/ClientStore.hpp2
-rw-r--r--src/client/HTTPClientReceiver.cpp101
-rw-r--r--src/client/HTTPClientReceiver.hpp6
-rw-r--r--src/client/OSCClientReceiver.cpp6
-rw-r--r--src/client/OSCClientReceiver.hpp2
-rw-r--r--src/client/SigClientInterface.hpp6
-rw-r--r--src/client/ThreadedSigClientInterface.hpp8
-rw-r--r--src/common/interface/ClientInterface.hpp2
-rw-r--r--src/engine/ClientBroadcaster.cpp4
-rw-r--r--src/engine/ClientBroadcaster.hpp2
-rw-r--r--src/engine/Engine.cpp21
-rw-r--r--src/engine/Engine.hpp10
-rw-r--r--src/engine/HTTPClientSender.cpp6
-rw-r--r--src/engine/HTTPClientSender.hpp8
-rw-r--r--src/engine/HTTPEngineReceiver.cpp14
-rw-r--r--src/engine/OSCClientSender.cpp8
-rw-r--r--src/engine/OSCClientSender.hpp2
-rw-r--r--src/engine/events/SendPortActivityEvent.cpp2
-rw-r--r--src/gui/ConnectWindow.cpp2
-rw-r--r--src/ingen/main.cpp9
-rw-r--r--src/shared/HTTPSender.cpp111
-rw-r--r--src/shared/HTTPSender.hpp21
23 files changed, 252 insertions, 109 deletions
diff --git a/src/client/ClientStore.cpp b/src/client/ClientStore.cpp
index fbcf5929..cf9e06a0 100644
--- a/src/client/ClientStore.cpp
+++ b/src/client/ClientStore.cpp
@@ -55,7 +55,7 @@ ClientStore::ClientStore(SharedPtr<EngineInterface> engine, SharedPtr<SigClientI
emitter->signal_property_change.connect(sigc::mem_fun(this, &ClientStore::set_property));
emitter->signal_port_value.connect(sigc::mem_fun(this, &ClientStore::set_port_value));
emitter->signal_voice_value.connect(sigc::mem_fun(this, &ClientStore::set_voice_value));
- emitter->signal_port_activity.connect(sigc::mem_fun(this, &ClientStore::port_activity));
+ emitter->signal_activity.connect(sigc::mem_fun(this, &ClientStore::activity));
}
@@ -557,13 +557,13 @@ ClientStore::set_voice_value(const string& port_path, uint32_t voice, const Raul
void
-ClientStore::port_activity(const Path& port_path)
+ClientStore::activity(const Path& path)
{
- SharedPtr<PortModel> port = PtrCast<PortModel>(object(port_path));
+ SharedPtr<PortModel> port = PtrCast<PortModel>(object(path));
if (port)
port->signal_activity.emit();
else
- cerr << "ERROR: activity for nonexistant port " << port_path << endl;
+ cerr << "ERROR: activity for nonexistant port " << path << endl;
}
diff --git a/src/client/ClientStore.hpp b/src/client/ClientStore.hpp
index 291f9af9..944ab752 100644
--- a/src/client/ClientStore.hpp
+++ b/src/client/ClientStore.hpp
@@ -118,7 +118,7 @@ private:
// Slots for SigClientInterface signals
void rename(const Path& old_path, const Path& new_path);
void patch_cleared(const Path& path);
- void port_activity(const Path& port_path);
+ void activity(const Path& path);
bool attempt_connection(const Path& src_port_path, const Path& dst_port_path, bool add_orphan=false);
diff --git a/src/client/HTTPClientReceiver.cpp b/src/client/HTTPClientReceiver.cpp
index 9116f853..572ff548 100644
--- a/src/client/HTTPClientReceiver.cpp
+++ b/src/client/HTTPClientReceiver.cpp
@@ -20,6 +20,8 @@
#include <cstring>
#include <iostream>
#include <sstream>
+#include <sys/socket.h>
+#include <errno.h>
#include "module/Module.hpp"
#include "HTTPClientReceiver.hpp"
@@ -49,33 +51,73 @@ HTTPClientReceiver::~HTTPClientReceiver()
}
+HTTPClientReceiver::Listener::~Listener()
+{
+ close(_sock);
+}
+
+HTTPClientReceiver::Listener::Listener(SoupSession* session, const std::string uri)
+ : _uri(uri)
+ , _session(session)
+{
+ string port_str = uri.substr(uri.find_last_of(":")+1);
+ int port = atoi(port_str.c_str());
+
+ cout << "HTTP listen URI: " << uri << " port: " << port << endl;
+
+ struct sockaddr_in servaddr;
+
+ // Create listen address
+ memset(&servaddr, 0, sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_port = htons(port);
+
+ // Create listen socket
+ if ((_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ cerr << "Error creating listening socket: %s" << strerror(errno) << endl;
+ _sock = -1;
+ return;
+ }
+
+ // Set remote address (FIXME: always localhost)
+ if (inet_aton("127.0.0.1", &servaddr.sin_addr) <= 0) {
+ cerr << "Invalid remote IP address" << endl;
+ _sock = -1;
+ return;
+ }
+
+ // Connect to server
+ if (connect(_sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
+ cerr << "Error calling connect: " << strerror(errno) << endl;
+ _sock = -1;
+ return;
+ }
+}
+
+
void
HTTPClientReceiver::Listener::_run()
{
-#if 0
- cout << "LISTENER RUN" << endl;
- /*const string uri = "http://localhost:16180";
- SoupMessage* msg = soup_message_new("GET", (uri + "/stream").c_str());
- soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED);
- soup_session_send_message(_session, msg);*/
-
- size_t offset = 0;
- soup_message_body_set_accumulate(_msg->response_body, false);
+ char in = '\0';
+ char last = '\0';
+ string recv = "";
+
while (true) {
- SoupBuffer* chunk = soup_message_body_get_chunk(_msg->response_body, offset);
- if (chunk == NULL) {
- //cout << "WAITING FOR DATA" << endl;
- } else if (chunk->length == 0) {
- cout << "CHUNKED TRANSFER COMPLETED" << endl;
- break;
- } else {
- cout << "RECEIVED CHUNK: " << (char*)chunk->data << endl;
- offset += chunk->length;
+ while (read(_sock, &in, 1) > 0 ) {
+ recv += in;
+ if (last == '\n' && in == '\n') {
+ if (recv != "") {
+ cout << "RECEIVED UPDATE:\n" << recv << endl;
+ recv = "";
+ last = '\0';
+ }
+ break;
+ }
+ last = in;
}
}
- cout << "LISTENER FINISHED" << endl;
-#endif
+ cout << "HTTP listener finished" << endl;
}
@@ -84,10 +126,10 @@ HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, voi
{
HTTPClientReceiver* me = (HTTPClientReceiver*)ptr;
const string path = soup_message_get_uri(msg)->path;
- cout << "MESSAGE: " << path << endl;
if (path == "/") {
me->_target->response_ok(0);
me->_target->enable();
+
} else if (path == "/plugins") {
if (msg->response_body->data == NULL) {
cout << "ERROR: Empty response" << endl;
@@ -98,6 +140,7 @@ HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, voi
Glib::ustring(msg->response_body->data),
Glib::ustring("."), Glib::ustring(""));
}
+
} else if (path == "/patch") {
if (msg->response_body->data == NULL) {
cout << "ERROR: Empty response" << endl;
@@ -108,10 +151,19 @@ HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, voi
Glib::ustring(msg->response_body->data),
Glib::ustring("/patch/"), Glib::ustring(""));
}
+
} else if (path == "/stream") {
- cout << "STREAM" << endl;
- //me->_listener = boost::shared_ptr<Listener>(new Listener(me->_session, msg));
- //me->_listener->start();
+ if (msg->response_body->data == NULL) {
+ cout << "ERROR: Empty response" << endl;
+ } else {
+ string uri = string(soup_uri_to_string(soup_message_get_uri(msg), false));
+ uri = uri.substr(0, uri.find_last_of(":"));
+ uri += string(":") + msg->response_body->data;
+ cout << "Stream URI: " << uri << endl;
+ me->_listener = boost::shared_ptr<Listener>(new Listener(me->_session, uri));
+ me->_listener->start();
+ }
+
} else {
cerr << "UNKNOWN MESSAGE: " << path << endl;
}
@@ -163,3 +215,4 @@ HTTPClientReceiver::stop()
} // namespace Client
} // namespace Ingen
+
diff --git a/src/client/HTTPClientReceiver.hpp b/src/client/HTTPClientReceiver.hpp
index 379ffe2d..015a551f 100644
--- a/src/client/HTTPClientReceiver.hpp
+++ b/src/client/HTTPClientReceiver.hpp
@@ -50,11 +50,13 @@ private:
class Listener : public Raul::Thread {
public:
- Listener(SoupSession* session, SoupMessage* msg) : _session(session), _msg(msg) {}
+ Listener(SoupSession* session, const std::string uri);
+ ~Listener();
void _run();
private:
+ std::string _uri;
+ int _sock;
SoupSession* _session;
- SoupMessage* _msg;
};
friend class Listener;
diff --git a/src/client/OSCClientReceiver.cpp b/src/client/OSCClientReceiver.cpp
index fa191206..bc8659df 100644
--- a/src/client/OSCClientReceiver.cpp
+++ b/src/client/OSCClientReceiver.cpp
@@ -155,7 +155,7 @@ OSCClientReceiver::setup_callbacks()
lo_server_thread_add_method(_st, "/ingen/set_property", NULL, set_property_cb, this);
lo_server_thread_add_method(_st, "/ingen/set_port_value", "sf", set_port_value_cb, this);
lo_server_thread_add_method(_st, "/ingen/set_voice_value", "sif", set_voice_value_cb, this);
- lo_server_thread_add_method(_st, "/ingen/port_activity", "s", port_activity_cb, this);
+ lo_server_thread_add_method(_st, "/ingen/activity", "s", activity_cb, this);
lo_server_thread_add_method(_st, "/ingen/program_add", "siis", program_add_cb, this);
lo_server_thread_add_method(_st, "/ingen/program_remove", "sii", program_remove_cb, this);
}
@@ -321,11 +321,11 @@ OSCClientReceiver::_set_voice_value_cb(const char* path, const char* types, lo_a
int
-OSCClientReceiver::_port_activity_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
+OSCClientReceiver::_activity_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const char* const port_path = &argv[0]->s;
- _target->port_activity(port_path);
+ _target->activity(port_path);
return 0;
}
diff --git a/src/client/OSCClientReceiver.hpp b/src/client/OSCClientReceiver.hpp
index ea5871b3..9203f096 100644
--- a/src/client/OSCClientReceiver.hpp
+++ b/src/client/OSCClientReceiver.hpp
@@ -95,7 +95,7 @@ private:
LO_HANDLER(set_property);
LO_HANDLER(set_port_value);
LO_HANDLER(set_voice_value);
- LO_HANDLER(port_activity);
+ LO_HANDLER(activity);
LO_HANDLER(program_add);
LO_HANDLER(program_remove);
};
diff --git a/src/client/SigClientInterface.hpp b/src/client/SigClientInterface.hpp
index 63586832..36ca44b9 100644
--- a/src/client/SigClientInterface.hpp
+++ b/src/client/SigClientInterface.hpp
@@ -66,7 +66,7 @@ public:
sigc::signal<void, string, string, Raul::Atom> signal_property_change;
sigc::signal<void, string, Raul::Atom> signal_port_value;
sigc::signal<void, string, uint32_t, Raul::Atom> signal_voice_value;
- sigc::signal<void, string> signal_port_activity;
+ sigc::signal<void, string> signal_activity;
sigc::signal<void, string, uint32_t, uint32_t, string> signal_program_add;
sigc::signal<void, string, uint32_t, uint32_t> signal_program_remove;
@@ -139,8 +139,8 @@ protected:
void set_voice_value(const string& port_path, uint32_t voice, const Raul::Atom& value)
{ if (_enabled) signal_voice_value.emit(port_path, voice, value); }
- void port_activity(const string& port_path)
- { if (_enabled) signal_port_activity.emit(port_path); }
+ void activity(const string& port_path)
+ { if (_enabled) signal_activity.emit(port_path); }
void program_add(const string& path, uint32_t bank, uint32_t program, const string& name)
{ if (_enabled) signal_program_add.emit(path, bank, program, name); }
diff --git a/src/client/ThreadedSigClientInterface.hpp b/src/client/ThreadedSigClientInterface.hpp
index f31ba37b..968954bc 100644
--- a/src/client/ThreadedSigClientInterface.hpp
+++ b/src/client/ThreadedSigClientInterface.hpp
@@ -64,7 +64,7 @@ public:
, variable_change_slot(signal_variable_change.make_slot())
, property_change_slot(signal_property_change.make_slot())
, port_value_slot(signal_port_value.make_slot())
- , port_activity_slot(signal_port_activity.make_slot())
+ , activity_slot(signal_activity.make_slot())
, program_add_slot(signal_program_add.make_slot())
, program_remove_slot(signal_program_remove.make_slot())
{
@@ -133,8 +133,8 @@ public:
void set_voice_value(const string& port_path, uint32_t voice, const Raul::Atom& value)
{ push_sig(sigc::bind(voice_value_slot, port_path, voice, value)); }
- void port_activity(const string& port_path)
- { push_sig(sigc::bind(port_activity_slot, port_path)); }
+ void activity(const string& port_path)
+ { push_sig(sigc::bind(activity_slot, port_path)); }
void program_add(const string& path, uint32_t bank, uint32_t program, const string& name)
{ push_sig(sigc::bind(program_add_slot, path, bank, program, name)); }
@@ -172,7 +172,7 @@ private:
sigc::slot<void, string, string, Raul::Atom> property_change_slot;
sigc::slot<void, string, Raul::Atom> port_value_slot;
sigc::slot<void, string, uint32_t, Raul::Atom> voice_value_slot;
- sigc::slot<void, string> port_activity_slot;
+ sigc::slot<void, string> activity_slot;
sigc::slot<void, string, uint32_t, uint32_t, string> program_add_slot;
sigc::slot<void, string, uint32_t, uint32_t> program_remove_slot;
};
diff --git a/src/common/interface/ClientInterface.hpp b/src/common/interface/ClientInterface.hpp
index 07c00fd7..d904ea9d 100644
--- a/src/common/interface/ClientInterface.hpp
+++ b/src/common/interface/ClientInterface.hpp
@@ -72,7 +72,7 @@ public:
virtual void object_renamed(const std::string& old_path,
const std::string& new_path) = 0;
- virtual void port_activity(const std::string& port_path) = 0;
+ virtual void activity(const std::string& path) = 0;
virtual void program_add(const std::string& node_path,
uint32_t bank,
diff --git a/src/engine/ClientBroadcaster.cpp b/src/engine/ClientBroadcaster.cpp
index f13743f6..19cb0cd4 100644
--- a/src/engine/ClientBroadcaster.cpp
+++ b/src/engine/ClientBroadcaster.cpp
@@ -222,10 +222,10 @@ ClientBroadcaster::send_port_value(const string& port_path, const Raul::Atom& va
void
-ClientBroadcaster::send_port_activity(const string& port_path)
+ClientBroadcaster::send_activity(const string& path)
{
for (Clients::const_iterator i = _clients.begin(); i != _clients.end(); ++i)
- (*i).second->port_activity(port_path);
+ (*i).second->activity(path);
}
diff --git a/src/engine/ClientBroadcaster.hpp b/src/engine/ClientBroadcaster.hpp
index 8f9afca3..2de4b1b9 100644
--- a/src/engine/ClientBroadcaster.hpp
+++ b/src/engine/ClientBroadcaster.hpp
@@ -77,7 +77,7 @@ public:
void send_variable_change(const string& node_path, const string& key, const Raul::Atom& value);
void send_property_change(const string& node_path, const string& key, const Raul::Atom& value);
void send_port_value(const string& port_path, const Raul::Atom& value);
- void send_port_activity(const string& port_path);
+ void send_activity(const string& path);
void send_program_add(const string& node_path, int bank, int program, const string& name);
void send_program_remove(const string& node_path, int bank, int program);
diff --git a/src/engine/Engine.cpp b/src/engine/Engine.cpp
index d221b4f9..eb9e2aa8 100644
--- a/src/engine/Engine.cpp
+++ b/src/engine/Engine.cpp
@@ -165,12 +165,9 @@ Engine::main_iteration()
void
-Engine::set_event_source(SharedPtr<EventSource> source)
+Engine::add_event_source(SharedPtr<EventSource> source)
{
- if (_event_source)
- cerr << "Warning: Dropped event source (engine interface)" << endl;
-
- _event_source = source;
+ _event_sources.insert(source);
}
@@ -192,8 +189,8 @@ Engine::activate(size_t parallelism)
if (!_midi_driver)
_midi_driver = new DummyMidiDriver();
- if (_event_source)
- _event_source->activate();
+ for (EventSources::iterator i = _event_sources.begin(); i != _event_sources.end(); ++i)
+ (*i)->activate();
// Create root patch
@@ -229,8 +226,8 @@ Engine::deactivate()
if (!_activated)
return;
- if (_event_source)
- _event_source->deactivate();
+ for (EventSources::iterator i = _event_sources.begin(); i != _event_sources.end(); ++i)
+ (*i)->deactivate();
/*for (Tree<GraphObject*>::iterator i = _engine_store->objects().begin();
i != _engine_store->objects().end(); ++i)
@@ -257,7 +254,7 @@ Engine::deactivate()
_post_processor->process();
_audio_driver.reset();
- _event_source.reset();
+ _event_sources.clear();
_activated = false;
}
@@ -266,8 +263,8 @@ Engine::deactivate()
void
Engine::process_events(ProcessContext& context)
{
- if (_event_source)
- _event_source->process(*_post_processor, context);
+ for (EventSources::iterator i = _event_sources.begin(); i != _event_sources.end(); ++i)
+ (*i)->process(*_post_processor, context);
}
diff --git a/src/engine/Engine.hpp b/src/engine/Engine.hpp
index 8edf37c3..261f135f 100644
--- a/src/engine/Engine.hpp
+++ b/src/engine/Engine.hpp
@@ -21,6 +21,7 @@
#include "config.h"
#include <cassert>
#include <vector>
+#include <set>
#include <boost/utility.hpp>
#include "raul/SharedPtr.hpp"
#include "module/global.hpp"
@@ -81,7 +82,6 @@ public:
virtual bool activated() { return _activated; }
Raul::Maid* maid() const { return _maid; }
- EventSource* event_source() const { return _event_source.get(); }
AudioDriver* audio_driver() const { return _audio_driver.get(); }
MidiDriver* midi_driver() const { return _midi_driver; }
OSCDriver* osc_driver() const { return _osc_driver; }
@@ -97,10 +97,10 @@ public:
/** Set the driver for the given data type (replacing the old) */
virtual void set_driver(DataType type, SharedPtr<Driver> driver);
-
- virtual void set_event_source(SharedPtr<EventSource> source);
virtual void set_midi_driver(MidiDriver* driver);
+ virtual void add_event_source(SharedPtr<EventSource> source);
+
Ingen::Shared::World* world() { return _world; }
typedef std::vector<ProcessSlave*> ProcessSlaves;
@@ -108,9 +108,11 @@ public:
inline ProcessSlaves& process_slaves() { return _process_slaves; }
private:
+ typedef std::set< SharedPtr<EventSource> > EventSources;
+ EventSources _event_sources;
+
ProcessSlaves _process_slaves;
Ingen::Shared::World* _world;
- SharedPtr<EventSource> _event_source;
SharedPtr<AudioDriver> _audio_driver;
MidiDriver* _midi_driver;
OSCDriver* _osc_driver;
diff --git a/src/engine/HTTPClientSender.cpp b/src/engine/HTTPClientSender.cpp
index ae97e1ca..ec60cb44 100644
--- a/src/engine/HTTPClientSender.cpp
+++ b/src/engine/HTTPClientSender.cpp
@@ -135,9 +135,9 @@ HTTPClientSender::set_voice_value(const std::string& port_path, uint32_t voice,
void
-HTTPClientSender::port_activity(const std::string& port_path)
+HTTPClientSender::activity(const std::string& path)
{
- //lo_send(_address, "/ingen/port_activity", "s", port_path.c_str(), LO_ARGS_END);
+ //lo_send(_address, "/ingen/activity", "s", port_path.c_str(), LO_ARGS_END);
}
@@ -159,9 +159,7 @@ HTTPClientSender::new_plugin(const std::string& uri,
void
HTTPClientSender::new_patch(const std::string& path, uint32_t poly)
{
- cout << "HTTP NEW PATCH" << endl;
send_chunk(string("<").append(path).append("> a ingen:Patch"));
- //send("/ingen/new_patch", "si", path.c_str(), poly, LO_ARGS_END);
}
diff --git a/src/engine/HTTPClientSender.hpp b/src/engine/HTTPClientSender.hpp
index 8e4f3d33..57aaed0e 100644
--- a/src/engine/HTTPClientSender.hpp
+++ b/src/engine/HTTPClientSender.hpp
@@ -41,12 +41,10 @@ namespace Shared { class EngineInterface; }
*/
class HTTPClientSender
: public Shared::ClientInterface
- , public Raul::Thread
, public Shared::HTTPSender
{
public:
- HTTPClientSender(SoupServer* s, SoupMessage* m)
- : Shared::HTTPSender(s, m)
+ HTTPClientSender()
{}
bool enabled() const { return _enabled; }
@@ -58,7 +56,7 @@ public:
void bundle_end() { HTTPSender::bundle_end(); }
void transfer_begin() { HTTPSender::transfer_begin(); }
void transfer_end() { HTTPSender::transfer_end(); }
-
+
std::string uri() const { return "http://example.org/"; }
void subscribe(Shared::EngineInterface* engine) { }
@@ -115,7 +113,7 @@ public:
uint32_t voice,
const Raul::Atom& value);
- virtual void port_activity(const std::string& port_path);
+ virtual void activity(const std::string& path);
virtual void program_add(const std::string& node_path,
uint32_t bank,
diff --git a/src/engine/HTTPEngineReceiver.cpp b/src/engine/HTTPEngineReceiver.cpp
index 9b6b6fb9..82bebc0c 100644
--- a/src/engine/HTTPEngineReceiver.cpp
+++ b/src/engine/HTTPEngineReceiver.cpp
@@ -148,16 +148,20 @@ HTTPEngineReceiver::message_callback(SoupServer* server, SoupMessage* msg, const
return;
} else if (path.substr(0, 6) == "/patch") {
path = '/' + path.substr(6);
+
} else if (path.substr(0, 7) == "/stream") {
- cout << "REGISTERING CLIENT" << endl;
- // FIXME: memory leak
- ClientInterface* client = new HTTPClientSender(me->_server, msg);
- soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED);
+ HTTPClientSender* client = new HTTPClientSender();
me->register_client(client);
+
+ // Respond with port number of stream for client
+ const int port = client->listen_port();
+ char buf[32];
+ snprintf(buf, 32, "%d", port);
+ soup_message_set_status(msg, SOUP_STATUS_OK);
+ soup_message_set_response(msg, mime_type, SOUP_MEMORY_COPY, buf, strlen(buf));
return;
} else {
- cout << "UNKNOWN PATH: " << path << endl;
soup_message_set_status(msg, SOUP_STATUS_NOT_FOUND);
soup_message_set_response(msg, "text/plain", SOUP_MEMORY_STATIC,
"Unknown path\n\n", 14);
diff --git a/src/engine/OSCClientSender.cpp b/src/engine/OSCClientSender.cpp
index d9893241..aa99c1d8 100644
--- a/src/engine/OSCClientSender.cpp
+++ b/src/engine/OSCClientSender.cpp
@@ -278,16 +278,16 @@ OSCClientSender::set_voice_value(const std::string& port_path, uint32_t voice, c
/** \page client_osc_namespace
- * <p> \b /ingen/port_activity - Notification of activity for a port (e.g. MIDI messages)
- * \arg \b path (string) - Path of port </p> \n \n
+ * <p> \b /ingen/activity - Notification of "activity" (e.g. port message blinkenlights)
+ * \arg \b path (string) - Path of object </p> \n \n
*/
void
-OSCClientSender::port_activity(const std::string& port_path)
+OSCClientSender::activity(const std::string& path)
{
if (!_enabled)
return;
- lo_send(_address, "/ingen/port_activity", "s", port_path.c_str(), LO_ARGS_END);
+ lo_send(_address, "/ingen/activity", "s", path.c_str(), LO_ARGS_END);
}
diff --git a/src/engine/OSCClientSender.hpp b/src/engine/OSCClientSender.hpp
index 3de967ab..879484c8 100644
--- a/src/engine/OSCClientSender.hpp
+++ b/src/engine/OSCClientSender.hpp
@@ -114,7 +114,7 @@ public:
uint32_t voice,
const Raul::Atom& value);
- virtual void port_activity(const std::string& port_path);
+ virtual void activity(const std::string& path);
virtual void program_add(const std::string& node_path,
uint32_t bank,
diff --git a/src/engine/events/SendPortActivityEvent.cpp b/src/engine/events/SendPortActivityEvent.cpp
index 0ab3abdd..3a408d8d 100644
--- a/src/engine/events/SendPortActivityEvent.cpp
+++ b/src/engine/events/SendPortActivityEvent.cpp
@@ -26,7 +26,7 @@ namespace Ingen {
void
SendPortActivityEvent::post_process()
{
- _engine.broadcaster()->send_port_activity(_port->path());
+ _engine.broadcaster()->send_activity(_port->path());
}
diff --git a/src/gui/ConnectWindow.cpp b/src/gui/ConnectWindow.cpp
index e52e83ac..a9221c04 100644
--- a/src/gui/ConnectWindow.cpp
+++ b/src/gui/ConnectWindow.cpp
@@ -223,7 +223,7 @@ ConnectWindow::connect(bool existing)
new QueuedEngineInterface(*world->local_engine,
Ingen::event_queue_size, Ingen::event_queue_size));
world->engine = interface;
- world->local_engine->set_event_source(interface);
+ world->local_engine->add_event_source(interface);
}
SharedPtr<SigClientInterface> client(new SigClientInterface());
diff --git a/src/ingen/main.cpp b/src/ingen/main.cpp
index 8d155c5c..4c73f1de 100644
--- a/src/ingen/main.cpp
+++ b/src/ingen/main.cpp
@@ -137,7 +137,7 @@ main(int argc, char** argv)
Ingen::QueuedEngineInterface* (*new_interface)(Ingen::Engine& engine);
if (engine_osc_module->get_symbol("new_queued_interface", (void*&)new_interface)) {
SharedPtr<QueuedEngineInterface> interface(new_interface(*engine));
- world->local_engine->set_event_source(interface);
+ world->local_engine->add_event_source(interface);
engine_interface = interface;
world->engine = engine_interface;
}
@@ -149,7 +149,7 @@ main(int argc, char** argv)
if (engine_osc_module->get_symbol("new_osc_receiver", (void*&)new_receiver)) {
SharedPtr<EventSource> source(new_receiver(*engine,
pre_processor_queue_size, args.engine_port_arg));
- world->local_engine->set_event_source(source);
+ world->local_engine->add_event_source(source);
}
}
#endif
@@ -158,8 +158,9 @@ main(int argc, char** argv)
// FIXE: leak
Ingen::HTTPEngineReceiver* (*new_receiver)(Ingen::Engine& engine, uint16_t port);
if (engine_http_module->get_symbol("new_http_receiver", (void*&)new_receiver)) {
- HTTPEngineReceiver* receiver = new_receiver(
- *world->local_engine, args.engine_port_arg);
+ boost::shared_ptr<HTTPEngineReceiver> receiver(new_receiver(
+ *world->local_engine, args.engine_port_arg));
+ world->local_engine->add_event_source(receiver);
receiver->activate();
}
}
diff --git a/src/shared/HTTPSender.cpp b/src/shared/HTTPSender.cpp
index 20135b2a..7f760786 100644
--- a/src/shared/HTTPSender.cpp
+++ b/src/shared/HTTPSender.cpp
@@ -20,6 +20,12 @@
#include <iostream>
#include <unistd.h>
#include <stdarg.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <errno.h>
+
+#include <cstring>
using namespace std;
@@ -27,53 +33,126 @@ namespace Ingen {
namespace Shared {
-HTTPSender::HTTPSender(SoupServer* server, SoupMessage* msg)
- : _server(server)
- , _msg(msg)
+HTTPSender::HTTPSender()
+ : _listen_port(-1)
+ , _listen_sock(-1)
+ , _client_sock(-1)
+ , _send_state(Immediate)
{
- soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED);
- cout << "Hello?" << endl;
- send_chunk("hello");
+ Thread::set_name("HTTP Sender");
+
+ struct sockaddr_in addr;
+
+ // Create listen address
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ // Create listen socket
+ if ((_listen_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) {
+ fprintf(stderr, "Error creating listening socket (%s)\n", strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+
+ // Bind our socket addresss to the listening socket
+ if (bind(_listen_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
+ fprintf(stderr, "Error calling bind (%s)\n", strerror(errno));
+ _listen_sock = -1;
+ }
+
+ // Find port number
+ socklen_t length = sizeof(addr);
+ if (getsockname(_listen_sock, (struct sockaddr*)&addr, &length) == -1) {
+ fprintf(stderr, "Error calling getsockname (%s)\n", strerror(errno));
+ _listen_sock = -1;
+ return;
+ }
+
+ if (listen(_listen_sock, 1) < 0 ) {
+ cerr << "Error calling listen: %s" << strerror(errno) << endl;
+ _listen_sock = -1;
+ return;
+ }
+
+ _listen_port = ntohs(addr.sin_port);
+ cout << "Opening event stream on TCP port " << _listen_port << endl;
+ start();
}
HTTPSender::~HTTPSender()
{
- cout << "HTTP SENDER EXIT" << endl;
- soup_message_body_complete(_msg->response_body);
- soup_server_unpause_message(_server, _msg);
+ stop();
+ if (_listen_sock != -1)
+ close(_listen_sock);
+ if (_client_sock != -1)
+ close(_client_sock);
+}
+
+void
+HTTPSender::_run()
+{
+ if (_listen_sock == -1) {
+ cerr << "Unable to open socket, exiting sender thread" << endl;
+ return;
+ }
+
+ // Accept connection
+ if ((_client_sock = accept(_listen_sock, NULL, NULL) ) < 0) {
+ cerr << "Error calling accept: " << strerror(errno) << endl;
+ return;
+ }
+
+ // Hold connection open and write when signalled
+ while (true) {
+ _mutex.lock();
+ _signal.wait(_mutex);
+
+ write(_client_sock, _transfer.c_str(), _transfer.length());
+ write(_client_sock, "\n\n", 2);
+
+ _mutex.unlock();
+ }
+
+ close(_listen_sock);
+ _listen_sock = -1;
}
void
HTTPSender::bundle_begin()
{
+ _mutex.lock();
_send_state = SendingBundle;
+ _transfer = "";
+ _mutex.unlock();
}
void
HTTPSender::bundle_end()
{
+ _mutex.lock();
assert(_send_state == SendingBundle);
- soup_message_body_append(_msg->response_body,
- SOUP_MEMORY_TEMPORARY, _transfer.c_str(), _transfer.length());
- soup_server_unpause_message(_server, _msg);
- _transfer = "";
+ _signal.broadcast();
_send_state = Immediate;
+ _mutex.unlock();
}
void
HTTPSender::send_chunk(const std::string& buf)
{
+ _mutex.lock();
+
if (_send_state == Immediate) {
- soup_message_body_append(_msg->response_body,
- SOUP_MEMORY_TEMPORARY, buf.c_str(), buf.length());
- soup_server_unpause_message(_server, _msg);
+ _transfer = "";
+ _signal.broadcast();
} else {
_transfer.append(buf);
}
+
+ _mutex.unlock();
}
diff --git a/src/shared/HTTPSender.hpp b/src/shared/HTTPSender.hpp
index 1025c071..1077b76d 100644
--- a/src/shared/HTTPSender.hpp
+++ b/src/shared/HTTPSender.hpp
@@ -20,14 +20,15 @@
#include <stdint.h>
#include <string>
-#include <libsoup/soup.h>
+#include <glibmm/thread.h>
+#include "raul/Thread.hpp"
namespace Ingen {
namespace Shared {
-class HTTPSender {
+class HTTPSender : public Raul::Thread {
public:
- HTTPSender(SoupServer* server, SoupMessage* msg);
+ HTTPSender();
virtual ~HTTPSender();
// Message bundling
@@ -38,13 +39,21 @@ public:
void transfer_begin() { bundle_begin(); }
void transfer_end() { bundle_end(); }
+ int listen_port() const { return _listen_port; }
+
protected:
- void send_chunk(const std::string& buf);
+ void _run();
+ void send_chunk(const std::string& buf);
+
enum SendState { Immediate, SendingBundle };
- SoupServer* _server;
- SoupMessage* _msg;
+ Glib::Mutex _mutex;
+ Glib::Cond _signal;
+
+ int _listen_port;
+ int _listen_sock;
+ int _client_sock;
SendState _send_state;
std::string _transfer;
};