From da4c1fcad194f4f3f399f6a4a731df34567c95ef Mon Sep 17 00:00:00 2001 From: David Robillard Date: Thu, 16 Aug 2012 00:59:35 +0000 Subject: Remove Raul::Slave class. Merge Thread::stop() and Thread::join(). Clean thread shut down without the use of pthread_cancel(). git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4708 a436a847-0d15-0410-975c-d299462d15a1 --- src/gui/ThreadedLoader.cpp | 31 ++++++++++++++++++------------- src/gui/ThreadedLoader.hpp | 9 ++++++--- src/server/Engine.cpp | 2 +- src/server/PreProcessor.cpp | 25 ++++++++++++++----------- src/server/PreProcessor.hpp | 13 ++++++++++--- src/server/Worker.cpp | 3 +-- src/socket/Socket.cpp | 9 +++++++++ src/socket/Socket.hpp | 6 ++++++ src/socket/SocketListener.cpp | 10 +++++++--- src/socket/SocketReader.cpp | 1 - 10 files changed, 72 insertions(+), 37 deletions(-) diff --git a/src/gui/ThreadedLoader.cpp b/src/gui/ThreadedLoader.cpp index f7c77c99..a6c9b1ed 100644 --- a/src/gui/ThreadedLoader.cpp +++ b/src/gui/ThreadedLoader.cpp @@ -32,8 +32,9 @@ namespace Ingen { namespace GUI { ThreadedLoader::ThreadedLoader(App& app, SharedPtr engine) - : Raul::Slave("Loader") + : Raul::Thread("Loader") , _app(app) + , _sem(0) , _engine(engine) { if (parser()) @@ -42,6 +43,12 @@ ThreadedLoader::ThreadedLoader(App& app, SharedPtr engine) warn << "Failed to load ingen_serialisation module, load disabled." << endl; } +ThreadedLoader::~ThreadedLoader() +{ + _exit_flag = true; + _sem.post(); +} + SharedPtr ThreadedLoader::parser() { @@ -54,16 +61,16 @@ ThreadedLoader::parser() } void -ThreadedLoader::_whipped() +ThreadedLoader::_run() { - _mutex.lock(); - - while ( ! _events.empty() ) { - _events.front()(); - _events.pop_front(); + while (_sem.wait() && !_exit_flag) { + _mutex.lock(); + while (!_events.empty()) { + _events.front()(); + _events.pop_front(); + } + _mutex.unlock(); } - - _mutex.unlock(); } void @@ -96,9 +103,8 @@ ThreadedLoader::load_patch(bool merge, engine_symbol, engine_data))); - whip(); - _mutex.unlock(); + _sem.post(); } void @@ -112,8 +118,7 @@ ThreadedLoader::save_patch(SharedPtr model, model, filename))); _mutex.unlock(); - - whip(); + _sem.post(); } void diff --git a/src/gui/ThreadedLoader.hpp b/src/gui/ThreadedLoader.hpp index e7815eeb..273c72f9 100644 --- a/src/gui/ThreadedLoader.hpp +++ b/src/gui/ThreadedLoader.hpp @@ -27,7 +27,7 @@ #include "ingen/Interface.hpp" #include "ingen/serialisation/Parser.hpp" #include "ingen/serialisation/Serialiser.hpp" -#include "raul/Slave.hpp" +#include "raul/Semaphore.hpp" #include "raul/Thread.hpp" namespace Ingen { @@ -44,12 +44,14 @@ namespace GUI { * * \ingroup GUI */ -class ThreadedLoader : public Raul::Slave +class ThreadedLoader : public Raul::Thread { public: ThreadedLoader(App& app, SharedPtr engine); + ~ThreadedLoader(); + void load_patch(bool merge, const Glib::ustring& document_uri, boost::optional engine_parent, @@ -68,9 +70,10 @@ private: /** Returns nothing and takes no parameters (because they have all been bound) */ typedef sigc::slot Closure; - void _whipped(); + void _run(); App& _app; + Raul::Semaphore _sem; SharedPtr _engine; Glib::Mutex _mutex; std::list _events; diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index eeb28e5a..c7fef892 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -258,7 +258,7 @@ Engine::activate() void Engine::deactivate() { - _pre_processor->stop(); + _pre_processor->join(); if (_driver) { _driver->deactivate(); diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index 9db13fd1..13746113 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -26,14 +26,14 @@ namespace Ingen { namespace Server { PreProcessor::PreProcessor() - : Raul::Slave("PreProcessor") + : Raul::Thread("PreProcessor") + , _sem(0) { start(); } PreProcessor::~PreProcessor() { - stop(); } void @@ -61,7 +61,7 @@ PreProcessor::event(Event* const ev) _prepared_back = ev; } - whip(); + _sem.post(); } unsigned @@ -110,18 +110,21 @@ PreProcessor::process(ProcessContext& context, PostProcessor& dest, bool limit) /** Pre-process a single event */ void -PreProcessor::_whipped() +PreProcessor::_run() { ThreadManager::set_flag(THREAD_PRE_PROCESS); - Event* ev = _prepared_back.get(); - if (!ev) - return; + while (_sem.wait() && !_exit_flag) { + Event* const ev = _prepared_back.get(); + if (!ev) { + return; + } - assert(!ev->is_prepared()); - ev->pre_process(); - assert(ev->is_prepared()); + assert(!ev->is_prepared()); + ev->pre_process(); + assert(ev->is_prepared()); - _prepared_back = (Event*)ev->next(); + _prepared_back = (Event*)ev->next(); + } } } // namespace Server diff --git a/src/server/PreProcessor.hpp b/src/server/PreProcessor.hpp index fd886139..9b7fbd44 100644 --- a/src/server/PreProcessor.hpp +++ b/src/server/PreProcessor.hpp @@ -20,7 +20,8 @@ #include #include "raul/AtomicPtr.hpp" -#include "raul/Slave.hpp" +#include "raul/Semaphore.hpp" +#include "raul/Thread.hpp" namespace Ingen { namespace Server { @@ -29,13 +30,18 @@ class Event; class PostProcessor; class ProcessContext; -class PreProcessor : public Raul::Slave +class PreProcessor : public Raul::Thread { public: explicit PreProcessor(); ~PreProcessor(); + virtual void join() { + _exit_flag = true; + _sem.post(); + } + /** Return true iff no events are enqueued. */ inline bool empty() const { return !_head.get(); } @@ -52,10 +58,11 @@ public: bool limit = true); protected: - virtual void _whipped(); ///< Prepare 1 event + virtual void _run(); private: Glib::Mutex _mutex; + Raul::Semaphore _sem; Raul::AtomicPtr _head; Raul::AtomicPtr _prepared_back; Raul::AtomicPtr _tail; diff --git a/src/server/Worker.cpp b/src/server/Worker.cpp index d17482bf..2c1c51a0 100644 --- a/src/server/Worker.cpp +++ b/src/server/Worker.cpp @@ -119,8 +119,7 @@ Worker::~Worker() void Worker::_run() { - while (!_exit_flag) { - _sem.wait(); + while (_sem.wait() && !_exit_flag) { MessageHeader msg; if (_requests.read_space() > sizeof(msg)) { if (_requests.read(sizeof(msg), &msg) != sizeof(msg)) { diff --git a/src/socket/Socket.cpp b/src/socket/Socket.cpp index 13a684bc..998d1cdb 100644 --- a/src/socket/Socket.cpp +++ b/src/socket/Socket.cpp @@ -188,5 +188,14 @@ Socket::close() } } +void +Socket::shutdown() +{ + if (_sock != -1) { + ::shutdown(_sock, SHUT_RDWR); + _sock = -1; + } +} + } // namespace Ingen } // namespace Socket diff --git a/src/socket/Socket.hpp b/src/socket/Socket.hpp index 49d03d48..1406d87e 100644 --- a/src/socket/Socket.hpp +++ b/src/socket/Socket.hpp @@ -81,6 +81,12 @@ public: /** Close the socket. */ void close(); + /** Shut down the socket. + * This terminates any connections associated with the sockets, and will + * (unlike close()) cause a poll on the socket to return. + */ + void shutdown(); + private: bool set_addr(const Raul::URI& uri); diff --git a/src/socket/SocketListener.cpp b/src/socket/SocketListener.cpp index 936dd0e0..58797fa8 100644 --- a/src/socket/SocketListener.cpp +++ b/src/socket/SocketListener.cpp @@ -65,7 +65,9 @@ SocketListener::SocketListener(Ingen::World& world) SocketListener::~SocketListener() { - stop(); + _exit_flag = true; + _unix_sock.shutdown(); + _net_sock.shutdown(); join(); _unix_sock.close(); _net_sock.close(); @@ -92,10 +94,12 @@ SocketListener::_run() ++nfds; } - while (!_exit_flag) { + while (true) { // Wait for input to arrive at a socket int ret = poll(pfds, nfds, -1); - if (ret == -1) { + if (_exit_flag) { + break; + } else if (ret == -1) { LOG(Raul::error) << "Poll error: " << strerror(errno) << std::endl; break; } else if (ret == 0) { diff --git a/src/socket/SocketReader.cpp b/src/socket/SocketReader.cpp index b73b9eb0..2301ba69 100644 --- a/src/socket/SocketReader.cpp +++ b/src/socket/SocketReader.cpp @@ -46,7 +46,6 @@ SocketReader::SocketReader(Ingen::World& world, SocketReader::~SocketReader() { - stop(); join(); } -- cgit v1.2.1