From fb6471ac9d5daefd3655bc19532a6028b5f0ead4 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sat, 15 Nov 2008 22:56:24 +0000 Subject: Stubs for HTTP streaming. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@1719 a436a847-0d15-0410-975c-d299462d15a1 --- src/client/HTTPClientReceiver.cpp | 48 +++++++++- src/client/HTTPClientReceiver.hpp | 17 +++- src/engine/HTTPClientSender.cpp | 191 ++++++++++++++++++++++++++++++++++++++ src/engine/HTTPClientSender.hpp | 138 +++++++++++++++++++++++++++ src/engine/HTTPEngineReceiver.cpp | 10 ++ src/engine/HTTPEngineReceiver.hpp | 2 +- src/engine/wscript | 14 ++- src/shared/HTTPSender.cpp | 81 ++++++++++++++++ src/shared/HTTPSender.hpp | 57 ++++++++++++ src/shared/wscript | 5 +- 10 files changed, 553 insertions(+), 10 deletions(-) create mode 100644 src/engine/HTTPClientSender.cpp create mode 100644 src/engine/HTTPClientSender.hpp create mode 100644 src/shared/HTTPSender.cpp create mode 100644 src/shared/HTTPSender.hpp (limited to 'src') diff --git a/src/client/HTTPClientReceiver.cpp b/src/client/HTTPClientReceiver.cpp index 624a7786..9116f853 100644 --- a/src/client/HTTPClientReceiver.cpp +++ b/src/client/HTTPClientReceiver.cpp @@ -49,11 +49,42 @@ HTTPClientReceiver::~HTTPClientReceiver() } +void +HTTPClientReceiver::Listener::_run() +{ +#if 0 + cout << "LISTENER RUN" << endl; + /*const string uri = "http://localhost:16180"; + SoupMessage* msg = soup_message_new("GET", (uri + "/stream").c_str()); + soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED); + soup_session_send_message(_session, msg);*/ + + size_t offset = 0; + soup_message_body_set_accumulate(_msg->response_body, false); + while (true) { + SoupBuffer* chunk = soup_message_body_get_chunk(_msg->response_body, offset); + if (chunk == NULL) { + //cout << "WAITING FOR DATA" << endl; + } else if (chunk->length == 0) { + cout << "CHUNKED TRANSFER COMPLETED" << endl; + break; + } else { + cout << "RECEIVED CHUNK: " << (char*)chunk->data << endl; + offset += chunk->length; + } + } + + cout << "LISTENER FINISHED" << endl; +#endif +} + + void HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, void* ptr) { HTTPClientReceiver* me = (HTTPClientReceiver*)ptr; const string path = soup_message_get_uri(msg)->path; + cout << "MESSAGE: " << path << endl; if (path == "/") { me->_target->response_ok(0); me->_target->enable(); @@ -77,6 +108,12 @@ HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, voi Glib::ustring(msg->response_body->data), Glib::ustring("/patch/"), Glib::ustring("")); } + } else if (path == "/stream") { + cout << "STREAM" << endl; + //me->_listener = boost::shared_ptr(new Listener(me->_session, msg)); + //me->_listener->start(); + } else { + cerr << "UNKNOWN MESSAGE: " << path << endl; } } @@ -96,17 +133,20 @@ HTTPClientReceiver::start(bool dump) } } - _session = soup_session_async_new(); + _session = soup_session_sync_new(); SoupMessage* msg; msg = soup_message_new("GET", _url.c_str()); - soup_session_queue_message (_session, msg, message_callback, this); + soup_session_queue_message(_session, msg, message_callback, this); msg = soup_message_new("GET", (_url + "/plugins").c_str()); - soup_session_queue_message (_session, msg, message_callback, this); + soup_session_queue_message(_session, msg, message_callback, this); msg = soup_message_new("GET", (_url + "/patch").c_str()); - soup_session_queue_message (_session, msg, message_callback, this); + soup_session_queue_message(_session, msg, message_callback, this); + + msg = soup_message_new("GET", (_url + "/stream").c_str()); + soup_session_queue_message(_session, msg, message_callback, this); } diff --git a/src/client/HTTPClientReceiver.hpp b/src/client/HTTPClientReceiver.hpp index bab55578..379ffe2d 100644 --- a/src/client/HTTPClientReceiver.hpp +++ b/src/client/HTTPClientReceiver.hpp @@ -21,10 +21,11 @@ #include #include #include -#include "interface/ClientInterface.hpp" -#include "serialisation/Parser.hpp" #include "redlandmm/World.hpp" #include "raul/Deletable.hpp" +#include "raul/Thread.hpp" +#include "interface/ClientInterface.hpp" +#include "serialisation/Parser.hpp" namespace Ingen { namespace Client { @@ -46,7 +47,19 @@ public: private: static void message_callback(SoupSession* session, SoupMessage* msg, void* ptr); + + class Listener : public Raul::Thread { + public: + Listener(SoupSession* session, SoupMessage* msg) : _session(session), _msg(msg) {} + void _run(); + private: + SoupSession* _session; + SoupMessage* _msg; + }; + friend class Listener; + SharedPtr _listener; + SharedPtr _target; Shared::World* _world; diff --git a/src/engine/HTTPClientSender.cpp b/src/engine/HTTPClientSender.cpp new file mode 100644 index 00000000..ae97e1ca --- /dev/null +++ b/src/engine/HTTPClientSender.cpp @@ -0,0 +1,191 @@ +/* This file is part of Ingen. + * Copyright (C) 2008 Dave Robillard + * + * Ingen is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include "raul/Atom.hpp" +#include "HTTPClientSender.hpp" + +using namespace std; +using namespace Raul; + +namespace Ingen { + +void +HTTPClientSender::response_ok(int32_t id) +{ + cout << "HTTP OK" << endl; +} + + +void +HTTPClientSender::response_error(int32_t id, const std::string& msg) +{ + cout << "HTTP ERROR" << endl; +} + + +void +HTTPClientSender::error(const std::string& msg) +{ + //send("/ingen/error", "s", msg.c_str(), LO_ARGS_END); +} + + +void HTTPClientSender::new_node(const std::string& node_path, + const std::string& plugin_uri) +{ + //send("/ingen/new_node", "ss", node_path.c_str(), plugin_uri.c_str(), LO_ARGS_END); +} + + +void +HTTPClientSender::new_port(const std::string& path, + const std::string& type, + uint32_t index, + bool is_output) +{ + //send("/ingen/new_port", "sisi", path.c_str(), index, type.c_str(), is_output, LO_ARGS_END); +} + + +void +HTTPClientSender::destroy(const std::string& path) +{ + assert(path != "/"); + + //send("/ingen/destroyed", "s", path.c_str(), LO_ARGS_END); +} + + +void +HTTPClientSender::patch_cleared(const std::string& patch_path) +{ + //send("/ingen/patch_cleared", "s", patch_path.c_str(), LO_ARGS_END); +} + + +void +HTTPClientSender::connect(const std::string& src_path, const std::string& dst_path) +{ + //send("/ingen/new_connection", "ss", src_path.c_str(), dst_path.c_str(), LO_ARGS_END); +} + + +void +HTTPClientSender::disconnect(const std::string& src_path, const std::string& dst_path) +{ + //send("/ingen/disconnection", "ss", src_path.c_str(), dst_path.c_str(), LO_ARGS_END); +} + + +void +HTTPClientSender::set_variable(const std::string& path, const std::string& key, const Atom& value) +{ + /*lo_message m = lo_message_new(); + lo_message_add_string(m, path.c_str()); + lo_message_add_string(m, key.c_str()); + Raul::AtomLiblo::lo_message_add_atom(m, value); + send_message("/ingen/set_variable", m);*/ +} + + +void +HTTPClientSender::set_property(const std::string& path, const std::string& key, const Atom& value) +{ + /*lo_message m = lo_message_new(); + lo_message_add_string(m, path.c_str()); + lo_message_add_string(m, key.c_str()); + Raul::AtomLiblo::lo_message_add_atom(m, value); + send_message("/ingen/set_property", m);*/ +} + + +void +HTTPClientSender::set_port_value(const std::string& port_path, const Raul::Atom& value) +{ + /*lo_message m = lo_message_new(); + lo_message_add_string(m, port_path.c_str()); + Raul::AtomLiblo::lo_message_add_atom(m, value); + send_message("/ingen/set_port_value", m);*/ +} + + +void +HTTPClientSender::set_voice_value(const std::string& port_path, uint32_t voice, const Raul::Atom& value) +{ + /*lo_message m = lo_message_new(); + lo_message_add_string(m, port_path.c_str()); + Raul::AtomLiblo::lo_message_add_atom(m, value); + send_message("/ingen/set_port_value", m);*/ +} + + +void +HTTPClientSender::port_activity(const std::string& port_path) +{ + //lo_send(_address, "/ingen/port_activity", "s", port_path.c_str(), LO_ARGS_END); +} + + +void +HTTPClientSender::new_plugin(const std::string& uri, + const std::string& type_uri, + const std::string& symbol, + const std::string& name) +{ + /*lo_message m = lo_message_new(); + lo_message_add_string(m, uri.c_str()); + lo_message_add_string(m, type_uri.c_str()); + lo_message_add_string(m, symbol.c_str()); + lo_message_add_string(m, name.c_str()); + send_message("/ingen/plugin", m);*/ +} + + +void +HTTPClientSender::new_patch(const std::string& path, uint32_t poly) +{ + cout << "HTTP NEW PATCH" << endl; + send_chunk(string("<").append(path).append("> a ingen:Patch")); + //send("/ingen/new_patch", "si", path.c_str(), poly, LO_ARGS_END); +} + + +void +HTTPClientSender::object_renamed(const std::string& old_path, const std::string& new_path) +{ + //send("/ingen/object_renamed", "ss", old_path.c_str(), new_path.c_str(), LO_ARGS_END); +} + + +void +HTTPClientSender::program_add(const std::string& node_path, uint32_t bank, uint32_t program, const std::string& name) +{ + /*send("/ingen/program_add", "siis", + node_path.c_str(), bank, program, name.c_str(), LO_ARGS_END);*/ +} + + +void +HTTPClientSender::program_remove(const std::string& node_path, uint32_t bank, uint32_t program) +{ + /*send("/ingen/program_remove", "sii", + node_path.c_str(), bank, program, LO_ARGS_END);*/ +} + + +} // namespace Ingen diff --git a/src/engine/HTTPClientSender.hpp b/src/engine/HTTPClientSender.hpp new file mode 100644 index 00000000..8e4f3d33 --- /dev/null +++ b/src/engine/HTTPClientSender.hpp @@ -0,0 +1,138 @@ +/* This file is part of Ingen. + * Copyright (C) 2008 Dave Robillard + * + * Ingen is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef HTTPCLIENTSENDER_H +#define HTTPCLIENTSENDER_H + +#include +#include +#include +#include +#include +#include "types.hpp" +#include "raul/Thread.hpp" +#include "interface/ClientInterface.hpp" +#include "shared/HTTPSender.hpp" + +namespace Ingen { + +namespace Shared { class EngineInterface; } + + +/** Implements ClientInterface for HTTP clients. + * Sends changes as RDF deltas over an HTTP stream + * (a single message with chunked encoding response). + * + * \ingroup engine + */ +class HTTPClientSender + : public Shared::ClientInterface + , public Raul::Thread + , public Shared::HTTPSender +{ +public: + HTTPClientSender(SoupServer* s, SoupMessage* m) + : Shared::HTTPSender(s, m) + {} + + bool enabled() const { return _enabled; } + + void enable() { _enabled = true; } + void disable() { _enabled = false; } + + void bundle_begin() { HTTPSender::bundle_begin(); } + void bundle_end() { HTTPSender::bundle_end(); } + void transfer_begin() { HTTPSender::transfer_begin(); } + void transfer_end() { HTTPSender::transfer_end(); } + + std::string uri() const { return "http://example.org/"; } + + void subscribe(Shared::EngineInterface* engine) { } + + /* *** ClientInterface Implementation Below *** */ + + //void client_registration(const std::string& url, int client_id); + + void response_ok(int32_t id); + void response_error(int32_t id, const std::string& msg); + + void error(const std::string& msg); + + virtual void new_plugin(const std::string& uri, + const std::string& type_uri, + const std::string& symbol, + const std::string& name); + + virtual void new_patch(const std::string& path, uint32_t poly); + + virtual void new_node(const std::string& path, + const std::string& plugin_uri); + + virtual void new_port(const std::string& path, + const std::string& type, + uint32_t index, + bool is_output); + + virtual void patch_cleared(const std::string& path); + + virtual void destroy(const std::string& path); + + virtual void object_renamed(const std::string& old_path, + const std::string& new_path); + + virtual void connect(const std::string& src_port_path, + const std::string& dst_port_path); + + virtual void disconnect(const std::string& src_port_path, + const std::string& dst_port_path); + + virtual void set_variable(const std::string& subject_path, + const std::string& predicate, + const Raul::Atom& value); + + virtual void set_property(const std::string& subject_path, + const std::string& predicate, + const Raul::Atom& value); + + virtual void set_port_value(const std::string& port_path, + const Raul::Atom& value); + + virtual void set_voice_value(const std::string& port_path, + uint32_t voice, + const Raul::Atom& value); + + virtual void port_activity(const std::string& port_path); + + virtual void program_add(const std::string& node_path, + uint32_t bank, + uint32_t program, + const std::string& program_name); + + virtual void program_remove(const std::string& node_path, + uint32_t bank, + uint32_t program); + +private: + std::string _url; + bool _enabled; +}; + + +} // namespace Ingen + +#endif // HTTPCLIENTSENDER_H + diff --git a/src/engine/HTTPEngineReceiver.cpp b/src/engine/HTTPEngineReceiver.cpp index 8a035175..9b6b6fb9 100644 --- a/src/engine/HTTPEngineReceiver.cpp +++ b/src/engine/HTTPEngineReceiver.cpp @@ -32,6 +32,7 @@ #include "QueuedEventSource.hpp" #include "ClientBroadcaster.hpp" #include "EngineStore.hpp" +#include "HTTPClientSender.hpp" using namespace std; using namespace Ingen::Shared; @@ -126,6 +127,7 @@ HTTPEngineReceiver::message_callback(SoupServer* server, SoupMessage* msg, const if (path == "/" || path == "") { const string r = string("@prefix rdfs: .\n") .append("\n<> rdfs:seeAlso ;") + .append("\n rdfs:seeAlso ;") .append("\n rdfs:seeAlso ."); soup_message_set_status(msg, SOUP_STATUS_OK); soup_message_set_response(msg, mime_type, SOUP_MEMORY_COPY, r.c_str(), r.length()); @@ -146,6 +148,14 @@ HTTPEngineReceiver::message_callback(SoupServer* server, SoupMessage* msg, const return; } else if (path.substr(0, 6) == "/patch") { path = '/' + path.substr(6); + } else if (path.substr(0, 7) == "/stream") { + cout << "REGISTERING CLIENT" << endl; + // FIXME: memory leak + ClientInterface* client = new HTTPClientSender(me->_server, msg); + soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED); + me->register_client(client); + return; + } else { cout << "UNKNOWN PATH: " << path << endl; soup_message_set_status(msg, SOUP_STATUS_NOT_FOUND); diff --git a/src/engine/HTTPEngineReceiver.hpp b/src/engine/HTTPEngineReceiver.hpp index 07b730b0..a6ed8825 100644 --- a/src/engine/HTTPEngineReceiver.hpp +++ b/src/engine/HTTPEngineReceiver.hpp @@ -54,7 +54,7 @@ private: GHashTable *query, SoupClientContext* client, void* data); ReceiveThread* _receive_thread; - SoupServer* _server; + SoupServer* _server; }; diff --git a/src/engine/wscript b/src/engine/wscript index face717a..1fa05b23 100644 --- a/src/engine/wscript +++ b/src/engine/wscript @@ -92,7 +92,12 @@ def build(bld): if bld.env()['HAVE_SOUP'] == 1: obj = bld.create_obj('cpp', 'shlib') - obj.source = 'QueuedEventSource.cpp QueuedEngineInterface.cpp HTTPEngineReceiver.cpp' + obj.source = ''' + QueuedEventSource.cpp + QueuedEngineInterface.cpp + HTTPClientSender.cpp + HTTPEngineReceiver.cpp + ''' obj.includes = ['.', '..', '../common', './events', '../engine'] obj.name = 'libingen_engine_http' obj.target = 'ingen_engine_http' @@ -101,7 +106,12 @@ def build(bld): if bld.env()['HAVE_LIBLO'] == 1: obj = bld.create_obj('cpp', 'shlib') - obj.source = 'QueuedEventSource.cpp QueuedEngineInterface.cpp OSCClientSender.cpp OSCEngineReceiver.cpp' + obj.source = ''' + QueuedEventSource.cpp + QueuedEngineInterface.cpp + OSCClientSender.cpp + OSCEngineReceiver.cpp + ''' obj.includes = ['.', '..', '../common', './events', '../engine'] obj.name = 'libingen_engine_osc' obj.target = 'ingen_engine_osc' diff --git a/src/shared/HTTPSender.cpp b/src/shared/HTTPSender.cpp new file mode 100644 index 00000000..20135b2a --- /dev/null +++ b/src/shared/HTTPSender.cpp @@ -0,0 +1,81 @@ +/* This file is part of Ingen. + * Copyright (C) 2008 Dave Robillard + * + * Ingen is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "HTTPSender.hpp" +#include +#include +#include +#include + +using namespace std; + +namespace Ingen { +namespace Shared { + + +HTTPSender::HTTPSender(SoupServer* server, SoupMessage* msg) + : _server(server) + , _msg(msg) +{ + soup_message_headers_set_encoding(msg->response_headers, SOUP_ENCODING_CHUNKED); + cout << "Hello?" << endl; + send_chunk("hello"); +} + + +HTTPSender::~HTTPSender() +{ + cout << "HTTP SENDER EXIT" << endl; + soup_message_body_complete(_msg->response_body); + soup_server_unpause_message(_server, _msg); +} + + +void +HTTPSender::bundle_begin() +{ + _send_state = SendingBundle; +} + + +void +HTTPSender::bundle_end() +{ + assert(_send_state == SendingBundle); + soup_message_body_append(_msg->response_body, + SOUP_MEMORY_TEMPORARY, _transfer.c_str(), _transfer.length()); + soup_server_unpause_message(_server, _msg); + _transfer = ""; + _send_state = Immediate; +} + + +void +HTTPSender::send_chunk(const std::string& buf) +{ + if (_send_state == Immediate) { + soup_message_body_append(_msg->response_body, + SOUP_MEMORY_TEMPORARY, buf.c_str(), buf.length()); + soup_server_unpause_message(_server, _msg); + } else { + _transfer.append(buf); + } +} + + +} // namespace Shared +} // namespace Ingen diff --git a/src/shared/HTTPSender.hpp b/src/shared/HTTPSender.hpp new file mode 100644 index 00000000..1025c071 --- /dev/null +++ b/src/shared/HTTPSender.hpp @@ -0,0 +1,57 @@ +/* This file is part of Ingen. + * Copyright (C) 2008 Dave Robillard + * + * Ingen is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef HTTPSENDER_H +#define HTTPSENDER_H + +#include +#include +#include + +namespace Ingen { +namespace Shared { + +class HTTPSender { +public: + HTTPSender(SoupServer* server, SoupMessage* msg); + virtual ~HTTPSender(); + + // Message bundling + void bundle_begin(); + void bundle_end(); + + // Transfers (loose bundling) + void transfer_begin() { bundle_begin(); } + void transfer_end() { bundle_end(); } + +protected: + void send_chunk(const std::string& buf); + + enum SendState { Immediate, SendingBundle }; + + SoupServer* _server; + SoupMessage* _msg; + SendState _send_state; + std::string _transfer; +}; + + +} // namespace Shared +} // namespace Ingen + +#endif // HTTPSENDER_H + diff --git a/src/shared/wscript b/src/shared/wscript index 6a5758a0..a758b8ad 100644 --- a/src/shared/wscript +++ b/src/shared/wscript @@ -12,8 +12,11 @@ def build(bld): Store.cpp runtime_paths.cpp ''' - if bld.env()['HAVE_LIBLO']: + if bld.env()['HAVE_LIBLO'] == 1: obj.source += ' OSCSender.cpp ' + if bld.env()['HAVE_SOUP'] == 1: + autowaf.use_lib(bld, obj, 'SOUP') + obj.source += ' HTTPSender.cpp ' obj.includes = ['.', '../', '../common'] obj.name = 'libingen_shared' obj.target = 'ingen_shared' -- cgit v1.2.1