diff options
author | David Robillard <d@drobilla.net> | 2007-01-23 02:02:39 +0000 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2007-01-23 02:02:39 +0000 |
commit | 32874d5dcb4867d966f25cd9b2953c9644d97027 (patch) | |
tree | c38428ae638d7b2b67884ee894947849d88de144 | |
parent | aa454620bff8b6c5dcd4e0e379726307ad1918da (diff) | |
download | raul-32874d5dcb4867d966f25cd9b2953c9644d97027.tar.gz raul-32874d5dcb4867d966f25cd9b2953c9644d97027.tar.bz2 raul-32874d5dcb4867d966f25cd9b2953c9644d97027.zip |
More work on multi-writer queue (SRMWQueue), proper unit test.
git-svn-id: http://svn.drobilla.net/lad/raul@267 a436a847-0d15-0410-975c-d299462d15a1
-rw-r--r-- | raul/AtomicInt.h | 4 | ||||
-rw-r--r-- | raul/SRMWQueue.h | 60 | ||||
-rw-r--r-- | tests/queue_test.cpp | 200 |
3 files changed, 242 insertions, 22 deletions
diff --git a/raul/AtomicInt.h b/raul/AtomicInt.h index 2678f31..cc50fb9 100644 --- a/raul/AtomicInt.h +++ b/raul/AtomicInt.h @@ -55,7 +55,9 @@ public: inline AtomicInt& operator--() // prefix { g_atomic_int_add(&_val, -1); return *this; } - /** Set value to newval iff current value is oldval */ + /** Set value to newval iff current value is oldval. + * @return whether set succeeded. + */ inline bool compare_and_exchange(int oldval, int newval) { return g_atomic_int_compare_and_exchange(&_val, oldval, newval); } diff --git a/raul/SRMWQueue.h b/raul/SRMWQueue.h index 5b7273a..3f4e1cd 100644 --- a/raul/SRMWQueue.h +++ b/raul/SRMWQueue.h @@ -19,6 +19,7 @@ #include <cassert> #include <cstdlib> +#include <cmath> #include <boost/utility.hpp> #include "raul/AtomicInt.h" @@ -41,8 +42,11 @@ namespace Raul { * Obey the threading restrictions documented here, or horrible nasty (possibly * undetected) errors will occur. * - * This is slightly more expensive than the SRMWSRMWQueue. Use that if you don't - * require multiple writers. + * If you only need a single writer, use SRSWQueue. This is slightly more + * computationally expensive, and allocates an additional size words of memory (ie + * if you're using this for ints or pointers etc, SRMWQueue will be twice the size + * of SRSWQueue for the same queue size. Additionally, the size of this queue must + * be a power of 2 (SRSWQueue does not have this limitation). * * \ingroup raul */ @@ -76,11 +80,13 @@ private: // Note that _front doesn't need to be an AtomicInt since it's only accessed // by the (single) reader thread - int _front; ///< Circular index of element at front of queue (READER ONLY) - AtomicInt _back; ///< Circular index 1 past element at back of queue (WRITERS ONLY) - AtomicInt _space; ///< Remaining free space for new elements (all threads) - const int _size; ///< Size of @ref _objects (you can store _size-1 objects) - T* const _objects; ///< Fixed array containing queued elements + unsigned _front; ///< Circular index of element at front of queue (READER ONLY) + AtomicInt _back; ///< Circular index 1 past element at back of queue (WRITERS ONLY) + AtomicInt _write_space; ///< Remaining free space for new elements (all threads) + const unsigned _size; ///< Size of @ref _objects (you can store _size-1 objects) + + T* const _objects; ///< Fixed array containing queued elements + AtomicInt* const _valid; ///< Parallel array to _objects, whether loc is written or not }; @@ -88,12 +94,18 @@ template<typename T> SRMWQueue<T>::SRMWQueue(size_t size) : _front(0) , _back(0) - , _space(size) + , _write_space(size) , _size(size+1) , _objects((T*)calloc(_size, sizeof(T))) + , _valid((AtomicInt*)calloc(_size, sizeof(AtomicInt))) { + assert(log2(size) - (int)log2(size) == 0); assert(size > 1); - assert(_size-1 == _space.get()); + assert(_size-1 == (unsigned)_write_space.get()); + + for (unsigned i=0; i < _size; ++i) { + assert(_valid[i].get() == 0); + } } @@ -112,7 +124,7 @@ template <typename T> inline bool SRMWQueue<T>::full() const { - return ( _space.get() <= 0 ); + return (_write_space.get() <= 0); } @@ -127,9 +139,8 @@ template <typename T> inline bool SRMWQueue<T>::push(const T& elem) { - const int old_space = _space.exchange_and_add(-1); - - const bool already_full = ( old_space <= 0 ); + const int old_write_space = _write_space.exchange_and_add(-1); + const bool already_full = ( old_write_space <= 0 ); /* Technically right here pop could be called in the reader thread and * make space available, but no harm in failing anyway - this queue @@ -137,17 +148,21 @@ SRMWQueue<T>::push(const T& elem) if (already_full) { - /* if multiple threads simultaneously get here, _space may be 0 - * or negative. The next call to pop() will set _space back to - * a sane value. Note that _space is not exposed, so this is okay + /* if multiple threads simultaneously get here, _write_space may be 0 + * or negative. The next call to pop() will set _write_space back to + * a sane value. Note that _write_space is not exposed, so this is okay * (... assuming this code is correct) */ return false; } else { - const int write_index = _back.exchange_and_add(1) % _size; + // Note: _size must be a power of 2 for this to not explode when _back overflows + const unsigned write_index = (unsigned)_back.exchange_and_add(1) % _size; + + assert(_valid[write_index] == 0); _objects[write_index] = elem; + ++(_valid[write_index]); return true; @@ -163,7 +178,7 @@ template <typename T> inline bool SRMWQueue<T>::empty() const { - return ( _space == capacity() ); + return (_valid[_front].get() == 0); } @@ -191,12 +206,15 @@ template <typename T> inline void SRMWQueue<T>::pop() { + assert(_valid[_front] == 1); + --(_valid[_front]); + _front = (_front + 1) % (_size); - if (_space.get() < 0) - _space = 1; + if (_write_space.get() < 0) + _write_space = 1; else - ++_space; + ++_write_space; } diff --git a/tests/queue_test.cpp b/tests/queue_test.cpp index dc9928d..0b8bbd3 100644 --- a/tests/queue_test.cpp +++ b/tests/queue_test.cpp @@ -1,12 +1,210 @@ #include <iostream> #include <string> +#include <vector> +#include <algorithm> +#include <stdio.h> +#include <stdlib.h> +#include <fcntl.h> +#include <termios.h> #include "raul/SRSWQueue.h" #include "raul/SRMWQueue.h" +#include "raul/Thread.h" +#include "raul/AtomicInt.h" using namespace std; using namespace Raul; +static const unsigned NUM_DATA = 10; +static const unsigned QUEUE_SIZE = 1024*1024; +static const unsigned NUM_WRITERS = 2; +static const unsigned PUSHES_PER_ITERATION = 2; +// Data to read/write using actions pumped through the queue +struct Record { + Record() : read_count(0), write_count(0) {} + + AtomicInt read_count; + AtomicInt write_count; +}; + +Record data[NUM_DATA]; + + +// Actions pumped through the queue to manipulate data +struct WriteAction { + WriteAction(unsigned idx) + : index(idx)/*, has_read(false)*/ {} + + inline void read() const { + //cout << "READ " << index << "\r\n"; + //assert(!has_read); + ++(data[index].read_count); + //has_read = true; + }; + + unsigned index; + //bool has_read; +}; + + +// The victim +SRMWQueue<WriteAction> queue(QUEUE_SIZE); + + +class WriteThread : public Thread { +protected: + void _run() { + + cout << "Writer starting.\r\n"; + + // Wait for everything to get ready + sleep(2); + + while (true) { + for (unsigned j=0; j < PUSHES_PER_ITERATION; ++j) { + unsigned i = rand() % NUM_DATA; + if (queue.push(WriteAction(i))) { + ++(data[i].write_count); + //cout << "WRITE " << i << "\r\n"; + } else { + cerr << "FAILED WRITE\r\n"; + } + } + + // FIXME: remove! + //if (rand() % 20) + // usleep(1); + + // This thread will never cancel without this here since + // all the stuff about is cancellation point free + // (good! RT safe) + pthread_testcancel(); + } + + cout << "Writer exiting." << endl; + } +}; + + +// Returns 0 if all read count/write count pairs are equal, +// otherwise how far off total count was +unsigned +data_is_sane() +{ + unsigned ret = 0; + for (unsigned i=0; i < NUM_DATA; ++i) { + unsigned diff = abs(data[i].read_count.get() - data[i].write_count.get()); + ret += diff; + } + + return ret; +} + + +void +dump_data() +{ + for (unsigned i=0; i < NUM_DATA; ++i) { + cout << i << ":\t" << data[i].read_count.get() + << "\t : \t" << data[i].write_count.get(); + if (data[i].read_count.get() == data[i].write_count.get()) + cout << "\t OK" << endl; + else + cout << "\t FAIL" << endl; + } +} + + + +int main() +{ + unsigned long total_processed = 0; + + vector<WriteThread*> writers(NUM_WRITERS, new WriteThread()); + + struct termios orig_term; + struct termios raw_term; + + cfmakeraw(&raw_term); + if (tcgetattr(0, &orig_term) != 0) return 1; //save terminal settings + if (tcsetattr(0, TCSANOW, &raw_term) != 0) return 1; //set to raw + fcntl(0, F_SETFL, O_NONBLOCK); //set to nonblocking IO on stdin + + + for (unsigned i=0; i < NUM_WRITERS; ++i) { + writers[i]->set_name(string("Writer ") + (char)('0' + i)); + writers[i]->start(); + } + + // Read + while (getchar() == -1) { + unsigned count = 0; + while (count < queue.capacity() && !queue.empty()) { + WriteAction action = queue.front(); + queue.pop(); + action.read(); + ++count; + ++total_processed; + } + + /*if (count > 0) + cout << "Processed " << count << " requests\t\t" + << "(total " << total_processed << ")\r\n";*/ + + //if (total_processed > 0 && total_processed % 128l == 0) + // cout << "Total processed: " << total_processed << "\r\n"; + } + + if (tcsetattr(0, TCSANOW, &orig_term) != 0) return 1; //restore + + cout << "Finishing." << endl; + + // Stop the writers + for (unsigned i=0; i < NUM_WRITERS; ++i) + writers[i]->stop(); + + cout << "\n\n****************** DONE *********************\n\n"; + + unsigned leftovers = 0; + + // Drain anything left in the queue + while (!queue.empty()) { + WriteAction action = queue.front(); + queue.pop(); + action.read(); + leftovers++; + ++total_processed; + } + + if (leftovers > 0) + cout << "Processed " << leftovers << " leftovers." << endl; + + + cout << "\n\n*********************************************\n\n"; + + cout << "Total processed: " << total_processed << endl; + if (total_processed > INT_MAX) + cout << "(Counter had to wrap)" << endl; + else + cout << "(Counter did NOT have to wrap)" << endl; + + + unsigned diff = data_is_sane(); + + if (diff == 0) { + cout << "PASS" << endl; + } else { + cout << "FAILED BY " << diff << endl; + // dump_data(); + } + + dump_data(); + + return 0; +} + + +#if 0 int main() { //SRSWQueue<int> q(10); @@ -49,3 +247,5 @@ int main() return 0; } +#endif + |