From e72d4813bbe1b4f1ebb6569ea1464bbe963d8831 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sat, 16 Mar 2019 22:31:41 +0100 Subject: Rewrite test framework --- extras/autowaf.py | 482 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 277 insertions(+), 205 deletions(-) diff --git a/extras/autowaf.py b/extras/autowaf.py index 1ad7198..3717ba4 100644 --- a/extras/autowaf.py +++ b/extras/autowaf.py @@ -10,6 +10,7 @@ from waflib.TaskGen import feature, before, after global g_is_child g_is_child = False +NONEMPTY = -10 if sys.platform == 'win32': lib_path_name = 'PATH' @@ -782,19 +783,6 @@ class ExecutionEnvironment: def __exit__(self, type, value, traceback): os.environ = self.original_environ -def cd_to_build_dir(ctx, appname): - top_level = (len(ctx.stack_path) > 1) - if top_level: - os.chdir(os.path.join('build', appname)) - else: - os.chdir('build') - -def cd_to_orig_dir(ctx, child): - if child: - os.chdir(os.path.join('..', '..')) - else: - os.chdir('..') - def show_diff(from_lines, to_lines, from_filename, to_filename): import difflib import sys @@ -829,211 +817,295 @@ def bench_time(): else: return time.time() +class TestOutput: + """Test output that is truthy if result is as expected""" + def __init__(self, expected, result=None): + self.stdout = self.stderr = None + self.expected = expected + self.result = result + + def __bool__(self): + return self.expected is None or self.result == self.expected + + __nonzero__ = __bool__ + +def is_string(s): + if sys.version_info[0] < 3: + return isinstance(s, basestring) + return isinstance(s, str) + +class TestScope: + """Scope for running tests that maintains pass/fail statistics""" + def __init__(self, tst, name, defaults): + self.tst = tst + self.name = name + self.defaults = defaults + self.n_failed = 0 + self.n_total = 0 + + def run(self, test, **kwargs): + if callable(test): + output = self._run_callable(test, **kwargs) + elif type(test) == list: + if 'name' not in kwargs: + import pipes + kwargs['name'] = ' '.join(map(pipes.quote, test)) + + output = self._run_command(test, **kwargs) + else: + raise Exception("Unknown test type") + + if not output: + self.tst.log_bad('FAILED', kwargs['name']) + + return self.tst.test_result(output) + + def _run_callable(self, test, **kwargs): + expected = kwargs['expected'] if 'expected' in kwargs else True + return TestOutput(expected, test()) + + def _run_command(self, test, **kwargs): + if 'stderr' in kwargs and kwargs['stderr'] == NONEMPTY: + # Run with a temp file for stderr and check that it is non-empty + import tempfile + with tempfile.TemporaryFile(mode='w') as stderr: + kwargs['stderr'] = stderr + output = self.run(test, **kwargs) + return (output if not output else + self.run( + lambda: stderr.tell() > 0, + name=kwargs['name'] + ' error message')) + + try: + # Run with stdout and stderr set to the appropriate streams + out_stream = self._stream('stdout', kwargs) + err_stream = self._stream('stderr', kwargs) + return self._exec(test, **kwargs) + finally: + out_stream = out_stream.close() if out_stream else None + err_stream = err_stream.close() if err_stream else None + + def _stream(self, stream_name, kwargs): + s = kwargs[stream_name] if stream_name in kwargs else None + if is_string(s): + kwargs[stream_name] = open(s, 'wb') + return kwargs[stream_name] + return None + + def _exec(self, + test, + expected=0, + name='', + stdin=None, + stdout=None, + stderr=None, + verbosity=1): + def stream(s): + return open(s, 'wb') if type(s) == str else s + + if verbosity > 1: + self.tst.log_good('RUN ', name) + + if Options.options.test_wrapper: + test = [Options.options.test_wrapper] + test + + output = TestOutput(expected) + with open(os.devnull, 'wb') as null: + out = null if verbosity < 3 and not stdout else stdout + err = null if verbosity < 2 and not stderr else stderr + proc = subprocess.Popen(test, stdin=stdin, stdout=out, stderr=err) + output.stdout, output.stderr = proc.communicate() + output.result = proc.returncode + + if output and verbosity > 0: + self.tst.log_good(' OK', name) + + return output + class TestContext(Build.BuildContext): "runs test suite" fun = cmd = 'test' -def pre_test(ctx, appname, dirs=['src']): - Logs.pprint('GREEN', '\n[==========] Running %s tests' % appname) - - if not hasattr(ctx, 'autowaf_tests_total'): - ctx.autowaf_tests_start_time = bench_time() - ctx.autowaf_tests_total = 0 - ctx.autowaf_tests_failed = 0 - ctx.autowaf_local_tests_total = 0 - ctx.autowaf_local_tests_failed = 0 - ctx.autowaf_tests = {} - - ctx.autowaf_tests[appname] = {'total': 0, 'failed': 0} - - cd_to_build_dir(ctx, appname) - if not ctx.env.NO_COVERAGE: - diropts = '' - for i in dirs: - diropts += ' -d ' + i - clear_log = open('lcov-clear.log', 'w') + def __init__(self, **kwargs): + super(TestContext, self).__init__(**kwargs) + self.start_time = bench_time() + self.recursed = False + + defaults = {'verbosity': Options.options.verbose} + self.stack = [TestScope(self, Context.g_module.APPNAME, defaults)] + + def defaults(self): + return self.stack[-1].defaults + + def finalize(self): + if self.stack[-1].n_failed > 0: + sys.exit(1) + + super(TestContext, self).finalize() + + def run(self, test, **kwargs): + self.stack[-1].run(test, **self.args(**kwargs)) + + def log_good(self, title, fmt, *args): + Logs.pprint('GREEN', '[%s] %s' % (title.center(10), fmt % args)) + + def log_bad(self, title, fmt, *args): + Logs.pprint('RED', '[%s] %s' % (title.center(10), fmt % args)) + + def pre_recurse(self, node): + wscript_module = Context.load_module(node.abspath()) + group_name = wscript_module.APPNAME + self.stack.append(TestScope(self, group_name, self.defaults())) + self.recursed = True + + bld_dir = node.get_bld().parent + if bld_dir != self.path.get_bld(): + Logs.info('') + + Logs.info("Waf: Entering directory `%s'\n", bld_dir) + os.chdir(str(bld_dir)) + + if str(node.parent) == Context.top_dir: + self.clear_coverage() + + self.log_good('=' * 10, 'Running %s tests', group_name) + super(TestContext, self).pre_recurse(node) + + def test_result(self, success): + self.stack[-1].n_total += 1 + self.stack[-1].n_failed += 1 if not success else 0 + return success + + def pop(self): + scope = self.stack.pop() + self.stack[-1].n_total += scope.n_total + self.stack[-1].n_failed += scope.n_failed + return scope + + def post_recurse(self, node): + super(TestContext, self).post_recurse(node) + + scope = self.pop() + duration = (bench_time() - self.start_time) * 1000.0 + if self.recursed and len(self.stack) == 1: + Logs.info('') + + self.log_good('=' * 10, '%d tests from %s ran (%d ms total)', + scope.n_total, scope.name, duration) + + if not self.env.NO_COVERAGE: + if str(node.parent) == Context.top_dir: + self.gen_coverage() + + if os.path.exists('coverage/index.html'): + self.log_good('COVERAGE', '', + os.path.abspath('coverage/index.html')) + + successes = scope.n_total - scope.n_failed + Logs.pprint('GREEN', '[ PASSED ] %d tests' % successes) + if scope.n_failed > 0: + Logs.pprint('RED', '[ FAILED ] %d tests' % scope.n_failed) + + os.chdir(str(self.path)) + + def execute(self): + self.restore() + if not self.all_envs: + self.load_envs() + + if not self.env.BUILD_TESTS: + self.fatal('Configuration does not include tests') + + with ExecutionEnvironment(self.env.AUTOWAF_RUN_ENV) as env: + if self.defaults()['verbosity'] > 0: + Logs.pprint('GREEN', str(env) + '\n') + self.recurse([self.run_dir]) + + def src_path(self, path): + return os.path.relpath(os.path.join(str(self.path), path)) + + def args(self, **kwargs): + all_kwargs = self.defaults().copy() + all_kwargs.update(kwargs) + return all_kwargs + + def test_group(self, name, **kwargs): + return TestGroup( + self, self.stack[-1].name, name, **self.args(**kwargs)) + + def set_test_defaults(self, **kwargs): + """Set default arguments to be passed to all tests""" + self.stack[-1].defaults.update(kwargs) + + def clear_coverage(self): + """Zero old coverage data""" try: - try: - # Clear coverage data - subprocess.call(('lcov %s -z' % diropts).split(), - stdout=clear_log, stderr=clear_log) - except Exception: - Logs.warn('Failed to run lcov, no coverage report generated') - finally: - clear_log.close() - -class TestFailed(Exception): - pass - -def post_test(ctx, appname, dirs=['src'], remove=['*boost*', 'c++*']): - if not ctx.env.NO_COVERAGE: - diropts = '' - for i in dirs: - diropts += ' -d ' + i - coverage_log = open('lcov-coverage.log', 'w') - coverage_lcov = open('coverage.lcov', 'w') - coverage_stripped_lcov = open('coverage-stripped.lcov', 'w') + with open('cov-clear.log', 'w') as log: + subprocess.call(['lcov', '-z', '-d', str(self.path)], + stdout=log, stderr=log) + + except Exception: + Logs.warn('Failed to run lcov to clear old coverage data') + + def gen_coverage(self): + """Generate coverage data and report""" try: - try: - base = '.' - if g_is_child: - base = '..' - - # Generate coverage data - lcov_cmd = 'lcov -c %s -b %s' % (diropts, base) - if ctx.env.LLVM_COV: - lcov_cmd += ' --gcov-tool %s' % ctx.env.LLVM_COV[0] - subprocess.call(lcov_cmd.split(), - stdout=coverage_lcov, stderr=coverage_log) - - # Strip unwanted stuff - subprocess.call( - ['lcov', '--remove', 'coverage.lcov'] + remove, - stdout=coverage_stripped_lcov, stderr=coverage_log) - - # Generate HTML coverage output - if not os.path.isdir('coverage'): - os.makedirs('coverage') - subprocess.call( - 'genhtml -o coverage coverage-stripped.lcov'.split(), - stdout=coverage_log, stderr=coverage_log) - - except Exception: - Logs.warn('Failed to run lcov, no coverage report generated') - finally: - coverage_stripped_lcov.close() - coverage_lcov.close() - coverage_log.close() - - duration = (bench_time() - ctx.autowaf_tests_start_time) * 1000.0 - total_tests = ctx.autowaf_tests[appname]['total'] - failed_tests = ctx.autowaf_tests[appname]['failed'] - passed_tests = total_tests - failed_tests - Logs.pprint('GREEN', '\n[==========] %d tests from %s ran (%d ms total)' % ( - total_tests, appname, duration)) - if not ctx.env.NO_COVERAGE: - Logs.pprint('GREEN', '[----------] Coverage: ' - % os.path.abspath('coverage/index.html')) - - Logs.pprint('GREEN', '[ PASSED ] %d tests' % passed_tests) - if failed_tests > 0: - Logs.pprint('RED', '[ FAILED ] %d tests' % failed_tests) - raise TestFailed('Tests from %s failed' % appname) - Logs.pprint('', '') - - top_level = (len(ctx.stack_path) > 1) - if top_level: - cd_to_orig_dir(ctx, top_level) - -def run_test(ctx, - appname, - test, - desired_status=0, - dirs=['src'], - name='', - header=False, - quiet=False): - """Run an individual test. - - `test` is either a shell command string, or a list of [name, return status] - for displaying tests implemented in the calling Python code. - """ + with open('cov.lcov', 'w') as out: + with open('cov.log', 'w') as err: + subprocess.call(['lcov', '-c', '--no-external', + '--rc', 'lcov_branch_coverage=1', + '-b', '.', + '-d', str(self.path)], + stdout=out, stderr=err) + + if not os.path.isdir('coverage'): + os.makedirs('coverage') + + with open('genhtml.log', 'w') as log: + subprocess.call(['genhtml', + '-o', 'coverage', + '--rc', 'genhtml_branch_coverage=1', + 'cov.lcov'], + stdout=log, stderr=log) - ctx.autowaf_tests_total += 1 - ctx.autowaf_local_tests_total += 1 - ctx.autowaf_tests[appname]['total'] += 1 + except Exception: + Logs.warn('Failed to run lcov to generate coverage report') - out = (None, None) - if type(test) == list: - name = test[0] - returncode = test[1] - elif callable(test): - returncode = test() - else: - s = test - if isinstance(test, type([])): - s = ' '.join(test) - if header and not quiet: - Logs.pprint('Green', '\n[ RUN ] %s' % s) - cmd = test - if Options.options.test_wrapper: - cmd = Options.options.test_wrapper + ' ' + test - if name == '': - name = test - - proc = subprocess.Popen(cmd, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out = proc.communicate() - returncode = proc.returncode - - success = desired_status is None or returncode == desired_status - if success: - if not quiet: - Logs.pprint('GREEN', '[ OK ] %s' % name) - else: - Logs.pprint('RED', '[ FAILED ] %s' % name) - ctx.autowaf_tests_failed += 1 - ctx.autowaf_local_tests_failed += 1 - ctx.autowaf_tests[appname]['failed'] += 1 - if type(test) != list and not callable(test): - Logs.pprint('RED', test) - - if Options.options.verbose and type(test) != list and not callable(test): - sys.stdout.write(out[0].decode('utf-8')) - sys.stderr.write(out[1].decode('utf-8')) - - return (success, out) - -def tests_name(ctx, appname, name='*'): - if name == '*': - return appname - else: - return '%s.%s' % (appname, name) +class TestGroup: + def __init__(self, tst, suitename, name, **kwargs): + self.tst = tst + self.suitename = suitename + self.name = name + self.kwargs = kwargs + self.start_time = bench_time() + tst.stack.append(TestScope(tst, name, tst.defaults())) -def begin_tests(ctx, appname, name='*'): - ctx.autowaf_local_tests_failed = 0 - ctx.autowaf_local_tests_total = 0 - ctx.autowaf_local_tests_start_time = bench_time() - Logs.pprint('GREEN', '\n[----------] %s' % ( - tests_name(ctx, appname, name))) + def label(self): + return self.suitename + '.%s' % self.name if self.name else '' - class Handle: - def __enter__(self): - pass + def args(self, **kwargs): + all_kwargs = self.tst.args(**self.kwargs) + all_kwargs.update(kwargs) + return all_kwargs - def __exit__(self, type, value, traceback): - end_tests(ctx, appname, name) + def run(self, test, **kwargs): + self.tst.stack[-1].run(test, **self.args(**kwargs)) - return Handle() + def __enter__(self): + if 'verbosity' in self.kwargs and self.kwargs['verbosity'] > 0: + self.tst.log_good('-' * 10, self.label()) + return self -def end_tests(ctx, appname, name='*'): - duration = (bench_time() - ctx.autowaf_local_tests_start_time) * 1000.0 - total = ctx.autowaf_local_tests_total - failures = ctx.autowaf_local_tests_failed - if failures == 0: - Logs.pprint('GREEN', '[----------] %d tests from %s (%d ms total)' % ( - ctx.autowaf_local_tests_total, tests_name(ctx, appname, name), duration)) - else: - Logs.pprint('RED', '[----------] %d/%d tests from %s (%d ms total)' % ( - total - failures, total, tests_name(ctx, appname, name), duration)) - -def run_tests(ctx, - appname, - tests, - desired_status=0, - dirs=['src'], - name='*', - headers=False): - begin_tests(ctx, appname, name) - - diropts = '' - for i in dirs: - diropts += ' -d ' + i - - for i in tests: - run_test(ctx, appname, i, desired_status, dirs, i, headers) - - end_tests(ctx, appname, name) + def __exit__(self, type, value, traceback): + duration = (bench_time() - self.start_time) * 1000.0 + scope = self.tst.pop() + n_passed = scope.n_total - scope.n_failed + if scope.n_failed == 0: + self.tst.log_good('-' * 10, '%d tests from %s (%d ms total)', + scope.n_total, self.label(), duration) + else: + self.tst.log_bad('-' * 10, '%d/%d tests from %s (%d ms total)', + n_passed, scope.n_total, self.label(), duration) def run_ldconfig(ctx): should_run = (ctx.cmd == 'install' and -- cgit v1.2.1