#include #include #include #include #include #include #include #include #include "raul/SRSWQueue.hpp" #include "raul/SRMWQueue.hpp" #include "raul/Thread.hpp" #include "raul/AtomicInt.hpp" using namespace std; using namespace Raul; static const unsigned NUM_DATA = 10; static const unsigned QUEUE_SIZE = 1024*1024; static const unsigned NUM_WRITERS = 2; static const unsigned PUSHES_PER_ITERATION = 2; // Data to read/write using actions pumped through the queue struct Record { Record() : read_count(0), write_count(0) {} AtomicInt read_count; AtomicInt write_count; }; Record data[NUM_DATA]; // Actions pumped through the queue to manipulate data struct WriteAction { WriteAction(unsigned idx) : index(idx)/*, has_read(false)*/ {} inline void read() const { //cout << "READ " << index << "\r\n"; //assert(!has_read); ++(data[index].read_count); //has_read = true; }; unsigned index; //bool has_read; }; // The victim SRMWQueue queue(QUEUE_SIZE); class WriteThread : public Thread { protected: void _run() { cout << "Writer starting.\r\n"; // Wait for everything to get ready sleep(2); while (true) { for (unsigned j=0; j < PUSHES_PER_ITERATION; ++j) { unsigned i = rand() % NUM_DATA; if (queue.push(WriteAction(i))) { ++(data[i].write_count); //cout << "WRITE " << i << "\r\n"; } else { cerr << "FAILED WRITE\r\n"; } } // FIXME: remove! //if (rand() % 20) // usleep(1); // This thread will never cancel without this here since // all the stuff about is cancellation point free // (good! RT safe) pthread_testcancel(); } cout << "Writer exiting." << endl; } }; // Returns 0 if all read count/write count pairs are equal, // otherwise how far off total count was unsigned data_is_sane() { unsigned ret = 0; for (unsigned i=0; i < NUM_DATA; ++i) { unsigned diff = abs(data[i].read_count.get() - data[i].write_count.get()); ret += diff; } return ret; } void dump_data() { for (unsigned i=0; i < NUM_DATA; ++i) { cout << i << ":\t" << data[i].read_count.get() << "\t : \t" << data[i].write_count.get(); if (data[i].read_count.get() == data[i].write_count.get()) cout << "\t OK" << endl; else cout << "\t FAIL" << endl; } } int main() { unsigned long total_processed = 0; cout << "Testing size" << endl; for (unsigned i=0; i < queue.capacity(); ++i) { queue.push(i); if (i == queue.capacity()-1) { if (!queue.full()) { cerr << "ERROR: Should be full at " << i << " (size " << queue.capacity() << ")" << endl; return -1; } } else { if (queue.full()) { cerr << "ERROR: Prematurely full at " << i << " (size " << queue.capacity() << ")" << endl; return -1; } } } for (unsigned i=0; i < queue.capacity(); ++i) queue.pop(); if (!queue.empty()) { cerr << "ERROR: Should be empty" << endl; return -1; } cout << "Testing concurrent reading/writing" << endl; vector writers(NUM_WRITERS, new WriteThread()); struct termios orig_term; struct termios raw_term; cfmakeraw(&raw_term); if (tcgetattr(0, &orig_term) != 0) return 1; //save terminal settings if (tcsetattr(0, TCSANOW, &raw_term) != 0) return 1; //set to raw fcntl(0, F_SETFL, O_NONBLOCK); //set to nonblocking IO on stdin for (unsigned i=0; i < NUM_WRITERS; ++i) { writers[i]->set_name(string("Writer ") + (char)('0' + i)); writers[i]->start(); } // Read while (getchar() == -1) { unsigned count = 0; while (count < queue.capacity() && !queue.empty()) { WriteAction action = queue.front(); queue.pop(); action.read(); ++count; ++total_processed; } /*if (count > 0) cout << "Processed " << count << " requests\t\t" << "(total " << total_processed << ")\r\n";*/ //if (total_processed > 0 && total_processed % 128l == 0) // cout << "Total processed: " << total_processed << "\r\n"; } if (tcsetattr(0, TCSANOW, &orig_term) != 0) return 1; //restore cout << "Finishing." << endl; // Stop the writers for (unsigned i=0; i < NUM_WRITERS; ++i) writers[i]->stop(); cout << "\n\n****************** DONE *********************\n\n"; unsigned leftovers = 0; // Drain anything left in the queue while (!queue.empty()) { WriteAction action = queue.front(); queue.pop(); action.read(); leftovers++; ++total_processed; } if (leftovers > 0) cout << "Processed " << leftovers << " leftovers." << endl; cout << "\n\n*********************************************\n\n"; cout << "Total processed: " << total_processed << endl; if (total_processed > INT_MAX) cout << "(Counter had to wrap)" << endl; else cout << "(Counter did NOT have to wrap)" << endl; unsigned diff = data_is_sane(); if (diff == 0) { cout << "PASS" << endl; } else { cout << "FAILED BY " << diff << endl; // dump_data(); } dump_data(); return 0; } #if 0 int main() { //SRSWQueue q(10); SRMWQueue q(10); cout << "New queue. Should be empty: " << q.empty() << endl; cout << "Capacity: " << q.capacity() << endl; //cout << "Fill: " << q.fill() << endl; for (uint i=0; i < 5; ++i) { q.push(i); assert(!q.full()); q.pop(); } cout << "Pushed and popped 5 elements. Queue should be empty: " << q.empty() << endl; //cout << "Fill: " << q.fill() << endl; for (uint i=10; i < 20; ++i) { assert(q.push(i)); } cout << "Pushed 10 elements. Queue should be full: " << q.full() << endl; //cout << "Fill: " << q.fill() << endl; cout << "The digits 10->19 should print: " << endl; while (!q.empty()) { int foo = q.front(); q.pop(); cout << "Popped: " << foo << endl; } cout << "Queue should be empty: " << q.empty() << endl; //cout << "Fill: " << q.fill() << endl; cout << "Attempting to add eleven elements to queue of size 10. Only first 10 should succeed:" << endl; for (uint i=20; i <= 39; ++i) { cout << i; //cout << " - Fill: " << q.fill(); cout << " - full: " << q.full(); cout << ", succeeded: " << q.push(i) << endl; } return 0; } #endif