summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2019-03-16 22:31:41 +0100
committerDavid Robillard <d@drobilla.net>2019-03-17 00:31:46 +0100
commite72d4813bbe1b4f1ebb6569ea1464bbe963d8831 (patch)
treee5b40caa0947d9acc9c3511001edbe94a600adb0
parent4602f03c965debfc86b9c46b2e0fff785d44214e (diff)
downloadautowaf-e72d4813bbe1b4f1ebb6569ea1464bbe963d8831.tar.gz
autowaf-e72d4813bbe1b4f1ebb6569ea1464bbe963d8831.tar.bz2
autowaf-e72d4813bbe1b4f1ebb6569ea1464bbe963d8831.zip
Rewrite test framework
-rw-r--r--extras/autowaf.py482
1 files changed, 277 insertions, 205 deletions
diff --git a/extras/autowaf.py b/extras/autowaf.py
index 1ad7198..3717ba4 100644
--- a/extras/autowaf.py
+++ b/extras/autowaf.py
@@ -10,6 +10,7 @@ from waflib.TaskGen import feature, before, after
global g_is_child
g_is_child = False
+NONEMPTY = -10
if sys.platform == 'win32':
lib_path_name = 'PATH'
@@ -782,19 +783,6 @@ class ExecutionEnvironment:
def __exit__(self, type, value, traceback):
os.environ = self.original_environ
-def cd_to_build_dir(ctx, appname):
- top_level = (len(ctx.stack_path) > 1)
- if top_level:
- os.chdir(os.path.join('build', appname))
- else:
- os.chdir('build')
-
-def cd_to_orig_dir(ctx, child):
- if child:
- os.chdir(os.path.join('..', '..'))
- else:
- os.chdir('..')
-
def show_diff(from_lines, to_lines, from_filename, to_filename):
import difflib
import sys
@@ -829,211 +817,295 @@ def bench_time():
else:
return time.time()
+class TestOutput:
+ """Test output that is truthy if result is as expected"""
+ def __init__(self, expected, result=None):
+ self.stdout = self.stderr = None
+ self.expected = expected
+ self.result = result
+
+ def __bool__(self):
+ return self.expected is None or self.result == self.expected
+
+ __nonzero__ = __bool__
+
+def is_string(s):
+ if sys.version_info[0] < 3:
+ return isinstance(s, basestring)
+ return isinstance(s, str)
+
+class TestScope:
+ """Scope for running tests that maintains pass/fail statistics"""
+ def __init__(self, tst, name, defaults):
+ self.tst = tst
+ self.name = name
+ self.defaults = defaults
+ self.n_failed = 0
+ self.n_total = 0
+
+ def run(self, test, **kwargs):
+ if callable(test):
+ output = self._run_callable(test, **kwargs)
+ elif type(test) == list:
+ if 'name' not in kwargs:
+ import pipes
+ kwargs['name'] = ' '.join(map(pipes.quote, test))
+
+ output = self._run_command(test, **kwargs)
+ else:
+ raise Exception("Unknown test type")
+
+ if not output:
+ self.tst.log_bad('FAILED', kwargs['name'])
+
+ return self.tst.test_result(output)
+
+ def _run_callable(self, test, **kwargs):
+ expected = kwargs['expected'] if 'expected' in kwargs else True
+ return TestOutput(expected, test())
+
+ def _run_command(self, test, **kwargs):
+ if 'stderr' in kwargs and kwargs['stderr'] == NONEMPTY:
+ # Run with a temp file for stderr and check that it is non-empty
+ import tempfile
+ with tempfile.TemporaryFile(mode='w') as stderr:
+ kwargs['stderr'] = stderr
+ output = self.run(test, **kwargs)
+ return (output if not output else
+ self.run(
+ lambda: stderr.tell() > 0,
+ name=kwargs['name'] + ' error message'))
+
+ try:
+ # Run with stdout and stderr set to the appropriate streams
+ out_stream = self._stream('stdout', kwargs)
+ err_stream = self._stream('stderr', kwargs)
+ return self._exec(test, **kwargs)
+ finally:
+ out_stream = out_stream.close() if out_stream else None
+ err_stream = err_stream.close() if err_stream else None
+
+ def _stream(self, stream_name, kwargs):
+ s = kwargs[stream_name] if stream_name in kwargs else None
+ if is_string(s):
+ kwargs[stream_name] = open(s, 'wb')
+ return kwargs[stream_name]
+ return None
+
+ def _exec(self,
+ test,
+ expected=0,
+ name='',
+ stdin=None,
+ stdout=None,
+ stderr=None,
+ verbosity=1):
+ def stream(s):
+ return open(s, 'wb') if type(s) == str else s
+
+ if verbosity > 1:
+ self.tst.log_good('RUN ', name)
+
+ if Options.options.test_wrapper:
+ test = [Options.options.test_wrapper] + test
+
+ output = TestOutput(expected)
+ with open(os.devnull, 'wb') as null:
+ out = null if verbosity < 3 and not stdout else stdout
+ err = null if verbosity < 2 and not stderr else stderr
+ proc = subprocess.Popen(test, stdin=stdin, stdout=out, stderr=err)
+ output.stdout, output.stderr = proc.communicate()
+ output.result = proc.returncode
+
+ if output and verbosity > 0:
+ self.tst.log_good(' OK', name)
+
+ return output
+
class TestContext(Build.BuildContext):
"runs test suite"
fun = cmd = 'test'
-def pre_test(ctx, appname, dirs=['src']):
- Logs.pprint('GREEN', '\n[==========] Running %s tests' % appname)
-
- if not hasattr(ctx, 'autowaf_tests_total'):
- ctx.autowaf_tests_start_time = bench_time()
- ctx.autowaf_tests_total = 0
- ctx.autowaf_tests_failed = 0
- ctx.autowaf_local_tests_total = 0
- ctx.autowaf_local_tests_failed = 0
- ctx.autowaf_tests = {}
-
- ctx.autowaf_tests[appname] = {'total': 0, 'failed': 0}
-
- cd_to_build_dir(ctx, appname)
- if not ctx.env.NO_COVERAGE:
- diropts = ''
- for i in dirs:
- diropts += ' -d ' + i
- clear_log = open('lcov-clear.log', 'w')
+ def __init__(self, **kwargs):
+ super(TestContext, self).__init__(**kwargs)
+ self.start_time = bench_time()
+ self.recursed = False
+
+ defaults = {'verbosity': Options.options.verbose}
+ self.stack = [TestScope(self, Context.g_module.APPNAME, defaults)]
+
+ def defaults(self):
+ return self.stack[-1].defaults
+
+ def finalize(self):
+ if self.stack[-1].n_failed > 0:
+ sys.exit(1)
+
+ super(TestContext, self).finalize()
+
+ def run(self, test, **kwargs):
+ self.stack[-1].run(test, **self.args(**kwargs))
+
+ def log_good(self, title, fmt, *args):
+ Logs.pprint('GREEN', '[%s] %s' % (title.center(10), fmt % args))
+
+ def log_bad(self, title, fmt, *args):
+ Logs.pprint('RED', '[%s] %s' % (title.center(10), fmt % args))
+
+ def pre_recurse(self, node):
+ wscript_module = Context.load_module(node.abspath())
+ group_name = wscript_module.APPNAME
+ self.stack.append(TestScope(self, group_name, self.defaults()))
+ self.recursed = True
+
+ bld_dir = node.get_bld().parent
+ if bld_dir != self.path.get_bld():
+ Logs.info('')
+
+ Logs.info("Waf: Entering directory `%s'\n", bld_dir)
+ os.chdir(str(bld_dir))
+
+ if str(node.parent) == Context.top_dir:
+ self.clear_coverage()
+
+ self.log_good('=' * 10, 'Running %s tests', group_name)
+ super(TestContext, self).pre_recurse(node)
+
+ def test_result(self, success):
+ self.stack[-1].n_total += 1
+ self.stack[-1].n_failed += 1 if not success else 0
+ return success
+
+ def pop(self):
+ scope = self.stack.pop()
+ self.stack[-1].n_total += scope.n_total
+ self.stack[-1].n_failed += scope.n_failed
+ return scope
+
+ def post_recurse(self, node):
+ super(TestContext, self).post_recurse(node)
+
+ scope = self.pop()
+ duration = (bench_time() - self.start_time) * 1000.0
+ if self.recursed and len(self.stack) == 1:
+ Logs.info('')
+
+ self.log_good('=' * 10, '%d tests from %s ran (%d ms total)',
+ scope.n_total, scope.name, duration)
+
+ if not self.env.NO_COVERAGE:
+ if str(node.parent) == Context.top_dir:
+ self.gen_coverage()
+
+ if os.path.exists('coverage/index.html'):
+ self.log_good('COVERAGE', '<file://%s>',
+ os.path.abspath('coverage/index.html'))
+
+ successes = scope.n_total - scope.n_failed
+ Logs.pprint('GREEN', '[ PASSED ] %d tests' % successes)
+ if scope.n_failed > 0:
+ Logs.pprint('RED', '[ FAILED ] %d tests' % scope.n_failed)
+
+ os.chdir(str(self.path))
+
+ def execute(self):
+ self.restore()
+ if not self.all_envs:
+ self.load_envs()
+
+ if not self.env.BUILD_TESTS:
+ self.fatal('Configuration does not include tests')
+
+ with ExecutionEnvironment(self.env.AUTOWAF_RUN_ENV) as env:
+ if self.defaults()['verbosity'] > 0:
+ Logs.pprint('GREEN', str(env) + '\n')
+ self.recurse([self.run_dir])
+
+ def src_path(self, path):
+ return os.path.relpath(os.path.join(str(self.path), path))
+
+ def args(self, **kwargs):
+ all_kwargs = self.defaults().copy()
+ all_kwargs.update(kwargs)
+ return all_kwargs
+
+ def test_group(self, name, **kwargs):
+ return TestGroup(
+ self, self.stack[-1].name, name, **self.args(**kwargs))
+
+ def set_test_defaults(self, **kwargs):
+ """Set default arguments to be passed to all tests"""
+ self.stack[-1].defaults.update(kwargs)
+
+ def clear_coverage(self):
+ """Zero old coverage data"""
try:
- try:
- # Clear coverage data
- subprocess.call(('lcov %s -z' % diropts).split(),
- stdout=clear_log, stderr=clear_log)
- except Exception:
- Logs.warn('Failed to run lcov, no coverage report generated')
- finally:
- clear_log.close()
-
-class TestFailed(Exception):
- pass
-
-def post_test(ctx, appname, dirs=['src'], remove=['*boost*', 'c++*']):
- if not ctx.env.NO_COVERAGE:
- diropts = ''
- for i in dirs:
- diropts += ' -d ' + i
- coverage_log = open('lcov-coverage.log', 'w')
- coverage_lcov = open('coverage.lcov', 'w')
- coverage_stripped_lcov = open('coverage-stripped.lcov', 'w')
+ with open('cov-clear.log', 'w') as log:
+ subprocess.call(['lcov', '-z', '-d', str(self.path)],
+ stdout=log, stderr=log)
+
+ except Exception:
+ Logs.warn('Failed to run lcov to clear old coverage data')
+
+ def gen_coverage(self):
+ """Generate coverage data and report"""
try:
- try:
- base = '.'
- if g_is_child:
- base = '..'
-
- # Generate coverage data
- lcov_cmd = 'lcov -c %s -b %s' % (diropts, base)
- if ctx.env.LLVM_COV:
- lcov_cmd += ' --gcov-tool %s' % ctx.env.LLVM_COV[0]
- subprocess.call(lcov_cmd.split(),
- stdout=coverage_lcov, stderr=coverage_log)
-
- # Strip unwanted stuff
- subprocess.call(
- ['lcov', '--remove', 'coverage.lcov'] + remove,
- stdout=coverage_stripped_lcov, stderr=coverage_log)
-
- # Generate HTML coverage output
- if not os.path.isdir('coverage'):
- os.makedirs('coverage')
- subprocess.call(
- 'genhtml -o coverage coverage-stripped.lcov'.split(),
- stdout=coverage_log, stderr=coverage_log)
-
- except Exception:
- Logs.warn('Failed to run lcov, no coverage report generated')
- finally:
- coverage_stripped_lcov.close()
- coverage_lcov.close()
- coverage_log.close()
-
- duration = (bench_time() - ctx.autowaf_tests_start_time) * 1000.0
- total_tests = ctx.autowaf_tests[appname]['total']
- failed_tests = ctx.autowaf_tests[appname]['failed']
- passed_tests = total_tests - failed_tests
- Logs.pprint('GREEN', '\n[==========] %d tests from %s ran (%d ms total)' % (
- total_tests, appname, duration))
- if not ctx.env.NO_COVERAGE:
- Logs.pprint('GREEN', '[----------] Coverage: <file://%s>'
- % os.path.abspath('coverage/index.html'))
-
- Logs.pprint('GREEN', '[ PASSED ] %d tests' % passed_tests)
- if failed_tests > 0:
- Logs.pprint('RED', '[ FAILED ] %d tests' % failed_tests)
- raise TestFailed('Tests from %s failed' % appname)
- Logs.pprint('', '')
-
- top_level = (len(ctx.stack_path) > 1)
- if top_level:
- cd_to_orig_dir(ctx, top_level)
-
-def run_test(ctx,
- appname,
- test,
- desired_status=0,
- dirs=['src'],
- name='',
- header=False,
- quiet=False):
- """Run an individual test.
-
- `test` is either a shell command string, or a list of [name, return status]
- for displaying tests implemented in the calling Python code.
- """
+ with open('cov.lcov', 'w') as out:
+ with open('cov.log', 'w') as err:
+ subprocess.call(['lcov', '-c', '--no-external',
+ '--rc', 'lcov_branch_coverage=1',
+ '-b', '.',
+ '-d', str(self.path)],
+ stdout=out, stderr=err)
+
+ if not os.path.isdir('coverage'):
+ os.makedirs('coverage')
+
+ with open('genhtml.log', 'w') as log:
+ subprocess.call(['genhtml',
+ '-o', 'coverage',
+ '--rc', 'genhtml_branch_coverage=1',
+ 'cov.lcov'],
+ stdout=log, stderr=log)
- ctx.autowaf_tests_total += 1
- ctx.autowaf_local_tests_total += 1
- ctx.autowaf_tests[appname]['total'] += 1
+ except Exception:
+ Logs.warn('Failed to run lcov to generate coverage report')
- out = (None, None)
- if type(test) == list:
- name = test[0]
- returncode = test[1]
- elif callable(test):
- returncode = test()
- else:
- s = test
- if isinstance(test, type([])):
- s = ' '.join(test)
- if header and not quiet:
- Logs.pprint('Green', '\n[ RUN ] %s' % s)
- cmd = test
- if Options.options.test_wrapper:
- cmd = Options.options.test_wrapper + ' ' + test
- if name == '':
- name = test
-
- proc = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- out = proc.communicate()
- returncode = proc.returncode
-
- success = desired_status is None or returncode == desired_status
- if success:
- if not quiet:
- Logs.pprint('GREEN', '[ OK ] %s' % name)
- else:
- Logs.pprint('RED', '[ FAILED ] %s' % name)
- ctx.autowaf_tests_failed += 1
- ctx.autowaf_local_tests_failed += 1
- ctx.autowaf_tests[appname]['failed'] += 1
- if type(test) != list and not callable(test):
- Logs.pprint('RED', test)
-
- if Options.options.verbose and type(test) != list and not callable(test):
- sys.stdout.write(out[0].decode('utf-8'))
- sys.stderr.write(out[1].decode('utf-8'))
-
- return (success, out)
-
-def tests_name(ctx, appname, name='*'):
- if name == '*':
- return appname
- else:
- return '%s.%s' % (appname, name)
+class TestGroup:
+ def __init__(self, tst, suitename, name, **kwargs):
+ self.tst = tst
+ self.suitename = suitename
+ self.name = name
+ self.kwargs = kwargs
+ self.start_time = bench_time()
+ tst.stack.append(TestScope(tst, name, tst.defaults()))
-def begin_tests(ctx, appname, name='*'):
- ctx.autowaf_local_tests_failed = 0
- ctx.autowaf_local_tests_total = 0
- ctx.autowaf_local_tests_start_time = bench_time()
- Logs.pprint('GREEN', '\n[----------] %s' % (
- tests_name(ctx, appname, name)))
+ def label(self):
+ return self.suitename + '.%s' % self.name if self.name else ''
- class Handle:
- def __enter__(self):
- pass
+ def args(self, **kwargs):
+ all_kwargs = self.tst.args(**self.kwargs)
+ all_kwargs.update(kwargs)
+ return all_kwargs
- def __exit__(self, type, value, traceback):
- end_tests(ctx, appname, name)
+ def run(self, test, **kwargs):
+ self.tst.stack[-1].run(test, **self.args(**kwargs))
- return Handle()
+ def __enter__(self):
+ if 'verbosity' in self.kwargs and self.kwargs['verbosity'] > 0:
+ self.tst.log_good('-' * 10, self.label())
+ return self
-def end_tests(ctx, appname, name='*'):
- duration = (bench_time() - ctx.autowaf_local_tests_start_time) * 1000.0
- total = ctx.autowaf_local_tests_total
- failures = ctx.autowaf_local_tests_failed
- if failures == 0:
- Logs.pprint('GREEN', '[----------] %d tests from %s (%d ms total)' % (
- ctx.autowaf_local_tests_total, tests_name(ctx, appname, name), duration))
- else:
- Logs.pprint('RED', '[----------] %d/%d tests from %s (%d ms total)' % (
- total - failures, total, tests_name(ctx, appname, name), duration))
-
-def run_tests(ctx,
- appname,
- tests,
- desired_status=0,
- dirs=['src'],
- name='*',
- headers=False):
- begin_tests(ctx, appname, name)
-
- diropts = ''
- for i in dirs:
- diropts += ' -d ' + i
-
- for i in tests:
- run_test(ctx, appname, i, desired_status, dirs, i, headers)
-
- end_tests(ctx, appname, name)
+ def __exit__(self, type, value, traceback):
+ duration = (bench_time() - self.start_time) * 1000.0
+ scope = self.tst.pop()
+ n_passed = scope.n_total - scope.n_failed
+ if scope.n_failed == 0:
+ self.tst.log_good('-' * 10, '%d tests from %s (%d ms total)',
+ scope.n_total, self.label(), duration)
+ else:
+ self.tst.log_bad('-' * 10, '%d/%d tests from %s (%d ms total)',
+ n_passed, scope.n_total, self.label(), duration)
def run_ldconfig(ctx):
should_run = (ctx.cmd == 'install' and