/* This file is part of Ingen. Copyright 2007-2012 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or any later version. Ingen is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. You should have received a copy of the GNU Affero General Public License along with Ingen. If not, see . */ #include #include #include #include #include #include "ingen/shared/World.hpp" #include "raul/log.hpp" #include "HTTPClientReceiver.hpp" #define LOG(s) s << "[HTTPClientReceiver] " using namespace std; using namespace Raul; namespace Ingen { using namespace Serialisation; namespace Client { HTTPClientReceiver::HTTPClientReceiver( Shared::World* world, const std::string& url, SharedPtr target) : _target(target) , _world(world) , _url(url) { _client_session = soup_session_sync_new(); start(false); assert(_client_session); } HTTPClientReceiver::~HTTPClientReceiver() { stop(); } HTTPClientReceiver::Listener::Listener(HTTPClientReceiver* receiver, const std::string& uri) : _uri(uri) , _receiver(receiver) { const string port_str = uri.substr(uri.find_last_of(":")+1); int port = atoi(port_str.c_str()); LOG(info) << "Client HTTP listen: " << uri << " (port " << port << ")" << endl; struct sockaddr_in servaddr; // Create listen address memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); // Create listen socket if ((_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { LOG(error) << "Error creating listening socket: %s" << strerror(errno) << endl; _sock = -1; return; } // Set remote address (FIXME: always localhost) if (inet_aton("127.0.0.1", &servaddr.sin_addr) <= 0) { LOG(error) << "Invalid remote IP address" << endl; _sock = -1; return; } // Connect to server if (connect(_sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) { LOG(error) << "Error calling connect: " << strerror(errno) << endl; _sock = -1; return; } } HTTPClientReceiver::Listener::~Listener() { close(_sock); } void HTTPClientReceiver::send(SoupMessage* msg) { assert(SOUP_IS_SESSION(_client_session)); assert(SOUP_IS_MESSAGE(msg)); soup_session_queue_message(_client_session, msg, message_callback, this); } void HTTPClientReceiver::close_session() { if (_client_session) { SoupSession* s = _client_session; _client_session = NULL; soup_session_abort(s); } } void HTTPClientReceiver::update(const std::string& str) { //LOG(info) << _world->parser()->parse_update(_world, _target.get(), str, _url); } void HTTPClientReceiver::Listener::_run() { char in = '\0'; char last = '\0'; char llast = '\0'; string recv; while (true) { while (read(_sock, &in, 1) > 0 ) { recv += in; if (in == '\n' && last == '\n' && llast == '\n') { if (!recv.empty()) { _receiver->update(recv); recv.clear(); last = '\0'; llast = '\0'; } break; } llast = last; last = in; } } LOG(info) << "HTTP listener finished" << endl; } void HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, void* ptr) { if (ptr == NULL) return; HTTPClientReceiver* me = (HTTPClientReceiver*)ptr; const string path = soup_message_get_uri(msg)->path; if (msg->response_body->data == NULL) { LOG(error) << "Empty client message" << endl; return; } if (path == "/") { me->_target->response(0, SUCCESS); } else if (path == "/plugins") { if (msg->response_body->data == NULL) { LOG(error) << "Empty response" << endl; } else { Glib::Mutex::Lock lock(me->_mutex); me->_target->response(0, SUCCESS); me->_world->parser()->parse_string(me->_world, me->_target.get(), Glib::ustring(msg->response_body->data), me->_url); } } else if (path.substr(0, 6) == "/patch") { if (msg->response_body->data == NULL) { LOG(error) << "Empty response" << endl; } else { Glib::Mutex::Lock lock(me->_mutex); me->_target->response(0, SUCCESS); me->_world->parser()->parse_string( me->_world, me->_target.get(), Glib::ustring(msg->response_body->data), ""); } } else if (path == "/stream") { if (msg->response_body->data == NULL) { LOG(error) << "Empty response" << endl; } else { Glib::Mutex::Lock lock(me->_mutex); string uri = string(soup_uri_to_string(soup_message_get_uri(msg), false)); uri = uri.substr(0, uri.find_last_of(":")); uri += string(":") + msg->response_body->data; LOG(info) << "Stream URI: " << uri << endl; me->_listener = boost::shared_ptr(new Listener(me, uri)); me->_listener->start(); } } else { LOG(error) << "Unknown message: " << path << endl; me->update(msg->response_body->data); } } void HTTPClientReceiver::start(bool dump) { if (!_world->parser()) _world->load_module("serialisation"); SoupMessage* msg = soup_message_new("GET", (_url + "/stream").c_str()); assert(SOUP_IS_MESSAGE(msg)); soup_session_queue_message(_client_session, msg, message_callback, this); } void HTTPClientReceiver::stop() { //unregister_client(); close_session(); } } // namespace Client } // namespace Ingen