summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2007-01-23 02:02:39 +0000
committerDavid Robillard <d@drobilla.net>2007-01-23 02:02:39 +0000
commit32874d5dcb4867d966f25cd9b2953c9644d97027 (patch)
treec38428ae638d7b2b67884ee894947849d88de144
parentaa454620bff8b6c5dcd4e0e379726307ad1918da (diff)
downloadraul-32874d5dcb4867d966f25cd9b2953c9644d97027.tar.gz
raul-32874d5dcb4867d966f25cd9b2953c9644d97027.tar.bz2
raul-32874d5dcb4867d966f25cd9b2953c9644d97027.zip
More work on multi-writer queue (SRMWQueue), proper unit test.
git-svn-id: http://svn.drobilla.net/lad/raul@267 a436a847-0d15-0410-975c-d299462d15a1
-rw-r--r--raul/AtomicInt.h4
-rw-r--r--raul/SRMWQueue.h60
-rw-r--r--tests/queue_test.cpp200
3 files changed, 242 insertions, 22 deletions
diff --git a/raul/AtomicInt.h b/raul/AtomicInt.h
index 2678f31..cc50fb9 100644
--- a/raul/AtomicInt.h
+++ b/raul/AtomicInt.h
@@ -55,7 +55,9 @@ public:
inline AtomicInt& operator--() // prefix
{ g_atomic_int_add(&_val, -1); return *this; }
- /** Set value to newval iff current value is oldval */
+ /** Set value to newval iff current value is oldval.
+ * @return whether set succeeded.
+ */
inline bool compare_and_exchange(int oldval, int newval)
{ return g_atomic_int_compare_and_exchange(&_val, oldval, newval); }
diff --git a/raul/SRMWQueue.h b/raul/SRMWQueue.h
index 5b7273a..3f4e1cd 100644
--- a/raul/SRMWQueue.h
+++ b/raul/SRMWQueue.h
@@ -19,6 +19,7 @@
#include <cassert>
#include <cstdlib>
+#include <cmath>
#include <boost/utility.hpp>
#include "raul/AtomicInt.h"
@@ -41,8 +42,11 @@ namespace Raul {
* Obey the threading restrictions documented here, or horrible nasty (possibly
* undetected) errors will occur.
*
- * This is slightly more expensive than the SRMWSRMWQueue. Use that if you don't
- * require multiple writers.
+ * If you only need a single writer, use SRSWQueue. This is slightly more
+ * computationally expensive, and allocates an additional size words of memory (ie
+ * if you're using this for ints or pointers etc, SRMWQueue will be twice the size
+ * of SRSWQueue for the same queue size. Additionally, the size of this queue must
+ * be a power of 2 (SRSWQueue does not have this limitation).
*
* \ingroup raul
*/
@@ -76,11 +80,13 @@ private:
// Note that _front doesn't need to be an AtomicInt since it's only accessed
// by the (single) reader thread
- int _front; ///< Circular index of element at front of queue (READER ONLY)
- AtomicInt _back; ///< Circular index 1 past element at back of queue (WRITERS ONLY)
- AtomicInt _space; ///< Remaining free space for new elements (all threads)
- const int _size; ///< Size of @ref _objects (you can store _size-1 objects)
- T* const _objects; ///< Fixed array containing queued elements
+ unsigned _front; ///< Circular index of element at front of queue (READER ONLY)
+ AtomicInt _back; ///< Circular index 1 past element at back of queue (WRITERS ONLY)
+ AtomicInt _write_space; ///< Remaining free space for new elements (all threads)
+ const unsigned _size; ///< Size of @ref _objects (you can store _size-1 objects)
+
+ T* const _objects; ///< Fixed array containing queued elements
+ AtomicInt* const _valid; ///< Parallel array to _objects, whether loc is written or not
};
@@ -88,12 +94,18 @@ template<typename T>
SRMWQueue<T>::SRMWQueue(size_t size)
: _front(0)
, _back(0)
- , _space(size)
+ , _write_space(size)
, _size(size+1)
, _objects((T*)calloc(_size, sizeof(T)))
+ , _valid((AtomicInt*)calloc(_size, sizeof(AtomicInt)))
{
+ assert(log2(size) - (int)log2(size) == 0);
assert(size > 1);
- assert(_size-1 == _space.get());
+ assert(_size-1 == (unsigned)_write_space.get());
+
+ for (unsigned i=0; i < _size; ++i) {
+ assert(_valid[i].get() == 0);
+ }
}
@@ -112,7 +124,7 @@ template <typename T>
inline bool
SRMWQueue<T>::full() const
{
- return ( _space.get() <= 0 );
+ return (_write_space.get() <= 0);
}
@@ -127,9 +139,8 @@ template <typename T>
inline bool
SRMWQueue<T>::push(const T& elem)
{
- const int old_space = _space.exchange_and_add(-1);
-
- const bool already_full = ( old_space <= 0 );
+ const int old_write_space = _write_space.exchange_and_add(-1);
+ const bool already_full = ( old_write_space <= 0 );
/* Technically right here pop could be called in the reader thread and
* make space available, but no harm in failing anyway - this queue
@@ -137,17 +148,21 @@ SRMWQueue<T>::push(const T& elem)
if (already_full) {
- /* if multiple threads simultaneously get here, _space may be 0
- * or negative. The next call to pop() will set _space back to
- * a sane value. Note that _space is not exposed, so this is okay
+ /* if multiple threads simultaneously get here, _write_space may be 0
+ * or negative. The next call to pop() will set _write_space back to
+ * a sane value. Note that _write_space is not exposed, so this is okay
* (... assuming this code is correct) */
return false;
} else {
- const int write_index = _back.exchange_and_add(1) % _size;
+ // Note: _size must be a power of 2 for this to not explode when _back overflows
+ const unsigned write_index = (unsigned)_back.exchange_and_add(1) % _size;
+
+ assert(_valid[write_index] == 0);
_objects[write_index] = elem;
+ ++(_valid[write_index]);
return true;
@@ -163,7 +178,7 @@ template <typename T>
inline bool
SRMWQueue<T>::empty() const
{
- return ( _space == capacity() );
+ return (_valid[_front].get() == 0);
}
@@ -191,12 +206,15 @@ template <typename T>
inline void
SRMWQueue<T>::pop()
{
+ assert(_valid[_front] == 1);
+ --(_valid[_front]);
+
_front = (_front + 1) % (_size);
- if (_space.get() < 0)
- _space = 1;
+ if (_write_space.get() < 0)
+ _write_space = 1;
else
- ++_space;
+ ++_write_space;
}
diff --git a/tests/queue_test.cpp b/tests/queue_test.cpp
index dc9928d..0b8bbd3 100644
--- a/tests/queue_test.cpp
+++ b/tests/queue_test.cpp
@@ -1,12 +1,210 @@
#include <iostream>
#include <string>
+#include <vector>
+#include <algorithm>
+#include <stdio.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <termios.h>
#include "raul/SRSWQueue.h"
#include "raul/SRMWQueue.h"
+#include "raul/Thread.h"
+#include "raul/AtomicInt.h"
using namespace std;
using namespace Raul;
+static const unsigned NUM_DATA = 10;
+static const unsigned QUEUE_SIZE = 1024*1024;
+static const unsigned NUM_WRITERS = 2;
+static const unsigned PUSHES_PER_ITERATION = 2;
+// Data to read/write using actions pumped through the queue
+struct Record {
+ Record() : read_count(0), write_count(0) {}
+
+ AtomicInt read_count;
+ AtomicInt write_count;
+};
+
+Record data[NUM_DATA];
+
+
+// Actions pumped through the queue to manipulate data
+struct WriteAction {
+ WriteAction(unsigned idx)
+ : index(idx)/*, has_read(false)*/ {}
+
+ inline void read() const {
+ //cout << "READ " << index << "\r\n";
+ //assert(!has_read);
+ ++(data[index].read_count);
+ //has_read = true;
+ };
+
+ unsigned index;
+ //bool has_read;
+};
+
+
+// The victim
+SRMWQueue<WriteAction> queue(QUEUE_SIZE);
+
+
+class WriteThread : public Thread {
+protected:
+ void _run() {
+
+ cout << "Writer starting.\r\n";
+
+ // Wait for everything to get ready
+ sleep(2);
+
+ while (true) {
+ for (unsigned j=0; j < PUSHES_PER_ITERATION; ++j) {
+ unsigned i = rand() % NUM_DATA;
+ if (queue.push(WriteAction(i))) {
+ ++(data[i].write_count);
+ //cout << "WRITE " << i << "\r\n";
+ } else {
+ cerr << "FAILED WRITE\r\n";
+ }
+ }
+
+ // FIXME: remove!
+ //if (rand() % 20)
+ // usleep(1);
+
+ // This thread will never cancel without this here since
+ // all the stuff about is cancellation point free
+ // (good! RT safe)
+ pthread_testcancel();
+ }
+
+ cout << "Writer exiting." << endl;
+ }
+};
+
+
+// Returns 0 if all read count/write count pairs are equal,
+// otherwise how far off total count was
+unsigned
+data_is_sane()
+{
+ unsigned ret = 0;
+ for (unsigned i=0; i < NUM_DATA; ++i) {
+ unsigned diff = abs(data[i].read_count.get() - data[i].write_count.get());
+ ret += diff;
+ }
+
+ return ret;
+}
+
+
+void
+dump_data()
+{
+ for (unsigned i=0; i < NUM_DATA; ++i) {
+ cout << i << ":\t" << data[i].read_count.get()
+ << "\t : \t" << data[i].write_count.get();
+ if (data[i].read_count.get() == data[i].write_count.get())
+ cout << "\t OK" << endl;
+ else
+ cout << "\t FAIL" << endl;
+ }
+}
+
+
+
+int main()
+{
+ unsigned long total_processed = 0;
+
+ vector<WriteThread*> writers(NUM_WRITERS, new WriteThread());
+
+ struct termios orig_term;
+ struct termios raw_term;
+
+ cfmakeraw(&raw_term);
+ if (tcgetattr(0, &orig_term) != 0) return 1; //save terminal settings
+ if (tcsetattr(0, TCSANOW, &raw_term) != 0) return 1; //set to raw
+ fcntl(0, F_SETFL, O_NONBLOCK); //set to nonblocking IO on stdin
+
+
+ for (unsigned i=0; i < NUM_WRITERS; ++i) {
+ writers[i]->set_name(string("Writer ") + (char)('0' + i));
+ writers[i]->start();
+ }
+
+ // Read
+ while (getchar() == -1) {
+ unsigned count = 0;
+ while (count < queue.capacity() && !queue.empty()) {
+ WriteAction action = queue.front();
+ queue.pop();
+ action.read();
+ ++count;
+ ++total_processed;
+ }
+
+ /*if (count > 0)
+ cout << "Processed " << count << " requests\t\t"
+ << "(total " << total_processed << ")\r\n";*/
+
+ //if (total_processed > 0 && total_processed % 128l == 0)
+ // cout << "Total processed: " << total_processed << "\r\n";
+ }
+
+ if (tcsetattr(0, TCSANOW, &orig_term) != 0) return 1; //restore
+
+ cout << "Finishing." << endl;
+
+ // Stop the writers
+ for (unsigned i=0; i < NUM_WRITERS; ++i)
+ writers[i]->stop();
+
+ cout << "\n\n****************** DONE *********************\n\n";
+
+ unsigned leftovers = 0;
+
+ // Drain anything left in the queue
+ while (!queue.empty()) {
+ WriteAction action = queue.front();
+ queue.pop();
+ action.read();
+ leftovers++;
+ ++total_processed;
+ }
+
+ if (leftovers > 0)
+ cout << "Processed " << leftovers << " leftovers." << endl;
+
+
+ cout << "\n\n*********************************************\n\n";
+
+ cout << "Total processed: " << total_processed << endl;
+ if (total_processed > INT_MAX)
+ cout << "(Counter had to wrap)" << endl;
+ else
+ cout << "(Counter did NOT have to wrap)" << endl;
+
+
+ unsigned diff = data_is_sane();
+
+ if (diff == 0) {
+ cout << "PASS" << endl;
+ } else {
+ cout << "FAILED BY " << diff << endl;
+ // dump_data();
+ }
+
+ dump_data();
+
+ return 0;
+}
+
+
+#if 0
int main()
{
//SRSWQueue<int> q(10);
@@ -49,3 +247,5 @@ int main()
return 0;
}
+#endif
+