summaryrefslogtreecommitdiffstats
path: root/src/engine
diff options
context:
space:
mode:
Diffstat (limited to 'src/engine')
-rw-r--r--src/engine/HTTPEngineReceiver.cpp2
-rw-r--r--src/engine/OSCEngineReceiver.cpp2
-rw-r--r--src/engine/QueuedEngineInterface.cpp6
-rw-r--r--src/engine/QueuedEngineInterface.hpp2
-rw-r--r--src/engine/QueuedEventSource.cpp26
-rw-r--r--src/engine/QueuedEventSource.hpp59
6 files changed, 22 insertions, 75 deletions
diff --git a/src/engine/HTTPEngineReceiver.cpp b/src/engine/HTTPEngineReceiver.cpp
index d31e2ab4..cf5299ed 100644
--- a/src/engine/HTTPEngineReceiver.cpp
+++ b/src/engine/HTTPEngineReceiver.cpp
@@ -41,7 +41,7 @@ namespace Ingen {
HTTPEngineReceiver::HTTPEngineReceiver(Engine& engine, uint16_t port)
- : QueuedEngineInterface(engine, 2, 2)
+ : QueuedEngineInterface(engine, 64) // FIXME
, _server(soup_server_new(SOUP_SERVER_PORT, port, NULL))
{
_receive_thread = new ReceiveThread(*this);
diff --git a/src/engine/OSCEngineReceiver.cpp b/src/engine/OSCEngineReceiver.cpp
index 8ef819e8..b2131928 100644
--- a/src/engine/OSCEngineReceiver.cpp
+++ b/src/engine/OSCEngineReceiver.cpp
@@ -48,7 +48,7 @@ namespace Ingen {
OSCEngineReceiver::OSCEngineReceiver(Engine& engine, size_t queue_size, uint16_t port)
- : QueuedEngineInterface(engine, queue_size, queue_size) // FIXME
+ : QueuedEngineInterface(engine, queue_size) // FIXME
, _server(NULL)
{
_receive_thread = new ReceiveThread(*this);
diff --git a/src/engine/QueuedEngineInterface.cpp b/src/engine/QueuedEngineInterface.cpp
index a399e955..c3bc56fe 100644
--- a/src/engine/QueuedEngineInterface.cpp
+++ b/src/engine/QueuedEngineInterface.cpp
@@ -26,8 +26,8 @@
namespace Ingen {
-QueuedEngineInterface::QueuedEngineInterface(Engine& engine, size_t queued_size, size_t stamped_size)
- : QueuedEventSource(queued_size, stamped_size)
+QueuedEngineInterface::QueuedEngineInterface(Engine& engine, size_t queue_size)
+ : QueuedEventSource(queue_size)
, _responder(new Responder(NULL, 0))
, _engine(engine)
, _in_bundle(false)
@@ -381,7 +381,7 @@ extern "C" {
Ingen::QueuedEngineInterface*
new_queued_interface(Ingen::Engine& engine)
{
- return new Ingen::QueuedEngineInterface(engine, Ingen::event_queue_size, Ingen::event_queue_size);
+ return new Ingen::QueuedEngineInterface(engine, Ingen::event_queue_size);
}
} // extern "C"
diff --git a/src/engine/QueuedEngineInterface.hpp b/src/engine/QueuedEngineInterface.hpp
index 6af848e7..481c5438 100644
--- a/src/engine/QueuedEngineInterface.hpp
+++ b/src/engine/QueuedEngineInterface.hpp
@@ -50,7 +50,7 @@ class Engine;
class QueuedEngineInterface : public QueuedEventSource, public EngineInterface
{
public:
- QueuedEngineInterface(Engine& engine, size_t queued_size, size_t stamped_size);
+ QueuedEngineInterface(Engine& engine, size_t queue_size);
virtual ~QueuedEngineInterface() {}
std::string uri() const { return "ingen:internal"; }
diff --git a/src/engine/QueuedEventSource.cpp b/src/engine/QueuedEventSource.cpp
index 7dd12049..e8c6deec 100644
--- a/src/engine/QueuedEventSource.cpp
+++ b/src/engine/QueuedEventSource.cpp
@@ -28,14 +28,13 @@ using namespace std;
namespace Ingen {
-QueuedEventSource::QueuedEventSource(size_t queued_size, size_t stamped_size)
+QueuedEventSource::QueuedEventSource(size_t queue_size)
: _front(0)
, _back(0)
, _prepared_back(0)
- , _size(queued_size+1)
+ , _size(queue_size+1)
, _blocking_semaphore(0)
, _full_semaphore(0)
- , _stamped_queue(stamped_size)
{
_events = (QueuedEvent**)calloc(_size, sizeof(QueuedEvent*));
@@ -99,8 +98,6 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context)
unsigned int num_events_processed = 0;
- /* FIXME: Merge these next two loops into one */
-
while ((ev = pop_earliest_queued_before(context.end()))) {
ev->execute(context);
dest.push(ev);
@@ -108,12 +105,6 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context)
break;
}
- while ((ev = pop_earliest_stamped_before(context.end()))) {
- ev->execute(context);
- dest.push(ev);
- ++num_events_processed;
- }
-
if (_full_semaphore.has_waiter()) {
const bool full = (((_front.get() - _back.get() + _size) % _size) == 1);
if (!full)
@@ -122,16 +113,11 @@ QueuedEventSource::process(PostProcessor& dest, ProcessContext& context)
}
-/** Pops the prepared event at the front of the prepare queue, if it exists.
+/** Pop the prepared event at the front of the prepare queue, if it exists.
*
- * This method will only pop events that have been prepared, and are
- * stamped before the time passed. In other words, it may return NULL
- * even if there are events pending in the queue. The events returned are
- * actually QueuedEvents, but after this they are "normal" events and the
- * engine deals with them just like a realtime in-band event. The engine will
- * not use the timestamps of the returned events in any way, since it is free
- * to execute these non-time-stamped events whenever it wants (at whatever rate
- * it wants).
+ * This method will only pop events that are prepared and have time stamp
+ * less than @a time. It may return NULL even if there are events pending in
+ * the queue, if they are unprepared or stamped in the future.
*/
Event*
QueuedEventSource::pop_earliest_queued_before(const SampleCount time)
diff --git a/src/engine/QueuedEventSource.hpp b/src/engine/QueuedEventSource.hpp
index 1d846897..a06de39f 100644
--- a/src/engine/QueuedEventSource.hpp
+++ b/src/engine/QueuedEventSource.hpp
@@ -49,7 +49,7 @@ class PostProcessor;
class QueuedEventSource : public EventSource, protected Raul::Slave
{
public:
- QueuedEventSource(size_t queued_size, size_t stamped_size);
+ QueuedEventSource(size_t queue_size);
~QueuedEventSource();
void activate() { Slave::start(); }
@@ -57,72 +57,33 @@ public:
void process(PostProcessor& dest, ProcessContext& context);
- void unblock();
+ /** Signal that the blocking event is finished.
+ * When this is called preparing will resume. This MUST be called by
+ * blocking events in their post_process() method. */
+ inline void unblock() { _blocking_semaphore.post(); }
protected:
- void push_queued(QueuedEvent* const ev);
- inline void push_stamped(Event* const ev) { _stamped_queue.push(ev); }
- Event* pop_earliest_queued_before(const SampleCount time);
- inline Event* pop_earliest_stamped_before(const SampleCount time);
+ void push_queued(QueuedEvent* const ev);
+ Event* pop_earliest_queued_before(const SampleCount time);
inline bool unprepared_events() { return (_prepared_back.get() != _back.get()); }
virtual void _whipped(); ///< Prepare 1 event
private:
- // Note that it's crucially important which functions access which of these
- // variables, to maintain threadsafeness.
+ // Note it's important which functions access which variables for thread safety
- //(FIXME: make this a separate class?)
// 2-part queue for events that require pre-processing:
AtomicInt _front; ///< Front of queue
- AtomicInt _back; ///< Back of entire queue (1 past index of back element)
- AtomicInt _prepared_back; ///< Back of prepared section (1 past index of back prepared element)
+ AtomicInt _back; ///< Back of entire queue (index of back event + 1)
+ AtomicInt _prepared_back; ///< Back of prepared events (index of back prepared event + 1)
const size_t _size;
QueuedEvent** _events;
Raul::Semaphore _blocking_semaphore;
-
Raul::Semaphore _full_semaphore;
-
- /** Queue for timestamped events (no pre-processing). */
- Raul::SRSWQueue<Event*> _stamped_queue;
};
-/** Pops the realtime (timestamped, not preprocessed) event off the realtime queue.
- *
- * Engine will use the sample timestamps of returned events directly and execute the
- * event with sample accuracy. Timestamps in the past will be bumped forward to
- * the beginning of the cycle (offset 0), when eg. skipped cycles occur.
- */
-inline Event*
-QueuedEventSource::pop_earliest_stamped_before(const SampleCount time)
-{
- Event* ret = NULL;
-
- if (!_stamped_queue.empty()) {
- if (_stamped_queue.front()->time() < time) {
- ret = _stamped_queue.front();
- _stamped_queue.pop();
- }
- }
-
- return ret;
-}
-
-
-/** Signal that the blocking event is finished.
- *
- * When this is called preparing will resume. This MUST be called by
- * blocking events in their post_process() method.
- */
-inline void
-QueuedEventSource::unblock()
-{
- _blocking_semaphore.post();
-}
-
-
} // namespace Ingen
#endif // QUEUEDEVENTSOURCE_H