summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2020-02-26 21:13:44 +0100
committerDavid Robillard <d@drobilla.net>2020-02-26 21:13:44 +0100
commite8df60dc3647c9deac999a7d3ccf73354ab939c4 (patch)
treefdf2e4bc4c8c0179b5466174000264fcbe52ee31
parent96840d3fc46ec6531a3c5b53eee1c0713723f9a7 (diff)
downloadingen-e8df60dc3647c9deac999a7d3ccf73354ab939c4.tar.gz
ingen-e8df60dc3647c9deac999a7d3ccf73354ab939c4.tar.bz2
ingen-e8df60dc3647c9deac999a7d3ccf73354ab939c4.zip
Set up serd to read directly from socket without a FILE intermediary
This was problematic because using fdopen() conflicts with poll(). If the FILE ends up reading more than is actually processed by serd, then poll will not fire because the socket file descriptor has been fully read, even though there is pending input in the FILE object (which is buffered). Avoid this by using a custom read function and calling recv() directly. In retrospect, this was stupid, but it seemed convenient to be able to use the handy built-in support for reading from a FILE in serd. Now, the client and server are using send() and recv() directly on the socket, as things should be. I am not sure if MSG_WAITALL is the best idea here, or if it's really important at all, but it seems like a good idea.
-rw-r--r--ingen/SocketReader.hpp7
-rw-r--r--src/SocketReader.cpp46
2 files changed, 34 insertions, 19 deletions
diff --git a/ingen/SocketReader.hpp b/ingen/SocketReader.hpp
index 3c3c5f3c..22a0f4aa 100644
--- a/ingen/SocketReader.hpp
+++ b/ingen/SocketReader.hpp
@@ -45,6 +45,12 @@ protected:
virtual void on_hangup() {}
private:
+ /// Serd source function for reading from socket
+ static size_t c_recv(void* buf, size_t size, size_t nmemb, void* stream);
+
+ /// Serd error function for getting socket error status
+ static int c_err(void* stream);
+
void run();
static SerdStatus set_base_uri(SocketReader* iface,
@@ -69,6 +75,7 @@ private:
SordInserter* _inserter;
SordNode* _msg_node;
SPtr<Raul::Socket> _socket;
+ int _socket_error;
bool _exit_flag;
std::thread _thread;
};
diff --git a/src/SocketReader.cpp b/src/SocketReader.cpp
index 443c418f..c8dbfe48 100644
--- a/src/SocketReader.cpp
+++ b/src/SocketReader.cpp
@@ -46,6 +46,7 @@ SocketReader::SocketReader(ingen::World& world,
, _inserter(nullptr)
, _msg_node(nullptr)
, _socket(std::move(sock))
+ , _socket_error(0)
, _exit_flag(false)
, _thread(&SocketReader::run, this)
{}
@@ -93,23 +94,34 @@ SocketReader::write_statement(SocketReader* iface,
object_datatype, object_lang);
}
+size_t
+SocketReader::c_recv(void* buf, size_t size, size_t nmemb, void* stream)
+{
+ SocketReader* self = (SocketReader*)stream;
+
+ const ssize_t c = recv(self->_socket->fd(), buf, size * nmemb, MSG_WAITALL);
+ if (c < 0) {
+ self->_socket_error = errno;
+ return 0;
+ }
+
+ return c;
+}
+
+int
+SocketReader::c_err(void* stream)
+{
+ SocketReader* self = (SocketReader*)stream;
+
+ return self->_socket_error;
+}
+
void
SocketReader::run()
{
Sord::World* world = _world.rdf_world();
LV2_URID_Map& map = _world.uri_map().urid_map_feature()->urid_map;
- // Open socket as a FILE for reading directly with serd
- std::unique_ptr<FILE, decltype(&fclose)> f{fdopen(_socket->fd(), "r"),
- &fclose};
- if (!f) {
- _world.log().error("Failed to open connection (%1%)\n",
- strerror(errno));
- // Connection gone, exit
- _socket.reset();
- return;
- }
-
// Set up a forge to build LV2 atoms from model
SordNode* base_uri = nullptr;
SordModel* model = nullptr;
@@ -137,7 +149,8 @@ SocketReader::run()
nullptr);
serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri));
- serd_reader_start_stream(reader, f.get(), (const uint8_t*)"(socket)", false);
+ serd_reader_start_source_stream(
+ reader, c_recv, c_err, this, (const uint8_t*)"(socket)", 1);
// Make an AtomReader to call Ingen Interface methods based on Atom
AtomReader ar(_world.uri_map(), _world.uris(), _world.log(), _iface);
@@ -147,13 +160,9 @@ SocketReader::run()
pfd.events = POLLIN;
pfd.revents = 0;
- while (!_exit_flag) {
- if (feof(f.get())) {
- break; // Lost connection
- }
-
+ while (!_exit_flag && !_socket_error) {
// Wait for input to arrive at socket
- int ret = poll(&pfd, 1, -1);
+ const int ret = poll(&pfd, 1, -1);
if (ret == -1 || (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) {
on_hangup();
break; // Hangup
@@ -189,7 +198,6 @@ SocketReader::run()
std::lock_guard<std::mutex> lock(_world.rdf_mutex());
// Destroy everything
- f.reset();
sord_inserter_free(_inserter);
serd_reader_end_stream(reader);
serd_reader_free(reader);