diff options
author | David Robillard <d@drobilla.net> | 2020-02-26 21:13:44 +0100 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2020-02-26 21:13:44 +0100 |
commit | e8df60dc3647c9deac999a7d3ccf73354ab939c4 (patch) | |
tree | fdf2e4bc4c8c0179b5466174000264fcbe52ee31 | |
parent | 96840d3fc46ec6531a3c5b53eee1c0713723f9a7 (diff) | |
download | ingen-e8df60dc3647c9deac999a7d3ccf73354ab939c4.tar.gz ingen-e8df60dc3647c9deac999a7d3ccf73354ab939c4.tar.bz2 ingen-e8df60dc3647c9deac999a7d3ccf73354ab939c4.zip |
Set up serd to read directly from socket without a FILE intermediary
This was problematic because using fdopen() conflicts with poll(). If the FILE
ends up reading more than is actually processed by serd, then poll will not
fire because the socket file descriptor has been fully read, even though there
is pending input in the FILE object (which is buffered).
Avoid this by using a custom read function and calling recv() directly. In
retrospect, this was stupid, but it seemed convenient to be able to use the
handy built-in support for reading from a FILE in serd.
Now, the client and server are using send() and recv() directly on the socket,
as things should be. I am not sure if MSG_WAITALL is the best idea here, or if
it's really important at all, but it seems like a good idea.
-rw-r--r-- | ingen/SocketReader.hpp | 7 | ||||
-rw-r--r-- | src/SocketReader.cpp | 46 |
2 files changed, 34 insertions, 19 deletions
diff --git a/ingen/SocketReader.hpp b/ingen/SocketReader.hpp index 3c3c5f3c..22a0f4aa 100644 --- a/ingen/SocketReader.hpp +++ b/ingen/SocketReader.hpp @@ -45,6 +45,12 @@ protected: virtual void on_hangup() {} private: + /// Serd source function for reading from socket + static size_t c_recv(void* buf, size_t size, size_t nmemb, void* stream); + + /// Serd error function for getting socket error status + static int c_err(void* stream); + void run(); static SerdStatus set_base_uri(SocketReader* iface, @@ -69,6 +75,7 @@ private: SordInserter* _inserter; SordNode* _msg_node; SPtr<Raul::Socket> _socket; + int _socket_error; bool _exit_flag; std::thread _thread; }; diff --git a/src/SocketReader.cpp b/src/SocketReader.cpp index 443c418f..c8dbfe48 100644 --- a/src/SocketReader.cpp +++ b/src/SocketReader.cpp @@ -46,6 +46,7 @@ SocketReader::SocketReader(ingen::World& world, , _inserter(nullptr) , _msg_node(nullptr) , _socket(std::move(sock)) + , _socket_error(0) , _exit_flag(false) , _thread(&SocketReader::run, this) {} @@ -93,23 +94,34 @@ SocketReader::write_statement(SocketReader* iface, object_datatype, object_lang); } +size_t +SocketReader::c_recv(void* buf, size_t size, size_t nmemb, void* stream) +{ + SocketReader* self = (SocketReader*)stream; + + const ssize_t c = recv(self->_socket->fd(), buf, size * nmemb, MSG_WAITALL); + if (c < 0) { + self->_socket_error = errno; + return 0; + } + + return c; +} + +int +SocketReader::c_err(void* stream) +{ + SocketReader* self = (SocketReader*)stream; + + return self->_socket_error; +} + void SocketReader::run() { Sord::World* world = _world.rdf_world(); LV2_URID_Map& map = _world.uri_map().urid_map_feature()->urid_map; - // Open socket as a FILE for reading directly with serd - std::unique_ptr<FILE, decltype(&fclose)> f{fdopen(_socket->fd(), "r"), - &fclose}; - if (!f) { - _world.log().error("Failed to open connection (%1%)\n", - strerror(errno)); - // Connection gone, exit - _socket.reset(); - return; - } - // Set up a forge to build LV2 atoms from model SordNode* base_uri = nullptr; SordModel* model = nullptr; @@ -137,7 +149,8 @@ SocketReader::run() nullptr); serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri)); - serd_reader_start_stream(reader, f.get(), (const uint8_t*)"(socket)", false); + serd_reader_start_source_stream( + reader, c_recv, c_err, this, (const uint8_t*)"(socket)", 1); // Make an AtomReader to call Ingen Interface methods based on Atom AtomReader ar(_world.uri_map(), _world.uris(), _world.log(), _iface); @@ -147,13 +160,9 @@ SocketReader::run() pfd.events = POLLIN; pfd.revents = 0; - while (!_exit_flag) { - if (feof(f.get())) { - break; // Lost connection - } - + while (!_exit_flag && !_socket_error) { // Wait for input to arrive at socket - int ret = poll(&pfd, 1, -1); + const int ret = poll(&pfd, 1, -1); if (ret == -1 || (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) { on_hangup(); break; // Hangup @@ -189,7 +198,6 @@ SocketReader::run() std::lock_guard<std::mutex> lock(_world.rdf_mutex()); // Destroy everything - f.reset(); sord_inserter_free(_inserter); serd_reader_end_stream(reader); serd_reader_free(reader); |