diff options
author | David Robillard <d@drobilla.net> | 2007-01-23 02:02:39 +0000 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2007-01-23 02:02:39 +0000 |
commit | 32874d5dcb4867d966f25cd9b2953c9644d97027 (patch) | |
tree | c38428ae638d7b2b67884ee894947849d88de144 /raul | |
parent | aa454620bff8b6c5dcd4e0e379726307ad1918da (diff) | |
download | raul-32874d5dcb4867d966f25cd9b2953c9644d97027.tar.gz raul-32874d5dcb4867d966f25cd9b2953c9644d97027.tar.bz2 raul-32874d5dcb4867d966f25cd9b2953c9644d97027.zip |
More work on multi-writer queue (SRMWQueue), proper unit test.
git-svn-id: http://svn.drobilla.net/lad/raul@267 a436a847-0d15-0410-975c-d299462d15a1
Diffstat (limited to 'raul')
-rw-r--r-- | raul/AtomicInt.h | 4 | ||||
-rw-r--r-- | raul/SRMWQueue.h | 60 |
2 files changed, 42 insertions, 22 deletions
diff --git a/raul/AtomicInt.h b/raul/AtomicInt.h index 2678f31..cc50fb9 100644 --- a/raul/AtomicInt.h +++ b/raul/AtomicInt.h @@ -55,7 +55,9 @@ public: inline AtomicInt& operator--() // prefix { g_atomic_int_add(&_val, -1); return *this; } - /** Set value to newval iff current value is oldval */ + /** Set value to newval iff current value is oldval. + * @return whether set succeeded. + */ inline bool compare_and_exchange(int oldval, int newval) { return g_atomic_int_compare_and_exchange(&_val, oldval, newval); } diff --git a/raul/SRMWQueue.h b/raul/SRMWQueue.h index 5b7273a..3f4e1cd 100644 --- a/raul/SRMWQueue.h +++ b/raul/SRMWQueue.h @@ -19,6 +19,7 @@ #include <cassert> #include <cstdlib> +#include <cmath> #include <boost/utility.hpp> #include "raul/AtomicInt.h" @@ -41,8 +42,11 @@ namespace Raul { * Obey the threading restrictions documented here, or horrible nasty (possibly * undetected) errors will occur. * - * This is slightly more expensive than the SRMWSRMWQueue. Use that if you don't - * require multiple writers. + * If you only need a single writer, use SRSWQueue. This is slightly more + * computationally expensive, and allocates an additional size words of memory (ie + * if you're using this for ints or pointers etc, SRMWQueue will be twice the size + * of SRSWQueue for the same queue size. Additionally, the size of this queue must + * be a power of 2 (SRSWQueue does not have this limitation). * * \ingroup raul */ @@ -76,11 +80,13 @@ private: // Note that _front doesn't need to be an AtomicInt since it's only accessed // by the (single) reader thread - int _front; ///< Circular index of element at front of queue (READER ONLY) - AtomicInt _back; ///< Circular index 1 past element at back of queue (WRITERS ONLY) - AtomicInt _space; ///< Remaining free space for new elements (all threads) - const int _size; ///< Size of @ref _objects (you can store _size-1 objects) - T* const _objects; ///< Fixed array containing queued elements + unsigned _front; ///< Circular index of element at front of queue (READER ONLY) + AtomicInt _back; ///< Circular index 1 past element at back of queue (WRITERS ONLY) + AtomicInt _write_space; ///< Remaining free space for new elements (all threads) + const unsigned _size; ///< Size of @ref _objects (you can store _size-1 objects) + + T* const _objects; ///< Fixed array containing queued elements + AtomicInt* const _valid; ///< Parallel array to _objects, whether loc is written or not }; @@ -88,12 +94,18 @@ template<typename T> SRMWQueue<T>::SRMWQueue(size_t size) : _front(0) , _back(0) - , _space(size) + , _write_space(size) , _size(size+1) , _objects((T*)calloc(_size, sizeof(T))) + , _valid((AtomicInt*)calloc(_size, sizeof(AtomicInt))) { + assert(log2(size) - (int)log2(size) == 0); assert(size > 1); - assert(_size-1 == _space.get()); + assert(_size-1 == (unsigned)_write_space.get()); + + for (unsigned i=0; i < _size; ++i) { + assert(_valid[i].get() == 0); + } } @@ -112,7 +124,7 @@ template <typename T> inline bool SRMWQueue<T>::full() const { - return ( _space.get() <= 0 ); + return (_write_space.get() <= 0); } @@ -127,9 +139,8 @@ template <typename T> inline bool SRMWQueue<T>::push(const T& elem) { - const int old_space = _space.exchange_and_add(-1); - - const bool already_full = ( old_space <= 0 ); + const int old_write_space = _write_space.exchange_and_add(-1); + const bool already_full = ( old_write_space <= 0 ); /* Technically right here pop could be called in the reader thread and * make space available, but no harm in failing anyway - this queue @@ -137,17 +148,21 @@ SRMWQueue<T>::push(const T& elem) if (already_full) { - /* if multiple threads simultaneously get here, _space may be 0 - * or negative. The next call to pop() will set _space back to - * a sane value. Note that _space is not exposed, so this is okay + /* if multiple threads simultaneously get here, _write_space may be 0 + * or negative. The next call to pop() will set _write_space back to + * a sane value. Note that _write_space is not exposed, so this is okay * (... assuming this code is correct) */ return false; } else { - const int write_index = _back.exchange_and_add(1) % _size; + // Note: _size must be a power of 2 for this to not explode when _back overflows + const unsigned write_index = (unsigned)_back.exchange_and_add(1) % _size; + + assert(_valid[write_index] == 0); _objects[write_index] = elem; + ++(_valid[write_index]); return true; @@ -163,7 +178,7 @@ template <typename T> inline bool SRMWQueue<T>::empty() const { - return ( _space == capacity() ); + return (_valid[_front].get() == 0); } @@ -191,12 +206,15 @@ template <typename T> inline void SRMWQueue<T>::pop() { + assert(_valid[_front] == 1); + --(_valid[_front]); + _front = (_front + 1) % (_size); - if (_space.get() < 0) - _space = 1; + if (_write_space.get() < 0) + _write_space = 1; else - ++_space; + ++_write_space; } |