/*
This file is part of Ingen.
Copyright 2007-2012 David Robillard
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or any later version.
Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
You should have received a copy of the GNU Affero General Public License
along with Ingen. If not, see .
*/
#include
#include
#include
#include
#include
#include "ingen/shared/World.hpp"
#include "raul/log.hpp"
#include "HTTPClientReceiver.hpp"
#define LOG(s) s << "[HTTPClientReceiver] "
using namespace std;
using namespace Raul;
namespace Ingen {
using namespace Serialisation;
namespace Client {
HTTPClientReceiver::HTTPClientReceiver(
Shared::World* world,
const std::string& url,
SharedPtr target)
: _target(target)
, _world(world)
, _url(url)
{
_client_session = soup_session_sync_new();
start(false);
assert(_client_session);
}
HTTPClientReceiver::~HTTPClientReceiver()
{
stop();
}
HTTPClientReceiver::Listener::Listener(HTTPClientReceiver* receiver, const std::string& uri)
: _uri(uri)
, _receiver(receiver)
{
const string port_str = uri.substr(uri.find_last_of(":")+1);
int port = atoi(port_str.c_str());
LOG(info) << "Client HTTP listen: " << uri << " (port " << port << ")" << endl;
struct sockaddr_in servaddr;
// Create listen address
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
// Create listen socket
if ((_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
LOG(error) << "Error creating listening socket: %s" << strerror(errno) << endl;
_sock = -1;
return;
}
// Set remote address (FIXME: always localhost)
if (inet_aton("127.0.0.1", &servaddr.sin_addr) <= 0) {
LOG(error) << "Invalid remote IP address" << endl;
_sock = -1;
return;
}
// Connect to server
if (connect(_sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
LOG(error) << "Error calling connect: " << strerror(errno) << endl;
_sock = -1;
return;
}
}
HTTPClientReceiver::Listener::~Listener()
{
close(_sock);
}
void
HTTPClientReceiver::send(SoupMessage* msg)
{
assert(SOUP_IS_SESSION(_client_session));
assert(SOUP_IS_MESSAGE(msg));
soup_session_queue_message(_client_session, msg, message_callback, this);
}
void
HTTPClientReceiver::close_session()
{
if (_client_session) {
SoupSession* s = _client_session;
_client_session = NULL;
soup_session_abort(s);
}
}
void
HTTPClientReceiver::update(const std::string& str)
{
//LOG(info) << _world->parser()->parse_update(_world, _target.get(), str, _url);
}
void
HTTPClientReceiver::Listener::_run()
{
char in = '\0';
char last = '\0';
char llast = '\0';
string recv;
while (true) {
while (read(_sock, &in, 1) > 0 ) {
recv += in;
if (in == '\n' && last == '\n' && llast == '\n') {
if (!recv.empty()) {
_receiver->update(recv);
recv.clear();
last = '\0';
llast = '\0';
}
break;
}
llast = last;
last = in;
}
}
LOG(info) << "HTTP listener finished" << endl;
}
void
HTTPClientReceiver::message_callback(SoupSession* session, SoupMessage* msg, void* ptr)
{
if (ptr == NULL)
return;
HTTPClientReceiver* me = (HTTPClientReceiver*)ptr;
const string path = soup_message_get_uri(msg)->path;
if (msg->response_body->data == NULL) {
LOG(error) << "Empty client message" << endl;
return;
}
if (path == "/") {
me->_target->response(0, SUCCESS);
} else if (path == "/plugins") {
if (msg->response_body->data == NULL) {
LOG(error) << "Empty response" << endl;
} else {
Glib::Mutex::Lock lock(me->_mutex);
me->_target->response(0, SUCCESS);
me->_world->parser()->parse_string(me->_world, me->_target.get(),
Glib::ustring(msg->response_body->data), me->_url);
}
} else if (path.substr(0, 6) == "/patch") {
if (msg->response_body->data == NULL) {
LOG(error) << "Empty response" << endl;
} else {
Glib::Mutex::Lock lock(me->_mutex);
me->_target->response(0, SUCCESS);
me->_world->parser()->parse_string(
me->_world,
me->_target.get(),
Glib::ustring(msg->response_body->data),
"");
}
} else if (path == "/stream") {
if (msg->response_body->data == NULL) {
LOG(error) << "Empty response" << endl;
} else {
Glib::Mutex::Lock lock(me->_mutex);
string uri = string(soup_uri_to_string(soup_message_get_uri(msg), false));
uri = uri.substr(0, uri.find_last_of(":"));
uri += string(":") + msg->response_body->data;
LOG(info) << "Stream URI: " << uri << endl;
me->_listener = boost::shared_ptr(new Listener(me, uri));
me->_listener->start();
}
} else {
LOG(error) << "Unknown message: " << path << endl;
me->update(msg->response_body->data);
}
}
void
HTTPClientReceiver::start(bool dump)
{
if (!_world->parser())
_world->load_module("serialisation");
SoupMessage* msg = soup_message_new("GET", (_url + "/stream").c_str());
assert(SOUP_IS_MESSAGE(msg));
soup_session_queue_message(_client_session, msg, message_callback, this);
}
void
HTTPClientReceiver::stop()
{
//unregister_client();
close_session();
}
} // namespace Client
} // namespace Ingen