summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ingen/SocketReader.hpp7
-rw-r--r--src/SocketReader.cpp46
2 files changed, 34 insertions, 19 deletions
diff --git a/ingen/SocketReader.hpp b/ingen/SocketReader.hpp
index 3c3c5f3c..22a0f4aa 100644
--- a/ingen/SocketReader.hpp
+++ b/ingen/SocketReader.hpp
@@ -45,6 +45,12 @@ protected:
virtual void on_hangup() {}
private:
+ /// Serd source function for reading from socket
+ static size_t c_recv(void* buf, size_t size, size_t nmemb, void* stream);
+
+ /// Serd error function for getting socket error status
+ static int c_err(void* stream);
+
void run();
static SerdStatus set_base_uri(SocketReader* iface,
@@ -69,6 +75,7 @@ private:
SordInserter* _inserter;
SordNode* _msg_node;
SPtr<Raul::Socket> _socket;
+ int _socket_error;
bool _exit_flag;
std::thread _thread;
};
diff --git a/src/SocketReader.cpp b/src/SocketReader.cpp
index 443c418f..c8dbfe48 100644
--- a/src/SocketReader.cpp
+++ b/src/SocketReader.cpp
@@ -46,6 +46,7 @@ SocketReader::SocketReader(ingen::World& world,
, _inserter(nullptr)
, _msg_node(nullptr)
, _socket(std::move(sock))
+ , _socket_error(0)
, _exit_flag(false)
, _thread(&SocketReader::run, this)
{}
@@ -93,23 +94,34 @@ SocketReader::write_statement(SocketReader* iface,
object_datatype, object_lang);
}
+size_t
+SocketReader::c_recv(void* buf, size_t size, size_t nmemb, void* stream)
+{
+ SocketReader* self = (SocketReader*)stream;
+
+ const ssize_t c = recv(self->_socket->fd(), buf, size * nmemb, MSG_WAITALL);
+ if (c < 0) {
+ self->_socket_error = errno;
+ return 0;
+ }
+
+ return c;
+}
+
+int
+SocketReader::c_err(void* stream)
+{
+ SocketReader* self = (SocketReader*)stream;
+
+ return self->_socket_error;
+}
+
void
SocketReader::run()
{
Sord::World* world = _world.rdf_world();
LV2_URID_Map& map = _world.uri_map().urid_map_feature()->urid_map;
- // Open socket as a FILE for reading directly with serd
- std::unique_ptr<FILE, decltype(&fclose)> f{fdopen(_socket->fd(), "r"),
- &fclose};
- if (!f) {
- _world.log().error("Failed to open connection (%1%)\n",
- strerror(errno));
- // Connection gone, exit
- _socket.reset();
- return;
- }
-
// Set up a forge to build LV2 atoms from model
SordNode* base_uri = nullptr;
SordModel* model = nullptr;
@@ -137,7 +149,8 @@ SocketReader::run()
nullptr);
serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri));
- serd_reader_start_stream(reader, f.get(), (const uint8_t*)"(socket)", false);
+ serd_reader_start_source_stream(
+ reader, c_recv, c_err, this, (const uint8_t*)"(socket)", 1);
// Make an AtomReader to call Ingen Interface methods based on Atom
AtomReader ar(_world.uri_map(), _world.uris(), _world.log(), _iface);
@@ -147,13 +160,9 @@ SocketReader::run()
pfd.events = POLLIN;
pfd.revents = 0;
- while (!_exit_flag) {
- if (feof(f.get())) {
- break; // Lost connection
- }
-
+ while (!_exit_flag && !_socket_error) {
// Wait for input to arrive at socket
- int ret = poll(&pfd, 1, -1);
+ const int ret = poll(&pfd, 1, -1);
if (ret == -1 || (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) {
on_hangup();
break; // Hangup
@@ -189,7 +198,6 @@ SocketReader::run()
std::lock_guard<std::mutex> lock(_world.rdf_mutex());
// Destroy everything
- f.reset();
sord_inserter_free(_inserter);
serd_reader_end_stream(reader);
serd_reader_free(reader);