#include "raul/AtomLiblo.hpp"
#include "raul/Path.hpp"
#include "raul/SharedPtr.hpp"
#include "raul/log.hpp"
#include "ingen_config.h"
#include "ingen/Interface.hpp"
#include "../server/ClientBroadcaster.hpp"
#include "../server/Engine.hpp"
#include "OSCClientSender.hpp"
#include "OSCEngineReceiver.hpp"
#define LOG(s) s << "[OSCEngineReceiver] "
using namespace std;
using namespace Raul;
namespace Ingen {
namespace Server {
/** @page engine_osc_namespace Server OSC Namespace Documentation
*
* These are the commands the engine recognizes. A client can control every
* aspect of the engine entirely with these commands.
*
* All commands on this page are in the "control band". If a client needs to
* know about the state of the engine, it must listen to the "notification band".
* See the "Client OSC Namespace Documentation" for details.
*/
OSCEngineReceiver::OSCEngineReceiver(Engine& engine,
SharedPtr interface,
uint16_t port)
: _engine(engine)
, _interface(interface)
, _server(NULL)
{
_receive_thread = new ReceiveThread(*this);
char port_str[6];
snprintf(port_str, sizeof(port_str), "%u", port);
_server = lo_server_new(port_str, error_cb);
if (_server == NULL) {
LOG(error) << "Could not start OSC server. Aborting." << endl;
exit(EXIT_FAILURE);
} else {
char* lo_url = lo_server_get_url(_server);
LOG(info) << "Started OSC server at " << lo_url << endl;
free(lo_url);
}
#ifdef RAUL_LOG_DEBUG
lo_server_add_method(_server, NULL, NULL, generic_cb, NULL);
#endif
// Set response address for this message.
// It's important this is first and returns nonzero.
lo_server_add_method(_server, NULL, NULL, set_response_address_cb, this);
#ifdef LIBLO_BUNDLES
lo_server_add_bundle_handlers(_server, bundle_start_cb, bundle_end_cb, this);
#endif
// Commands
lo_server_add_method(_server, "/ping", "i", ping_cb, this);
lo_server_add_method(_server, "/ping_queued", "i", ping_slow_cb, this);
lo_server_add_method(_server, "/register_client", "i", register_client_cb, this);
lo_server_add_method(_server, "/unregister_client", "i", unregister_client_cb, this);
lo_server_add_method(_server, "/put", NULL, put_cb, this);
lo_server_add_method(_server, "/delta_begin", NULL, delta_begin_cb, this);
lo_server_add_method(_server, "/delta_remove", NULL, delta_remove_cb, this);
lo_server_add_method(_server, "/delta_add", NULL, delta_add_cb, this);
lo_server_add_method(_server, "/delta_end", NULL, delta_end_cb, this);
lo_server_add_method(_server, "/move", "iss", move_cb, this);
lo_server_add_method(_server, "/delete", "is", del_cb, this);
lo_server_add_method(_server, "/connect", "iss", connect_cb, this);
lo_server_add_method(_server, "/disconnect", "iss", disconnect_cb, this);
lo_server_add_method(_server, "/disconnect_all", "iss", disconnect_all_cb, this);
lo_server_add_method(_server, "/note_on", "isii", note_on_cb, this);
lo_server_add_method(_server, "/note_off", "isi", note_off_cb, this);
lo_server_add_method(_server, "/all_notes_off", "isi", all_notes_off_cb, this);
lo_server_add_method(_server, "/learn", "is", learn_cb, this);
lo_server_add_method(_server, "/set_property", NULL, set_property_cb, this);
// Queries
lo_server_add_method(_server, "/get", "is", get_cb, this);
lo_server_add_method(_server, NULL, NULL, unknown_cb, NULL);
_receive_thread->set_name("OSCEngineReceiver Listener");
_receive_thread->start();
_receive_thread->set_scheduling(SCHED_FIFO, 5);
}
OSCEngineReceiver::~OSCEngineReceiver()
{
_receive_thread->stop();
delete _receive_thread;
if (_server != NULL) {
lo_server_free(_server);
_server = NULL;
}
}
/** Override the semaphore driven _run method of ServerInterfaceImpl
* to wait on OSC messages and prepare them right away in the same thread.
*/
void
OSCEngineReceiver::ReceiveThread::_run()
{
/* get a timestamp here and stamp all the events with the same time so
* they all get executed in the same cycle */
while (true) {
// Wait on a message and enqueue it
lo_server_recv(_receiver._server);
// Enqueue every other message that is here "now"
// (would this provide truly atomic bundles?)
while (lo_server_recv_noblock(_receiver._server, 0) > 0) {}
}
}
/** Create a new request for this message, if necessary.
*
* This is based on the fact that the current request is stored in a ref
* counted pointer, and events just take a reference to that. Thus, events
* may delete their request if we've since switched to a new one, or the
* same one can stay around and serve a series of events.
* Hooray for reference counting.
*
* If this message came from the same source as the last message, no allocation
* of requests or lo_addresses or any of it needs to be done. Unfortunately
* the only way to check is by comparing URLs, because liblo addresses suck.
* Lack of a fast liblo address comparison really sucks here, in any case.
*/
int
OSCEngineReceiver::set_response_address_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg, void* user_data)
{
OSCEngineReceiver* const me = reinterpret_cast(user_data);
if (argc < 1 || types[0] != 'i') // Not a valid Ingen message
return 0; // Save liblo the trouble
const int32_t id = argv[0]->i;
const lo_address addr = lo_message_get_source(msg);
char* const url = lo_address_get_url(addr);
if (id != -1) {
// TODO: Cache client
ClientInterface* client = me->_engine.broadcaster()->client(url);
me->_interface->set_response_id(id);
} else {
me->_interface->disable_responses();
}
free(url);
// If this returns 0 no OSC commands will work
return 1;
}
#ifdef LIBLO_BUNDLES
int
OSCEngineReceiver::_bundle_start_cb(lo_timetag time)
{
info << "BUNDLE START" << endl;
return 0;
}
int
OSCEngineReceiver::_bundle_end_cb()
{
info << "BUNDLE END" << endl;
return 0;
}
#endif
void
OSCEngineReceiver::error_cb(int num, const char* msg, const char* path)
{
error << "liblo server error" << num;
if (path) {
error << " for path `" << path << "'";
}
error << " (" << msg << ")" << endl;
}
/** @page engine_osc_namespace
* /ping
* @arg @p response-id :: Integer
*/
int
OSCEngineReceiver::_ping_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const lo_address addr = lo_message_get_source(msg);
if (lo_send(addr, "/ok", "i", argv[0]->i) < 0)
warn << "Unable to send response (" << lo_address_errstr(addr) << ")" << endl;
return 0;
}
/** @page engine_osc_namespace
* /ping_queued
* @arg @p response-id :: Integer
*
* @par
* Reply to sender with a successful response after going through the
* event queue. This is useful for checking if the engine is actually active,
* or for sending after several events as a sentinel and wait on it's response,
* to know when all previous events have finished processing.
*/
int
OSCEngineReceiver::_ping_slow_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
_interface->ping();
return 0;
}
/** @page engine_osc_namespace
* /register_client
* @arg @p response-id :: Integer
*
* @par
* Register a new client with the engine. The incoming address will be
* used for the new registered client.
*/
int
OSCEngineReceiver::_register_client_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
lo_address addr = lo_message_get_source(msg);
char* const url = lo_address_get_url(addr);
ClientInterface* client = new OSCClientSender(
(const char*)url,
_engine.world()->conf()->option("packet-size").get_int32());
_interface->register_client(client);
free(url);
return 0;
}
/** @page engine_osc_namespace
* /unregister_client
* @arg @p response-id :: Integer
*
* @par
* Unregister a client.
*/
int
OSCEngineReceiver::_unregister_client_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
lo_address addr = lo_message_get_source(msg);
char* url = lo_address_get_url(addr);
_interface->unregister_client(url);
free(url);
return 0;
}
/** @page engine_osc_namespace
* /get
* @arg @p response-id :: Integer
* @arg @p uri :: URI String
*
* @par
* Request all properties of an object.
*/
int
OSCEngineReceiver::_get_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
_interface->get(&argv[1]->s);
return 0;
}
/** @page engine_osc_namespace
* /put
* @arg @p response-id :: Integer
* @arg @p path :: String
* @arg @p context :: URI String
* @arg @p predicate :: URI String
* @arg @p value
* @arg @p ...
*
* @par
* PUT a set of properties to a path.
*/
int
OSCEngineReceiver::_put_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const char* obj_path = &argv[1]->s;
const char* ctx = &argv[2]->s;
Resource::Properties prop;
for (int i = 3; i < argc-1; i += 2)
prop.insert(
make_pair(&argv[i]->s,
AtomLiblo::lo_arg_to_atom(_engine.world()->forge(), types[i+1], argv[i+1])));
_interface->put(obj_path, prop, Resource::uri_to_graph(ctx));
return 0;
}
int
OSCEngineReceiver::_delta_begin_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const char* obj_path = &argv[1]->s;
assert(_delta_remove.empty());
assert(_delta_add.empty());
_delta_uri = obj_path;
return 0;
}
int
OSCEngineReceiver::_delta_remove_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
_delta_remove.insert(
make_pair(&argv[1]->s,
AtomLiblo::lo_arg_to_atom(_engine.world()->forge(), types[2], argv[2])));
return 0;
}
int
OSCEngineReceiver::_delta_add_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
_delta_add.insert(
make_pair(&argv[1]->s,
AtomLiblo::lo_arg_to_atom(_engine.world()->forge(), types[2], argv[2])));
return 0;
}
int
OSCEngineReceiver::_delta_end_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
_interface->delta(_delta_uri, _delta_remove, _delta_add);
_delta_uri = Raul::URI();
_delta_remove.clear();
_delta_add.clear();
return 0;
}
/** @page engine_osc_namespace
* /move
* @arg @p response-id :: Integer
* @arg @p old-path :: String
* @arg @p new-path :: String
*
* @par
* MOVE an object to a new path.
*/
int
OSCEngineReceiver::_move_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const char* old_path = &argv[1]->s;
const char* new_path = &argv[2]->s;
_interface->move(old_path, new_path);
return 0;
}
/** @page engine_osc_namespace
* /delete
* @arg @p response-id :: Integer
* @arg @p path :: String
*
* @par
* DELETE an object.
*/
int
OSCEngineReceiver::_del_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const char* uri = &argv[1]->s;
_interface->del(uri);
return 0;
}
/** @page engine_osc_namespace
* /connect
* @arg @p response-id :: Integer
* @arg @p src-port-path :: String
* @arg @p dst-port-path :: String
*
* @par
* Connect two ports (which must be in the same patch).
*/
int
OSCEngineReceiver::_connect_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const char* src_port_path = &argv[1]->s;
const char* dst_port_path = &argv[2]->s;
_interface->connect(src_port_path, dst_port_path);
return 0;
}
/** @page engine_osc_namespace
* /disconnect
* @arg @p response-id :: Integer
* @arg @p src-port-path :: String
* @arg @p dst-port-path :: String
*
* @par
* Disconnect two ports.
*/
int
OSCEngineReceiver::_disconnect_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const char* src_port_path = &argv[1]->s;
const char* dst_port_path = &argv[2]->s;
_interface->disconnect(src_port_path, dst_port_path);
return 0;
}
/** @page engine_osc_namespace
* /disconnect_all
* @arg @p response-id :: Integer
* @arg @p patch-path :: String
* @arg @p object-path :: String
*
* @par
* Disconnect all connections to/from a node/port.
*/
int
OSCEngineReceiver::_disconnect_all_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
const char* patch_path = &argv[1]->s;
const char* object_path = &argv[2]->s;
_interface->disconnect_all(patch_path, object_path);
return 0;
}
/** @page engine_osc_namespace
* /note_on
* @arg @p response-id :: Integer
* @arg @p node-path :: String
* @arg @p note-num (int)
* @arg @p velocity (int)
*
* @par
* Trigger a note-on, just as if it came from MIDI.
*/
int
OSCEngineReceiver::_note_on_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
/*
const char* node_path = &argv[1]->s;
const uint8_t note_num = argv[2]->i;
const uint8_t velocity = argv[3]->i;
*/
warn << "TODO: OSC note on" << endl;
//note_on(node_path, note_num, velocity);
return 0;
}
/** @page engine_osc_namespace
* /note_off
* @arg @p response-id :: Integer
* @arg @p node-path :: String
* @arg @p note-num :: Integer
*
* @par
* Trigger a note-off, just as if it came from MIDI.
*/
int
OSCEngineReceiver::_note_off_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
/*
const char* patch_path = &argv[1]->s;
const uint8_t note_num = argv[2]->i;
*/
warn << "TODO: OSC note off" << endl;
//note_off(patch_path, note_num);
return 0;
}
/** @page engine_osc_namespace
* /all_notes_off
* @arg @p response-id :: Integer
* @arg @p patch-path :: String
*
* @par
* Trigger a note-off for all voices, just as if it came from MIDI.
*/
int
OSCEngineReceiver::_all_notes_off_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
/*
const char* patch_path = &argv[1]->s;
*/
warn << "TODO: OSC all notes off" << endl;
//all_notes_off(patch_path);
return 0;
}
/** @page engine_osc_namespace
* /set_property
* @arg @p response-id :: Integer
* @arg @p uri :: URI String
* @arg @p key :: URI String
* @arg @p value :: String
*
* @par
* Set a property on a graph object.
*/
int
OSCEngineReceiver::_set_property_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg)
{
if (argc != 4 || types[0] != 'i' || types[1] != 's' || types[2] != 's')
return 1;
const char* object_path = &argv[1]->s;
const char* key = &argv[2]->s;
Raul::Atom value = Raul::AtomLiblo::lo_arg_to_atom(_engine.world()->forge(), types[3], argv[3]);
_interface->set_property(object_path, key, value);
return 0;
}
// Static Callbacks //
// Display incoming OSC messages (for debugging purposes)
int
OSCEngineReceiver::generic_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg, void* user_data)
{
printf("[OSCEngineReceiver] %s (%s)\t", path, types);
for (int i=0; i < argc; ++i) {
lo_arg_pp(lo_type(types[i]), argv[i]);
printf("\t");
}
printf("\n");
return 1; // not handled
}
int
OSCEngineReceiver::unknown_cb(const char* path, const char* types, lo_arg** argv, int argc, lo_message msg, void* user_data)
{
const lo_address addr = lo_message_get_source(msg);
char* const url = lo_address_get_url(addr);
warn << "Unknown OSC command " << path << " (" << types << ") "
<< "received from " << url << endl;
string error_msg = "Unknown command: ";
error_msg.append(path).append(" ").append(types);
lo_send(addr, "/error", "s", error_msg.c_str(), LO_ARGS_END);
free(url);
return 0;
}
} // namespace Server
} // namespace Ingen