From 6932da9169a38a5a8eafc63357b9ede00cb46117 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Wed, 9 May 2012 16:06:59 +0000 Subject: Support TCP sockets. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4328 a436a847-0d15-0410-975c-d299462d15a1 --- src/socket/SocketListener.cpp | 160 +++++++++++++++++++++++++++++++++--------- src/socket/SocketListener.hpp | 26 ++++++- src/socket/SocketReader.cpp | 10 ++- 3 files changed, 158 insertions(+), 38 deletions(-) (limited to 'src/socket') diff --git a/src/socket/SocketListener.cpp b/src/socket/SocketListener.cpp index 79ad11c4..eadeedd8 100644 --- a/src/socket/SocketListener.cpp +++ b/src/socket/SocketListener.cpp @@ -14,10 +14,16 @@ along with Ingen. If not, see . */ +#include +#include +#include #include #include #include +#include +#include + #include "ingen/Interface.hpp" #include "ingen/shared/World.hpp" #include "ingen/shared/AtomReader.hpp" @@ -34,39 +40,93 @@ namespace Ingen { namespace Socket { -SocketListener::SocketListener(Ingen::Shared::World& world) - : _world(world) +bool +SocketListener::Socket::open(const std::string& uri, + int domain, + struct sockaddr* a, + socklen_t s) { - set_name("SocketListener"); + addr = a; + addr_len = s; + sock = socket(domain, SOCK_STREAM, 0); + if (sock == -1) { + return false; + } + + if (bind(sock, addr, addr_len) == -1) { + LOG(Raul::error) << "Failed to bind " << uri << std::endl; + return false; + } - // Create server socket - _sock = socket(AF_UNIX, SOCK_STREAM, 0); - if (_sock == -1) { - LOG(Raul::error) << "Failed to create socket" << std::endl; - return; + if (listen(sock, 64) == -1) { + LOG(Raul::error) << "Failed to listen on " << uri << std::endl; + return false; + } else { + LOG(Raul::info) << "Listening on " << uri << std::endl; } - _sock_path = world.conf()->option("socket").get_string(); + return true; +} + +int +SocketListener::Socket::accept() +{ + // Accept connection from client + socklen_t client_addr_len = addr_len; + struct sockaddr* client_addr = (struct sockaddr*)calloc( + 1, client_addr_len); + + int conn = ::accept(sock, client_addr, &client_addr_len); + if (conn == -1) { + LOG(Raul::error) << "Error accepting connection: " + << strerror(errno) << std::endl; + } + + return conn; +} + +void +SocketListener::Socket::close() +{ + if (sock != -1) { + ::close(sock); + sock = -1; + } +} - // Make server socket address - struct sockaddr_un addr; - memset(&addr, 0, sizeof(struct sockaddr_un)); - addr.sun_family = AF_UNIX; - strncpy(addr.sun_path, _sock_path.c_str(), sizeof(addr.sun_path) - 1); +SocketListener::SocketListener(Ingen::Shared::World& world) + : _world(world) +{ + set_name("SocketListener"); - // Bind socket to address - if (bind(_sock, (struct sockaddr*)&addr, - sizeof(struct sockaddr_un)) == -1) { - LOG(Raul::error) << "Failed to bind socket" << std::endl; - return; + // Create UNIX socket + _unix_path = world.conf()->option("socket").get_string(); + std::string unix_uri = "turtle+unix://" + _unix_path; + struct sockaddr_un* uaddr = (struct sockaddr_un*)calloc( + 1, sizeof(struct sockaddr_un)); + uaddr->sun_family = AF_UNIX; + strncpy(uaddr->sun_path, _unix_path.c_str(), sizeof(uaddr->sun_path) - 1); + if (!_unix_sock.open(unix_uri, AF_UNIX, + (struct sockaddr*)uaddr, + sizeof(struct sockaddr_un))) { + LOG(Raul::error) << "Failed to create UNIX socket" << std::endl; } - // Mark socket as a passive socket for accepting incoming connections - if (listen(_sock, 64) == -1) { - LOG(Raul::error) << "Failed to listen on socket" << std::endl; + // Create TCP socket + int port = world.conf()->option("engine-port").get_int(); + std::ostringstream ss; + ss << "turtle+tcp:///localhost:"; + ss << port; + struct sockaddr_in* naddr = (struct sockaddr_in*)calloc( + 1, sizeof(struct sockaddr_in)); + naddr->sin_family = AF_INET; + naddr->sin_port = htons(port); + if (!_net_sock.open(ss.str(), AF_INET, + (struct sockaddr*)naddr, + sizeof(struct sockaddr_in))) { + LOG(Raul::error) << "Failed to create TCP socket" << std::endl; } - LOG(Raul::info) << "Listening on socket at " << _sock_path << std::endl; start(); } @@ -74,27 +134,57 @@ SocketListener::~SocketListener() { stop(); join(); - close(_sock); - unlink(_sock_path.c_str()); + _unix_sock.close(); + _net_sock.close(); + unlink(_unix_path.c_str()); } void SocketListener::_run() { + Server::Engine* engine = (Server::Engine*)_world.local_engine().get(); + + struct pollfd pfds[2]; + int nfds = 0; + if (_unix_sock.sock != -1) { + pfds[nfds].fd = _unix_sock.sock; + pfds[nfds].events = POLLIN; + pfds[nfds].revents = 0; + ++nfds; + } + if (_net_sock.sock != -1) { + pfds[nfds].fd = _net_sock.sock; + pfds[nfds].events = POLLIN; + pfds[nfds].revents = 0; + ++nfds; + } + while (!_exit_flag) { - // Accept connection from client - socklen_t client_addr_size = sizeof(struct sockaddr_un); - struct sockaddr_un client_addr; - int conn = accept(_sock, (struct sockaddr*)&client_addr, - &client_addr_size); - if (conn == -1) { - LOG(Raul::error) << "Error accepting connection" << std::endl; + // Wait for input to arrive at a socket + int ret = poll(pfds, nfds, -1); + if (ret == -1) { + LOG(Raul::error) << "Poll error: " << strerror(errno) << std::endl; + break; + } else if (ret == 0) { + LOG(Raul::error) << "Poll returned with no data" << std::endl; continue; } - // Make an new interface/thread to handle the connection - Server::Engine* engine = (Server::Engine*)_world.local_engine().get(); - new SocketReader(_world, *engine->interface(), conn); + if (pfds[0].revents & POLLIN) { + int conn = _unix_sock.accept(); + if (conn != -1) { + // Make an new interface/thread to handle the connection + new SocketReader(_world, *engine->interface(), conn); + } + } + + if (pfds[1].revents & POLLIN) { + int conn = _net_sock.accept(); + if (conn != -1) { + // Make an new interface/thread to handle the connection + new SocketReader(_world, *engine->interface(), conn); + } + } } } diff --git a/src/socket/SocketListener.hpp b/src/socket/SocketListener.hpp index 10e9d1d2..6d70c78a 100644 --- a/src/socket/SocketListener.hpp +++ b/src/socket/SocketListener.hpp @@ -14,6 +14,8 @@ along with Ingen. If not, see . */ +#include + #include #include "raul/SharedPtr.hpp" @@ -33,12 +35,32 @@ public: SocketListener(Ingen::Shared::World& world); ~SocketListener(); + struct Socket { + Socket() : addr(NULL), addr_len(0), sock(-1) {} + ~Socket() { close(); } + + bool open(const std::string& uri, + int domain, + struct sockaddr* addr, + socklen_t addr_len); + + int accept(); + + void close(); + + std::string uri; + struct sockaddr* addr; + socklen_t addr_len; + int sock; + }; + private: virtual void _run(); Ingen::Shared::World& _world; - std::string _sock_path; - int _sock; + std::string _unix_path; + Socket _unix_sock; + Socket _net_sock; }; } // namespace Ingen diff --git a/src/socket/SocketReader.cpp b/src/socket/SocketReader.cpp index 8b3ed7a1..2d1b5c14 100644 --- a/src/socket/SocketReader.cpp +++ b/src/socket/SocketReader.cpp @@ -142,9 +142,13 @@ SocketReader::_run() pfd.revents = 0; while (!_exit_flag) { + if (feof(f)) { + break; // Lost connection + } + // Wait for input to arrive at socket int ret = poll(&pfd, 1, -1); - if (ret == -1 || (pfd.revents & (POLLHUP | POLLNVAL))) { + if (ret == -1 || (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) { break; // Hangup } else if (!ret) { continue; // No data, shouldn't happen @@ -156,6 +160,10 @@ SocketReader::_run() continue; // Read nothing, e.g. just whitespace } else if (st) { fprintf(stderr, "Read error: %s\n", serd_strerror(st)); + continue; + } else if (!_msg_node) { + LOG(Raul::error) << "Received empty message" << std::endl; + continue; } // Build an LV2_Atom at chunk.buf from the message -- cgit v1.2.1