From 7e013dc6986fa9d6dc8616d494d9de5d192c4c69 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Wed, 12 Jul 2006 06:34:30 +0000 Subject: Factored out Thread (and Slave, an explicitly signal-driven thread) git-svn-id: http://svn.drobilla.net/lad/ingen@87 a436a847-0d15-0410-975c-d299462d15a1 --- src/libs/engine/EventSource.h | 2 +- src/libs/engine/JackAudioDriver.cpp | 2 +- src/libs/engine/Maid.h | 2 +- src/libs/engine/Makefile.am | 3 + src/libs/engine/OSCReceiver.cpp | 147 +++++++++++++++++---------------- src/libs/engine/OSCReceiver.h | 9 ++- src/libs/engine/ObjectSender.cpp | 2 +- src/libs/engine/PostProcessor.cpp | 90 +++------------------ src/libs/engine/PostProcessor.h | 31 ++----- src/libs/engine/QueuedEventSource.cpp | 148 +++++++++------------------------- src/libs/engine/QueuedEventSource.h | 37 ++++----- src/libs/engine/Slave.h | 60 ++++++++++++++ src/libs/engine/Thread.cpp | 104 ++++++++++++++++++++++++ src/libs/engine/Thread.h | 63 +++++++++++++++ 14 files changed, 391 insertions(+), 309 deletions(-) create mode 100644 src/libs/engine/Slave.h create mode 100644 src/libs/engine/Thread.cpp create mode 100644 src/libs/engine/Thread.h (limited to 'src/libs/engine') diff --git a/src/libs/engine/EventSource.h b/src/libs/engine/EventSource.h index 99423dc3..273100b7 100644 --- a/src/libs/engine/EventSource.h +++ b/src/libs/engine/EventSource.h @@ -35,7 +35,7 @@ public: virtual ~EventSource() {} - virtual Event* pop_earliest_event_before(const samplecount time) = 0; + virtual Event* pop_earliest_before(const samplecount time) = 0; virtual void start() = 0; diff --git a/src/libs/engine/JackAudioDriver.cpp b/src/libs/engine/JackAudioDriver.cpp index 9ba5de82..92794796 100644 --- a/src/libs/engine/JackAudioDriver.cpp +++ b/src/libs/engine/JackAudioDriver.cpp @@ -281,7 +281,7 @@ JackAudioDriver::process_events(jack_nframes_t block_start, jack_nframes_t block // FIXME while ((ev = reinterpret_cast(om->osc_receiver()) - ->pop_earliest_event_before(block_end)) != NULL) { + ->pop_earliest_before(block_end)) != NULL) { ev->execute(0); // QueuedEvents are not sample accurate om->post_processor()->push(ev); if (++num_events_processed > MAX_SLOW_EVENTS) diff --git a/src/libs/engine/Maid.h b/src/libs/engine/Maid.h index 986bad20..a3f844ef 100644 --- a/src/libs/engine/Maid.h +++ b/src/libs/engine/Maid.h @@ -28,7 +28,7 @@ * * cleanup() is meant to be called periodically to free memory, often * enough to prevent the queue from overdflowing. This is done by the - * main thread, in OmApp. + * main thread (in OmApp.cpp) since it has nothing better to do. * * \ingroup engine */ diff --git a/src/libs/engine/Makefile.am b/src/libs/engine/Makefile.am index a7fd98c1..e814b3b8 100644 --- a/src/libs/engine/Makefile.am +++ b/src/libs/engine/Makefile.am @@ -73,6 +73,9 @@ libingen_la_SOURCES = \ Plugin.h \ Array.h \ List.h \ + Slave.h \ + Thread.h \ + Thread.cpp \ PostProcessor.h \ PostProcessor.cpp \ Connection.h \ diff --git a/src/libs/engine/OSCReceiver.cpp b/src/libs/engine/OSCReceiver.cpp index 206cd3da..30bda10b 100644 --- a/src/libs/engine/OSCReceiver.cpp +++ b/src/libs/engine/OSCReceiver.cpp @@ -52,76 +52,75 @@ using Shared::ClientKey; OSCReceiver::OSCReceiver(size_t queue_size, const char* const port) : QueuedEngineInterface(queue_size), _port(port), - _is_activated(false), - _st(NULL), + _server(NULL), _osc_responder(NULL) { - _st = lo_server_thread_new(port, error_cb); + _server = lo_server_new(port, error_cb); - if (_st == NULL) { + if (_server == NULL) { cerr << "[OSC] Could not start OSC server. Aborting." << endl; exit(EXIT_FAILURE); } else { - char* lo_url = lo_server_thread_get_url(_st); + char* lo_url = lo_server_get_url(_server); cout << "[OSC] Started OSC server at " << lo_url << endl; free(lo_url); } // For debugging, print all incoming OSC messages - lo_server_thread_add_method(_st, NULL, NULL, generic_cb, NULL); + lo_server_add_method(_server, NULL, NULL, generic_cb, NULL); // Set response address for this message. // It's important this is first and returns nonzero. - lo_server_thread_add_method(_st, NULL, NULL, set_response_address_cb, this); + lo_server_add_method(_server, NULL, NULL, set_response_address_cb, this); // Commands - lo_server_thread_add_method(_st, "/om/ping", "i", ping_cb, this); - lo_server_thread_add_method(_st, "/om/ping_slow", "i", ping_slow_cb, this); - lo_server_thread_add_method(_st, "/om/engine/quit", "i", quit_cb, this); - //lo_server_thread_add_method(_st, "/om/engine/register_client", "is", register_client_cb, this); - lo_server_thread_add_method(_st, "/om/engine/register_client", "i", register_client_cb, this); - lo_server_thread_add_method(_st, "/om/engine/unregister_client", "i", unregister_client_cb, this); - lo_server_thread_add_method(_st, "/om/engine/load_plugins", "i", load_plugins_cb, this); - lo_server_thread_add_method(_st, "/om/engine/activate", "i", engine_activate_cb, this); - lo_server_thread_add_method(_st, "/om/engine/deactivate", "i", engine_deactivate_cb, this); - lo_server_thread_add_method(_st, "/om/synth/create_patch", "isi", create_patch_cb, this); - lo_server_thread_add_method(_st, "/om/synth/enable_patch", "is", enable_patch_cb, this); - lo_server_thread_add_method(_st, "/om/synth/disable_patch", "is", disable_patch_cb, this); - lo_server_thread_add_method(_st, "/om/synth/clear_patch", "is", clear_patch_cb, this); - lo_server_thread_add_method(_st, "/om/synth/create_port", "issi", create_port_cb, this); - lo_server_thread_add_method(_st, "/om/synth/create_node", "issssi", create_node_cb, this); - lo_server_thread_add_method(_st, "/om/synth/create_node", "isssi", create_node_by_uri_cb, this); - lo_server_thread_add_method(_st, "/om/synth/destroy", "is", destroy_cb, this); - lo_server_thread_add_method(_st, "/om/synth/rename", "iss", rename_cb, this); - lo_server_thread_add_method(_st, "/om/synth/connect", "iss", connect_cb, this); - lo_server_thread_add_method(_st, "/om/synth/disconnect", "iss", disconnect_cb, this); - lo_server_thread_add_method(_st, "/om/synth/disconnect_all", "is", disconnect_all_cb, this); - lo_server_thread_add_method(_st, "/om/synth/set_port_value", "isf", set_port_value_cb, this); - lo_server_thread_add_method(_st, "/om/synth/set_port_value", "isif", set_port_value_voice_cb, this); - lo_server_thread_add_method(_st, "/om/synth/set_port_value_slow", "isf", set_port_value_slow_cb, this); - lo_server_thread_add_method(_st, "/om/synth/note_on", "isii", note_on_cb, this); - lo_server_thread_add_method(_st, "/om/synth/note_off", "isi", note_off_cb, this); - lo_server_thread_add_method(_st, "/om/synth/all_notes_off", "isi", all_notes_off_cb, this); - lo_server_thread_add_method(_st, "/om/synth/midi_learn", "is", midi_learn_cb, this); + lo_server_add_method(_server, "/om/ping", "i", ping_cb, this); + lo_server_add_method(_server, "/om/ping_slow", "i", ping_slow_cb, this); + lo_server_add_method(_server, "/om/engine/quit", "i", quit_cb, this); + //lo_server_add_method(_server, "/om/engine/register_client", "is", register_client_cb, this); + lo_server_add_method(_server, "/om/engine/register_client", "i", register_client_cb, this); + lo_server_add_method(_server, "/om/engine/unregister_client", "i", unregister_client_cb, this); + lo_server_add_method(_server, "/om/engine/load_plugins", "i", load_plugins_cb, this); + lo_server_add_method(_server, "/om/engine/activate", "i", engine_activate_cb, this); + lo_server_add_method(_server, "/om/engine/deactivate", "i", engine_deactivate_cb, this); + lo_server_add_method(_server, "/om/synth/create_patch", "isi", create_patch_cb, this); + lo_server_add_method(_server, "/om/synth/enable_patch", "is", enable_patch_cb, this); + lo_server_add_method(_server, "/om/synth/disable_patch", "is", disable_patch_cb, this); + lo_server_add_method(_server, "/om/synth/clear_patch", "is", clear_patch_cb, this); + lo_server_add_method(_server, "/om/synth/create_port", "issi", create_port_cb, this); + lo_server_add_method(_server, "/om/synth/create_node", "issssi", create_node_cb, this); + lo_server_add_method(_server, "/om/synth/create_node", "isssi", create_node_by_uri_cb, this); + lo_server_add_method(_server, "/om/synth/destroy", "is", destroy_cb, this); + lo_server_add_method(_server, "/om/synth/rename", "iss", rename_cb, this); + lo_server_add_method(_server, "/om/synth/connect", "iss", connect_cb, this); + lo_server_add_method(_server, "/om/synth/disconnect", "iss", disconnect_cb, this); + lo_server_add_method(_server, "/om/synth/disconnect_all", "is", disconnect_all_cb, this); + lo_server_add_method(_server, "/om/synth/set_port_value", "isf", set_port_value_cb, this); + lo_server_add_method(_server, "/om/synth/set_port_value", "isif", set_port_value_voice_cb, this); + lo_server_add_method(_server, "/om/synth/set_port_value_slow", "isf", set_port_value_slow_cb, this); + lo_server_add_method(_server, "/om/synth/note_on", "isii", note_on_cb, this); + lo_server_add_method(_server, "/om/synth/note_off", "isi", note_off_cb, this); + lo_server_add_method(_server, "/om/synth/all_notes_off", "isi", all_notes_off_cb, this); + lo_server_add_method(_server, "/om/synth/midi_learn", "is", midi_learn_cb, this); #ifdef HAVE_LASH - lo_server_thread_add_method(_st, "/om/lash/restore_finished", "i", lash_restore_done_cb, this); + lo_server_add_method(_server, "/om/lash/restore_finished", "i", lash_restore_done_cb, this); #endif - lo_server_thread_add_method(_st, "/om/metadata/request", "isss", metadata_get_cb, this); - lo_server_thread_add_method(_st, "/om/metadata/set", "isss", metadata_set_cb, this); + lo_server_add_method(_server, "/om/metadata/request", "isss", metadata_get_cb, this); + lo_server_add_method(_server, "/om/metadata/set", "isss", metadata_set_cb, this); // Queries - lo_server_thread_add_method(_st, "/om/request/plugins", "i", request_plugins_cb, this); - lo_server_thread_add_method(_st, "/om/request/all_objects", "i", request_all_objects_cb, this); - lo_server_thread_add_method(_st, "/om/request/port_value", "is", request_port_value_cb, this); + lo_server_add_method(_server, "/om/request/plugins", "i", request_plugins_cb, this); + lo_server_add_method(_server, "/om/request/all_objects", "i", request_all_objects_cb, this); + lo_server_add_method(_server, "/om/request/port_value", "is", request_port_value_cb, this); // DSSI support #ifdef HAVE_DSSI // XXX WARNING: notice this is a catch-all - lo_server_thread_add_method(_st, NULL, NULL, dssi_cb, this); + lo_server_add_method(_server, NULL, NULL, dssi_cb, this); #endif - lo_server_thread_add_method(_st, NULL, NULL, unknown_cb, NULL); + lo_server_add_method(_server, NULL, NULL, unknown_cb, NULL); } @@ -129,9 +128,9 @@ OSCReceiver::~OSCReceiver() { deactivate(); - if (_st != NULL) { - lo_server_thread_free(_st); - _st = NULL; + if (_server != NULL) { + lo_server_free(_server); + _server = NULL; } } @@ -139,41 +138,49 @@ OSCReceiver::~OSCReceiver() void OSCReceiver::start() { + set_name("OSCReceiver"); QueuedEventSource::start(); - - if (!_is_activated) { - lo_server_thread_start(_st); - _is_activated = true; - } - - /* Waiting on the next liblo release - pthread_t lo_thread = lo_server_thread_get_thread(_st); - - sched_param sp; - sp.sched_priority = 20; - int result = pthread_setschedparam(lo_thread, SCHED_FIFO, &sp); - if (!result) - cout << "[OSC] Set OSC thread to realtime scheduling (SCHED_FIFO, priority " - << sp.sched_priority << ")" << endl; - else - cout << "[OSC] Unable to set OSC thread to realtime scheduling (" - << strerror(result) << endl; - */ + set_scheduling(SCHED_FIFO, 10); } void OSCReceiver::stop() { - if (_is_activated) { - lo_server_thread_stop(_st); - cout << "[OSCReceiver] Stopped OSC server thread" << endl; - _is_activated = false; - } + cout << "[OSCReceiver] Stopped OSC listening thread" << endl; QueuedEventSource::stop(); } +/** Override the semaphore driven _run method of QueuedEngineInterface + * to wait on OSC messages and prepare them right away in the same thread. + */ +void +OSCReceiver::_run() +{ + /* FIXME: Make Event() take a timestamp as a parameter, get a timestamp + * here and stamp all the events with the same time so they all get + * executed in the same cycle */ + + while (true) { + assert( ! unprepared_events()); + + // Wait on a message and enqueue it + lo_server_recv(_server); + + // Enqueue every other message that is here "now" + // (would this provide truly atomic bundles?) + while (lo_server_recv_noblock(_server, 0) > 0) ; + + // Process them all + while (unprepared_events()) + _signalled(); + + // No more unprepared events + } +} + + /** Create a new responder for this message, if necessary. * * This is based on the fact that the current responder is stored in a ref diff --git a/src/libs/engine/OSCReceiver.h b/src/libs/engine/OSCReceiver.h index 15a30fc1..c966632f 100644 --- a/src/libs/engine/OSCReceiver.h +++ b/src/libs/engine/OSCReceiver.h @@ -43,6 +43,8 @@ inline static int name##_cb(LO_HANDLER_ARGS, void* osc_receiver)\ { return ((OSCReceiver*)osc_receiver)->m_##name##_cb(path, types, argv, argc, msg); } +/* FIXME: Make this receive and preprocess in the same thread? */ + /** Receives OSC messages from liblo. * @@ -66,7 +68,9 @@ private: // Prevent copies (undefined) OSCReceiver(const OSCReceiver&); OSCReceiver& operator=(const OSCReceiver&); - + + virtual void _run(); + static void error_cb(int num, const char* msg, const char* path); static int set_response_address_cb(LO_HANDLER_ARGS, void* osc_receiver); static int generic_cb(LO_HANDLER_ARGS, void* osc_receiver); @@ -112,8 +116,7 @@ private: #endif const char* const _port; - bool _is_activated; - lo_server_thread _st; + lo_server _server; /** Cached OSC responder (for most recent incoming message) */ CountedPtr _osc_responder; diff --git a/src/libs/engine/ObjectSender.cpp b/src/libs/engine/ObjectSender.cpp index 8a5fc367..347c9ce3 100644 --- a/src/libs/engine/ObjectSender.cpp +++ b/src/libs/engine/ObjectSender.cpp @@ -162,7 +162,7 @@ ObjectSender::send_port(ClientInterface* client, const Port* port) if (port->type() == DataType::FLOAT && port->buffer_size() == 1) { sample default_value = dynamic_cast*>( port)->buffer(0)->value_at(0); - cerr << port->path() << " sending default value " << default_value << endl; + //cerr << port->path() << " sending default value " << default_value << endl; client->control_change(port->path(), default_value); } diff --git a/src/libs/engine/PostProcessor.cpp b/src/libs/engine/PostProcessor.cpp index e339f757..8b68159e 100644 --- a/src/libs/engine/PostProcessor.cpp +++ b/src/libs/engine/PostProcessor.cpp @@ -29,94 +29,28 @@ using std::cerr; using std::cout; using std::endl; namespace Om { -bool PostProcessor::m_process_thread_exit_flag = false; - PostProcessor::PostProcessor(size_t queue_size) -: m_events(queue_size), - m_thread_exists(false), - m_semaphore(0) -{ -} - - -PostProcessor::~PostProcessor() -{ - stop(); -} - - -/** Start the process thread. - */ -void -PostProcessor::start() -{ - cout << "[PostProcessor] Starting." << endl; - - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, 1500000); - - pthread_create(&m_process_thread, &attr, process_events, this); - m_thread_exists = true; -} - - -/** Stop the process thread. - */ -void -PostProcessor::stop() +: _events(queue_size) { - if (m_thread_exists) { - m_process_thread_exit_flag = true; - pthread_cancel(m_process_thread); - pthread_join(m_process_thread, NULL); - m_thread_exists = false; - } + set_name("PostProcessor"); } -/** Signal the PostProcessor to process all pending events. +/** Post processing thread. + * + * Infinite loop that waits on the semaphore and processes every enqueued + * event (to be signalled at the end of every process cycle). */ void -PostProcessor::signal() +PostProcessor::_signalled() { - m_semaphore.post(); -} - - -void* -PostProcessor::process_events(void* osc_processer) -{ - PostProcessor* me = (PostProcessor*)osc_processer; - return me->m_process_events(); -} - - -/** OSC message processing thread. - */ -void* -PostProcessor::m_process_events() -{ - Event* ev = NULL; - - while (true) { - m_semaphore.wait(); - - if (m_process_thread_exit_flag) - break; - - while (!m_events.is_empty()) { - ev = m_events.pop(); - assert(ev != NULL); - ev->post_process(); - om->maid()->push(ev); - } + while ( ! _events.is_empty()) { + Event* const ev = _events.pop(); + assert(ev); + ev->post_process(); + om->maid()->push(ev); } - - cout << "[PostProcessor] Exiting post processor thread." << endl; - - return NULL; } } // namespace Om diff --git a/src/libs/engine/PostProcessor.h b/src/libs/engine/PostProcessor.h index 2083c442..e0c384eb 100644 --- a/src/libs/engine/PostProcessor.h +++ b/src/libs/engine/PostProcessor.h @@ -21,6 +21,7 @@ #include "types.h" #include "util/Queue.h" #include "util/Semaphore.h" +#include "Slave.h" namespace Om { @@ -35,44 +36,24 @@ class Event; * * \ingroup engine */ -class PostProcessor +class PostProcessor : public Slave { public: PostProcessor(size_t queue_size); - ~PostProcessor(); - void start(); - void stop(); - - inline void push(Event* const ev); - void signal(); + /** Push an event on to the process queue, realtime-safe, not thread-safe. */ + inline void push(Event* const ev) { _events.push(ev); } private: // Prevent copies PostProcessor(const PostProcessor&); PostProcessor& operator=(const PostProcessor&); - Queue m_events; - - static void* process_events(void* me); - void* m_process_events(); - - pthread_t m_process_thread; - bool m_thread_exists; - static bool m_process_thread_exit_flag; - Semaphore m_semaphore; + Queue _events; + virtual void _signalled(); }; -/** Push an event on to the process queue, realtime-safe, not thread-safe. - */ -inline void -PostProcessor::push(Event* const ev) -{ - m_events.push(ev); -} - - } // namespace Om #endif // POSTPROCESSOR_H diff --git a/src/libs/engine/QueuedEventSource.cpp b/src/libs/engine/QueuedEventSource.cpp index d41b2201..8254f3c8 100644 --- a/src/libs/engine/QueuedEventSource.cpp +++ b/src/libs/engine/QueuedEventSource.cpp @@ -25,20 +25,15 @@ namespace Om { QueuedEventSource::QueuedEventSource(size_t size) -: m_front(0), - m_back(0), - m_prepared_back(0), - m_size(size+1), - m_thread_exists(false), - m_prepare_thread_exit_flag(false), - m_semaphore(0) +: _front(0), + _back(0), + _prepared_back(0), + _size(size+1), + _blocking_semaphore(0) { - m_events = (QueuedEvent**)calloc(m_size, sizeof(QueuedEvent*)); + _events = (QueuedEvent**)calloc(_size, sizeof(QueuedEvent*)); - pthread_mutex_init(&m_blocking_mutex, NULL); - pthread_cond_init(&m_blocking_cond, NULL); - - mlock(m_events, m_size * sizeof(QueuedEvent*)); + mlock(_events, _size * sizeof(QueuedEvent*)); } @@ -46,49 +41,7 @@ QueuedEventSource::~QueuedEventSource() { stop(); - free(m_events); - pthread_mutex_destroy(&m_blocking_mutex); - pthread_cond_destroy(&m_blocking_cond); -} - - -/** Start the prepare thread. - */ -void -QueuedEventSource::start() -{ - if (m_thread_exists) { - cerr << "[QueuedEventSource] Thread already launched?" << endl; - return; - } else { - cout << "[QueuedEventSource] Launching thread." << endl; - } - - m_prepare_thread_exit_flag = false; - - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, 1500000); - - pthread_create(&m_prepare_thread, &attr, &QueuedEventSource::prepare_loop, this); - pthread_attr_destroy(&attr); - - m_thread_exists = true; -} - - -/** Destroy the prepare thread. - */ -void -QueuedEventSource::stop() -{ - if (m_thread_exists) { - m_prepare_thread_exit_flag = true; - pthread_cancel(m_prepare_thread); - pthread_join(m_prepare_thread, NULL); - m_thread_exists = false; - cout << "[QueuedEventSource] Stopped thread." << endl; - } + free(_events); } @@ -99,13 +52,13 @@ QueuedEventSource::push(QueuedEvent* const ev) { assert(!ev->is_prepared()); - if (m_events[m_back] != NULL) { + if (_events[_back] != NULL) { cerr << "[QueuedEventSource] Error: Queue is full! Event is lost, please report!" << endl; delete ev; } else { - m_events[m_back] = ev; - m_back = (m_back + 1) % m_size; - m_semaphore.post(); + _events[_back] = ev; + _back = (_back + 1) % _size; + signal(); } } @@ -115,18 +68,18 @@ QueuedEventSource::push(QueuedEvent* const ev) * This method will only pop events that have been prepared, and are * stamped before the time passed. In other words, it may return NULL * even if there are events pending in the queue. The events returned are - * actually QueuedEvent*s, but after this they are "normal" events and the + * actually QueuedEvents, but after this they are "normal" events and the * engine deals with them just like a realtime in-band event. */ Event* -QueuedEventSource::pop_earliest_event_before(const samplecount time) +QueuedEventSource::pop_earliest_before(const samplecount time) { - QueuedEvent* front_event = m_events[m_front]; + QueuedEvent* const front_event = _events[_front]; // Pop - if (front_event != NULL && front_event->time_stamp() < time && front_event->is_prepared()) { - m_events[m_front] = NULL; - m_front = (m_front + 1) % m_size; + if (front_event && front_event->is_prepared() && front_event->time_stamp() < time) { + _events[_front] = NULL; + _front = (_front + 1) % _size; return front_event; } else { return NULL; @@ -137,63 +90,40 @@ QueuedEventSource::pop_earliest_event_before(const samplecount time) // Private // - /** Signal that the blocking event is finished. * - * When this is called preparing will resume. This will be called by + * When this is called preparing will resume. This MUST be called by * blocking events in their post_process() method. */ void QueuedEventSource::unblock() { - /* FIXME: Make this a semaphore, and have events signal at the end of their - * execute() methods so the preprocessor can start preparing events immediately - * instead of waiting for the postprocessor to get around to finalizing the event? */ - pthread_mutex_lock(&m_blocking_mutex); - pthread_cond_signal(&m_blocking_cond); - pthread_mutex_unlock(&m_blocking_mutex); + _blocking_semaphore.post(); } -void* -QueuedEventSource::m_prepare_loop() +void +QueuedEventSource::_signalled() { - QueuedEvent* ev = NULL; - - while (true) { - m_semaphore.wait(); - - if (m_prepare_thread_exit_flag) - break; // exit signalled - - ev = m_events[m_prepared_back]; - assert(ev != NULL); - - if (ev == NULL) { - cerr << "[QueuedEventSource] ERROR: Signalled, but event is NULL." << endl; - continue; - } - - assert(ev != NULL); - assert(!ev->is_prepared()); - - if (ev->is_blocking()) - pthread_mutex_lock(&m_blocking_mutex); - - ev->pre_process(); - - m_prepared_back = (m_prepared_back+1) % m_size; - - // If a blocking event, wait for event to finish passing through - // the audio cycle before preparing the next event - if (ev->is_blocking()) { - pthread_cond_wait(&m_blocking_cond, &m_blocking_mutex); - pthread_mutex_unlock(&m_blocking_mutex); - } + QueuedEvent* const ev = _events[_prepared_back]; + assert(ev != NULL); + + if (ev == NULL) { + cerr << "[QueuedEventSource] ERROR: Signalled, but event is NULL." << endl; + return; } - cout << "[QueuedEventSource] Exiting slow event queue thread." << endl; - return NULL; + assert(ev != NULL); + assert(!ev->is_prepared()); + + ev->pre_process(); + + _prepared_back = (_prepared_back+1) % _size; + + // If event was blocking, wait for event to being run through the + // process thread before preparing the next event + if (ev->is_blocking()) + _blocking_semaphore.wait(); } diff --git a/src/libs/engine/QueuedEventSource.h b/src/libs/engine/QueuedEventSource.h index 15fb45f1..c3e6904a 100644 --- a/src/libs/engine/QueuedEventSource.h +++ b/src/libs/engine/QueuedEventSource.h @@ -21,6 +21,7 @@ #include #include "types.h" #include "util/Semaphore.h" +#include "Slave.h" #include "EventSource.h" namespace Om { @@ -35,22 +36,27 @@ class QueuedEvent; * popping are threadsafe, as long as a single thread pushes and a single * thread pops (ie this data structure is threadsafe, but the push and pop * methods themselves are not). + * + * This class is it's own slave. :) */ -class QueuedEventSource : public EventSource +class QueuedEventSource : public EventSource, protected Slave { public: QueuedEventSource(size_t size); ~QueuedEventSource(); - Event* pop_earliest_event_before(const samplecount time); + void start() { Thread::start(); } + void stop() { Thread::stop(); } - void unblock(); + Event* pop_earliest_before(const samplecount time); - void start(); - void stop(); + void unblock(); protected: void push(QueuedEvent* const ev); + bool unprepared_events() { return (_prepared_back != _back); } + + virtual void _signalled(); ///< Prepare 1 event private: // Prevent copies (undefined) @@ -60,21 +66,12 @@ private: // Note that it's crucially important which functions access which of these // variables, to maintain threadsafeness. - size_t m_front; ///< Front of queue - size_t m_back; ///< Back of entire queue (1 past index of back element) - size_t m_prepared_back; ///< Back of prepared section (1 past index of back prepared element) - const size_t m_size; - QueuedEvent** m_events; - - bool m_thread_exists; - bool m_prepare_thread_exit_flag; - pthread_t m_prepare_thread; - Semaphore m_semaphore; ///< Counting semaphor for driving prepare thread - pthread_mutex_t m_blocking_mutex; - pthread_cond_t m_blocking_cond; - - static void* prepare_loop(void* q) { return ((QueuedEventSource*)q)->m_prepare_loop(); } - void* m_prepare_loop(); + size_t _front; ///< Front of queue + size_t _back; ///< Back of entire queue (1 past index of back element) + size_t _prepared_back; ///< Back of prepared section (1 past index of back prepared element) + const size_t _size; + QueuedEvent** _events; + Semaphore _blocking_semaphore; }; diff --git a/src/libs/engine/Slave.h b/src/libs/engine/Slave.h new file mode 100644 index 00000000..e6cc8ed5 --- /dev/null +++ b/src/libs/engine/Slave.h @@ -0,0 +1,60 @@ +/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard. + * + * Om is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Om is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef SLAVE_H +#define SLAVE_H + +#include +#include "util/Semaphore.h" +#include "Thread.h" + +namespace Om { + + +/** Thread driven by (realtime safe) signals. + * + * \ingroup engine + */ +class Slave : public Thread +{ +public: + Slave() : _semaphore(0) {} + + inline void signal() { _semaphore.post(); } + +protected: + virtual void _signalled() = 0; + + Semaphore _semaphore; + +private: + // Prevent copies + Slave(const Slave&); + Slave& operator=(const Slave&); + + void _run() + { + while (true) { + _semaphore.wait(); + _signalled(); + } + } +}; + + +} // namespace Om + +#endif // SLAVE_H diff --git a/src/libs/engine/Thread.cpp b/src/libs/engine/Thread.cpp new file mode 100644 index 00000000..6ddfa621 --- /dev/null +++ b/src/libs/engine/Thread.cpp @@ -0,0 +1,104 @@ +/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard. + * + * Om is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Om is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "Thread.h" +#include +#include +#include + +using std::cerr; using std::cout; using std::endl; + +namespace Om { + + +Thread::Thread() +: _pthread_exists(false) +{ +} + + +Thread::~Thread() +{ + stop(); +} + + +/** Start the process thread. + */ +void +Thread::start() +{ + cout << "[" << _name << " Thread] Starting." << endl; + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, 1500000); + + pthread_create(&_pthread, &attr, _static_run, this); + _pthread_exists = true; +} + + +/** Stop the process thread. + */ +void +Thread::stop() +{ + if (_pthread_exists) { + pthread_cancel(_pthread); + pthread_join(_pthread, NULL); + _pthread_exists = false; + } +} + + +/** Set the scheduling policy for this thread. + * + * @param must be one of SCHED_FIFO, SCHED_RR, or SCHED_OTHER. + */ +void +Thread::set_scheduling(int policy, unsigned int priority) +{ + sched_param sp; + sp.sched_priority = priority; + int result = pthread_setschedparam(_pthread, SCHED_FIFO, &sp); + if (!result) { + cout << "[" << _name << " Thread] Set scheduling policy to "; + switch (policy) { + case SCHED_FIFO: cout << "SCHED_FIFO"; break; + case SCHED_RR: cout << "SCHED_RR"; break; + case SCHED_OTHER: cout << "SCHED_OTHER"; break; + default: cout << "UNKNOWN"; break; + } + cout << ", priority " << sp.sched_priority << endl; + } else { + cout << "[" << _name << " Thread] Unable to set scheduling policy (" + << strerror(result) << ")" << endl; + } +} + + +void* +Thread::_static_run(void* me) +{ + Thread* myself = (Thread*)me; + myself->_run(); + // and I + return NULL; +} + +} // namespace Om + diff --git a/src/libs/engine/Thread.h b/src/libs/engine/Thread.h new file mode 100644 index 00000000..ee59ee5c --- /dev/null +++ b/src/libs/engine/Thread.h @@ -0,0 +1,63 @@ +/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard. + * + * Om is free software; you can redistribute it and/or modify it under the + * terms of the GNU General Public License as published by the Free Software + * Foundation; either version 2 of the License, or (at your option) any later + * version. + * + * Om is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef THREAD_H +#define THREAD_H + +#include +#include + +namespace Om { + + +/* FIXME: This isn't Ingen specific at all. Move it to util. */ + + +/** Abstract base class for all threads. + * + * \ingroup engine + */ +class Thread +{ +public: + Thread(); + virtual ~Thread(); + + virtual void start(); + virtual void stop(); + + void set_name(const std::string& name) { _name = name; } + void set_scheduling(int policy, unsigned int priority); + +protected: + virtual void _run() = 0; + + std::string _name; + pthread_t _pthread; + bool _pthread_exists; + +private: + // Prevent copies + Thread(const Thread&); + Thread& operator=(const Thread&); + + static void* _static_run(void* me); +}; + + +} // namespace Om + +#endif // THREAD_H -- cgit v1.2.1