summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2006-07-12 06:34:30 +0000
committerDavid Robillard <d@drobilla.net>2006-07-12 06:34:30 +0000
commit7e013dc6986fa9d6dc8616d494d9de5d192c4c69 (patch)
tree89bc9d97375fafae33cf22f1020c788baa8d326c
parent120757b8cb154266aae21472a49f0c00309a7dde (diff)
downloadingen-7e013dc6986fa9d6dc8616d494d9de5d192c4c69.tar.gz
ingen-7e013dc6986fa9d6dc8616d494d9de5d192c4c69.tar.bz2
ingen-7e013dc6986fa9d6dc8616d494d9de5d192c4c69.zip
Factored out Thread (and Slave, an explicitly signal-driven thread)
git-svn-id: http://svn.drobilla.net/lad/ingen@87 a436a847-0d15-0410-975c-d299462d15a1
-rw-r--r--src/common/util/Semaphore.h2
-rw-r--r--src/libs/engine/EventSource.h2
-rw-r--r--src/libs/engine/JackAudioDriver.cpp2
-rw-r--r--src/libs/engine/Maid.h2
-rw-r--r--src/libs/engine/Makefile.am3
-rw-r--r--src/libs/engine/OSCReceiver.cpp147
-rw-r--r--src/libs/engine/OSCReceiver.h9
-rw-r--r--src/libs/engine/ObjectSender.cpp2
-rw-r--r--src/libs/engine/PostProcessor.cpp90
-rw-r--r--src/libs/engine/PostProcessor.h31
-rw-r--r--src/libs/engine/QueuedEventSource.cpp148
-rw-r--r--src/libs/engine/QueuedEventSource.h37
-rw-r--r--src/libs/engine/Slave.h60
-rw-r--r--src/libs/engine/Thread.cpp104
-rw-r--r--src/libs/engine/Thread.h63
15 files changed, 392 insertions, 310 deletions
diff --git a/src/common/util/Semaphore.h b/src/common/util/Semaphore.h
index 045984a6..fdf63802 100644
--- a/src/common/util/Semaphore.h
+++ b/src/common/util/Semaphore.h
@@ -27,7 +27,7 @@
* work in GDB. Turns out sem_wait can fail when run in GDB, and Debian
* really needs to update it's man pages.
*
- * This class remains as a pretty wrapper/abstraction that does nothing.
+ * This class remains as a trivial (yet pretty) wrapper/abstraction.
*/
class Semaphore {
public:
diff --git a/src/libs/engine/EventSource.h b/src/libs/engine/EventSource.h
index 99423dc3..273100b7 100644
--- a/src/libs/engine/EventSource.h
+++ b/src/libs/engine/EventSource.h
@@ -35,7 +35,7 @@ public:
virtual ~EventSource() {}
- virtual Event* pop_earliest_event_before(const samplecount time) = 0;
+ virtual Event* pop_earliest_before(const samplecount time) = 0;
virtual void start() = 0;
diff --git a/src/libs/engine/JackAudioDriver.cpp b/src/libs/engine/JackAudioDriver.cpp
index 9ba5de82..92794796 100644
--- a/src/libs/engine/JackAudioDriver.cpp
+++ b/src/libs/engine/JackAudioDriver.cpp
@@ -281,7 +281,7 @@ JackAudioDriver::process_events(jack_nframes_t block_start, jack_nframes_t block
// FIXME
while ((ev = reinterpret_cast<EventSource*>(om->osc_receiver())
- ->pop_earliest_event_before(block_end)) != NULL) {
+ ->pop_earliest_before(block_end)) != NULL) {
ev->execute(0); // QueuedEvents are not sample accurate
om->post_processor()->push(ev);
if (++num_events_processed > MAX_SLOW_EVENTS)
diff --git a/src/libs/engine/Maid.h b/src/libs/engine/Maid.h
index 986bad20..a3f844ef 100644
--- a/src/libs/engine/Maid.h
+++ b/src/libs/engine/Maid.h
@@ -28,7 +28,7 @@
*
* cleanup() is meant to be called periodically to free memory, often
* enough to prevent the queue from overdflowing. This is done by the
- * main thread, in OmApp.
+ * main thread (in OmApp.cpp) since it has nothing better to do.
*
* \ingroup engine
*/
diff --git a/src/libs/engine/Makefile.am b/src/libs/engine/Makefile.am
index a7fd98c1..e814b3b8 100644
--- a/src/libs/engine/Makefile.am
+++ b/src/libs/engine/Makefile.am
@@ -73,6 +73,9 @@ libingen_la_SOURCES = \
Plugin.h \
Array.h \
List.h \
+ Slave.h \
+ Thread.h \
+ Thread.cpp \
PostProcessor.h \
PostProcessor.cpp \
Connection.h \
diff --git a/src/libs/engine/OSCReceiver.cpp b/src/libs/engine/OSCReceiver.cpp
index 206cd3da..30bda10b 100644
--- a/src/libs/engine/OSCReceiver.cpp
+++ b/src/libs/engine/OSCReceiver.cpp
@@ -52,76 +52,75 @@ using Shared::ClientKey;
OSCReceiver::OSCReceiver(size_t queue_size, const char* const port)
: QueuedEngineInterface(queue_size),
_port(port),
- _is_activated(false),
- _st(NULL),
+ _server(NULL),
_osc_responder(NULL)
{
- _st = lo_server_thread_new(port, error_cb);
+ _server = lo_server_new(port, error_cb);
- if (_st == NULL) {
+ if (_server == NULL) {
cerr << "[OSC] Could not start OSC server. Aborting." << endl;
exit(EXIT_FAILURE);
} else {
- char* lo_url = lo_server_thread_get_url(_st);
+ char* lo_url = lo_server_get_url(_server);
cout << "[OSC] Started OSC server at " << lo_url << endl;
free(lo_url);
}
// For debugging, print all incoming OSC messages
- lo_server_thread_add_method(_st, NULL, NULL, generic_cb, NULL);
+ lo_server_add_method(_server, NULL, NULL, generic_cb, NULL);
// Set response address for this message.
// It's important this is first and returns nonzero.
- lo_server_thread_add_method(_st, NULL, NULL, set_response_address_cb, this);
+ lo_server_add_method(_server, NULL, NULL, set_response_address_cb, this);
// Commands
- lo_server_thread_add_method(_st, "/om/ping", "i", ping_cb, this);
- lo_server_thread_add_method(_st, "/om/ping_slow", "i", ping_slow_cb, this);
- lo_server_thread_add_method(_st, "/om/engine/quit", "i", quit_cb, this);
- //lo_server_thread_add_method(_st, "/om/engine/register_client", "is", register_client_cb, this);
- lo_server_thread_add_method(_st, "/om/engine/register_client", "i", register_client_cb, this);
- lo_server_thread_add_method(_st, "/om/engine/unregister_client", "i", unregister_client_cb, this);
- lo_server_thread_add_method(_st, "/om/engine/load_plugins", "i", load_plugins_cb, this);
- lo_server_thread_add_method(_st, "/om/engine/activate", "i", engine_activate_cb, this);
- lo_server_thread_add_method(_st, "/om/engine/deactivate", "i", engine_deactivate_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/create_patch", "isi", create_patch_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/enable_patch", "is", enable_patch_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/disable_patch", "is", disable_patch_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/clear_patch", "is", clear_patch_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/create_port", "issi", create_port_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/create_node", "issssi", create_node_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/create_node", "isssi", create_node_by_uri_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/destroy", "is", destroy_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/rename", "iss", rename_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/connect", "iss", connect_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/disconnect", "iss", disconnect_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/disconnect_all", "is", disconnect_all_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/set_port_value", "isf", set_port_value_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/set_port_value", "isif", set_port_value_voice_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/set_port_value_slow", "isf", set_port_value_slow_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/note_on", "isii", note_on_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/note_off", "isi", note_off_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/all_notes_off", "isi", all_notes_off_cb, this);
- lo_server_thread_add_method(_st, "/om/synth/midi_learn", "is", midi_learn_cb, this);
+ lo_server_add_method(_server, "/om/ping", "i", ping_cb, this);
+ lo_server_add_method(_server, "/om/ping_slow", "i", ping_slow_cb, this);
+ lo_server_add_method(_server, "/om/engine/quit", "i", quit_cb, this);
+ //lo_server_add_method(_server, "/om/engine/register_client", "is", register_client_cb, this);
+ lo_server_add_method(_server, "/om/engine/register_client", "i", register_client_cb, this);
+ lo_server_add_method(_server, "/om/engine/unregister_client", "i", unregister_client_cb, this);
+ lo_server_add_method(_server, "/om/engine/load_plugins", "i", load_plugins_cb, this);
+ lo_server_add_method(_server, "/om/engine/activate", "i", engine_activate_cb, this);
+ lo_server_add_method(_server, "/om/engine/deactivate", "i", engine_deactivate_cb, this);
+ lo_server_add_method(_server, "/om/synth/create_patch", "isi", create_patch_cb, this);
+ lo_server_add_method(_server, "/om/synth/enable_patch", "is", enable_patch_cb, this);
+ lo_server_add_method(_server, "/om/synth/disable_patch", "is", disable_patch_cb, this);
+ lo_server_add_method(_server, "/om/synth/clear_patch", "is", clear_patch_cb, this);
+ lo_server_add_method(_server, "/om/synth/create_port", "issi", create_port_cb, this);
+ lo_server_add_method(_server, "/om/synth/create_node", "issssi", create_node_cb, this);
+ lo_server_add_method(_server, "/om/synth/create_node", "isssi", create_node_by_uri_cb, this);
+ lo_server_add_method(_server, "/om/synth/destroy", "is", destroy_cb, this);
+ lo_server_add_method(_server, "/om/synth/rename", "iss", rename_cb, this);
+ lo_server_add_method(_server, "/om/synth/connect", "iss", connect_cb, this);
+ lo_server_add_method(_server, "/om/synth/disconnect", "iss", disconnect_cb, this);
+ lo_server_add_method(_server, "/om/synth/disconnect_all", "is", disconnect_all_cb, this);
+ lo_server_add_method(_server, "/om/synth/set_port_value", "isf", set_port_value_cb, this);
+ lo_server_add_method(_server, "/om/synth/set_port_value", "isif", set_port_value_voice_cb, this);
+ lo_server_add_method(_server, "/om/synth/set_port_value_slow", "isf", set_port_value_slow_cb, this);
+ lo_server_add_method(_server, "/om/synth/note_on", "isii", note_on_cb, this);
+ lo_server_add_method(_server, "/om/synth/note_off", "isi", note_off_cb, this);
+ lo_server_add_method(_server, "/om/synth/all_notes_off", "isi", all_notes_off_cb, this);
+ lo_server_add_method(_server, "/om/synth/midi_learn", "is", midi_learn_cb, this);
#ifdef HAVE_LASH
- lo_server_thread_add_method(_st, "/om/lash/restore_finished", "i", lash_restore_done_cb, this);
+ lo_server_add_method(_server, "/om/lash/restore_finished", "i", lash_restore_done_cb, this);
#endif
- lo_server_thread_add_method(_st, "/om/metadata/request", "isss", metadata_get_cb, this);
- lo_server_thread_add_method(_st, "/om/metadata/set", "isss", metadata_set_cb, this);
+ lo_server_add_method(_server, "/om/metadata/request", "isss", metadata_get_cb, this);
+ lo_server_add_method(_server, "/om/metadata/set", "isss", metadata_set_cb, this);
// Queries
- lo_server_thread_add_method(_st, "/om/request/plugins", "i", request_plugins_cb, this);
- lo_server_thread_add_method(_st, "/om/request/all_objects", "i", request_all_objects_cb, this);
- lo_server_thread_add_method(_st, "/om/request/port_value", "is", request_port_value_cb, this);
+ lo_server_add_method(_server, "/om/request/plugins", "i", request_plugins_cb, this);
+ lo_server_add_method(_server, "/om/request/all_objects", "i", request_all_objects_cb, this);
+ lo_server_add_method(_server, "/om/request/port_value", "is", request_port_value_cb, this);
// DSSI support
#ifdef HAVE_DSSI
// XXX WARNING: notice this is a catch-all
- lo_server_thread_add_method(_st, NULL, NULL, dssi_cb, this);
+ lo_server_add_method(_server, NULL, NULL, dssi_cb, this);
#endif
- lo_server_thread_add_method(_st, NULL, NULL, unknown_cb, NULL);
+ lo_server_add_method(_server, NULL, NULL, unknown_cb, NULL);
}
@@ -129,9 +128,9 @@ OSCReceiver::~OSCReceiver()
{
deactivate();
- if (_st != NULL) {
- lo_server_thread_free(_st);
- _st = NULL;
+ if (_server != NULL) {
+ lo_server_free(_server);
+ _server = NULL;
}
}
@@ -139,41 +138,49 @@ OSCReceiver::~OSCReceiver()
void
OSCReceiver::start()
{
+ set_name("OSCReceiver");
QueuedEventSource::start();
-
- if (!_is_activated) {
- lo_server_thread_start(_st);
- _is_activated = true;
- }
-
- /* Waiting on the next liblo release
- pthread_t lo_thread = lo_server_thread_get_thread(_st);
-
- sched_param sp;
- sp.sched_priority = 20;
- int result = pthread_setschedparam(lo_thread, SCHED_FIFO, &sp);
- if (!result)
- cout << "[OSC] Set OSC thread to realtime scheduling (SCHED_FIFO, priority "
- << sp.sched_priority << ")" << endl;
- else
- cout << "[OSC] Unable to set OSC thread to realtime scheduling ("
- << strerror(result) << endl;
- */
+ set_scheduling(SCHED_FIFO, 10);
}
void
OSCReceiver::stop()
{
- if (_is_activated) {
- lo_server_thread_stop(_st);
- cout << "[OSCReceiver] Stopped OSC server thread" << endl;
- _is_activated = false;
- }
+ cout << "[OSCReceiver] Stopped OSC listening thread" << endl;
QueuedEventSource::stop();
}
+/** Override the semaphore driven _run method of QueuedEngineInterface
+ * to wait on OSC messages and prepare them right away in the same thread.
+ */
+void
+OSCReceiver::_run()
+{
+ /* FIXME: Make Event() take a timestamp as a parameter, get a timestamp
+ * here and stamp all the events with the same time so they all get
+ * executed in the same cycle */
+
+ while (true) {
+ assert( ! unprepared_events());
+
+ // Wait on a message and enqueue it
+ lo_server_recv(_server);
+
+ // Enqueue every other message that is here "now"
+ // (would this provide truly atomic bundles?)
+ while (lo_server_recv_noblock(_server, 0) > 0) ;
+
+ // Process them all
+ while (unprepared_events())
+ _signalled();
+
+ // No more unprepared events
+ }
+}
+
+
/** Create a new responder for this message, if necessary.
*
* This is based on the fact that the current responder is stored in a ref
diff --git a/src/libs/engine/OSCReceiver.h b/src/libs/engine/OSCReceiver.h
index 15a30fc1..c966632f 100644
--- a/src/libs/engine/OSCReceiver.h
+++ b/src/libs/engine/OSCReceiver.h
@@ -43,6 +43,8 @@ inline static int name##_cb(LO_HANDLER_ARGS, void* osc_receiver)\
{ return ((OSCReceiver*)osc_receiver)->m_##name##_cb(path, types, argv, argc, msg); }
+/* FIXME: Make this receive and preprocess in the same thread? */
+
/** Receives OSC messages from liblo.
*
@@ -66,7 +68,9 @@ private:
// Prevent copies (undefined)
OSCReceiver(const OSCReceiver&);
OSCReceiver& operator=(const OSCReceiver&);
-
+
+ virtual void _run();
+
static void error_cb(int num, const char* msg, const char* path);
static int set_response_address_cb(LO_HANDLER_ARGS, void* osc_receiver);
static int generic_cb(LO_HANDLER_ARGS, void* osc_receiver);
@@ -112,8 +116,7 @@ private:
#endif
const char* const _port;
- bool _is_activated;
- lo_server_thread _st;
+ lo_server _server;
/** Cached OSC responder (for most recent incoming message) */
CountedPtr<OSCResponder> _osc_responder;
diff --git a/src/libs/engine/ObjectSender.cpp b/src/libs/engine/ObjectSender.cpp
index 8a5fc367..347c9ce3 100644
--- a/src/libs/engine/ObjectSender.cpp
+++ b/src/libs/engine/ObjectSender.cpp
@@ -162,7 +162,7 @@ ObjectSender::send_port(ClientInterface* client, const Port* port)
if (port->type() == DataType::FLOAT && port->buffer_size() == 1) {
sample default_value = dynamic_cast<const TypedPort<sample>*>(
port)->buffer(0)->value_at(0);
- cerr << port->path() << " sending default value " << default_value << endl;
+ //cerr << port->path() << " sending default value " << default_value << endl;
client->control_change(port->path(), default_value);
}
diff --git a/src/libs/engine/PostProcessor.cpp b/src/libs/engine/PostProcessor.cpp
index e339f757..8b68159e 100644
--- a/src/libs/engine/PostProcessor.cpp
+++ b/src/libs/engine/PostProcessor.cpp
@@ -29,94 +29,28 @@ using std::cerr; using std::cout; using std::endl;
namespace Om {
-bool PostProcessor::m_process_thread_exit_flag = false;
-
PostProcessor::PostProcessor(size_t queue_size)
-: m_events(queue_size),
- m_thread_exists(false),
- m_semaphore(0)
-{
-}
-
-
-PostProcessor::~PostProcessor()
-{
- stop();
-}
-
-
-/** Start the process thread.
- */
-void
-PostProcessor::start()
-{
- cout << "[PostProcessor] Starting." << endl;
-
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setstacksize(&attr, 1500000);
-
- pthread_create(&m_process_thread, &attr, process_events, this);
- m_thread_exists = true;
-}
-
-
-/** Stop the process thread.
- */
-void
-PostProcessor::stop()
+: _events(queue_size)
{
- if (m_thread_exists) {
- m_process_thread_exit_flag = true;
- pthread_cancel(m_process_thread);
- pthread_join(m_process_thread, NULL);
- m_thread_exists = false;
- }
+ set_name("PostProcessor");
}
-/** Signal the PostProcessor to process all pending events.
+/** Post processing thread.
+ *
+ * Infinite loop that waits on the semaphore and processes every enqueued
+ * event (to be signalled at the end of every process cycle).
*/
void
-PostProcessor::signal()
+PostProcessor::_signalled()
{
- m_semaphore.post();
-}
-
-
-void*
-PostProcessor::process_events(void* osc_processer)
-{
- PostProcessor* me = (PostProcessor*)osc_processer;
- return me->m_process_events();
-}
-
-
-/** OSC message processing thread.
- */
-void*
-PostProcessor::m_process_events()
-{
- Event* ev = NULL;
-
- while (true) {
- m_semaphore.wait();
-
- if (m_process_thread_exit_flag)
- break;
-
- while (!m_events.is_empty()) {
- ev = m_events.pop();
- assert(ev != NULL);
- ev->post_process();
- om->maid()->push(ev);
- }
+ while ( ! _events.is_empty()) {
+ Event* const ev = _events.pop();
+ assert(ev);
+ ev->post_process();
+ om->maid()->push(ev);
}
-
- cout << "[PostProcessor] Exiting post processor thread." << endl;
-
- return NULL;
}
} // namespace Om
diff --git a/src/libs/engine/PostProcessor.h b/src/libs/engine/PostProcessor.h
index 2083c442..e0c384eb 100644
--- a/src/libs/engine/PostProcessor.h
+++ b/src/libs/engine/PostProcessor.h
@@ -21,6 +21,7 @@
#include "types.h"
#include "util/Queue.h"
#include "util/Semaphore.h"
+#include "Slave.h"
namespace Om {
@@ -35,44 +36,24 @@ class Event;
*
* \ingroup engine
*/
-class PostProcessor
+class PostProcessor : public Slave
{
public:
PostProcessor(size_t queue_size);
- ~PostProcessor();
- void start();
- void stop();
-
- inline void push(Event* const ev);
- void signal();
+ /** Push an event on to the process queue, realtime-safe, not thread-safe. */
+ inline void push(Event* const ev) { _events.push(ev); }
private:
// Prevent copies
PostProcessor(const PostProcessor&);
PostProcessor& operator=(const PostProcessor&);
- Queue<Event*> m_events;
-
- static void* process_events(void* me);
- void* m_process_events();
-
- pthread_t m_process_thread;
- bool m_thread_exists;
- static bool m_process_thread_exit_flag;
- Semaphore m_semaphore;
+ Queue<Event*> _events;
+ virtual void _signalled();
};
-/** Push an event on to the process queue, realtime-safe, not thread-safe.
- */
-inline void
-PostProcessor::push(Event* const ev)
-{
- m_events.push(ev);
-}
-
-
} // namespace Om
#endif // POSTPROCESSOR_H
diff --git a/src/libs/engine/QueuedEventSource.cpp b/src/libs/engine/QueuedEventSource.cpp
index d41b2201..8254f3c8 100644
--- a/src/libs/engine/QueuedEventSource.cpp
+++ b/src/libs/engine/QueuedEventSource.cpp
@@ -25,20 +25,15 @@ namespace Om {
QueuedEventSource::QueuedEventSource(size_t size)
-: m_front(0),
- m_back(0),
- m_prepared_back(0),
- m_size(size+1),
- m_thread_exists(false),
- m_prepare_thread_exit_flag(false),
- m_semaphore(0)
+: _front(0),
+ _back(0),
+ _prepared_back(0),
+ _size(size+1),
+ _blocking_semaphore(0)
{
- m_events = (QueuedEvent**)calloc(m_size, sizeof(QueuedEvent*));
+ _events = (QueuedEvent**)calloc(_size, sizeof(QueuedEvent*));
- pthread_mutex_init(&m_blocking_mutex, NULL);
- pthread_cond_init(&m_blocking_cond, NULL);
-
- mlock(m_events, m_size * sizeof(QueuedEvent*));
+ mlock(_events, _size * sizeof(QueuedEvent*));
}
@@ -46,49 +41,7 @@ QueuedEventSource::~QueuedEventSource()
{
stop();
- free(m_events);
- pthread_mutex_destroy(&m_blocking_mutex);
- pthread_cond_destroy(&m_blocking_cond);
-}
-
-
-/** Start the prepare thread.
- */
-void
-QueuedEventSource::start()
-{
- if (m_thread_exists) {
- cerr << "[QueuedEventSource] Thread already launched?" << endl;
- return;
- } else {
- cout << "[QueuedEventSource] Launching thread." << endl;
- }
-
- m_prepare_thread_exit_flag = false;
-
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setstacksize(&attr, 1500000);
-
- pthread_create(&m_prepare_thread, &attr, &QueuedEventSource::prepare_loop, this);
- pthread_attr_destroy(&attr);
-
- m_thread_exists = true;
-}
-
-
-/** Destroy the prepare thread.
- */
-void
-QueuedEventSource::stop()
-{
- if (m_thread_exists) {
- m_prepare_thread_exit_flag = true;
- pthread_cancel(m_prepare_thread);
- pthread_join(m_prepare_thread, NULL);
- m_thread_exists = false;
- cout << "[QueuedEventSource] Stopped thread." << endl;
- }
+ free(_events);
}
@@ -99,13 +52,13 @@ QueuedEventSource::push(QueuedEvent* const ev)
{
assert(!ev->is_prepared());
- if (m_events[m_back] != NULL) {
+ if (_events[_back] != NULL) {
cerr << "[QueuedEventSource] Error: Queue is full! Event is lost, please report!" << endl;
delete ev;
} else {
- m_events[m_back] = ev;
- m_back = (m_back + 1) % m_size;
- m_semaphore.post();
+ _events[_back] = ev;
+ _back = (_back + 1) % _size;
+ signal();
}
}
@@ -115,18 +68,18 @@ QueuedEventSource::push(QueuedEvent* const ev)
* This method will only pop events that have been prepared, and are
* stamped before the time passed. In other words, it may return NULL
* even if there are events pending in the queue. The events returned are
- * actually QueuedEvent*s, but after this they are "normal" events and the
+ * actually QueuedEvents, but after this they are "normal" events and the
* engine deals with them just like a realtime in-band event.
*/
Event*
-QueuedEventSource::pop_earliest_event_before(const samplecount time)
+QueuedEventSource::pop_earliest_before(const samplecount time)
{
- QueuedEvent* front_event = m_events[m_front];
+ QueuedEvent* const front_event = _events[_front];
// Pop
- if (front_event != NULL && front_event->time_stamp() < time && front_event->is_prepared()) {
- m_events[m_front] = NULL;
- m_front = (m_front + 1) % m_size;
+ if (front_event && front_event->is_prepared() && front_event->time_stamp() < time) {
+ _events[_front] = NULL;
+ _front = (_front + 1) % _size;
return front_event;
} else {
return NULL;
@@ -137,63 +90,40 @@ QueuedEventSource::pop_earliest_event_before(const samplecount time)
// Private //
-
/** Signal that the blocking event is finished.
*
- * When this is called preparing will resume. This will be called by
+ * When this is called preparing will resume. This MUST be called by
* blocking events in their post_process() method.
*/
void
QueuedEventSource::unblock()
{
- /* FIXME: Make this a semaphore, and have events signal at the end of their
- * execute() methods so the preprocessor can start preparing events immediately
- * instead of waiting for the postprocessor to get around to finalizing the event? */
- pthread_mutex_lock(&m_blocking_mutex);
- pthread_cond_signal(&m_blocking_cond);
- pthread_mutex_unlock(&m_blocking_mutex);
+ _blocking_semaphore.post();
}
-void*
-QueuedEventSource::m_prepare_loop()
+void
+QueuedEventSource::_signalled()
{
- QueuedEvent* ev = NULL;
-
- while (true) {
- m_semaphore.wait();
-
- if (m_prepare_thread_exit_flag)
- break; // exit signalled
-
- ev = m_events[m_prepared_back];
- assert(ev != NULL);
-
- if (ev == NULL) {
- cerr << "[QueuedEventSource] ERROR: Signalled, but event is NULL." << endl;
- continue;
- }
-
- assert(ev != NULL);
- assert(!ev->is_prepared());
-
- if (ev->is_blocking())
- pthread_mutex_lock(&m_blocking_mutex);
-
- ev->pre_process();
-
- m_prepared_back = (m_prepared_back+1) % m_size;
-
- // If a blocking event, wait for event to finish passing through
- // the audio cycle before preparing the next event
- if (ev->is_blocking()) {
- pthread_cond_wait(&m_blocking_cond, &m_blocking_mutex);
- pthread_mutex_unlock(&m_blocking_mutex);
- }
+ QueuedEvent* const ev = _events[_prepared_back];
+ assert(ev != NULL);
+
+ if (ev == NULL) {
+ cerr << "[QueuedEventSource] ERROR: Signalled, but event is NULL." << endl;
+ return;
}
- cout << "[QueuedEventSource] Exiting slow event queue thread." << endl;
- return NULL;
+ assert(ev != NULL);
+ assert(!ev->is_prepared());
+
+ ev->pre_process();
+
+ _prepared_back = (_prepared_back+1) % _size;
+
+ // If event was blocking, wait for event to being run through the
+ // process thread before preparing the next event
+ if (ev->is_blocking())
+ _blocking_semaphore.wait();
}
diff --git a/src/libs/engine/QueuedEventSource.h b/src/libs/engine/QueuedEventSource.h
index 15fb45f1..c3e6904a 100644
--- a/src/libs/engine/QueuedEventSource.h
+++ b/src/libs/engine/QueuedEventSource.h
@@ -21,6 +21,7 @@
#include <pthread.h>
#include "types.h"
#include "util/Semaphore.h"
+#include "Slave.h"
#include "EventSource.h"
namespace Om {
@@ -35,22 +36,27 @@ class QueuedEvent;
* popping are threadsafe, as long as a single thread pushes and a single
* thread pops (ie this data structure is threadsafe, but the push and pop
* methods themselves are not).
+ *
+ * This class is it's own slave. :)
*/
-class QueuedEventSource : public EventSource
+class QueuedEventSource : public EventSource, protected Slave
{
public:
QueuedEventSource(size_t size);
~QueuedEventSource();
- Event* pop_earliest_event_before(const samplecount time);
+ void start() { Thread::start(); }
+ void stop() { Thread::stop(); }
- void unblock();
+ Event* pop_earliest_before(const samplecount time);
- void start();
- void stop();
+ void unblock();
protected:
void push(QueuedEvent* const ev);
+ bool unprepared_events() { return (_prepared_back != _back); }
+
+ virtual void _signalled(); ///< Prepare 1 event
private:
// Prevent copies (undefined)
@@ -60,21 +66,12 @@ private:
// Note that it's crucially important which functions access which of these
// variables, to maintain threadsafeness.
- size_t m_front; ///< Front of queue
- size_t m_back; ///< Back of entire queue (1 past index of back element)
- size_t m_prepared_back; ///< Back of prepared section (1 past index of back prepared element)
- const size_t m_size;
- QueuedEvent** m_events;
-
- bool m_thread_exists;
- bool m_prepare_thread_exit_flag;
- pthread_t m_prepare_thread;
- Semaphore m_semaphore; ///< Counting semaphor for driving prepare thread
- pthread_mutex_t m_blocking_mutex;
- pthread_cond_t m_blocking_cond;
-
- static void* prepare_loop(void* q) { return ((QueuedEventSource*)q)->m_prepare_loop(); }
- void* m_prepare_loop();
+ size_t _front; ///< Front of queue
+ size_t _back; ///< Back of entire queue (1 past index of back element)
+ size_t _prepared_back; ///< Back of prepared section (1 past index of back prepared element)
+ const size_t _size;
+ QueuedEvent** _events;
+ Semaphore _blocking_semaphore;
};
diff --git a/src/libs/engine/Slave.h b/src/libs/engine/Slave.h
new file mode 100644
index 00000000..e6cc8ed5
--- /dev/null
+++ b/src/libs/engine/Slave.h
@@ -0,0 +1,60 @@
+/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard.
+ *
+ * Om is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU General Public License as published by the Free Software
+ * Foundation; either version 2 of the License, or (at your option) any later
+ * version.
+ *
+ * Om is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef SLAVE_H
+#define SLAVE_H
+
+#include <pthread.h>
+#include "util/Semaphore.h"
+#include "Thread.h"
+
+namespace Om {
+
+
+/** Thread driven by (realtime safe) signals.
+ *
+ * \ingroup engine
+ */
+class Slave : public Thread
+{
+public:
+ Slave() : _semaphore(0) {}
+
+ inline void signal() { _semaphore.post(); }
+
+protected:
+ virtual void _signalled() = 0;
+
+ Semaphore _semaphore;
+
+private:
+ // Prevent copies
+ Slave(const Slave&);
+ Slave& operator=(const Slave&);
+
+ void _run()
+ {
+ while (true) {
+ _semaphore.wait();
+ _signalled();
+ }
+ }
+};
+
+
+} // namespace Om
+
+#endif // SLAVE_H
diff --git a/src/libs/engine/Thread.cpp b/src/libs/engine/Thread.cpp
new file mode 100644
index 00000000..6ddfa621
--- /dev/null
+++ b/src/libs/engine/Thread.cpp
@@ -0,0 +1,104 @@
+/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard.
+ *
+ * Om is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU General Public License as published by the Free Software
+ * Foundation; either version 2 of the License, or (at your option) any later
+ * version.
+ *
+ * Om is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "Thread.h"
+#include <cassert>
+#include <iostream>
+#include <pthread.h>
+
+using std::cerr; using std::cout; using std::endl;
+
+namespace Om {
+
+
+Thread::Thread()
+: _pthread_exists(false)
+{
+}
+
+
+Thread::~Thread()
+{
+ stop();
+}
+
+
+/** Start the process thread.
+ */
+void
+Thread::start()
+{
+ cout << "[" << _name << " Thread] Starting." << endl;
+
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setstacksize(&attr, 1500000);
+
+ pthread_create(&_pthread, &attr, _static_run, this);
+ _pthread_exists = true;
+}
+
+
+/** Stop the process thread.
+ */
+void
+Thread::stop()
+{
+ if (_pthread_exists) {
+ pthread_cancel(_pthread);
+ pthread_join(_pthread, NULL);
+ _pthread_exists = false;
+ }
+}
+
+
+/** Set the scheduling policy for this thread.
+ *
+ * @param must be one of SCHED_FIFO, SCHED_RR, or SCHED_OTHER.
+ */
+void
+Thread::set_scheduling(int policy, unsigned int priority)
+{
+ sched_param sp;
+ sp.sched_priority = priority;
+ int result = pthread_setschedparam(_pthread, SCHED_FIFO, &sp);
+ if (!result) {
+ cout << "[" << _name << " Thread] Set scheduling policy to ";
+ switch (policy) {
+ case SCHED_FIFO: cout << "SCHED_FIFO"; break;
+ case SCHED_RR: cout << "SCHED_RR"; break;
+ case SCHED_OTHER: cout << "SCHED_OTHER"; break;
+ default: cout << "UNKNOWN"; break;
+ }
+ cout << ", priority " << sp.sched_priority << endl;
+ } else {
+ cout << "[" << _name << " Thread] Unable to set scheduling policy ("
+ << strerror(result) << ")" << endl;
+ }
+}
+
+
+void*
+Thread::_static_run(void* me)
+{
+ Thread* myself = (Thread*)me;
+ myself->_run();
+ // and I
+ return NULL;
+}
+
+} // namespace Om
+
diff --git a/src/libs/engine/Thread.h b/src/libs/engine/Thread.h
new file mode 100644
index 00000000..ee59ee5c
--- /dev/null
+++ b/src/libs/engine/Thread.h
@@ -0,0 +1,63 @@
+/* This file is part of Ingen. Copyright (C) 2006 Dave Robillard.
+ *
+ * Om is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU General Public License as published by the Free Software
+ * Foundation; either version 2 of the License, or (at your option) any later
+ * version.
+ *
+ * Om is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef THREAD_H
+#define THREAD_H
+
+#include <string>
+#include <pthread.h>
+
+namespace Om {
+
+
+/* FIXME: This isn't Ingen specific at all. Move it to util. */
+
+
+/** Abstract base class for all threads.
+ *
+ * \ingroup engine
+ */
+class Thread
+{
+public:
+ Thread();
+ virtual ~Thread();
+
+ virtual void start();
+ virtual void stop();
+
+ void set_name(const std::string& name) { _name = name; }
+ void set_scheduling(int policy, unsigned int priority);
+
+protected:
+ virtual void _run() = 0;
+
+ std::string _name;
+ pthread_t _pthread;
+ bool _pthread_exists;
+
+private:
+ // Prevent copies
+ Thread(const Thread&);
+ Thread& operator=(const Thread&);
+
+ static void* _static_run(void* me);
+};
+
+
+} // namespace Om
+
+#endif // THREAD_H