From 821b53090d46b26e911db7fea1e7ac0a17bc2bf6 Mon Sep 17 00:00:00 2001 From: "Thing-han, Lim" <15379156+potsrevennil@users.noreply.github.com> Date: Tue, 5 Nov 2024 18:33:10 +0800 Subject: [PATCH] tests: Refactor tests script for better readability `State` class naming was a mistake, renaming it to Options And refactor the member functions of State into member functions of `Base`, `Test_Implementations`, `Tests` `Base`: the lowest level class for handling compilation and binary execution `Test_Implementations`: handle running one or multiple implementations `Tests`: provide top level functions for click cli commands Signed-off-by: Thing-han, Lim <15379156+potsrevennil@users.noreply.github.com> --- scripts/lib/mlkem_test.py | 461 ++++++++++++++++++++++---------------- scripts/tests | 58 +++-- 2 files changed, 292 insertions(+), 227 deletions(-) diff --git a/scripts/lib/mlkem_test.py b/scripts/lib/mlkem_test.py index 225e47536..6c3f5844a 100644 --- a/scripts/lib/mlkem_test.py +++ b/scripts/lib/mlkem_test.py @@ -16,30 +16,59 @@ github_summary, logger, ) -import copy +gh_env = os.environ.get("GITHUB_ENV") -class State(object): +class CompileOptions(object): + + def __init__( + self, cross_prefix: str, cflags: str, arch_flags: str, auto: bool, verbose: bool + ): + self.cross_prefix = cross_prefix + self.cflags = cflags + self.arch_flags = arch_flags + self.auto = auto + self.verbose = verbose + + +class Options(object): def __init__(self): - self.verbose = False self.cross_prefix = "" self.cflags = "" self.arch_flags = "" self.auto = True + self.verbose = False + self.opt = "ALL" self.compile = True self.run = True + +class Base: + + def __init__(self, test_type: TEST_TYPES, copts: CompileOptions, opt: bool): + self.test_type = test_type + self.cross_prefix = copts.cross_prefix + self.cflags = copts.cflags + self.arch_flags = copts.arch_flags + self.auto = copts.auto + self.verbose = copts.verbose + self.opt = opt + self.compile_mode = "Cross" if self.cross_prefix else "Native" + def compile_schemes( self, - test_type: TEST_TYPES, - opt: bool, extra_make_envs={}, extra_make_args=[], ): """compile or cross compile with some extra environment variables and makefile arguments""" - log = logger(test_type, "Compiling", self.cross_prefix, opt) + if gh_env is not None: + print( + f"::group::compile {self.compile_mode} {self.opt} {self.test_type.desc()}" + ) + + log = logger(self.test_type, "Compile", self.cross_prefix, self.opt) def dict2str(dict): s = "" @@ -47,23 +76,24 @@ def dict2str(dict): s += f"{k}={v} " return s - make_envs = ( - {"CFLAGS": f"{self.cflags}"} if self.cflags is not None else {} - ) | ( - {"ARCH_FLAGS": f"{self.arch_flags}"} if self.arch_flags is not None else {} - ) - extra_make_envs.update(make_envs) - extra_make_args = extra_make_args + list( - set([f"OPT={int(opt)}", f"AUTO={int(self.auto)}"]) - set(extra_make_args) + set([f"OPT={int(self.opt)}", f"AUTO={int(self.auto)}"]) + - set(extra_make_args) ) args = [ "make", f"CROSS_PREFIX={self.cross_prefix}", - f"{test_type}", + f"{self.test_type}", ] + extra_make_args + make_envs = ( + {"CFLAGS": f"{self.cflags}"} if self.cflags is not None else {} + ) | ( + {"ARCH_FLAGS": f"{self.arch_flags}"} if self.arch_flags is not None else {} + ) + extra_make_envs.update(make_envs) + log.info(dict2str(extra_make_envs) + " ".join(args)) p = subprocess.run( @@ -74,19 +104,25 @@ def dict2str(dict): if p.returncode != 0: log.error(f"make failed: {p.returncode}") + + if gh_env is not None: + print(f"::endgroup::") + + if p.returncode != 0: sys.exit(1) def run_scheme( self, - test_type: TEST_TYPES, scheme: SCHEME, - opt: bool, + actual_proc: Callable[[bytes], str] = None, + expect_proc: Callable[[SCHEME], str] = None, run_as_root=False, exec_wrapper=None, - ) -> bytes: + ): """Run the binary in all different ways""" - log = logger(test_type, scheme, self.cross_prefix, opt) - bin = test_type.bin_path(scheme, opt) + log = logger(self.test_type, scheme, self.cross_prefix, self.opt) + + bin = self.test_type.bin_path(scheme, self.opt) if not os.path.isfile(bin): log.error(f"{bin} does not exists") sys.exit(1) @@ -100,12 +136,12 @@ def run_scheme( cmd = ["qemu-aarch64"] + cmd else: log.info( - f"Emulation for {self.cross_prefix} on {platform.system()} not supported" + f"Emulation for {self.cross_prefix} on {platform.system()} not supported", ) if run_as_root: log.info( - f"Running {bin} as root -- you may need to enter your root password." + f"Running {bin} as root -- you may need to enter your root password.", ) cmd = ["sudo"] + cmd @@ -114,109 +150,108 @@ def run_scheme( exec_wrapper = exec_wrapper.split(" ") cmd = exec_wrapper + cmd - log.info(" ".join(cmd)) - result = subprocess.run( + log.debug(" ".join(cmd)) + + p = subprocess.run( cmd, capture_output=True, universal_newlines=False, ) - if result.returncode != 0: - log.error( - f"Running '{cmd}' failed: {result.returncode} {result.stderr.decode()}" - ) - sys.exit(1) - - return result.stdout + result = None - def run_schemes( - self, - test_type: TEST_TYPES, - opt: bool, - run_as_root=False, - exec_wrapper=None, - actual_proc: Callable[[bytes], str] = None, - expect_proc: Callable[[SCHEME], str] = None, - ) -> TypedDict: - fail = False - results = {} - for scheme in SCHEME: - log = logger(test_type, scheme, self.cross_prefix, opt) - result = self.run_scheme( - test_type, - scheme, - opt, - run_as_root, - exec_wrapper, + if p.returncode != 0: + log.error( + f"Running '{cmd}' failed: {p.returncode} {p.stderr.decode()}", ) - - if actual_proc is not None and expect_proc is not None: - actual = actual_proc(result) - expect = expect_proc(scheme) - f = actual != expect - fail = fail or f - if f: - log.error( - f"{scheme} failed, expecting {expect}, but getting {actual}" - ) - else: - log.info(f"{scheme} passed") - results[scheme] = f + elif actual_proc is not None and expect_proc is not None: + actual = actual_proc(p.stdout) + expect = expect_proc(scheme) + result = actual != expect + if result: + log.error(f"failed, expecting {expect}, but getting {actual}") else: - log.info(f"{scheme}") - log.info(f"\n{result.decode()}") - results[scheme] = result.decode() - - title = ( - "## " - + ("Cross" if self.cross_prefix else "Native") - + " " - + ("Opt" if opt else "Non-opt") - + " Tests" - ) - github_summary(title, test_type, results) - - if fail: - sys.exit(1) + log.info(f"passed") + else: + log.info(f"{p.stdout.decode()}") + result = p.stdout.decode() - if actual_proc is not None and expect_proc is not None: - return fail + if p.returncode != 0: + exit(p.returncode) else: - return results + return result + - def test( +class Test_Implementations: + def __init__( self, test_type: TEST_TYPES, - opt: bool, + copts: CompileOptions, + ): + self.test_type = test_type + self.ts = {} + self.ts["no_opt"] = Base(test_type, copts, False) + self.ts["opt"] = Base(test_type, copts, True) + + def compile( + self, + opt: str, extra_make_envs={}, extra_make_args=[], - actual_proc: Callable[[bytes], str] = None, - expect_proc: Callable[[SCHEME], str] = None, - run_as_root: bool = False, - exec_wrapper: str = None, ): - config_logger(self.verbose) - - if self.compile: - self.compile_schemes( - test_type, - opt, + if opt.lower() == "all" or opt.lower() == "no_opt": + self.ts["no_opt"].compile_schemes( + extra_make_envs, + extra_make_args, + ) + if opt.lower() == "all" or opt.lower() == "opt": + self.ts["opt"].compile_schemes( extra_make_envs, extra_make_args, ) - results = None - if self.run: - results = self.run_schemes( - test_type, - opt, - run_as_root, - exec_wrapper, - actual_proc, - expect_proc, + def run_schemes( + self, + opt: str, + actual_proc: Callable[[bytes], str] = None, + expect_proc: Callable[[SCHEME], str] = None, + run_as_root=False, + exec_wrapper=None, + ) -> TypedDict: + results = {} + + def run(opt: bool): + k = "opt" if opt else "no_opt" + results[k] = {} + for scheme in SCHEME: + result = self.ts[k].run_scheme( + scheme, + actual_proc, + expect_proc, + run_as_root, + exec_wrapper, + ) + + results[k][scheme] = result + + title = ( + "## " + (self.ts[k].compile_mode) + " " + (k.capitalize()) + " Tests" ) + github_summary(title, self.test_type, results[k]) - return results + if opt.lower() == "all" or opt.lower() == "no_opt": + run(False) + if opt.lower() == "all" or opt.lower() == "opt": + run(True) + + if actual_proc is not None and expect_proc is not None: + return reduce( + lambda acc, c: acc or c, + [r for rs in results.values() for r in rs.values()], + False, + ) + else: + return results """ @@ -226,7 +261,26 @@ def test( class Tests: - def func(self, state: State, opt: bool): + def __init__(self, opts: Options): + copts = CompileOptions( + opts.cross_prefix, opts.cflags, opts.arch_flags, opts.auto, opts.verbose + ) + self.opt = opts.opt + + self.verbose = opts.verbose + self._func = Test_Implementations(TEST_TYPES.MLKEM, copts) + self._nistkat = Test_Implementations(TEST_TYPES.NISTKAT, copts) + self._kat = Test_Implementations(TEST_TYPES.KAT, copts) + self._bench = Test_Implementations(TEST_TYPES.BENCH, copts) + self._bench_components = Test_Implementations( + TEST_TYPES.BENCH_COMPONENTS, copts + ) + + self.compile_mode = "Cross" if opts.cross_prefix else "Native" + self.compile = opts.compile + self.run = opts.run + + def _run_func(self): """Underlying function for functional test""" def expect(scheme: SCHEME) -> str: @@ -240,37 +294,58 @@ def expect(scheme: SCHEME) -> str: + f"CRYPTO_CIPHERTEXTBYTES: {ct_bytes}\n" ) - state.test( - TEST_TYPES.MLKEM, - opt, + return self._func.run_schemes( + self.opt, actual_proc=lambda result: str(result, encoding="utf-8"), expect_proc=expect, ) - def nistkat(self, state: State, opt: bool): - """Underlying function for nistkat test""" + def func(self): + config_logger(self.verbose) + + if self.compile: + self._func.compile(self.opt) + if self.run: + fail = self._run_func() + if fail: + exit(1) - state.test( - TEST_TYPES.NISTKAT, - opt, + def _run_nistkat(self): + return self._nistkat.run_schemes( + self.opt, actual_proc=sha256sum, expect_proc=lambda scheme: parse_meta(scheme, "nistkat-sha256"), ) - def kat(self, state: State, opt: bool): - """Underlying function for kat test""" + def nistkat(self): + config_logger(self.verbose) + + if self.compile: + self._nistkat.compile(self.opt) + if self.run: + fail = self._run_nistkat() + if fail: + exit(1) - state.test( - TEST_TYPES.KAT, - opt, + def _run_kat(self): + return self._kat.run_schemes( + self.opt, actual_proc=sha256sum, expect_proc=lambda scheme: parse_meta(scheme, "kat-sha256"), ) + def kat(self): + config_logger(self.verbose) + + if self.compile: + self._kat.compile(self.opt) + if self.run: + fail = self._run_kat() + if fail: + exit(1) + def bench( self, - state: State, - opt: bool, cycles: str, output, run_as_root: bool, @@ -278,12 +353,12 @@ def bench( mac_taskpolicy, components, ): - config_logger(state.verbose) + config_logger(self.verbose) if components is False: - bench_type = TEST_TYPES.BENCH + t = self._bench else: - bench_type = TEST_TYPES.BENCH_COMPONENTS + t = self._bench_components output = False if mac_taskpolicy: @@ -293,95 +368,97 @@ def bench( else: exec_wrapper = f"taskpolicy -c {mac_taskpolicy}" - results = state.test( - bench_type, - opt, - extra_make_args=[f"CYCLES={cycles}"], - run_as_root=run_as_root, - exec_wrapper=exec_wrapper, - ) - - if results is not None and output is not None and components is False: - import json - - with open(output, "w") as f: - v = [] - for scheme in results: - schemeStr = str(scheme) - r = results[scheme] - - # The first 3 lines of the output are expected to be - # keypair cycles=X - # encaps cycles=X - # decaps cycles=X - - lines = [line for line in r.splitlines() if "=" in line] - - d = {k.strip(): int(v) for k, v in (l.split("=") for l in lines)} - for primitive in ["keypair", "encaps", "decaps"]: - v.append( - { - "name": f"{schemeStr} {primitive}", - "unit": "cycles", - "value": d[f"{primitive} cycles"], - } - ) - f.write(json.dumps(v)) - - def all(self, state: State, opt: str, func: bool, kat: bool, nistkat: bool): - compile_mode = "cross" if state.cross_prefix else "native" + if self.compile: + t.compile(self.opt, extra_make_args=[f"CYCLES={cycles}"]) - gh_env = os.environ.get("GITHUB_ENV") + if self.run: + resultss = t.run_schemes( + self.opt, + run_as_root, + exec_wrapper, + ) - tests = [ - *([self.func] if func else []), - *([self.nistkat] if nistkat else []), - *([self.kat] if kat else []), - ] + if resultss is None: + exit(0) + + for k, results in resultss.items(): + if results is not None and output is not None and components is False: + import json + + with open(output, "w") as f: + v = [] + for scheme in results: + schemeStr = str(scheme) + r = results[scheme] + + # The first 3 lines of the output are expected to be + # keypair cycles=X + # encaps cycles=X + # decaps cycles=X + + lines = [line for line in r.splitlines() if "=" in line] + + d = { + k.strip(): int(v) for k, v in (l.split("=") for l in lines) + } + for primitive in ["keypair", "encaps", "decaps"]: + v.append( + { + "name": f"{schemeStr} {primitive}", + "unit": "cycles", + "value": d[f"{primitive} cycles"], + } + ) + f.write(json.dumps(v)) + + def all(self, func: bool, kat: bool, nistkat: bool): + config_logger(self.verbose) exit_code = 0 - if state.compile: - _state = copy.deepcopy(state) - _state.run = False - - def _compile(opt: bool): - opt_label = "opt" if opt else "no-opt" - for f in tests: + if self.compile: + compiles = [ + *([self._func.compile] if func else []), + *([self._nistkat.compile] if nistkat else []), + *([self._kat.compile] if kat else []), + ] + + def _compile(opt: str): + for f in compiles: if gh_env is not None: print( - f"::group::compile {compile_mode} {opt_label} {f.__name__.removeprefix('_')} test" + f"::group::compile {self.compile_mode} {opt} {f.__name__.removeprefix('_')} test" ) - try: - f(_state, opt) - print("") + f(opt) except SystemExit as e: exit_code = exit_code or e if gh_env is not None: - print(f"::endgroup::") + print("::endgroup::") sys.stdout.flush() - if opt.lower() == "all" or opt.lower() == "no_opt": - _compile(False) - - if opt.lower() == "all" or opt.lower() == "opt": - _compile(True) + if self.opt.lower() == "all": + _compile("no_opt") + _compile("opt") + else: + _compile(self.opt) - if state.run: - _state = state - _state.compile = False + if self.run: + runs = [ + *([self._run_func] if func else []), + *([self._run_nistkat] if nistkat else []), + *([self._run_kat] if kat else []), + ] - def _run(f, _state: State, opt: bool): - opt_label = "opt" if opt else "no-opt" + for f in runs: if gh_env is not None: print( - f"::group::run {compile_mode} {opt_label} {f.__name__.removeprefix('_')} test" + f"::group::run {self.compile_mode} {self.opt} {f.__name__.removeprefix('_')} test" ) try: - f(_state, opt) + f() except SystemExit as e: exit_code = exit_code or e @@ -389,10 +466,4 @@ def _run(f, _state: State, opt: bool): print(f"::endgroup::") sys.stdout.flush() - for f in tests: - if opt.lower() == "all" or opt.lower() == "no_opt": - _run(f, _state, False) - if opt.lower() == "all" or opt.lower() == "opt": - _run(f, _state, True) - exit(exit_code) diff --git a/scripts/tests b/scripts/tests index c9b0a422b..1ee2920b2 100755 --- a/scripts/tests +++ b/scripts/tests @@ -17,7 +17,7 @@ Click interface configuration def __callback(n): def callback(ctx, param, value): - state = ctx.ensure_object(State) + state = ctx.ensure_object(Options) state.__dict__[n] = value return value @@ -74,6 +74,16 @@ _shared_options = [ help="Allow makefile to auto configure system specific preprocessor", callback=__callback("auto"), ), + click.option( + "--opt", + expose_value=False, + nargs=1, + type=click.Choice(["ALL", "OPT", "NO_OPT"], case_sensitive=False), + show_default=True, + default="ALL", + help="Determine whether to compile/run the opt/no_opt binary or both", + callback=__callback("opt"), + ), click.option( "--compile/--no-compile", expose_value=False, @@ -162,10 +172,9 @@ def cli(): context_settings={"show_default": True}, ) @add_options(_shared_options) -@click.make_pass_decorator(State, ensure=True) -@_opt_option -def func(state: State, opt: bool): - Tests().func(state, opt) +@click.make_pass_decorator(Options, ensure=True) +def func(opts: Options): + Tests(opts).func() @cli.command( @@ -173,10 +182,9 @@ def func(state: State, opt: bool): context_settings={"show_default": True}, ) @add_options(_shared_options) -@click.make_pass_decorator(State, ensure=True) -@_opt_option -def nistkat(state: State, opt: bool): - Tests().nistkat(state, opt) +@click.make_pass_decorator(Options, ensure=True) +def nistkat(opts: Options): + Tests(opts).nistkat() @cli.command( @@ -184,10 +192,9 @@ def nistkat(state: State, opt: bool): context_settings={"show_default": True}, ) @add_options(_shared_options) -@click.make_pass_decorator(State, ensure=True) -@_opt_option -def kat(state: State, opt: bool): - Tests().kat(state, opt) +@click.make_pass_decorator(Options, ensure=True) +def kat(opts: Options): + Tests(opts).kat() @cli.command( @@ -196,11 +203,9 @@ def kat(state: State, opt: bool): ) @add_options(_shared_options) @add_options(_bench_options) -@click.make_pass_decorator(State, ensure=True) -@_opt_option +@click.make_pass_decorator(Options, ensure=True) def bench( - state: State, - opt: bool, + opts: Options, cycles: str, output, run_as_root: bool, @@ -208,9 +213,7 @@ def bench( mac_taskpolicy, components, ): - Tests().bench( - state, - opt, + Tests(opts).bench( cycles, output, run_as_root, @@ -227,14 +230,6 @@ def bench( @add_options(_shared_options) @add_options( [ - click.option( - "--opt", - nargs=1, - type=click.Choice(["ALL", "OPT", "NO_OPT"], case_sensitive=False), - show_default=True, - default="ALL", - help="Determine whether to compile/run the opt/no_opt binary or both", - ), click.option( "--func/--no-func", is_flag=True, @@ -258,15 +253,14 @@ def bench( ), ] ) -@click.make_pass_decorator(State, ensure=True) +@click.make_pass_decorator(Options, ensure=True) def all( - state: State, - opt: str, + opts: Options, func: bool, kat: bool, nistkat: bool, ): - Tests().all(state, opt, func, kat, nistkat) + Tests(opts).all(func, kat, nistkat) if __name__ == "__main__":