#!/usr/bin/env python3 """Run the serd RDF validation test suite.""" import serd_test_util import argparse import datetime import difflib import itertools import os import re import shlex import subprocess import sys import tempfile import urllib.parse def _uri_path(uri): path = urllib.parse.urlparse(uri).path drive = os.path.splitdrive(path[1:])[0] return path if not drive else path[1:] def _load_rdf(filename, base_uri, command_prefix): """Load an RDF file as dictionaries via serdi (only supports URIs).""" rdf_type = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" model = {} instances = {} cmd = command_prefix + ["-I", base_uri, filename] proc = subprocess.run(cmd, capture_output=True, check=True) for line in proc.stdout.splitlines(): matches = re.match( r"<([^ ]*)> <([^ ]*)> <([^ ]*)> \.", line.decode("utf-8") ) if matches: s, p, o = (matches.group(1), matches.group(2), matches.group(3)) if s not in model: model[s] = {p: [o]} elif p not in model[s]: model[s][p] = [o] else: model[s][p].append(o) if p == rdf_type: if o not in instances: instances[o] = set([s]) else: instances[o].update([s]) return model, instances def _option_combinations(options): """Return an iterator that cycles through all combinations of options.""" combinations = [] for count in range(len(options) + 1): combinations += list(itertools.combinations(options, count)) return itertools.cycle(combinations) def _show_diff(from_lines, to_lines, from_filename, to_filename): same = True for line in difflib.unified_diff( from_lines, to_lines, fromfile=os.path.abspath(from_filename), tofile=os.path.abspath(to_filename), ): sys.stderr.write(line) same = False return same def _file_equals(patha, pathb): for path in (patha, pathb): if not os.access(path, os.F_OK): sys.stderr.write("error: missing file {}\n".format(path)) return False with open(patha, "r", encoding="utf-8") as fa: with open(pathb, "r", encoding="utf-8") as fb: return _show_diff(fa.readlines(), fb.readlines(), patha, pathb) def _file_lines_equal(patha, pathb, subst_from="", subst_to=""): import io for path in (patha, pathb): if not os.access(path, os.F_OK): sys.stderr.write("error: missing file %s\n" % path) return False la = sorted(set(io.open(patha, encoding="utf-8").readlines())) lb = sorted(set(io.open(pathb, encoding="utf-8").readlines())) if la != lb: _show_diff(la, lb, patha, pathb) return False return True def validation_test_suite( manifest_path, schemas, base_uri, report_filename, isyntax, command_prefix, ): """Run all tests in a test suite manifest.""" mf = "http://www.w3.org/2001/sw/DataAccess/tests/test-manifest#" test_dir = os.path.dirname(manifest_path) model, instances = serd_test_util.load_rdf( manifest_path, base_uri, command_prefix ) top_dir = os.path.commonpath([os.getcwd(), os.path.abspath(test_dir)]) out_test_dir = os.path.relpath(test_dir, top_dir) os.makedirs(out_test_dir, exist_ok=True) asserter = "" if os.getenv("USER") == "drobilla": asserter = "http://drobilla.net/drobilla#me" class Results: def __init__(self): self.n_tests = 0 self.n_failures = 0 def run_tests(test_class, tests, expected_return, results): for test in sorted(tests): test_uri = model[test][mf + "action"][0] test_uri_path = _uri_path(test_uri) test_name = os.path.basename(test_uri_path) test_path = os.path.join(test_dir, test_name) command = ( command_prefix + [ "-V", "-I", test_uri, test_path, ] + schemas ) out_filename = os.path.join(out_test_dir, test_name + ".out") results.n_tests += 1 if expected_return == 0: # Positive test with open(out_filename, "w") as stdout: proc = subprocess.run(command, check=False, stdout=stdout) if proc.returncode == 0: passed = True else: results.n_failures += 1 sys.stderr.write( "error: Unexpected failure of command: {}\n".format( " ".join(shlex.quote(c) for c in command) ) ) else: # Negative test with open(out_filename, "w") as stdout: with tempfile.TemporaryFile() as stderr: proc = subprocess.run( command, check=False, stdout=stdout, stderr=stderr ) if proc.returncode != 0: passed = True else: results.n_failures += 1 sys.stderr.write( "error: Unexpected success of command: {}\n".format( " ".join(shlex.quote(c) for c in command) ) ) # Check that an error message was printed stderr.seek(0, 2) # Seek to end if stderr.tell() == 0: # Empty sys.stderr.write( "error: No error message printed by command: {}\n".format( " ".join(shlex.quote(c) for c in command) ) ) result = 1 # Write test report entry if report_filename: with open(report_filename, "a") as report: report.write( serd_test_util.earl_assertion(test, passed, asserter) ) # Run all test types in the test suite results = Results() ns_serd = "http://drobilla.net/ns/serd#" for test_class, instances in instances.items(): if test_class.startswith(ns_serd): expected = 1 if "Negative" in test_class else 0 run_tests(test_class, instances, expected, results) # Print result summary if results.n_failures > 0: sys.stderr.write( "error: {}/{} tests failed\n".format( results.n_failures, results.n_tests ) ) else: sys.stdout.write("All {} tests passed\n".format(results.n_tests)) return results.n_failures def main(): """Run the command line tool.""" parser = argparse.ArgumentParser( usage="%(prog)s [OPTION]... MANIFEST BASE_URI SCHEMA...", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--report", help="path to write result report to") parser.add_argument("--serdi", default="serdi", help="path to serdi") parser.add_argument("--syntax", default="turtle", help="input syntax") parser.add_argument("--wrapper", default="", help="executable wrapper") parser.add_argument("manifest", help="test suite manifest.ttl file") parser.add_argument("base_uri", help="base URI for tests") parser.add_argument("schema", nargs="+", help="schema file") args = parser.parse_args(sys.argv[1:]) command_prefix = shlex.split(args.wrapper) + [args.serdi] return validation_test_suite( args.manifest, args.schema, args.base_uri, args.report, args.syntax, command_prefix, ) if __name__ == "__main__": try: sys.exit(main()) except subprocess.CalledProcessError as e: if e.stderr is not None: sys.stderr.write(e.stderr.decode("utf-8")) sys.stderr.write("error: %s\n" % e) sys.exit(e.returncode)