From 32874d5dcb4867d966f25cd9b2953c9644d97027 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Tue, 23 Jan 2007 02:02:39 +0000 Subject: More work on multi-writer queue (SRMWQueue), proper unit test. git-svn-id: http://svn.drobilla.net/lad/raul@267 a436a847-0d15-0410-975c-d299462d15a1 --- tests/queue_test.cpp | 200 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) (limited to 'tests') diff --git a/tests/queue_test.cpp b/tests/queue_test.cpp index dc9928d..0b8bbd3 100644 --- a/tests/queue_test.cpp +++ b/tests/queue_test.cpp @@ -1,12 +1,210 @@ #include #include +#include +#include +#include +#include +#include +#include #include "raul/SRSWQueue.h" #include "raul/SRMWQueue.h" +#include "raul/Thread.h" +#include "raul/AtomicInt.h" using namespace std; using namespace Raul; +static const unsigned NUM_DATA = 10; +static const unsigned QUEUE_SIZE = 1024*1024; +static const unsigned NUM_WRITERS = 2; +static const unsigned PUSHES_PER_ITERATION = 2; +// Data to read/write using actions pumped through the queue +struct Record { + Record() : read_count(0), write_count(0) {} + + AtomicInt read_count; + AtomicInt write_count; +}; + +Record data[NUM_DATA]; + + +// Actions pumped through the queue to manipulate data +struct WriteAction { + WriteAction(unsigned idx) + : index(idx)/*, has_read(false)*/ {} + + inline void read() const { + //cout << "READ " << index << "\r\n"; + //assert(!has_read); + ++(data[index].read_count); + //has_read = true; + }; + + unsigned index; + //bool has_read; +}; + + +// The victim +SRMWQueue queue(QUEUE_SIZE); + + +class WriteThread : public Thread { +protected: + void _run() { + + cout << "Writer starting.\r\n"; + + // Wait for everything to get ready + sleep(2); + + while (true) { + for (unsigned j=0; j < PUSHES_PER_ITERATION; ++j) { + unsigned i = rand() % NUM_DATA; + if (queue.push(WriteAction(i))) { + ++(data[i].write_count); + //cout << "WRITE " << i << "\r\n"; + } else { + cerr << "FAILED WRITE\r\n"; + } + } + + // FIXME: remove! + //if (rand() % 20) + // usleep(1); + + // This thread will never cancel without this here since + // all the stuff about is cancellation point free + // (good! RT safe) + pthread_testcancel(); + } + + cout << "Writer exiting." << endl; + } +}; + + +// Returns 0 if all read count/write count pairs are equal, +// otherwise how far off total count was +unsigned +data_is_sane() +{ + unsigned ret = 0; + for (unsigned i=0; i < NUM_DATA; ++i) { + unsigned diff = abs(data[i].read_count.get() - data[i].write_count.get()); + ret += diff; + } + + return ret; +} + + +void +dump_data() +{ + for (unsigned i=0; i < NUM_DATA; ++i) { + cout << i << ":\t" << data[i].read_count.get() + << "\t : \t" << data[i].write_count.get(); + if (data[i].read_count.get() == data[i].write_count.get()) + cout << "\t OK" << endl; + else + cout << "\t FAIL" << endl; + } +} + + + +int main() +{ + unsigned long total_processed = 0; + + vector writers(NUM_WRITERS, new WriteThread()); + + struct termios orig_term; + struct termios raw_term; + + cfmakeraw(&raw_term); + if (tcgetattr(0, &orig_term) != 0) return 1; //save terminal settings + if (tcsetattr(0, TCSANOW, &raw_term) != 0) return 1; //set to raw + fcntl(0, F_SETFL, O_NONBLOCK); //set to nonblocking IO on stdin + + + for (unsigned i=0; i < NUM_WRITERS; ++i) { + writers[i]->set_name(string("Writer ") + (char)('0' + i)); + writers[i]->start(); + } + + // Read + while (getchar() == -1) { + unsigned count = 0; + while (count < queue.capacity() && !queue.empty()) { + WriteAction action = queue.front(); + queue.pop(); + action.read(); + ++count; + ++total_processed; + } + + /*if (count > 0) + cout << "Processed " << count << " requests\t\t" + << "(total " << total_processed << ")\r\n";*/ + + //if (total_processed > 0 && total_processed % 128l == 0) + // cout << "Total processed: " << total_processed << "\r\n"; + } + + if (tcsetattr(0, TCSANOW, &orig_term) != 0) return 1; //restore + + cout << "Finishing." << endl; + + // Stop the writers + for (unsigned i=0; i < NUM_WRITERS; ++i) + writers[i]->stop(); + + cout << "\n\n****************** DONE *********************\n\n"; + + unsigned leftovers = 0; + + // Drain anything left in the queue + while (!queue.empty()) { + WriteAction action = queue.front(); + queue.pop(); + action.read(); + leftovers++; + ++total_processed; + } + + if (leftovers > 0) + cout << "Processed " << leftovers << " leftovers." << endl; + + + cout << "\n\n*********************************************\n\n"; + + cout << "Total processed: " << total_processed << endl; + if (total_processed > INT_MAX) + cout << "(Counter had to wrap)" << endl; + else + cout << "(Counter did NOT have to wrap)" << endl; + + + unsigned diff = data_is_sane(); + + if (diff == 0) { + cout << "PASS" << endl; + } else { + cout << "FAILED BY " << diff << endl; + // dump_data(); + } + + dump_data(); + + return 0; +} + + +#if 0 int main() { //SRSWQueue q(10); @@ -49,3 +247,5 @@ int main() return 0; } +#endif + -- cgit v1.2.1