summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/gui/ThreadedLoader.cpp31
-rw-r--r--src/gui/ThreadedLoader.hpp9
-rw-r--r--src/server/Engine.cpp2
-rw-r--r--src/server/PreProcessor.cpp25
-rw-r--r--src/server/PreProcessor.hpp13
-rw-r--r--src/server/Worker.cpp3
-rw-r--r--src/socket/Socket.cpp9
-rw-r--r--src/socket/Socket.hpp6
-rw-r--r--src/socket/SocketListener.cpp10
-rw-r--r--src/socket/SocketReader.cpp1
10 files changed, 72 insertions, 37 deletions
diff --git a/src/gui/ThreadedLoader.cpp b/src/gui/ThreadedLoader.cpp
index f7c77c99..a6c9b1ed 100644
--- a/src/gui/ThreadedLoader.cpp
+++ b/src/gui/ThreadedLoader.cpp
@@ -32,8 +32,9 @@ namespace Ingen {
namespace GUI {
ThreadedLoader::ThreadedLoader(App& app, SharedPtr<Interface> engine)
- : Raul::Slave("Loader")
+ : Raul::Thread("Loader")
, _app(app)
+ , _sem(0)
, _engine(engine)
{
if (parser())
@@ -42,6 +43,12 @@ ThreadedLoader::ThreadedLoader(App& app, SharedPtr<Interface> engine)
warn << "Failed to load ingen_serialisation module, load disabled." << endl;
}
+ThreadedLoader::~ThreadedLoader()
+{
+ _exit_flag = true;
+ _sem.post();
+}
+
SharedPtr<Serialisation::Parser>
ThreadedLoader::parser()
{
@@ -54,16 +61,16 @@ ThreadedLoader::parser()
}
void
-ThreadedLoader::_whipped()
+ThreadedLoader::_run()
{
- _mutex.lock();
-
- while ( ! _events.empty() ) {
- _events.front()();
- _events.pop_front();
+ while (_sem.wait() && !_exit_flag) {
+ _mutex.lock();
+ while (!_events.empty()) {
+ _events.front()();
+ _events.pop_front();
+ }
+ _mutex.unlock();
}
-
- _mutex.unlock();
}
void
@@ -96,9 +103,8 @@ ThreadedLoader::load_patch(bool merge,
engine_symbol,
engine_data)));
- whip();
-
_mutex.unlock();
+ _sem.post();
}
void
@@ -112,8 +118,7 @@ ThreadedLoader::save_patch(SharedPtr<const Client::PatchModel> model,
model, filename)));
_mutex.unlock();
-
- whip();
+ _sem.post();
}
void
diff --git a/src/gui/ThreadedLoader.hpp b/src/gui/ThreadedLoader.hpp
index e7815eeb..273c72f9 100644
--- a/src/gui/ThreadedLoader.hpp
+++ b/src/gui/ThreadedLoader.hpp
@@ -27,7 +27,7 @@
#include "ingen/Interface.hpp"
#include "ingen/serialisation/Parser.hpp"
#include "ingen/serialisation/Serialiser.hpp"
-#include "raul/Slave.hpp"
+#include "raul/Semaphore.hpp"
#include "raul/Thread.hpp"
namespace Ingen {
@@ -44,12 +44,14 @@ namespace GUI {
*
* \ingroup GUI
*/
-class ThreadedLoader : public Raul::Slave
+class ThreadedLoader : public Raul::Thread
{
public:
ThreadedLoader(App& app,
SharedPtr<Interface> engine);
+ ~ThreadedLoader();
+
void load_patch(bool merge,
const Glib::ustring& document_uri,
boost::optional<Raul::Path> engine_parent,
@@ -68,9 +70,10 @@ private:
/** Returns nothing and takes no parameters (because they have all been bound) */
typedef sigc::slot<void> Closure;
- void _whipped();
+ void _run();
App& _app;
+ Raul::Semaphore _sem;
SharedPtr<Interface> _engine;
Glib::Mutex _mutex;
std::list<Closure> _events;
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index eeb28e5a..c7fef892 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -258,7 +258,7 @@ Engine::activate()
void
Engine::deactivate()
{
- _pre_processor->stop();
+ _pre_processor->join();
if (_driver) {
_driver->deactivate();
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp
index 9db13fd1..13746113 100644
--- a/src/server/PreProcessor.cpp
+++ b/src/server/PreProcessor.cpp
@@ -26,14 +26,14 @@ namespace Ingen {
namespace Server {
PreProcessor::PreProcessor()
- : Raul::Slave("PreProcessor")
+ : Raul::Thread("PreProcessor")
+ , _sem(0)
{
start();
}
PreProcessor::~PreProcessor()
{
- stop();
}
void
@@ -61,7 +61,7 @@ PreProcessor::event(Event* const ev)
_prepared_back = ev;
}
- whip();
+ _sem.post();
}
unsigned
@@ -110,18 +110,21 @@ PreProcessor::process(ProcessContext& context, PostProcessor& dest, bool limit)
/** Pre-process a single event */
void
-PreProcessor::_whipped()
+PreProcessor::_run()
{
ThreadManager::set_flag(THREAD_PRE_PROCESS);
- Event* ev = _prepared_back.get();
- if (!ev)
- return;
+ while (_sem.wait() && !_exit_flag) {
+ Event* const ev = _prepared_back.get();
+ if (!ev) {
+ return;
+ }
- assert(!ev->is_prepared());
- ev->pre_process();
- assert(ev->is_prepared());
+ assert(!ev->is_prepared());
+ ev->pre_process();
+ assert(ev->is_prepared());
- _prepared_back = (Event*)ev->next();
+ _prepared_back = (Event*)ev->next();
+ }
}
} // namespace Server
diff --git a/src/server/PreProcessor.hpp b/src/server/PreProcessor.hpp
index fd886139..9b7fbd44 100644
--- a/src/server/PreProcessor.hpp
+++ b/src/server/PreProcessor.hpp
@@ -20,7 +20,8 @@
#include <glibmm/thread.h>
#include "raul/AtomicPtr.hpp"
-#include "raul/Slave.hpp"
+#include "raul/Semaphore.hpp"
+#include "raul/Thread.hpp"
namespace Ingen {
namespace Server {
@@ -29,13 +30,18 @@ class Event;
class PostProcessor;
class ProcessContext;
-class PreProcessor : public Raul::Slave
+class PreProcessor : public Raul::Thread
{
public:
explicit PreProcessor();
~PreProcessor();
+ virtual void join() {
+ _exit_flag = true;
+ _sem.post();
+ }
+
/** Return true iff no events are enqueued. */
inline bool empty() const { return !_head.get(); }
@@ -52,10 +58,11 @@ public:
bool limit = true);
protected:
- virtual void _whipped(); ///< Prepare 1 event
+ virtual void _run();
private:
Glib::Mutex _mutex;
+ Raul::Semaphore _sem;
Raul::AtomicPtr<Event> _head;
Raul::AtomicPtr<Event> _prepared_back;
Raul::AtomicPtr<Event> _tail;
diff --git a/src/server/Worker.cpp b/src/server/Worker.cpp
index d17482bf..2c1c51a0 100644
--- a/src/server/Worker.cpp
+++ b/src/server/Worker.cpp
@@ -119,8 +119,7 @@ Worker::~Worker()
void
Worker::_run()
{
- while (!_exit_flag) {
- _sem.wait();
+ while (_sem.wait() && !_exit_flag) {
MessageHeader msg;
if (_requests.read_space() > sizeof(msg)) {
if (_requests.read(sizeof(msg), &msg) != sizeof(msg)) {
diff --git a/src/socket/Socket.cpp b/src/socket/Socket.cpp
index 13a684bc..998d1cdb 100644
--- a/src/socket/Socket.cpp
+++ b/src/socket/Socket.cpp
@@ -188,5 +188,14 @@ Socket::close()
}
}
+void
+Socket::shutdown()
+{
+ if (_sock != -1) {
+ ::shutdown(_sock, SHUT_RDWR);
+ _sock = -1;
+ }
+}
+
} // namespace Ingen
} // namespace Socket
diff --git a/src/socket/Socket.hpp b/src/socket/Socket.hpp
index 49d03d48..1406d87e 100644
--- a/src/socket/Socket.hpp
+++ b/src/socket/Socket.hpp
@@ -81,6 +81,12 @@ public:
/** Close the socket. */
void close();
+ /** Shut down the socket.
+ * This terminates any connections associated with the sockets, and will
+ * (unlike close()) cause a poll on the socket to return.
+ */
+ void shutdown();
+
private:
bool set_addr(const Raul::URI& uri);
diff --git a/src/socket/SocketListener.cpp b/src/socket/SocketListener.cpp
index 936dd0e0..58797fa8 100644
--- a/src/socket/SocketListener.cpp
+++ b/src/socket/SocketListener.cpp
@@ -65,7 +65,9 @@ SocketListener::SocketListener(Ingen::World& world)
SocketListener::~SocketListener()
{
- stop();
+ _exit_flag = true;
+ _unix_sock.shutdown();
+ _net_sock.shutdown();
join();
_unix_sock.close();
_net_sock.close();
@@ -92,10 +94,12 @@ SocketListener::_run()
++nfds;
}
- while (!_exit_flag) {
+ while (true) {
// Wait for input to arrive at a socket
int ret = poll(pfds, nfds, -1);
- if (ret == -1) {
+ if (_exit_flag) {
+ break;
+ } else if (ret == -1) {
LOG(Raul::error) << "Poll error: " << strerror(errno) << std::endl;
break;
} else if (ret == 0) {
diff --git a/src/socket/SocketReader.cpp b/src/socket/SocketReader.cpp
index b73b9eb0..2301ba69 100644
--- a/src/socket/SocketReader.cpp
+++ b/src/socket/SocketReader.cpp
@@ -46,7 +46,6 @@ SocketReader::SocketReader(Ingen::World& world,
SocketReader::~SocketReader()
{
- stop();
join();
}