diff options
-rw-r--r-- | ingen/SocketReader.hpp | 7 | ||||
-rw-r--r-- | src/SocketReader.cpp | 46 |
2 files changed, 34 insertions, 19 deletions
diff --git a/ingen/SocketReader.hpp b/ingen/SocketReader.hpp index 3c3c5f3c..22a0f4aa 100644 --- a/ingen/SocketReader.hpp +++ b/ingen/SocketReader.hpp @@ -45,6 +45,12 @@ protected: virtual void on_hangup() {} private: + /// Serd source function for reading from socket + static size_t c_recv(void* buf, size_t size, size_t nmemb, void* stream); + + /// Serd error function for getting socket error status + static int c_err(void* stream); + void run(); static SerdStatus set_base_uri(SocketReader* iface, @@ -69,6 +75,7 @@ private: SordInserter* _inserter; SordNode* _msg_node; SPtr<Raul::Socket> _socket; + int _socket_error; bool _exit_flag; std::thread _thread; }; diff --git a/src/SocketReader.cpp b/src/SocketReader.cpp index 443c418f..c8dbfe48 100644 --- a/src/SocketReader.cpp +++ b/src/SocketReader.cpp @@ -46,6 +46,7 @@ SocketReader::SocketReader(ingen::World& world, , _inserter(nullptr) , _msg_node(nullptr) , _socket(std::move(sock)) + , _socket_error(0) , _exit_flag(false) , _thread(&SocketReader::run, this) {} @@ -93,23 +94,34 @@ SocketReader::write_statement(SocketReader* iface, object_datatype, object_lang); } +size_t +SocketReader::c_recv(void* buf, size_t size, size_t nmemb, void* stream) +{ + SocketReader* self = (SocketReader*)stream; + + const ssize_t c = recv(self->_socket->fd(), buf, size * nmemb, MSG_WAITALL); + if (c < 0) { + self->_socket_error = errno; + return 0; + } + + return c; +} + +int +SocketReader::c_err(void* stream) +{ + SocketReader* self = (SocketReader*)stream; + + return self->_socket_error; +} + void SocketReader::run() { Sord::World* world = _world.rdf_world(); LV2_URID_Map& map = _world.uri_map().urid_map_feature()->urid_map; - // Open socket as a FILE for reading directly with serd - std::unique_ptr<FILE, decltype(&fclose)> f{fdopen(_socket->fd(), "r"), - &fclose}; - if (!f) { - _world.log().error("Failed to open connection (%1%)\n", - strerror(errno)); - // Connection gone, exit - _socket.reset(); - return; - } - // Set up a forge to build LV2 atoms from model SordNode* base_uri = nullptr; SordModel* model = nullptr; @@ -137,7 +149,8 @@ SocketReader::run() nullptr); serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri)); - serd_reader_start_stream(reader, f.get(), (const uint8_t*)"(socket)", false); + serd_reader_start_source_stream( + reader, c_recv, c_err, this, (const uint8_t*)"(socket)", 1); // Make an AtomReader to call Ingen Interface methods based on Atom AtomReader ar(_world.uri_map(), _world.uris(), _world.log(), _iface); @@ -147,13 +160,9 @@ SocketReader::run() pfd.events = POLLIN; pfd.revents = 0; - while (!_exit_flag) { - if (feof(f.get())) { - break; // Lost connection - } - + while (!_exit_flag && !_socket_error) { // Wait for input to arrive at socket - int ret = poll(&pfd, 1, -1); + const int ret = poll(&pfd, 1, -1); if (ret == -1 || (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) { on_hangup(); break; // Hangup @@ -189,7 +198,6 @@ SocketReader::run() std::lock_guard<std::mutex> lock(_world.rdf_mutex()); // Destroy everything - f.reset(); sord_inserter_free(_inserter); serd_reader_end_stream(reader); serd_reader_free(reader); |