diff options
-rw-r--r-- | raul/SRMWQueue.hpp | 202 | ||||
-rw-r--r-- | raul/SRSWQueue.hpp | 147 | ||||
-rw-r--r-- | test/build_test.cpp | 2 | ||||
-rw-r--r-- | test/queue_test.cpp | 242 | ||||
-rw-r--r-- | wscript | 1 |
5 files changed, 0 insertions, 594 deletions
diff --git a/raul/SRMWQueue.hpp b/raul/SRMWQueue.hpp deleted file mode 100644 index 769364c..0000000 --- a/raul/SRMWQueue.hpp +++ /dev/null @@ -1,202 +0,0 @@ -/* - This file is part of Raul. - Copyright 2007-2014 David Robillard <http://drobilla.net> - - Raul is free software: you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation, either version 3 of the License, or any later version. - - Raul is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with Raul. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef RAUL_SRMW_QUEUE_HPP -#define RAUL_SRMW_QUEUE_HPP - -#include <cassert> -#include <cstdlib> -#include <cmath> - -#include "raul/Noncopyable.hpp" - -namespace Raul { - -/** Realtime-safe single-reader multi-writer queue (aka lock-free ringbuffer) - * - * Implemented as a dequeue in a fixed array. Both push and pop are realtime - * safe, but only push is threadsafe. In other words, multiple threads can push - * data into this queue for a single thread to consume. - * - * The interface is intentionally as similar to std::queue as possible, but - * note the additional thread restrictions imposed (e.g. empty() is only - * legal to call in the read thread). - * - * Obey the threading restrictions documented here, or horrible nasty (possibly - * undetected) errors will occur. - * - * If you only need a single writer, use SRSWQueue. This is slightly more - * computationally expensive, and allocates an additional size words of memory (ie - * if you're using this for ints or pointers etc, SRMWQueue will be twice the size - * of SRSWQueue for the same queue size. Additionally, the size of this queue must - * be a power of 2 (SRSWQueue does not have this limitation). - * - * \ingroup raul - */ -template <typename T> -class SRMWQueue : Noncopyable -{ -public: - explicit SRMWQueue(size_t size); - ~SRMWQueue(); - - // Any thread: - - inline size_t capacity() const { return _size-1; } - - // Write thread(s): - - inline bool full() const; - inline bool push(const T& obj); - - // Read thread: - - inline bool empty() const; - inline T& front() const; - inline void pop(); - -private: - - // Note that _front needn't be atomic since it's only used by reader - - unsigned _front; ///< Circular index of element at front of queue (READER ONLY) - std::atomic<int> _back; ///< Circular index 1 past element at back of queue (WRITERS ONLY) - std::atomic<int> _write_space; ///< Remaining free space for new elements (all threads) - const unsigned _size; ///< Size of `_objects` (you can store _size-1 objects) - - T* const _objects; ///< Fixed array containing queued elements - std::atomic<bool>* const _valid; ///< Parallel array to _objects, whether loc is written or not -}; - -template<typename T> -SRMWQueue<T>::SRMWQueue(size_t size) - : _front(0) - , _back(0) - , _write_space(int(size)) - , _size(unsigned(size) + 1) - , _objects((T*)calloc(_size, sizeof(T))) - , _valid(new std::atomic<bool>[_size]) -{ - assert(log2(size) - (int)log2(size) == 0); - assert(size > 1); - assert(_size-1 == (unsigned)_write_space.load()); - - for (unsigned i = 0; i < _size; ++i) { - _valid[i] = false; - } -} - -template <typename T> -SRMWQueue<T>::~SRMWQueue() -{ - delete[] _valid; - free(_objects); -} - -/** Return whether the queue is full. - * - * Write thread(s) only. - */ -template <typename T> -inline bool -SRMWQueue<T>::full() const -{ - return (_write_space.load() <= 0); -} - -/** Push an item onto the back of the SRMWQueue - realtime-safe, not thread-safe. - * - * Write thread(s) only. - * - * @returns true if `elem` was successfully pushed onto the queue, - * false otherwise (queue is full). - */ -template <typename T> -inline bool -SRMWQueue<T>::push(const T& elem) -{ - const int old_write_space = _write_space--; - const bool already_full = (old_write_space <= 0); - - /* Technically right here pop could be called in the reader thread and - * make space available, but no harm in failing anyway - this queue - * really isn't designed to be filled... */ - - if (already_full) { - /* if multiple threads simultaneously get here, _write_space may be 0 - * or negative. The next call to pop() will set _write_space back to - * a sane value. Note that _write_space is not exposed, so this is okay - * (... assuming this code is correct) */ - - return false; - - } else { - // Note: _size must be a power of 2 for this to not explode when _back overflows - const unsigned write_index = unsigned(_back++) % _size; - - assert(!_valid[write_index]); - _objects[write_index] = elem; - _valid[write_index] = true; - - return true; - } -} - -/** Return whether the queue is empty. - * - * Read thread only. - */ -template <typename T> -inline bool -SRMWQueue<T>::empty() const -{ - return (!_valid[_front].load()); -} - -/** Return the element at the front of the queue without removing it. - * - * It is a fatal error to call front() when the queue is empty. - * Read thread only. - */ -template <typename T> -inline T& -SRMWQueue<T>::front() const -{ - return _objects[_front]; -} - -/** Pop an item off the front of the queue - realtime-safe, NOT thread-safe. - * - * It is a fatal error to call pop() if the queue is empty. - * Read thread only. - */ -template <typename T> -inline void -SRMWQueue<T>::pop() -{ - _valid[_front] = false; - - _front = (_front + 1) % (_size); - - if (_write_space.load() < 0) - _write_space = 1; - else - ++_write_space; -} - -} // namespace Raul - -#endif // RAUL_SRMW_QUEUE_HPP diff --git a/raul/SRSWQueue.hpp b/raul/SRSWQueue.hpp deleted file mode 100644 index 5e2cf4e..0000000 --- a/raul/SRSWQueue.hpp +++ /dev/null @@ -1,147 +0,0 @@ -/* - This file is part of Raul. - Copyright 2007-2015 David Robillard <http://drobilla.net> - - Raul is free software: you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation, either version 3 of the License, or any later version. - - Raul is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with Raul. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef RAUL_SRSW_QUEUE_HPP -#define RAUL_SRSW_QUEUE_HPP - -#include <atomic> -#include <cassert> - -#include "raul/Noncopyable.hpp" - -namespace Raul { - -/** Realtime-safe single-reader single-writer queue (aka lock-free ringbuffer) - * - * This is appropriate for a cross-thread queue of fixed size object. If you - * need to do variable sized reads and writes, use Raul::RingBuffer instead. - * - * Implemented as a dequeue in a fixed array. This is read/write thread-safe, - * pushing and popping may occur simultaneously by seperate threads, but - * the push and pop operations themselves are not thread-safe (ie. there can - * be at most 1 read and at most 1 writer thread). - * - * \ingroup raul - */ -template <typename T> -class SRSWQueue : Noncopyable -{ -public: - /** @param size Size in number of elements */ - explicit SRSWQueue(size_t size); - ~SRSWQueue(); - - // Any thread: - - inline size_t capacity() const { return _size-1; } - - // Write thread(s): - - inline bool full() const; - inline bool push(const T& obj); - - // Read thread: - - inline bool empty() const; - inline T& front() const; - inline void pop(); - -private: - std::atomic<size_t> _front; ///< Index to front of queue - std::atomic<size_t> _back; ///< Index to back of queue (one past end) - const size_t _size; ///< Size of `_objects` (at most _size-1) - T* const _objects; ///< Fixed array containing queued elements -}; - -template<typename T> -SRSWQueue<T>::SRSWQueue(size_t size) - : _front(0) - , _back(0) - , _size(size + 1) - , _objects(new T[_size]) -{ - assert(size > 1); -} - -template <typename T> -SRSWQueue<T>::~SRSWQueue() -{ - delete[] _objects; -} - -/** Return whether or not the queue is empty. - */ -template <typename T> -inline bool -SRSWQueue<T>::empty() const -{ - return (_back.load() == _front.load()); -} - -/** Return whether or not the queue is full. - */ -template <typename T> -inline bool -SRSWQueue<T>::full() const -{ - return (((_front.load() - _back.load() + _size) % _size) == 1); -} - -/** Return the element at the front of the queue without removing it - */ -template <typename T> -inline T& -SRSWQueue<T>::front() const -{ - return _objects[_front.load()]; -} - -/** Push an item onto the back of the SRSWQueue - realtime-safe, not thread-safe. - * - * @returns true if `elem` was successfully pushed onto the queue, - * false otherwise (queue is full). - */ -template <typename T> -inline bool -SRSWQueue<T>::push(const T& elem) -{ - if (full()) { - return false; - } else { - unsigned back = _back.load(); - _objects[back] = elem; - _back = (back + 1) % _size; - return true; - } -} - -/** Pop an item off the front of the queue - realtime-safe, not thread-safe. - * - * It is a fatal error to call pop() when the queue is empty. - */ -template <typename T> -inline void -SRSWQueue<T>::pop() -{ - assert(!empty()); - assert(_size > 0); - - _front = (_front.load() + 1) % (_size); -} - -} // namespace Raul - -#endif // RAUL_SRSW_QUEUE_HPP diff --git a/test/build_test.cpp b/test/build_test.cpp index dabc207..ea5bf2b 100644 --- a/test/build_test.cpp +++ b/test/build_test.cpp @@ -23,8 +23,6 @@ #include "raul/Path.hpp" #include "raul/Process.hpp" #include "raul/RingBuffer.hpp" -#include "raul/SRMWQueue.hpp" -#include "raul/SRSWQueue.hpp" #include "raul/Semaphore.hpp" #include "raul/Socket.hpp" #include "raul/Symbol.hpp" diff --git a/test/queue_test.cpp b/test/queue_test.cpp deleted file mode 100644 index 8f9ab61..0000000 --- a/test/queue_test.cpp +++ /dev/null @@ -1,242 +0,0 @@ -/* - This file is part of Raul. - Copyright 2007-2015 David Robillard <http://drobilla.net> - - Raul is free software: you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation, either version 3 of the License, or any later version. - - Raul is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with Raul. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <limits.h> -#include <stdio.h> -#include <stdlib.h> - -#include <algorithm> -#include <atomic> -#include <iostream> -#include <string> -#include <thread> -#include <vector> - -#include "raul/SRMWQueue.hpp" -#include "raul/SRSWQueue.hpp" - -using namespace std; -using namespace Raul; - -namespace { - -const unsigned NUM_DATA = 10; -const unsigned QUEUE_SIZE = 128; -const unsigned NUM_WRITERS = 2; -const unsigned PUSHES_PER_ITERATION = 3; - -// Data to read/write using actions pumped through the queue -struct Record { - Record() : read_count(0), write_count(0) {} - - std::atomic<int> read_count; - std::atomic<int> write_count; -}; - -Record data[NUM_DATA]; - -// Actions pumped through the queue to manipulate data -struct WriteAction { - WriteAction(unsigned idx) : index(idx) {} - - inline void read() const { - ++(data[index].read_count); - } - - unsigned index; -}; - -// The victim -SRMWQueue<WriteAction> queue(QUEUE_SIZE); - -void -test_write(bool* exit_flag) -{ - while (!*exit_flag) { - for (unsigned j=0; j < PUSHES_PER_ITERATION; ++j) { - unsigned i = unsigned(rand()) % NUM_DATA; - if (queue.push(WriteAction(i))) { - ++(data[i].write_count); - //cout << "WRITE " << i << "\r\n"; - } else { - //cerr << "FAILED WRITE\r\n"; - } - } - } - - cout << "Writer exiting." << endl; -} - -// Returns 0 if all read count/write count pairs are equal, -// otherwise how far off total count was -unsigned -data_is_sane() -{ - unsigned ret = 0; - for (unsigned i = 0; i < NUM_DATA; ++i) { - ret += unsigned(abs(data[i].read_count.load() - - data[i].write_count.load())); - } - - return ret; -} - -} // namespace - -int -main() -{ - size_t total_processed = 0; - - cout << "Testing size" << endl; - for (unsigned i = 0; i < queue.capacity(); ++i) { - queue.push(i); - if (i == queue.capacity()-1) { - if (!queue.full()) { - cerr << "ERROR: Should be full at " << i - << " (size " << queue.capacity() << ")" << endl; - return -1; - } - } else { - if (queue.full()) { - cerr << "ERROR: Prematurely full at " << i - << " (size " << queue.capacity() << ")" << endl; - return -1; - } - } - } - - for (size_t i = 0; i < queue.capacity(); ++i) - queue.pop(); - - if (!queue.empty()) { - cerr << "ERROR: Should be empty" << endl; - return -1; - } - - cout << "Testing concurrent reading/writing" << endl; - bool exit_flags[NUM_WRITERS]; - vector<std::thread*> writers(NUM_WRITERS, NULL); - - for (unsigned i = 0; i < NUM_WRITERS; ++i) { - exit_flags[i] = false; - writers[i] = new std::thread(test_write, &exit_flags[i]); - } - - // Read - unsigned count = 0; - for (unsigned i = 0; i < 10000000; ++i) { - while (count < queue.capacity() && !queue.empty()) { - WriteAction action = queue.front(); - queue.pop(); - action.read(); - ++count; - ++total_processed; - } - - /*if (count > 0) - cout << "Processed " << count << " requests\t\t" - << "(total " << total_processed << ")\r\n"; - - if (total_processed > 0 && total_processed % 128l == 0) - cout << "Total processed: " << total_processed << "\r\n";*/ - } - - cout << "Processed " << total_processed << " requests" << endl; - - // Stop the writers - for (unsigned i = 0; i < NUM_WRITERS; ++i) { - exit_flags[i] = true; - writers[i]->join(); - delete writers[i]; - } - - //cout << "\n\n****************** DONE *********************\n\n"; - - unsigned leftovers = 0; - - // Drain anything left in the queue - while (!queue.empty()) { - WriteAction action = queue.front(); - queue.pop(); - action.read(); - leftovers++; - ++total_processed; - } - - if (leftovers > 0) - cout << "Processed " << leftovers << " leftovers." << endl; - - //cout << "\n\n*********************************************\n\n"; - - cout << "Total processed: " << total_processed << endl; - if (total_processed > INT_MAX) - cout << "(Counter had to wrap)" << endl; - else - cout << "(Counter did NOT have to wrap)" << endl; - - const unsigned diff = data_is_sane(); - if (diff != 0) { - cout << "FAILED BY " << diff << endl; - } - - return diff == 0 ? EXIT_SUCCESS : EXIT_FAILURE; -} - -#if 0 -int main() -{ - //SRSWQueue<int> q(10); - SRMWQueue<int> q(10); - - cout << "New queue. Should be empty: " << q.empty() << endl; - cout << "Capacity: " << q.capacity() << endl; - //cout << "Fill: " << q.fill() << endl; - - for (uint i=0; i < 5; ++i) { - q.push(i); - assert(!q.full()); - q.pop(); - } - cout << "Pushed and popped 5 elements. Queue should be empty: " << q.empty() << endl; - //cout << "Fill: " << q.fill() << endl; - - for (uint i=10; i < 20; ++i) { - assert(q.push(i)); - } - cout << "Pushed 10 elements. Queue should be full: " << q.full() << endl; - //cout << "Fill: " << q.fill() << endl; - - cout << "The digits 10->19 should print: " << endl; - while (!q.empty()) { - int foo = q.front(); - q.pop(); - cout << "Popped: " << foo << endl; - } - cout << "Queue should be empty: " << q.empty() << endl; - //cout << "Fill: " << q.fill() << endl; - - cout << "Attempting to add eleven elements to queue of size 10. Only first 10 should succeed:" << endl; - for (uint i=20; i <= 39; ++i) { - cout << i; - //cout << " - Fill: " << q.fill(); - cout << " - full: " << q.full(); - cout << ", succeeded: " << q.push(i) << endl; - } - - return 0; -} -#endif @@ -64,7 +64,6 @@ tests = ''' test/double_buffer_test test/maid_test test/path_test - test/queue_test test/ringbuffer_test test/sem_test test/socket_test |