From e8df60dc3647c9deac999a7d3ccf73354ab939c4 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Wed, 26 Feb 2020 21:13:44 +0100 Subject: Set up serd to read directly from socket without a FILE intermediary This was problematic because using fdopen() conflicts with poll(). If the FILE ends up reading more than is actually processed by serd, then poll will not fire because the socket file descriptor has been fully read, even though there is pending input in the FILE object (which is buffered). Avoid this by using a custom read function and calling recv() directly. In retrospect, this was stupid, but it seemed convenient to be able to use the handy built-in support for reading from a FILE in serd. Now, the client and server are using send() and recv() directly on the socket, as things should be. I am not sure if MSG_WAITALL is the best idea here, or if it's really important at all, but it seems like a good idea. --- ingen/SocketReader.hpp | 7 +++++++ src/SocketReader.cpp | 46 +++++++++++++++++++++++++++------------------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/ingen/SocketReader.hpp b/ingen/SocketReader.hpp index 3c3c5f3c..22a0f4aa 100644 --- a/ingen/SocketReader.hpp +++ b/ingen/SocketReader.hpp @@ -45,6 +45,12 @@ protected: virtual void on_hangup() {} private: + /// Serd source function for reading from socket + static size_t c_recv(void* buf, size_t size, size_t nmemb, void* stream); + + /// Serd error function for getting socket error status + static int c_err(void* stream); + void run(); static SerdStatus set_base_uri(SocketReader* iface, @@ -69,6 +75,7 @@ private: SordInserter* _inserter; SordNode* _msg_node; SPtr _socket; + int _socket_error; bool _exit_flag; std::thread _thread; }; diff --git a/src/SocketReader.cpp b/src/SocketReader.cpp index 443c418f..c8dbfe48 100644 --- a/src/SocketReader.cpp +++ b/src/SocketReader.cpp @@ -46,6 +46,7 @@ SocketReader::SocketReader(ingen::World& world, , _inserter(nullptr) , _msg_node(nullptr) , _socket(std::move(sock)) + , _socket_error(0) , _exit_flag(false) , _thread(&SocketReader::run, this) {} @@ -93,23 +94,34 @@ SocketReader::write_statement(SocketReader* iface, object_datatype, object_lang); } +size_t +SocketReader::c_recv(void* buf, size_t size, size_t nmemb, void* stream) +{ + SocketReader* self = (SocketReader*)stream; + + const ssize_t c = recv(self->_socket->fd(), buf, size * nmemb, MSG_WAITALL); + if (c < 0) { + self->_socket_error = errno; + return 0; + } + + return c; +} + +int +SocketReader::c_err(void* stream) +{ + SocketReader* self = (SocketReader*)stream; + + return self->_socket_error; +} + void SocketReader::run() { Sord::World* world = _world.rdf_world(); LV2_URID_Map& map = _world.uri_map().urid_map_feature()->urid_map; - // Open socket as a FILE for reading directly with serd - std::unique_ptr f{fdopen(_socket->fd(), "r"), - &fclose}; - if (!f) { - _world.log().error("Failed to open connection (%1%)\n", - strerror(errno)); - // Connection gone, exit - _socket.reset(); - return; - } - // Set up a forge to build LV2 atoms from model SordNode* base_uri = nullptr; SordModel* model = nullptr; @@ -137,7 +149,8 @@ SocketReader::run() nullptr); serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri)); - serd_reader_start_stream(reader, f.get(), (const uint8_t*)"(socket)", false); + serd_reader_start_source_stream( + reader, c_recv, c_err, this, (const uint8_t*)"(socket)", 1); // Make an AtomReader to call Ingen Interface methods based on Atom AtomReader ar(_world.uri_map(), _world.uris(), _world.log(), _iface); @@ -147,13 +160,9 @@ SocketReader::run() pfd.events = POLLIN; pfd.revents = 0; - while (!_exit_flag) { - if (feof(f.get())) { - break; // Lost connection - } - + while (!_exit_flag && !_socket_error) { // Wait for input to arrive at socket - int ret = poll(&pfd, 1, -1); + const int ret = poll(&pfd, 1, -1); if (ret == -1 || (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) { on_hangup(); break; // Hangup @@ -189,7 +198,6 @@ SocketReader::run() std::lock_guard lock(_world.rdf_mutex()); // Destroy everything - f.reset(); sord_inserter_free(_inserter); serd_reader_end_stream(reader); serd_reader_free(reader); -- cgit v1.2.1