summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2011-05-18 17:45:39 +0000
committerDavid Robillard <d@drobilla.net>2011-05-18 17:45:39 +0000
commit177f2b48b6516ba4dee70a709ce28105f1092de2 (patch)
tree84f6d9b9a026c6d35940e99f2981bd5be537548a /src
parent27e3187270807c5e126a2e4bf66211de0f784a28 (diff)
downloadingen-177f2b48b6516ba4dee70a709ce28105f1092de2.tar.gz
ingen-177f2b48b6516ba4dee70a709ce28105f1092de2.tar.bz2
ingen-177f2b48b6516ba4dee70a709ce28105f1092de2.zip
Use an intrusive linked list for event queue rather than Raul::List.
This avoids the need to allocate list nodes, improving performance (event throughput), and making EventSource::push_queued realtime safe. Remove unused queue_size parameter from EventSource and friends. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@3282 a436a847-0d15-0410-975c-d299462d15a1
Diffstat (limited to 'src')
-rw-r--r--src/server/Engine.cpp2
-rw-r--r--src/server/Event.hpp18
-rw-r--r--src/server/EventSource.cpp69
-rw-r--r--src/server/EventSource.hpp14
-rw-r--r--src/server/HTTPEngineReceiver.cpp2
-rw-r--r--src/server/OSCEngineReceiver.cpp4
-rw-r--r--src/server/OSCEngineReceiver.hpp2
-rw-r--r--src/server/PostProcessor.cpp55
-rw-r--r--src/server/PostProcessor.hpp28
-rw-r--r--src/server/QueuedEngineInterface.cpp4
-rw-r--r--src/server/QueuedEngineInterface.hpp2
-rw-r--r--src/server/ingen_engine.cpp3
-rw-r--r--src/server/ingen_lv2.cpp4
-rw-r--r--src/server/ingen_osc.cpp1
14 files changed, 120 insertions, 88 deletions
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index 90518bd2..3aa05eba 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -62,7 +62,7 @@ Engine::Engine(Ingen::Shared::World* a_world)
, _maid(new Raul::Maid(event_queue_size()))
, _message_context(new MessageContext(*this))
, _node_factory(new NodeFactory(a_world))
- , _post_processor(new PostProcessor(*this, event_queue_size()))
+ , _post_processor(new PostProcessor(*this))
, _quit_flag(false)
{
if (a_world->store()) {
diff --git a/src/server/Event.hpp b/src/server/Event.hpp
index af39bfa4..e915631a 100644
--- a/src/server/Event.hpp
+++ b/src/server/Event.hpp
@@ -19,9 +19,12 @@
#define INGEN_ENGINE_EVENT_HPP
#include <cassert>
+
#include "raul/SharedPtr.hpp"
#include "raul/Deletable.hpp"
#include "raul/Path.hpp"
+#include "raul/AtomicPtr.hpp"
+
#include "types.hpp"
namespace Ingen {
@@ -56,6 +59,10 @@ public:
inline SampleCount time() const { return _time; }
+ /** Get the next event in the event process list. */
+ Event* next() const { return _next.get(); }
+ void next(Event* ev) { _next = ev; }
+
int error() { return _error; }
protected:
@@ -67,11 +74,12 @@ protected:
, _executed(false)
{}
- Engine& _engine;
- SharedPtr<Request> _request;
- FrameTime _time;
- int _error;
- bool _executed;
+ Engine& _engine;
+ SharedPtr<Request> _request;
+ Raul::AtomicPtr<Event> _next;
+ FrameTime _time;
+ int _error;
+ bool _executed;
};
} // namespace Server
diff --git a/src/server/EventSource.cpp b/src/server/EventSource.cpp
index 273a4693..cdecfd4a 100644
--- a/src/server/EventSource.cpp
+++ b/src/server/EventSource.cpp
@@ -15,19 +15,18 @@
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
-#include <sys/mman.h>
#include "EventSource.hpp"
-#include "QueuedEvent.hpp"
#include "PostProcessor.hpp"
-#include "ThreadManager.hpp"
#include "ProcessContext.hpp"
+#include "QueuedEvent.hpp"
+#include "ThreadManager.hpp"
using namespace std;
namespace Ingen {
namespace Server {
-EventSource::EventSource(size_t queue_size)
+EventSource::EventSource()
: _blocking_semaphore(0)
{
Thread::set_context(THREAD_PRE_PROCESS);
@@ -45,10 +44,22 @@ void
EventSource::push_queued(QueuedEvent* const ev)
{
assert(!ev->is_prepared());
- Raul::List<Event*>::Node* node = new Raul::List<Event*>::Node(ev);
- _events.push_back(node);
- if (_prepared_back.get() == NULL)
- _prepared_back = node;
+ assert(!ev->next());
+
+ QueuedEvent* const head = _head.get();
+ QueuedEvent* const tail = _tail.get();
+
+ if (!head) {
+ _head = ev;
+ _tail = ev;
+ } else {
+ _tail = ev;
+ tail->next(ev);
+ }
+
+ if (!_prepared_back.get()) {
+ _prepared_back = ev;
+ }
whip();
}
@@ -62,39 +73,35 @@ EventSource::process(PostProcessor& dest, ProcessContext& context, bool limit)
{
ThreadManager::assert_thread(THREAD_PROCESS);
- if (_events.empty())
+ if (!_head.get())
return;
/* Limit the maximum number of queued events to process per cycle. This
- * makes the process callback (more) realtime-safe by preventing being
- * choked by events coming in faster than they can be processed.
- * FIXME: test this and figure out a good value */
+ makes the process callback (more) realtime-safe by preventing being
+ choked by events coming in faster than they can be processed.
+ FIXME: test this and figure out a good value
+ */
const size_t MAX_QUEUED_EVENTS = context.nframes() / 32;
size_t num_events_processed = 0;
- Raul::List<Event*>::Node* head = _events.head();
- Raul::List<Event*>::Node* tail = head;
-
- if (!head)
- return;
-
- QueuedEvent* ev = (QueuedEvent*)head->elem();
+ QueuedEvent* ev = _head.get();
+ QueuedEvent* last = ev;
while (ev && ev->is_prepared() && ev->time() < context.end()) {
ev->execute(context);
- tail = head;
- head = head->next();
+ last = ev;
+ ev = (QueuedEvent*)ev->next();
++num_events_processed;
- if (limit && num_events_processed > MAX_QUEUED_EVENTS)
+ if (limit && (num_events_processed > MAX_QUEUED_EVENTS))
break;
- ev = (head ? (QueuedEvent*)head->elem() : NULL);
}
if (num_events_processed > 0) {
- Raul::List<Event*> front;
- _events.chop_front(front, num_events_processed, tail);
- dest.append(&front);
+ dest.append(_head.get(), last);
+ _head = (QueuedEvent*)last->next();
+ if (!last->next())
+ _tail = NULL;
}
}
@@ -102,19 +109,15 @@ EventSource::process(PostProcessor& dest, ProcessContext& context, bool limit)
void
EventSource::_whipped()
{
- Raul::List<Event*>::Node* pb = _prepared_back.get();
- if (!pb)
+ QueuedEvent* ev = _prepared_back.get();
+ if (!ev)
return;
- QueuedEvent* const ev = (QueuedEvent*)pb->elem();
- assert(ev);
-
assert(!ev->is_prepared());
ev->pre_process();
assert(ev->is_prepared());
- assert(_prepared_back.get() == pb);
- _prepared_back = pb->next();
+ _prepared_back = (QueuedEvent*)ev->next();
// If event was blocking, wait for event to being run through the
// process thread before preparing the next event
diff --git a/src/server/EventSource.hpp b/src/server/EventSource.hpp
index 19d87ed9..5d3c1e14 100644
--- a/src/server/EventSource.hpp
+++ b/src/server/EventSource.hpp
@@ -18,9 +18,9 @@
#ifndef INGEN_ENGINE_EVENTSOURCE_HPP
#define INGEN_ENGINE_EVENTSOURCE_HPP
+#include "raul/AtomicPtr.hpp"
#include "raul/Semaphore.hpp"
#include "raul/Slave.hpp"
-#include "raul/List.hpp"
namespace Ingen {
namespace Server {
@@ -39,12 +39,12 @@ class ProcessContext;
class EventSource : protected Raul::Slave
{
public:
- explicit EventSource(size_t queue_size);
+ explicit EventSource();
virtual ~EventSource();
void process(PostProcessor& dest, ProcessContext& context, bool limit=true);
- bool empty() { return _events.empty(); }
+ bool empty() { return !_head.get(); }
/** Signal that a blocking event is finished.
*
@@ -61,9 +61,11 @@ protected:
virtual void _whipped(); ///< Prepare 1 event
private:
- Raul::List<Event*> _events;
- Raul::AtomicPtr<Raul::List<Event*>::Node> _prepared_back;
- Raul::Semaphore _blocking_semaphore;
+ Raul::AtomicPtr<QueuedEvent> _head;
+ Raul::AtomicPtr<QueuedEvent> _prepared_back;
+ Raul::AtomicPtr<QueuedEvent> _tail;
+
+ Raul::Semaphore _blocking_semaphore;
};
} // namespace Server
diff --git a/src/server/HTTPEngineReceiver.cpp b/src/server/HTTPEngineReceiver.cpp
index b027a6b3..4284c51a 100644
--- a/src/server/HTTPEngineReceiver.cpp
+++ b/src/server/HTTPEngineReceiver.cpp
@@ -51,7 +51,7 @@ using namespace Serialisation;
namespace Server {
HTTPEngineReceiver::HTTPEngineReceiver(Engine& engine, uint16_t port)
- : QueuedEngineInterface(engine, 64) // FIXME
+ : QueuedEngineInterface(engine)
, _server(soup_server_new(SOUP_SERVER_PORT, port, NULL))
{
_receive_thread = new ReceiveThread(*this);
diff --git a/src/server/OSCEngineReceiver.cpp b/src/server/OSCEngineReceiver.cpp
index 635856da..338fb985 100644
--- a/src/server/OSCEngineReceiver.cpp
+++ b/src/server/OSCEngineReceiver.cpp
@@ -57,8 +57,8 @@ namespace Server {
* See the "Client OSC Namespace Documentation" for details.</p>
*/
-OSCEngineReceiver::OSCEngineReceiver(Engine& engine, size_t queue_size, uint16_t port)
- : QueuedEngineInterface(engine, queue_size) // FIXME
+OSCEngineReceiver::OSCEngineReceiver(Engine& engine, uint16_t port)
+ : QueuedEngineInterface(engine)
, _server(NULL)
{
_receive_thread = new ReceiveThread(*this);
diff --git a/src/server/OSCEngineReceiver.hpp b/src/server/OSCEngineReceiver.hpp
index 8a76fa01..c9585f44 100644
--- a/src/server/OSCEngineReceiver.hpp
+++ b/src/server/OSCEngineReceiver.hpp
@@ -57,7 +57,7 @@ inline static int name##_cb(LO_HANDLER_ARGS, void* myself)\
class OSCEngineReceiver : public QueuedEngineInterface
{
public:
- OSCEngineReceiver(Engine& engine, size_t queue_size, uint16_t port);
+ OSCEngineReceiver(Engine& engine, uint16_t port);
~OSCEngineReceiver();
private:
diff --git a/src/server/PostProcessor.cpp b/src/server/PostProcessor.cpp
index 25252a16..3364322a 100644
--- a/src/server/PostProcessor.cpp
+++ b/src/server/PostProcessor.cpp
@@ -15,16 +15,16 @@
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
-#include <cassert>
-#include <pthread.h>
+#include <assert.h>
+
#include "raul/log.hpp"
-#include "raul/SRSWQueue.hpp"
-#include "events/SendPortValue.hpp"
-#include "Event.hpp"
-#include "PostProcessor.hpp"
-#include "Engine.hpp"
+
#include "Driver.hpp"
+#include "Engine.hpp"
+#include "PostProcessor.hpp"
#include "ProcessContext.hpp"
+#include "QueuedEvent.hpp"
+#include "events/SendPortValue.hpp"
using namespace std;
using namespace Raul;
@@ -32,10 +32,9 @@ using namespace Raul;
namespace Ingen {
namespace Server {
-PostProcessor::PostProcessor(Engine& engine, size_t queue_size)
+PostProcessor::PostProcessor(Engine& engine)
: _engine(engine)
, _max_time(0)
- , _events(queue_size)
, _event_buffer_size(sizeof(Events::SendPortValue)) // FIXME: make generic
, _event_buffer((uint8_t*)malloc(_event_buffer_size))
{
@@ -47,6 +46,21 @@ PostProcessor::~PostProcessor()
}
void
+PostProcessor::append(QueuedEvent* first, QueuedEvent* last)
+{
+ assert(first);
+ assert(last);
+ QueuedEvent* const head = _head.get();
+ QueuedEvent* const tail = _tail.get();
+ if (!head) {
+ _head = first;
+ } else {
+ tail->next(first);
+ }
+ _tail = last;
+}
+
+void
PostProcessor::process()
{
const FrameTime end_time = _max_time.get();
@@ -75,16 +89,21 @@ PostProcessor::process()
}
/* Process normal events */
- Raul::List<Event*>::Node* n = _events.head();
- while (n) {
- if (n->elem()->time() > end_time)
+ QueuedEvent* ev = _head.get();
+ while (ev) {
+ if (ev->time() > end_time)
break;
- Raul::List<Event*>::Node* next = n->next();
- n->elem()->post_process();
- _events.erase(_events.begin());
- delete n->elem();
- delete n;
- n = next;
+
+ QueuedEvent* const next = (QueuedEvent*)ev->next();
+ ev->post_process();
+ if (next) {
+ _head = next;
+ } else {
+ _head = NULL;
+ _tail = NULL;
+ }
+ delete ev;
+ ev = next;
}
}
diff --git a/src/server/PostProcessor.hpp b/src/server/PostProcessor.hpp
index f06a182e..9cf8ea06 100644
--- a/src/server/PostProcessor.hpp
+++ b/src/server/PostProcessor.hpp
@@ -18,14 +18,15 @@
#ifndef INGEN_ENGINE_POSTPROCESSOR_HPP
#define INGEN_ENGINE_POSTPROCESSOR_HPP
-#include <pthread.h>
-#include "raul/SRSWQueue.hpp"
-#include "raul/List.hpp"
+#include "raul/AtomicInt.hpp"
+#include "raul/AtomicPtr.hpp"
+
+#include "types.hpp"
namespace Ingen {
namespace Server {
-class Event;
+class QueuedEvent;
class Engine;
/** Processor for Events after leaving the audio thread.
@@ -42,11 +43,13 @@ class Engine;
class PostProcessor
{
public:
- PostProcessor(Engine& engine, size_t queue_size);
+ PostProcessor(Engine& engine);
~PostProcessor();
- /** Push a list of events on to the process queue, realtime-safe, not thread-safe. */
- inline void append(Raul::List<Event*>* l) { _events.append(*l); }
+ /** Push a list of events on to the process queue.
+ realtime-safe, not thread-safe.
+ */
+ void append(QueuedEvent* first, QueuedEvent* last);
/** Post-process and delete all pending events */
void process();
@@ -55,11 +58,12 @@ public:
void set_end_time(FrameTime time) { _max_time = time; }
private:
- Engine& _engine;
- Raul::AtomicInt _max_time;
- Raul::List<Event*> _events;
- uint32_t _event_buffer_size;
- uint8_t* _event_buffer;
+ Engine& _engine;
+ Raul::AtomicPtr<QueuedEvent> _head;
+ Raul::AtomicPtr<QueuedEvent> _tail;
+ Raul::AtomicInt _max_time;
+ uint32_t _event_buffer_size;
+ uint8_t* _event_buffer;
};
} // namespace Server
diff --git a/src/server/QueuedEngineInterface.cpp b/src/server/QueuedEngineInterface.cpp
index 5ef9335f..958a5110 100644
--- a/src/server/QueuedEngineInterface.cpp
+++ b/src/server/QueuedEngineInterface.cpp
@@ -33,8 +33,8 @@ using namespace Raul;
namespace Ingen {
namespace Server {
-QueuedEngineInterface::QueuedEngineInterface(Engine& engine, size_t queue_size)
- : EventSource(queue_size)
+QueuedEngineInterface::QueuedEngineInterface(Engine& engine)
+ : EventSource()
, _request(new Request(this, NULL, 0))
, _engine(engine)
, _in_bundle(false)
diff --git a/src/server/QueuedEngineInterface.hpp b/src/server/QueuedEngineInterface.hpp
index 3bd77013..65ad80f7 100644
--- a/src/server/QueuedEngineInterface.hpp
+++ b/src/server/QueuedEngineInterface.hpp
@@ -48,7 +48,7 @@ class QueuedEngineInterface : public EventSource,
public ServerInterface
{
public:
- QueuedEngineInterface(Engine& engine, size_t queue_size);
+ QueuedEngineInterface(Engine& engine);
virtual ~QueuedEngineInterface();
Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; }
diff --git a/src/server/ingen_engine.cpp b/src/server/ingen_engine.cpp
index 568267cd..a0cda60c 100644
--- a/src/server/ingen_engine.cpp
+++ b/src/server/ingen_engine.cpp
@@ -29,8 +29,7 @@ struct IngenEngineModule : public Ingen::Shared::Module {
SharedPtr<Server::Engine> engine(new Server::Engine(world));
world->set_local_engine(engine);
SharedPtr<Server::QueuedEngineInterface> interface(
- new Server::QueuedEngineInterface(*engine.get(),
- engine->event_queue_size()));
+ new Server::QueuedEngineInterface(*engine.get()));
world->set_engine(interface);
engine->add_event_source(interface);
assert(world->local_engine() == engine);
diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp
index bd64cf34..aa3c1f56 100644
--- a/src/server/ingen_lv2.cpp
+++ b/src/server/ingen_lv2.cpp
@@ -268,9 +268,7 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
plugin->world->set_local_engine(engine);
SharedPtr<Server::QueuedEngineInterface> interface(
- new Server::QueuedEngineInterface(
- *engine.get(),
- engine->event_queue_size()));
+ new Server::QueuedEngineInterface(*engine.get()));
plugin->world->set_engine(interface);
engine->add_event_source(interface);
diff --git a/src/server/ingen_osc.cpp b/src/server/ingen_osc.cpp
index bf0f09f8..f10481e5 100644
--- a/src/server/ingen_osc.cpp
+++ b/src/server/ingen_osc.cpp
@@ -29,7 +29,6 @@ struct IngenOSCModule : public Ingen::Shared::Module {
SharedPtr<Server::OSCEngineReceiver> interface(
new Server::OSCEngineReceiver(
*engine,
- engine->event_queue_size(),
world->conf()->option("engine-port").get_int32()));
engine->add_event_source(interface);
}