summaryrefslogtreecommitdiffstats
path: root/test/queue_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/queue_test.cpp')
-rw-r--r--test/queue_test.cpp254
1 files changed, 254 insertions, 0 deletions
diff --git a/test/queue_test.cpp b/test/queue_test.cpp
new file mode 100644
index 0000000..49ba6f2
--- /dev/null
+++ b/test/queue_test.cpp
@@ -0,0 +1,254 @@
+#include <iostream>
+#include <string>
+#include <vector>
+#include <algorithm>
+#include <stdio.h>
+#include <stdlib.h>
+#include "raul/SRSWQueue.hpp"
+#include "raul/SRMWQueue.hpp"
+#include "raul/Thread.hpp"
+#include "raul/AtomicInt.hpp"
+
+using namespace std;
+using namespace Raul;
+
+static const unsigned NUM_DATA = 10;
+static const unsigned QUEUE_SIZE = 128;
+static const unsigned NUM_WRITERS = 2;
+static const unsigned PUSHES_PER_ITERATION = 3;
+
+// Data to read/write using actions pumped through the queue
+struct Record {
+ Record() : read_count(0), write_count(0) {}
+
+ AtomicInt read_count;
+ AtomicInt write_count;
+};
+
+Record data[NUM_DATA];
+
+
+// Actions pumped through the queue to manipulate data
+struct WriteAction {
+ WriteAction(unsigned idx) : index(idx) {}
+
+ inline void read() const {
+ ++(data[index].read_count);
+ };
+
+ unsigned index;
+};
+
+
+// The victim
+SRMWQueue<WriteAction> queue(QUEUE_SIZE);
+
+
+class WriteThread : public Thread {
+protected:
+ void _run() {
+ // Wait for everything to get ready
+ sleep(1);
+
+ while (true) {
+ for (unsigned j=0; j < PUSHES_PER_ITERATION; ++j) {
+ unsigned i = rand() % NUM_DATA;
+ if (queue.push(WriteAction(i))) {
+ ++(data[i].write_count);
+ //cout << "WRITE " << i << "\r\n";
+ } else {
+ //cerr << "FAILED WRITE\r\n";
+ }
+ }
+
+ // This thread will never cancel without this here since
+ // this loop is hard RT safe and thus cancellation point free
+ pthread_testcancel();
+ }
+
+ cout << "Writer exiting." << endl;
+ }
+};
+
+
+// Returns 0 if all read count/write count pairs are equal,
+// otherwise how far off total count was
+unsigned
+data_is_sane()
+{
+ unsigned ret = 0;
+ for (unsigned i = 0; i < NUM_DATA; ++i) {
+ unsigned diff = abs(data[i].read_count.get() - data[i].write_count.get());
+ ret += diff;
+ }
+
+ return ret;
+}
+
+
+void
+dump_data()
+{
+ for (unsigned i = 0; i < NUM_DATA; ++i) {
+ cout << i << ":\t" << data[i].read_count.get()
+ << "\t : \t" << data[i].write_count.get();
+ if (data[i].read_count.get() == data[i].write_count.get())
+ cout << "\t OK" << endl;
+ else
+ cout << "\t FAIL" << endl;
+ }
+}
+
+
+int
+main()
+{
+ unsigned long total_processed = 0;
+
+ cout << "Testing size" << endl;
+ for (unsigned i=0; i < queue.capacity(); ++i) {
+ queue.push(i);
+ if (i == queue.capacity()-1) {
+ if (!queue.full()) {
+ cerr << "ERROR: Should be full at " << i
+ << " (size " << queue.capacity() << ")" << endl;
+ return -1;
+ }
+ } else {
+ if (queue.full()) {
+ cerr << "ERROR: Prematurely full at " << i
+ << " (size " << queue.capacity() << ")" << endl;
+ return -1;
+ }
+ }
+ }
+
+ for (unsigned i = 0; i < queue.capacity(); ++i)
+ queue.pop();
+
+ if (!queue.empty()) {
+ cerr << "ERROR: Should be empty" << endl;
+ return -1;
+ }
+
+ cout << "Testing concurrent reading/writing" << endl;
+ vector<WriteThread*> writers(NUM_WRITERS, new WriteThread());
+
+ for (unsigned i=0; i < NUM_WRITERS; ++i) {
+ writers[i]->set_name(string("Writer ") + (char)('0' + i));
+ writers[i]->start();
+ }
+
+ sleep(1);
+
+ // Read
+ unsigned count = 0;
+ for (unsigned i = 0; i < 10000000; ++i) {
+ while (count < queue.capacity() && !queue.empty()) {
+ WriteAction action = queue.front();
+ queue.pop();
+ action.read();
+ ++count;
+ ++total_processed;
+ }
+
+ /*if (count > 0)
+ cout << "Processed " << count << " requests\t\t"
+ << "(total " << total_processed << ")\r\n";
+
+ if (total_processed > 0 && total_processed % 128l == 0)
+ cout << "Total processed: " << total_processed << "\r\n";*/
+ }
+
+ cout << "Processed " << total_processed << " requests" << endl;
+
+ // Stop the writers
+ for (unsigned i=0; i < NUM_WRITERS; ++i)
+ writers[i]->stop();
+
+ //cout << "\n\n****************** DONE *********************\n\n";
+
+ unsigned leftovers = 0;
+
+ // Drain anything left in the queue
+ while (!queue.empty()) {
+ WriteAction action = queue.front();
+ queue.pop();
+ action.read();
+ leftovers++;
+ ++total_processed;
+ }
+
+ if (leftovers > 0)
+ cout << "Processed " << leftovers << " leftovers." << endl;
+
+
+ //cout << "\n\n*********************************************\n\n";
+
+ cout << "Total processed: " << total_processed << endl;
+ if (total_processed > INT_MAX)
+ cout << "(Counter had to wrap)" << endl;
+ else
+ cout << "(Counter did NOT have to wrap)" << endl;
+
+
+ const unsigned diff = data_is_sane();
+
+ if (diff == 0) {
+ return EXIT_SUCCESS;
+ } else {
+ cout << "FAILED BY " << diff << endl;
+ return EXIT_FAILURE;
+ }
+
+ //dump_data();
+
+ return 0;
+}
+
+
+#if 0
+int main()
+{
+ //SRSWQueue<int> q(10);
+ SRMWQueue<int> q(10);
+
+ cout << "New queue. Should be empty: " << q.empty() << endl;
+ cout << "Capacity: " << q.capacity() << endl;
+ //cout << "Fill: " << q.fill() << endl;
+
+ for (uint i=0; i < 5; ++i) {
+ q.push(i);
+ assert(!q.full());
+ q.pop();
+ }
+ cout << "Pushed and popped 5 elements. Queue should be empty: " << q.empty() << endl;
+ //cout << "Fill: " << q.fill() << endl;
+
+ for (uint i=10; i < 20; ++i) {
+ assert(q.push(i));
+ }
+ cout << "Pushed 10 elements. Queue should be full: " << q.full() << endl;
+ //cout << "Fill: " << q.fill() << endl;
+
+ cout << "The digits 10->19 should print: " << endl;
+ while (!q.empty()) {
+ int foo = q.front();
+ q.pop();
+ cout << "Popped: " << foo << endl;
+ }
+ cout << "Queue should be empty: " << q.empty() << endl;
+ //cout << "Fill: " << q.fill() << endl;
+
+ cout << "Attempting to add eleven elements to queue of size 10. Only first 10 should succeed:" << endl;
+ for (uint i=20; i <= 39; ++i) {
+ cout << i;
+ //cout << " - Fill: " << q.fill();
+ cout << " - full: " << q.full();
+ cout << ", succeeded: " << q.push(i) << endl;
+ }
+
+ return 0;
+}
+#endif
+