summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2013-02-04 00:54:25 +0000
committerDavid Robillard <d@drobilla.net>2013-02-04 00:54:25 +0000
commit46c3a486eee4b2ef69d9cc4f9e2701082c64d7c8 (patch)
treeecf7a89fb1c28a619106dd78ccd257a1ff52c70a
parent69f98d63f5bd22c82208fef5fbc2a61613541bd7 (diff)
downloadingen-46c3a486eee4b2ef69d9cc4f9e2701082c64d7c8.tar.gz
ingen-46c3a486eee4b2ef69d9cc4f9e2701082c64d7c8.tar.bz2
ingen-46c3a486eee4b2ef69d9cc4f9e2701082c64d7c8.zip
Replace Raul::thread with std::thread.
git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@5047 a436a847-0d15-0410-975c-d299462d15a1
-rw-r--r--src/gui/ThreadedLoader.cpp12
-rw-r--r--src/gui/ThreadedLoader.hpp9
-rw-r--r--src/server/Broadcaster.cpp11
-rw-r--r--src/server/Broadcaster.hpp7
-rw-r--r--src/server/BufferFactory.hpp6
-rw-r--r--src/server/Engine.cpp4
-rw-r--r--src/server/InternalPlugin.hpp6
-rw-r--r--src/server/PreProcessor.cpp17
-rw-r--r--src/server/PreProcessor.hpp18
-rw-r--r--src/server/ThreadManager.hpp2
-rw-r--r--src/server/Worker.cpp12
-rw-r--r--src/server/Worker.hpp9
-rw-r--r--src/server/ingen_lv2.cpp54
-rw-r--r--src/socket/SocketClient.hpp4
-rw-r--r--src/socket/SocketListener.cpp128
-rw-r--r--src/socket/SocketListener.hpp46
-rw-r--r--src/socket/SocketReader.cpp14
-rw-r--r--src/socket/SocketReader.hpp9
-rw-r--r--src/socket/ingen_socket_server.cpp118
-rw-r--r--src/socket/wscript1
-rw-r--r--tests/ingen_test.cpp1
21 files changed, 199 insertions, 289 deletions
diff --git a/src/gui/ThreadedLoader.cpp b/src/gui/ThreadedLoader.cpp
index 597a728d..5c33c1de 100644
--- a/src/gui/ThreadedLoader.cpp
+++ b/src/gui/ThreadedLoader.cpp
@@ -31,14 +31,13 @@ namespace Ingen {
namespace GUI {
ThreadedLoader::ThreadedLoader(App& app, SPtr<Interface> engine)
- : Raul::Thread()
- , _app(app)
+ : _app(app)
, _sem(0)
, _engine(engine)
+ , _exit_flag(false)
+ , _thread(&ThreadedLoader::run, this)
{
- if (parser()) {
- start();
- } else {
+ if (!parser()) {
app.log().warn("Parser unavailable, graph loading disabled\n");
}
}
@@ -47,6 +46,7 @@ ThreadedLoader::~ThreadedLoader()
{
_exit_flag = true;
_sem.post();
+ _thread.join();
}
SPtr<Serialisation::Parser>
@@ -61,7 +61,7 @@ ThreadedLoader::parser()
}
void
-ThreadedLoader::_run()
+ThreadedLoader::run()
{
while (_sem.wait() && !_exit_flag) {
_mutex.lock();
diff --git a/src/gui/ThreadedLoader.hpp b/src/gui/ThreadedLoader.hpp
index 815cfeec..7f4de063 100644
--- a/src/gui/ThreadedLoader.hpp
+++ b/src/gui/ThreadedLoader.hpp
@@ -17,6 +17,8 @@
#ifndef INGEN_GUI_THREADEDLOADER_HPP
#define INGEN_GUI_THREADEDLOADER_HPP
+#include <thread>
+
#include <cassert>
#include <list>
#include <string>
@@ -28,7 +30,6 @@
#include "ingen/serialisation/Parser.hpp"
#include "ingen/serialisation/Serialiser.hpp"
#include "raul/Semaphore.hpp"
-#include "raul/Thread.hpp"
namespace Ingen {
namespace GUI {
@@ -44,7 +45,7 @@ namespace GUI {
*
* \ingroup GUI
*/
-class ThreadedLoader : public Raul::Thread
+class ThreadedLoader
{
public:
ThreadedLoader(App& app,
@@ -70,13 +71,15 @@ private:
/** Returns nothing and takes no parameters (because they have all been bound) */
typedef sigc::slot<void> Closure;
- void _run();
+ void run();
App& _app;
Raul::Semaphore _sem;
SPtr<Interface> _engine;
Glib::Mutex _mutex;
std::list<Closure> _events;
+ bool _exit_flag;
+ std::thread _thread;
};
} // namespace GUI
diff --git a/src/server/Broadcaster.cpp b/src/server/Broadcaster.cpp
index 8e53164f..fa8624df 100644
--- a/src/server/Broadcaster.cpp
+++ b/src/server/Broadcaster.cpp
@@ -15,7 +15,6 @@
*/
#include <utility>
-#include <glibmm/thread.h>
#include "ingen/Interface.hpp"
@@ -33,7 +32,7 @@ Broadcaster::Broadcaster()
Broadcaster::~Broadcaster()
{
- Glib::Mutex::Lock lock(_clients_mutex);
+ std::lock_guard<std::mutex> lock(_clients_mutex);
_clients.clear();
_broadcastees.clear();
}
@@ -44,7 +43,7 @@ void
Broadcaster::register_client(const Raul::URI& uri,
SPtr<Interface> client)
{
- Glib::Mutex::Lock lock(_clients_mutex);
+ std::lock_guard<std::mutex> lock(_clients_mutex);
_clients[uri] = client;
}
@@ -55,7 +54,7 @@ Broadcaster::register_client(const Raul::URI& uri,
bool
Broadcaster::unregister_client(const Raul::URI& uri)
{
- Glib::Mutex::Lock lock(_clients_mutex);
+ std::lock_guard<std::mutex> lock(_clients_mutex);
const size_t erased = _clients.erase(uri);
_broadcastees.erase(uri);
return (erased > 0);
@@ -78,7 +77,7 @@ Broadcaster::set_broadcast(const Raul::URI& client, bool broadcast)
SPtr<Interface>
Broadcaster::client(const Raul::URI& uri)
{
- Glib::Mutex::Lock lock(_clients_mutex);
+ std::lock_guard<std::mutex> lock(_clients_mutex);
Clients::iterator i = _clients.find(uri);
if (i != _clients.end()) {
return (*i).second;
@@ -90,7 +89,7 @@ Broadcaster::client(const Raul::URI& uri)
void
Broadcaster::send_plugins(const BlockFactory::Plugins& plugins)
{
- Glib::Mutex::Lock lock(_clients_mutex);
+ std::lock_guard<std::mutex> lock(_clients_mutex);
for (const auto& c : _clients) {
send_plugins_to(c.second.get(), plugins);
}
diff --git a/src/server/Broadcaster.hpp b/src/server/Broadcaster.hpp
index ba471bc2..3a7eaf44 100644
--- a/src/server/Broadcaster.hpp
+++ b/src/server/Broadcaster.hpp
@@ -20,11 +20,10 @@
#include <atomic>
#include <list>
#include <map>
+#include <mutex>
#include <set>
#include <string>
-#include <glibmm/thread.h>
-
#include "ingen/Interface.hpp"
#include "ingen/types.hpp"
@@ -85,7 +84,7 @@ public:
void send_plugins_to(Interface*, const BlockFactory::Plugins& plugin_list);
#define BROADCAST(msg, ...) \
- Glib::Mutex::Lock lock(_clients_mutex); \
+ std::lock_guard<std::mutex> lock(_clients_mutex); \
for (const auto& c : _clients) \
c.second->msg(__VA_ARGS__)
@@ -147,7 +146,7 @@ private:
typedef std::map< Raul::URI, SPtr<Interface> > Clients;
- Glib::Mutex _clients_mutex;
+ std::mutex _clients_mutex;
Clients _clients;
std::set<Raul::URI> _broadcastees;
std::atomic<bool> _must_broadcast;
diff --git a/src/server/BufferFactory.hpp b/src/server/BufferFactory.hpp
index fc9c5515..3bbc542e 100644
--- a/src/server/BufferFactory.hpp
+++ b/src/server/BufferFactory.hpp
@@ -19,9 +19,7 @@
#include <atomic>
#include <map>
-
-#undef nil
-#include <glibmm/thread.h>
+#include <mutex>
#include "ingen/Forge.hpp"
#include "ingen/URIs.hpp"
@@ -88,7 +86,7 @@ private:
std::atomic<Buffer*> _free_sequence;
std::atomic<Buffer*> _free_object;
- Glib::Mutex _mutex;
+ std::mutex _mutex;
Engine& _engine;
URIs& _uris;
uint32_t _seq_size;
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index 3d76a032..ce253b75 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -192,8 +192,6 @@ Engine::activate()
_buffer_factory->set_block_length(_driver->block_length());
_options->set(*this);
- _pre_processor->start();
-
const Ingen::URIs& uris = world()->uris();
Forge& forge = world()->forge();
@@ -283,8 +281,6 @@ Engine::activate()
void
Engine::deactivate()
{
- _pre_processor->join();
-
if (_driver) {
_driver->deactivate();
}
diff --git a/src/server/InternalPlugin.hpp b/src/server/InternalPlugin.hpp
index 5bd842dd..fc5537bf 100644
--- a/src/server/InternalPlugin.hpp
+++ b/src/server/InternalPlugin.hpp
@@ -17,10 +17,8 @@
#ifndef INGEN_ENGINE_INTERNALPLUGIN_HPP
#define INGEN_ENGINE_INTERNALPLUGIN_HPP
-#include <cstdlib>
-
-#include <boost/utility.hpp>
-#include <glibmm/module.h>
+#include "raul/Symbol.hpp"
+#include "raul/URI.hpp"
#include "PluginImpl.hpp"
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp
index d62fdab3..83215bee 100644
--- a/src/server/PreProcessor.cpp
+++ b/src/server/PreProcessor.cpp
@@ -14,6 +14,9 @@
along with Ingen. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stdexcept>
+#include <iostream>
+
#include "Event.hpp"
#include "PostProcessor.hpp"
#include "PreProcessor.hpp"
@@ -26,19 +29,19 @@ namespace Ingen {
namespace Server {
PreProcessor::PreProcessor()
- : Raul::Thread()
- , _sem(0)
+ : _sem(0)
, _head(NULL)
, _prepared_back(NULL)
, _tail(NULL)
-{
- start();
-}
+ , _exit_flag(false)
+ , _thread(&PreProcessor::run, this)
+{}
PreProcessor::~PreProcessor()
{
_exit_flag = true;
_sem.post();
+ _thread.join();
}
void
@@ -46,7 +49,7 @@ PreProcessor::event(Event* const ev)
{
// TODO: Probably possible to make this lock-free with CAS
ThreadManager::assert_not_thread(THREAD_IS_REAL_TIME);
- Glib::Mutex::Lock lock(_mutex);
+ std::lock_guard<std::mutex> lock(_mutex);
assert(!ev->is_prepared());
assert(!ev->next());
@@ -114,7 +117,7 @@ PreProcessor::process(ProcessContext& context, PostProcessor& dest, bool limit)
}
void
-PreProcessor::_run()
+PreProcessor::run()
{
ThreadManager::set_flag(THREAD_PRE_PROCESS);
while (_sem.wait() && !_exit_flag) {
diff --git a/src/server/PreProcessor.hpp b/src/server/PreProcessor.hpp
index 10563ee9..d8e71428 100644
--- a/src/server/PreProcessor.hpp
+++ b/src/server/PreProcessor.hpp
@@ -18,11 +18,10 @@
#define INGEN_ENGINE_PREPROCESSOR_HPP
#include <atomic>
-
-#include <glibmm/thread.h>
+#include <thread>
+#include <mutex>
#include "raul/Semaphore.hpp"
-#include "raul/Thread.hpp"
namespace Ingen {
namespace Server {
@@ -31,18 +30,13 @@ class Event;
class PostProcessor;
class ProcessContext;
-class PreProcessor : public Raul::Thread
+class PreProcessor
{
public:
explicit PreProcessor();
~PreProcessor();
- virtual void join() {
- _exit_flag = true;
- _sem.post();
- }
-
/** Return true iff no events are enqueued. */
inline bool empty() const { return !_head.load(); }
@@ -59,14 +53,16 @@ public:
bool limit = true);
protected:
- virtual void _run();
+ void run();
private:
- Glib::Mutex _mutex;
+ std::mutex _mutex;
Raul::Semaphore _sem;
std::atomic<Event*> _head;
std::atomic<Event*> _prepared_back;
std::atomic<Event*> _tail;
+ bool _exit_flag;
+ std::thread _thread;
};
} // namespace Server
diff --git a/src/server/ThreadManager.hpp b/src/server/ThreadManager.hpp
index 445219ae..1fb1ca58 100644
--- a/src/server/ThreadManager.hpp
+++ b/src/server/ThreadManager.hpp
@@ -18,7 +18,7 @@
#define INGEN_ENGINE_THREADMANAGER_HPP
#include <cassert>
-#include "raul/Thread.hpp"
+
#include "raul/ThreadVar.hpp"
namespace Ingen {
diff --git a/src/server/Worker.cpp b/src/server/Worker.cpp
index 8afc8a20..ce7ba833 100644
--- a/src/server/Worker.cpp
+++ b/src/server/Worker.cpp
@@ -101,28 +101,26 @@ Worker::Schedule::feature(World* world, Node* n)
}
Worker::Worker(Log& log, uint32_t buffer_size)
- : Raul::Thread()
- , _schedule(new Schedule())
+ : _schedule(new Schedule())
, _log(log)
, _sem(0)
, _requests(buffer_size)
, _responses(buffer_size)
, _buffer((uint8_t*)malloc(buffer_size))
, _buffer_size(buffer_size)
-{
- start();
-}
+ , _thread(&Worker::run, this)
+{}
Worker::~Worker()
{
_exit_flag = true;
_sem.post();
- join();
+ _thread.join();
free(_buffer);
}
void
-Worker::_run()
+Worker::run()
{
while (_sem.wait() && !_exit_flag) {
MessageHeader msg;
diff --git a/src/server/Worker.hpp b/src/server/Worker.hpp
index b90e117e..d51c6559 100644
--- a/src/server/Worker.hpp
+++ b/src/server/Worker.hpp
@@ -17,11 +17,12 @@
#ifndef INGEN_ENGINE_WORKER_HPP
#define INGEN_ENGINE_WORKER_HPP
+#include <thread>
+
#include "ingen/LV2Features.hpp"
#include "lv2/lv2plug.in/ns/ext/worker/worker.h"
#include "raul/RingBuffer.hpp"
#include "raul/Semaphore.hpp"
-#include "raul/Thread.hpp"
namespace Ingen {
@@ -31,7 +32,7 @@ namespace Server {
class LV2Block;
-class Worker : public Raul::Thread
+class Worker
{
public:
Worker(Log& log, uint32_t buffer_size);
@@ -58,8 +59,10 @@ private:
Raul::RingBuffer _responses;
uint8_t* const _buffer;
const uint32_t _buffer_size;
+ bool _exit_flag;
+ std::thread _thread;
- virtual void _run();
+ void run();
};
} // namespace Server
diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp
index e3bb240a..29d6d8c6 100644
--- a/src/server/ingen_lv2.cpp
+++ b/src/server/ingen_lv2.cpp
@@ -17,6 +17,7 @@
#include <stdlib.h>
#include <string>
+#include <thread>
#include <vector>
#include <glib.h>
@@ -44,7 +45,6 @@
#include "ingen/serialisation/Serialiser.hpp"
#include "ingen/types.hpp"
#include "raul/Semaphore.hpp"
-#include "raul/Thread.hpp"
#include "Buffer.hpp"
#include "Driver.hpp"
@@ -399,35 +399,22 @@ extern "C" {
using namespace Ingen;
using namespace Ingen::Server;
-class MainThread : public Raul::Thread
+static void
+ingen_lv2_main(SPtr<Engine> engine, LV2Driver* driver)
{
-public:
- explicit MainThread(SPtr<Engine> engine,
- LV2Driver* driver)
- : Raul::Thread()
- , _engine(engine)
- , _driver(driver)
- {}
+ while (true) {
+ // Wait until there is work to be done
+ driver->main_sem().wait();
-private:
- virtual void _run() {
- while (true) {
- // Wait until there is work to be done
- _driver->main_sem().wait();
+ // Convert pending messages to events and push to pre processor
+ driver->consume_from_ui();
- // Convert pending messages to events and push to pre processor
- _driver->consume_from_ui();
-
- // Run post processor and maid to finalise events from last time
- if (!_engine->main_iteration()) {
- return;
- }
+ // Run post processor and maid to finalise events from last time
+ if (!engine->main_iteration()) {
+ return;
}
}
-
- SPtr<Engine> _engine;
- LV2Driver* _driver;
-};
+}
struct IngenPlugin {
IngenPlugin()
@@ -439,7 +426,7 @@ struct IngenPlugin {
{}
Ingen::World* world;
- MainThread* main;
+ std::thread* main;
LV2_URID_Map* map;
int argc;
char** argv;
@@ -584,8 +571,6 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
LV2Driver* driver = new LV2Driver(*engine.get(), block_length, rate);
engine->set_driver(SPtr<Ingen::Server::Driver>(driver));
- plugin->main = new MainThread(engine, driver);
-
engine->activate();
Server::ThreadManager::single_threaded = true;
@@ -632,10 +617,12 @@ ingen_connect_port(LV2_Handle instance, uint32_t port, void* data)
static void
ingen_activate(LV2_Handle instance)
{
- IngenPlugin* me = (IngenPlugin*)instance;
- me->world->engine()->activate();
- //((EventWriter*)me->world->engine().get())->start();
- me->main->start();
+ IngenPlugin* me = (IngenPlugin*)instance;
+ SPtr<Server::Engine> engine = dynamic_ptr_cast<Server::Engine>(
+ me->world->engine());
+ LV2Driver* driver = (LV2Driver*)engine->driver();
+ engine->activate();
+ me->main = new std::thread(ingen_lv2_main, engine, driver);
}
static void
@@ -656,6 +643,8 @@ ingen_deactivate(LV2_Handle instance)
{
IngenPlugin* me = (IngenPlugin*)instance;
me->world->engine()->deactivate();
+ delete me->main;
+ me->main = NULL;
}
static void
@@ -664,6 +653,7 @@ ingen_cleanup(LV2_Handle instance)
IngenPlugin* me = (IngenPlugin*)instance;
me->world->set_engine(SPtr<Ingen::Server::Engine>());
me->world->set_interface(SPtr<Ingen::Interface>());
+ delete me->main;
delete me->world;
delete me;
}
diff --git a/src/socket/SocketClient.hpp b/src/socket/SocketClient.hpp
index b05cb343..f7d8b95e 100644
--- a/src/socket/SocketClient.hpp
+++ b/src/socket/SocketClient.hpp
@@ -34,9 +34,7 @@ public:
: SocketWriter(world.uri_map(), world.uris(), uri, sock)
, _respondee(respondee)
, _reader(world, *respondee.get(), sock)
- {
- _reader.start();
- }
+ {}
virtual SPtr<Interface> respondee() const {
return _respondee;
diff --git a/src/socket/SocketListener.cpp b/src/socket/SocketListener.cpp
deleted file mode 100644
index c2c1d0ee..00000000
--- a/src/socket/SocketListener.cpp
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- This file is part of Ingen.
- Copyright 2007-2012 David Robillard <http://drobilla.net/>
-
- Ingen is free software: you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free
- Software Foundation, either version 3 of the License, or any later version.
-
- Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
-
- You should have received a copy of the GNU Affero General Public License
- along with Ingen. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <errno.h>
-#include <poll.h>
-
-#include <sstream>
-
-#include "ingen/AtomReader.hpp"
-#include "ingen/Configuration.hpp"
-#include "ingen/Interface.hpp"
-#include "ingen/Log.hpp"
-#include "ingen/World.hpp"
-#include "sord/sordmm.hpp"
-#include "sratom/sratom.h"
-
-#include "../server/Engine.hpp"
-#include "../server/EventWriter.hpp"
-#include "SocketListener.hpp"
-#include "SocketServer.hpp"
-
-namespace Ingen {
-namespace Socket {
-
-SocketListener::SocketListener(Ingen::World& world)
- : Raul::Thread()
- , _world(world)
- , _unix_sock(Socket::Type::UNIX)
- , _net_sock(Socket::Type::TCP)
-{
- // Create UNIX socket
- _unix_path = world.conf().option("socket").ptr<char>();
- const Raul::URI unix_uri("unix://" + _unix_path);
- if (!_unix_sock.bind(unix_uri) || !_unix_sock.listen()) {
- _world.log().error("Failed to create UNIX socket\n");
- _unix_sock.close();
- }
- _world.log().info(Raul::fmt("Listening on socket %1%\n") % unix_uri);
-
- // Create TCP socket
- int port = world.conf().option("engine-port").get<int32_t>();
- std::ostringstream ss;
- ss << "tcp://localhost:";
- ss << port;
- if (!_net_sock.bind(Raul::URI(ss.str())) || !_net_sock.listen()) {
- _world.log().error("Failed to create TCP socket\n");
- _net_sock.close();
- }
- _world.log().info(Raul::fmt("Listening on TCP port %1%\n") % port);
-
- start();
-}
-
-SocketListener::~SocketListener()
-{
- _exit_flag = true;
- _unix_sock.shutdown();
- _net_sock.shutdown();
- _unix_sock.close();
- _net_sock.close();
- join();
- unlink(_unix_path.c_str());
-}
-
-void
-SocketListener::_run()
-{
- Server::Engine* engine = (Server::Engine*)_world.engine().get();
-
- struct pollfd pfds[2];
- int nfds = 0;
- if (_unix_sock.fd() != -1) {
- pfds[nfds].fd = _unix_sock.fd();
- pfds[nfds].events = POLLIN;
- pfds[nfds].revents = 0;
- ++nfds;
- }
- if (_net_sock.fd() != -1) {
- pfds[nfds].fd = _net_sock.fd();
- pfds[nfds].events = POLLIN;
- pfds[nfds].revents = 0;
- ++nfds;
- }
-
- while (true) {
- // Wait for input to arrive at a socket
- const int ret = poll(pfds, nfds, -1);
- if (_exit_flag) {
- break;
- } else if (ret == -1) {
- _world.log().error(Raul::fmt("Poll error: %1%\n") % strerror(errno));
- break;
- } else if (ret == 0) {
- _world.log().error("Poll returned with no data\n");
- continue;
- }
-
- if (pfds[0].revents & POLLIN) {
- SPtr<Socket> conn = _unix_sock.accept();
- if (conn) {
- new SocketServer(_world, *engine, conn);
- }
- }
-
- if (pfds[1].revents & POLLIN) {
- SPtr<Socket> conn = _net_sock.accept();
- if (conn) {
- new SocketServer(_world, *engine, conn);
- }
- }
- }
-}
-
-} // namespace Ingen
-} // namespace Socket
diff --git a/src/socket/SocketListener.hpp b/src/socket/SocketListener.hpp
deleted file mode 100644
index bea55da2..00000000
--- a/src/socket/SocketListener.hpp
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- This file is part of Ingen.
- Copyright 2007-2012 David Robillard <http://drobilla.net/>
-
- Ingen is free software: you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free
- Software Foundation, either version 3 of the License, or any later version.
-
- Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
-
- You should have received a copy of the GNU Affero General Public License
- along with Ingen. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <string>
-
-#include "raul/Thread.hpp"
-
-#include "Socket.hpp"
-
-namespace Ingen {
-
-class Interface;
-class World;
-
-namespace Socket {
-
-class SocketListener : public Raul::Thread
-{
-public:
- explicit SocketListener(Ingen::World& world);
- ~SocketListener();
-
-private:
- virtual void _run();
-
- Ingen::World& _world;
- std::string _unix_path;
- Socket _unix_sock;
- Socket _net_sock;
-};
-
-} // namespace Ingen
-} // namespace Socket
diff --git a/src/socket/SocketReader.cpp b/src/socket/SocketReader.cpp
index 4ff65b3b..d3e6affc 100644
--- a/src/socket/SocketReader.cpp
+++ b/src/socket/SocketReader.cpp
@@ -33,20 +33,20 @@ namespace Socket {
SocketReader::SocketReader(Ingen::World& world,
Interface& iface,
SPtr<Socket> sock)
- : Raul::Thread()
- , _world(world)
+ : _world(world)
, _iface(iface)
, _inserter(NULL)
, _msg_node(NULL)
, _socket(sock)
-{
- start();
-}
+ , _exit_flag(false)
+ , _thread(&SocketReader::run, this)
+{}
SocketReader::~SocketReader()
{
+ _exit_flag = true;
_socket->shutdown();
- join();
+ _thread.join();
}
SerdStatus
@@ -86,7 +86,7 @@ SocketReader::write_statement(SocketReader* iface,
}
void
-SocketReader::_run()
+SocketReader::run()
{
Sord::World* world = _world.rdf_world();
LV2_URID_Map* map = &_world.uri_map().urid_map_feature()->urid_map;
diff --git a/src/socket/SocketReader.hpp b/src/socket/SocketReader.hpp
index ea2bcd8e..489b7e3f 100644
--- a/src/socket/SocketReader.hpp
+++ b/src/socket/SocketReader.hpp
@@ -17,7 +17,8 @@
#ifndef INGEN_SOCKET_SOCKET_READER_HPP
#define INGEN_SOCKET_SOCKET_READER_HPP
-#include "raul/Thread.hpp"
+#include <thread>
+
#include "sord/sord.h"
#include "Socket.hpp"
@@ -30,7 +31,7 @@ class World;
namespace Socket {
/** Calls Interface methods based on Turtle messages received via socket. */
-class SocketReader : public Raul::Thread
+class SocketReader
{
public:
SocketReader(World& world,
@@ -40,7 +41,7 @@ public:
~SocketReader();
private:
- virtual void _run();
+ void run();
static SerdStatus set_base_uri(SocketReader* iface,
const SerdNode* uri_node);
@@ -64,6 +65,8 @@ private:
SordInserter* _inserter;
SordNode* _msg_node;
SPtr<Socket> _socket;
+ bool _exit_flag;
+ std::thread _thread;
};
} // namespace Ingen
diff --git a/src/socket/ingen_socket_server.cpp b/src/socket/ingen_socket_server.cpp
index 37b05bae..0a9d7494 100644
--- a/src/socket/ingen_socket_server.cpp
+++ b/src/socket/ingen_socket_server.cpp
@@ -14,31 +14,133 @@
along with Ingen. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <errno.h>
+#include <poll.h>
+
+#include <sstream>
+#include <thread>
+
+#include "ingen/Configuration.hpp"
#include "ingen/Module.hpp"
#include "ingen/World.hpp"
#include "../server/Engine.hpp"
#include "../server/EventWriter.hpp"
-#include "SocketListener.hpp"
+#include "Socket.hpp"
+#include "SocketServer.hpp"
+
+#define UNIX_SCHEME "unix://"
+
+namespace Ingen {
+namespace Socket {
+
+static void
+ingen_listen(Ingen::World* world, Socket* unix_sock, Socket* net_sock)
+{
+ const std::string unix_path(world->conf().option("socket").ptr<char>());
+
+ SPtr<Server::Engine> engine = dynamic_ptr_cast<Server::Engine>(
+ world->engine());
+
+ // Bind UNIX socket
+ const Raul::URI unix_uri(UNIX_SCHEME + unix_path);
+ if (!unix_sock->bind(unix_uri) || !unix_sock->listen()) {
+ world->log().error("Failed to create UNIX socket\n");
+ unix_sock->close();
+ }
+ world->log().info(Raul::fmt("Listening on socket %1%\n") % unix_uri);
-using namespace Ingen;
+ // Bind TCP socket
+ const int port = world->conf().option("engine-port").get<int32_t>();
+ std::ostringstream ss;
+ ss << "tcp://localhost:";
+ ss << port;
+ if (!net_sock->bind(Raul::URI(ss.str())) || !net_sock->listen()) {
+ world->log().error("Failed to create TCP socket\n");
+ net_sock->close();
+ }
+ world->log().info(Raul::fmt("Listening on TCP port %1%\n") % port);
-struct IngenSocketServerModule : public Ingen::Module {
- void load(Ingen::World* world) {
- listener = SPtr<Ingen::Socket::SocketListener>(
- new Ingen::Socket::SocketListener(*world));
+ struct pollfd pfds[2];
+ int nfds = 0;
+ if (unix_sock->fd() != -1) {
+ pfds[nfds].fd = unix_sock->fd();
+ pfds[nfds].events = POLLIN;
+ pfds[nfds].revents = 0;
+ ++nfds;
+ }
+ if (net_sock->fd() != -1) {
+ pfds[nfds].fd = net_sock->fd();
+ pfds[nfds].events = POLLIN;
+ pfds[nfds].revents = 0;
+ ++nfds;
}
- SPtr<Ingen::Socket::SocketListener> listener;
+ while (true) {
+ // Wait for input to arrive at a socket
+ const int ret = poll(pfds, nfds, -1);
+ if (ret == -1) {
+ world->log().error(Raul::fmt("Poll error: %1%\n") % strerror(errno));
+ break;
+ } else if ((pfds[0].revents & POLLHUP) || pfds[1].revents & POLLHUP) {
+ break;
+ } else if (ret == 0) {
+ world->log().error("Poll returned with no data\n");
+ continue;
+ }
+
+ if (pfds[0].revents & POLLIN) {
+ SPtr<Socket> conn = unix_sock->accept();
+ if (conn) {
+ new SocketServer(*world, *engine, conn);
+ }
+ }
+
+ if (pfds[1].revents & POLLIN) {
+ SPtr<Socket> conn = net_sock->accept();
+ if (conn) {
+ new SocketServer(*world, *engine, conn);
+ }
+ }
+ }
+}
+
+struct ServerModule : public Ingen::Module
+{
+ ServerModule()
+ : unix_sock(Socket::Type::UNIX)
+ , net_sock(Socket::Type::TCP)
+ {}
+
+ ~ServerModule() {
+ unix_sock.shutdown();
+ net_sock.shutdown();
+ thread->join();
+ unlink(unix_sock.uri().substr(strlen(UNIX_SCHEME)).c_str());
+ }
+
+ void load(World* world) {
+ world = world;
+ thread = std::unique_ptr<std::thread>(
+ new std::thread(ingen_listen, world, &unix_sock, &net_sock));
+ }
+
+ World* world;
+ Socket unix_sock;
+ Socket net_sock;
+ std::unique_ptr<std::thread> thread;
};
+} // namespace Socket
+} // namespace Ingen
+
extern "C" {
Ingen::Module*
ingen_module_load()
{
- return new IngenSocketServerModule();
+ return new Ingen::Socket::ServerModule();
}
} // extern "C"
diff --git a/src/socket/wscript b/src/socket/wscript
index f6705c37..e503e8a2 100644
--- a/src/socket/wscript
+++ b/src/socket/wscript
@@ -5,7 +5,6 @@ def build(bld):
if bld.is_defined('HAVE_SOCKET'):
obj = bld(features = 'cxx cxxshlib',
source = ['Socket.cpp',
- 'SocketListener.cpp',
'SocketReader.cpp',
'SocketWriter.cpp',
'ingen_socket_server.cpp'],
diff --git a/tests/ingen_test.cpp b/tests/ingen_test.cpp
index bdc535e8..f0300728 100644
--- a/tests/ingen_test.cpp
+++ b/tests/ingen_test.cpp
@@ -28,7 +28,6 @@
#include <glibmm/timer.h>
#include "raul/Path.hpp"
-#include "raul/Thread.hpp"
#include "serd/serd.h"
#include "sord/sordmm.hpp"