From 68c1a2e677775e489cff4beb38ef17c1efeae4e3 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Mon, 27 Mar 2023 15:42:23 -0400 Subject: Factor out common test runner utilities --- test/run_test_suite.py | 167 ++++++++++------------------------------ test/serd_test_util/__init__.py | 145 ++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+), 128 deletions(-) create mode 100644 test/serd_test_util/__init__.py (limited to 'test') diff --git a/test/run_test_suite.py b/test/run_test_suite.py index 94571f72..554c5e01 100755 --- a/test/run_test_suite.py +++ b/test/run_test_suite.py @@ -1,53 +1,20 @@ #!/usr/bin/env python3 -# Copyright 2022 David Robillard +# Copyright 2022-2023 David Robillard # SPDX-License-Identifier: ISC """Run an RDF test suite with serdi.""" import argparse -import datetime import difflib import itertools import os -import re import shlex import subprocess import sys import tempfile -import urllib.parse - - -def earl_assertion(test, passed, asserter): - """Return a Turtle description of an assertion for the test report.""" - - asserter_str = "" - if asserter is not None: - asserter_str = "\n\tearl:assertedBy <%s> ;" % asserter - - return """ -[] -\ta earl:Assertion ;%s -\tearl:subject ; -\tearl:test <%s> ; -\tearl:result [ -\t\ta earl:TestResult ; -\t\tearl:outcome %s ; -\t\tdc:date "%s"^^xsd:dateTime -\t] . -""" % ( - asserter_str, - test, - "earl:passed" if passed else "earl:failed", - datetime.datetime.now().replace(microsecond=0).isoformat(), - ) - -def log_error(message): - """Log an error message to stderr""" - - sys.stderr.write("error: ") - sys.stderr.write(message) +import serd_test_util def test_thru( @@ -103,22 +70,12 @@ def test_thru( thru_cmd, check=True, capture_output=True, encoding="utf-8" ) - if not _lines_equal( + return serd_test_util.lines_equal( check_lines, proc.stdout.splitlines(True), check_path, thru_path, - ): - log_error("Corrupted round-trip output {}\n".format(thru_cmd)) - return 1 - - return 0 - - -def _uri_path(uri): - path = urllib.parse.urlparse(uri).path - drive = os.path.splitdrive(path[1:])[0] - return path if not drive else path[1:] + ) def _test_input_syntax(test_class): @@ -151,37 +108,6 @@ def _test_output_syntax(test_class): raise Exception("Unknown test class <{}>".format(test_class)) -def _load_rdf(filename, base_uri, command_prefix): - """Load an RDF file as dictionaries via serdi (only supports URIs).""" - - rdf_type = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" - model = {} - instances = {} - - cmd = command_prefix + [filename, base_uri] - proc = subprocess.run( - cmd, capture_output=True, encoding="utf-8", check=True - ) - for line in proc.stdout.splitlines(): - matches = re.match(r"<([^ ]*)> <([^ ]*)> <([^ ]*)> \.", line) - if matches: - s, p, o = (matches.group(1), matches.group(2), matches.group(3)) - if s not in model: - model[s] = {p: [o]} - elif p not in model[s]: - model[s][p] = [o] - else: - model[s][p].append(o) - - if p == rdf_type: - if o not in instances: - instances[o] = set([s]) - else: - instances[o].update([s]) - - return model, instances - - def _option_combinations(options): """Return an iterator that cycles through all combinations of options.""" @@ -192,20 +118,6 @@ def _option_combinations(options): return itertools.cycle(combinations) -def _lines_equal(from_lines, to_lines, from_filename, to_filename): - same = True - for line in difflib.unified_diff( - from_lines, - to_lines, - fromfile=os.path.abspath(from_filename), - tofile=os.path.abspath(to_filename), - ): - sys.stderr.write(line) - same = False - - return same - - def test_suite( manifest_path, base_uri, @@ -218,17 +130,14 @@ def test_suite( mf = "http://www.w3.org/2001/sw/DataAccess/tests/test-manifest#" test_dir = os.path.dirname(manifest_path) - model, instances = _load_rdf(manifest_path, base_uri, command_prefix) + model, instances = serd_test_util.load_rdf( + manifest_path, base_uri, command_prefix + ) asserter = "" if os.getenv("USER") == "drobilla": asserter = "http://drobilla.net/drobilla#me" - class Results: - def __init__(self): - self.n_tests = 0 - self.n_failures = 0 - def run_tests(test_class, tests, expected_return, results): thru_flags = [["-e"], ["-f"], ["-b"], ["-r", "http://example.org/"]] osyntax = _test_output_syntax(test_class) @@ -241,7 +150,7 @@ def test_suite( for test in sorted(tests): test_uri = model[test][mf + "action"][0] - test_uri_path = _uri_path(test_uri) + test_uri_path = serd_test_util.uri_path(test_uri) test_name = os.path.basename(test_uri_path) test_path = os.path.join(test_dir, test_name) @@ -255,10 +164,11 @@ def test_suite( with tempfile.TemporaryFile("w+", encoding="utf-8") as out: proc = subprocess.run(command, check=False, stdout=out) if proc.returncode == 0: + results.test_passed() passed = True else: - results.n_failures += 1 - log_error( + results.test_failed() + serd_test_util.error( "Unexpected failure of command: {}\n".format( command_string ) @@ -267,21 +177,23 @@ def test_suite( if proc.returncode == 0 and mf + "result" in model[test]: # Check output against expected output from test suite check_uri = model[test][mf + "result"][0] - check_filename = os.path.basename(_uri_path(check_uri)) + check_filename = os.path.basename( + serd_test_util.uri_path(check_uri) + ) check_path = os.path.join(test_dir, check_filename) with open(check_path, "r", encoding="utf-8") as check: check_lines = check.readlines() out.seek(0) - if not _lines_equal( + if not serd_test_util.lines_equal( check_lines, out.readlines(), check_path, out_filename, ): - results.n_failures += 1 - log_error( + results.test_failed() + serd_test_util.error( "Output {} does not match {}\n".format( out_filename, check_path ) @@ -289,16 +201,21 @@ def test_suite( # Run round-trip test check.seek(0) - results.n_failures += test_thru( - test_uri, - test_path, - check_lines, - check_path, - out_test_dir, - list(next(thru_options_iter)), - isyntax, - osyntax, - command_prefix, + results.check( + test_thru( + test_uri, + test_path, + check_lines, + check_path, + out_test_dir, + list(next(thru_options_iter)), + isyntax, + osyntax, + command_prefix, + ), + "Corrupted round-trip of {}\n".format( + test_uri + ), ) else: # Negative test @@ -314,7 +231,7 @@ def test_suite( passed = True else: results.n_failures += 1 - log_error( + serd_test_util.error( "Unexpected success of command: {}\n".format( command_string ) @@ -324,7 +241,7 @@ def test_suite( stderr.seek(0, 2) # Seek to end if stderr.tell() == 0: # Empty results.n_failures += 1 - log_error( + serd_test_util.error( "No error message printed by: {}\n".format( command_string ) @@ -333,10 +250,12 @@ def test_suite( # Write test report entry if report_filename: with open(report_filename, "a", encoding="utf-8") as report: - report.write(earl_assertion(test, passed, asserter)) + report.write( + serd_test_util.earl_assertion(test, passed, asserter) + ) # Run all test types in the test suite - results = Results() + results = serd_test_util.Results() ns_rdftest = "http://www.w3.org/ns/rdftest#" for test_class, instances in instances.items(): if test_class.startswith(ns_rdftest): @@ -347,15 +266,7 @@ def test_suite( ) run_tests(test_class, instances, expected, results) - # Print result summary - if results.n_failures > 0: - log_error( - "{}/{} tests failed\n".format(results.n_failures, results.n_tests) - ) - else: - sys.stdout.write("All {} tests passed\n".format(results.n_tests)) - - return results.n_failures + return serd_test_util.print_result_summary(results) def main(): diff --git a/test/serd_test_util/__init__.py b/test/serd_test_util/__init__.py new file mode 100644 index 00000000..844c454c --- /dev/null +++ b/test/serd_test_util/__init__.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 + +# Copyright 2022-2023 David Robillard +# SPDX-License-Identifier: ISC + +"""Utilities for data-driven tests.""" + +import datetime +import difflib +import os +import re +import subprocess +import sys +import urllib.parse + + +class Results: + """Counts of test executions and failures.""" + + def __init__(self): + self.n_tests = 0 + self.n_failures = 0 + + def test_passed(self): + """Record a successful test.""" + self.n_tests += 1 + + def test_failed(self): + """Record a failed test.""" + self.n_tests += 1 + self.n_failures += 1 + + def check(self, condition, message=None): + """Check a test condition and update counts accordingly.""" + if not condition: + self.test_failed() + if message is not None: + error(message) + else: + self.test_passed() + + +def error(message): + """Log an error message to stderr.""" + + sys.stderr.write("error: ") + sys.stderr.write(message) + + +def print_result_summary(results): + """Print test result summary to stdout or stderr as appropriate.""" + + if results.n_tests <= 0: + error("No tests found\n") + return -1 + + failed, total = (results.n_failures, results.n_tests) + if failed == 0: + sys.stdout.write("All {} tests passed\n".format(total)) + else: + error("{}/{} tests failed\n".format(failed, total)) + + return failed + + +def uri_path(uri): + """Return the path component of a URI.""" + + path = urllib.parse.urlparse(uri).path + drive = os.path.splitdrive(path[1:])[0] + return path if not drive else path[1:] + + +def earl_assertion(test, passed, asserter): + """Return a Turtle description of an assertion for the test report.""" + + asserter_str = "" + if asserter is not None: + asserter_str = "\n\tearl:assertedBy <%s> ;" % asserter + + return """ +[] +\ta earl:Assertion ;%s +\tearl:subject ; +\tearl:test <%s> ; +\tearl:result [ +\t\ta earl:TestResult ; +\t\tearl:outcome %s ; +\t\tdc:date "%s"^^xsd:dateTime +\t] . +""" % ( + asserter_str, + test, + "earl:passed" if passed else "earl:failed", + datetime.datetime.now().replace(microsecond=0).isoformat(), + ) + + +def load_rdf(filename, base_uri, command_prefix): + """Load an RDF file as dictionaries via serdi (only supports URIs).""" + + rdf_type = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" + model = {} + instances = {} + + cmd = command_prefix + [filename, base_uri] + proc = subprocess.run( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True + ) + for line in proc.stdout.splitlines(): + matches = re.match( + r"<([^ ]*)> <([^ ]*)> <([^ ]*)> \.", line.decode("utf-8") + ) + if matches: + s, p, o = (matches.group(1), matches.group(2), matches.group(3)) + if s not in model: + model[s] = {p: [o]} + elif p not in model[s]: + model[s][p] = [o] + else: + model[s][p].append(o) + + if p == rdf_type: + if o not in instances: + instances[o] = set([s]) + else: + instances[o].update([s]) + + return model, instances + + +def lines_equal(from_lines, to_lines, from_filename, to_filename): + """Return true if from_lines equals to_lines, or print a diff.""" + + same = True + for line in difflib.unified_diff( + from_lines, + to_lines, + fromfile=os.path.abspath(from_filename), + tofile=os.path.abspath(to_filename), + ): + sys.stderr.write(line) + same = False + + return same -- cgit v1.2.1