diff --git a/scripts/lib/mlkem_test.py b/scripts/lib/mlkem_test.py index 66a79c1b3..aebbaa16a 100644 --- a/scripts/lib/mlkem_test.py +++ b/scripts/lib/mlkem_test.py @@ -260,6 +260,25 @@ def run_schemes( else: return results + def test( + self, + opt: bool, + compile: bool, + run: bool, + extra_make_envs={}, + extra_make_args=[], + actual_proc: Callable[[bytes], str] = None, + expect_proc: Callable[[SCHEME, str], tuple[bool, str]] = None, + cmd_prefix: [str] = [], + extra_args: [str] = [], + ) -> TypedDict: + if compile: + self.compile(opt, extra_make_envs, extra_make_args) + if run: + return self.run_schemes( + opt, actual_proc, expect_proc, cmd_prefix, extra_args + ) + """ Underlying functional tests @@ -272,23 +291,16 @@ def __init__(self, opts: Options): copts = CompileOptions( opts.cross_prefix, opts.cflags, opts.arch_flags, opts.auto, opts.verbose ) + + self._f = lambda t: Test_Implementations(t, copts) self.opt = opts.opt self.verbose = opts.verbose - self._func = Test_Implementations(TEST_TYPES.MLKEM, copts) - self._nistkat = Test_Implementations(TEST_TYPES.NISTKAT, copts) - self._kat = Test_Implementations(TEST_TYPES.KAT, copts) - self._acvp = Test_Implementations(TEST_TYPES.ACVP, copts) - self._bench = Test_Implementations(TEST_TYPES.BENCH, copts) - self._bench_components = Test_Implementations( - TEST_TYPES.BENCH_COMPONENTS, copts - ) - self.compile_mode = copts.compile_mode() self.compile = opts.compile self.run = opts.run - def _run_func(self, opt: bool): + def _func(self, opt: bool, compile: bool, run: bool) -> TypedDict: """Underlying function for functional test""" def expect(scheme: SCHEME, actual: str) -> tuple[bool, str]: @@ -307,8 +319,10 @@ def expect(scheme: SCHEME, actual: str) -> tuple[bool, str]: f"Failed, expecting {expect}, but getting {actual}" if fail else "", ) - return self._func.run_schemes( + return self._f(TEST_TYPES.MLKEM).test( opt, + compile, + run, actual_proc=lambda result: str(result, encoding="utf-8"), expect_proc=expect, ) @@ -316,23 +330,16 @@ def expect(scheme: SCHEME, actual: str) -> tuple[bool, str]: def func(self): config_logger(self.verbose) - def _func(opt: bool): - - if self.compile: - self._func.compile(opt) - if self.run: - return self._run_func(opt) - fail = False if self.opt.lower() == "all" or self.opt.lower() == "no_opt": - fail = fail or _func(False) + fail = fail or self._func(False, self.compile, self.run) if self.opt.lower() == "all" or self.opt.lower() == "opt": - fail = fail or _func(True) + fail = fail or self._func(True, self.compile, self.run) if fail: exit(1) - def _run_nistkat(self, opt: bool): + def _nistkat(self, opt: bool, compile: bool, run: bool) -> TypedDict: def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]: expect = parse_meta(scheme, "nistkat-sha256") fail = expect != actual @@ -342,8 +349,10 @@ def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]: f"Failed, expecting {expect}, but getting {actual}" if fail else "", ) - return self._nistkat.run_schemes( + return self._f(TEST_TYPES.NISTKAT).test( opt, + compile, + run, actual_proc=sha256sum, expect_proc=expect_proc, ) @@ -351,22 +360,16 @@ def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]: def nistkat(self): config_logger(self.verbose) - def _nistkat(opt: bool): - if self.compile: - self._nistkat.compile(opt) - if self.run: - return self._run_nistkat(opt) - fail = False if self.opt.lower() == "all" or self.opt.lower() == "no_opt": - fail = fail or _nistkat(False) + fail = fail or self._nistkat(False, self.compile, self.run) if self.opt.lower() == "all" or self.opt.lower() == "opt": - fail = fail or _nistkat(True) + fail = fail or self._nistkat(True, self.compile, self.run) if fail: exit(1) - def _run_kat(self, opt: bool): + def _kat(self, opt: bool, compile: bool, run: bool) -> TypedDict: def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]: expect = parse_meta(scheme, "kat-sha256") fail = expect != actual @@ -376,8 +379,10 @@ def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]: f"Failed, expecting {expect}, but getting {actual}" if fail else "", ) - return self._kat.run_schemes( + return self._f(TEST_TYPES.KAT).test( opt, + compile, + run, actual_proc=sha256sum, expect_proc=expect_proc, ) @@ -385,176 +390,169 @@ def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]: def kat(self): config_logger(self.verbose) - def _kat(opt: bool): - if self.compile: - self._kat.compile(opt) - if self.run: - return self._run_kat(opt) - fail = False if self.opt.lower() == "all" or self.opt.lower() == "no_opt": - fail = fail or _kat(False) + fail = fail or self._kat(False, self.compile, self.run) if self.opt.lower() == "all" or self.opt.lower() == "opt": - fail = fail or _kat(True) + fail = fail or self._kat(True, self.compile, self.run) if fail: exit(1) - def _run_acvp(self, opt: bool, acvp_dir: str = "test/acvp_data"): - acvp_keygen_json = f"{acvp_dir}/acvp_keygen_internalProjection.json" - acvp_encapDecap_json = f"{acvp_dir}/acvp_encapDecap_internalProjection.json" + def _acvp( + self, opt: bool, compmile: bool, run: bool, acvp_dir: str = "test/acvp_data" + ): + if compile: + self._f(TEST_TYPES.ACVP).compile(opt) + if run: + acvp_keygen_json = f"{acvp_dir}/acvp_keygen_internalProjection.json" + acvp_encapDecap_json = f"{acvp_dir}/acvp_encapDecap_internalProjection.json" + + with open(acvp_keygen_json, "r") as f: + acvp_keygen_data = json.load(f) + + with open(acvp_encapDecap_json, "r") as f: + acvp_encapDecap_data = json.load(f) + + def actual_proc(bs: bytes) -> str: + return bs.decode("utf-8") + + def _expect_proc( + tc: TypedDict, scheme: SCHEME, actual: str + ) -> tuple[bool, str]: + fail = False + err = "" + for l in actual.splitlines(): + (k, v) = l.split("=") + if v != tc[k]: + fail = True + err = ( + err + + f"Failed, Mismatching result for {k}: expect {tc[k]}, but got {v}\n" + ) + return (fail, err) + + opt_label = "opt" if opt else "no_opt" + + def init_results() -> TypedDict: + results = {} + results[opt_label] = {} + for s in SCHEME: + results[opt_label][s] = False + return results - with open(acvp_keygen_json, "r") as f: - acvp_keygen_data = json.load(f) + fail = False + results = init_results() + # encapDecap + if gh_env is not None: + print( + f"::group::run {self.compile_mode} {opt_label} {TEST_TYPES.ACVP.desc()} encapDecap" + ) - with open(acvp_encapDecap_json, "r") as f: - acvp_encapDecap_data = json.load(f) + for i, tg in enumerate(acvp_encapDecap_data["testGroups"]): + scheme = SCHEME.from_str(tg["parameterSet"]) + + for tc in tg["tests"]: + if tg["function"] == "encapsulation": + extra_args = [ + "encapDecap", + "AFT", + "encapsulation", + f"ek={tc['ek']}", + f"m={tc['m']}", + ] + + elif tg["function"] == "decapsulation": + extra_args = [ + "encapDecap", + "VAL", + "decapsulation", + f"dk={tg['dk']}", + f"c={tc['c']}", + ] + + rs = self._f(TEST_TYPES.ACVP).run_scheme( + opt, + scheme, + extra_args=extra_args, + actual_proc=actual_proc, + expect_proc=partial(_expect_proc, tc), + ) + for k, r in rs.items(): + results[k][scheme] = results[k][scheme] or r[scheme] - def actual_proc(bs: bytes) -> str: - return bs.decode("utf-8") + if gh_env is not None: + print(f"::endgroup::") - def _expect_proc( - tc: TypedDict, scheme: SCHEME, actual: str - ) -> tuple[bool, str]: - fail = False - err = "" - for l in actual.splitlines(): - (k, v) = l.split("=") - if v != tc[k]: - fail = True - err = ( - err - + f"Failed, Mismatching result for {k}: expect {tc[k]}, but got {v}\n" - ) - return (fail, err) + for k, result in results.items(): + title = "## " + (self.compile_mode) + " " + (k.capitalize()) + " Tests" + github_summary(title, f"{TEST_TYPES.ACVP.desc()} encapDecap", result) - opt_label = "opt" if opt else "no_opt" + fail = reduce(lambda acc, c: acc or c, result.values(), fail) - def init_results() -> TypedDict: - results = {} - results[opt_label] = {} - for s in SCHEME: - results[opt_label][s] = False - return results + results = init_results() - fail = False - results = init_results() - # encapDecap - if gh_env is not None: - print( - f"::group::run {self.compile_mode} {opt_label} {TEST_TYPES.ACVP.desc()} encapDecap" - ) + if gh_env is not None: + print( + f"::group::run {self.compile_mode} {opt_label} {TEST_TYPES.ACVP.desc()} keyGen" + ) - for i, tg in enumerate(acvp_encapDecap_data["testGroups"]): - scheme = SCHEME.from_str(tg["parameterSet"]) + for i, tg in enumerate(acvp_keygen_data["testGroups"]): + scheme = SCHEME.from_str(tg["parameterSet"]) - for tc in tg["tests"]: - if tg["function"] == "encapsulation": + for tc in tg["tests"]: extra_args = [ - "encapDecap", + "keyGen", "AFT", - "encapsulation", - f"ek={tc['ek']}", - f"m={tc['m']}", + f"z={tc['z']}", + f"d={tc['d']}", ] - elif tg["function"] == "decapsulation": - extra_args = [ - "encapDecap", - "VAL", - "decapsulation", - f"dk={tg['dk']}", - f"c={tc['c']}", - ] - - rs = self._acvp.run_scheme( - opt, - scheme, - extra_args=extra_args, - actual_proc=actual_proc, - expect_proc=partial(_expect_proc, tc), - ) - for k, r in rs.items(): - results[k][scheme] = results[k][scheme] or r[scheme] - - if gh_env is not None: - print(f"::endgroup::") - - for k, result in results.items(): - title = ( - "## " + (self._acvp.compile_mode) + " " + (k.capitalize()) + " Tests" - ) - github_summary(title, f"{TEST_TYPES.ACVP.desc()} encapDecap", result) - - fail = reduce(lambda acc, c: acc or c, result.values(), fail) - - results = init_results() - - if gh_env is not None: - print( - f"::group::run {self.compile_mode} {opt_label} {TEST_TYPES.ACVP.desc()} keyGen" - ) - - for i, tg in enumerate(acvp_keygen_data["testGroups"]): - scheme = SCHEME.from_str(tg["parameterSet"]) - - for tc in tg["tests"]: - extra_args = [ - "keyGen", - "AFT", - f"z={tc['z']}", - f"d={tc['d']}", - ] - - rs = self._acvp.run_scheme( - opt, - scheme, - extra_args=extra_args, - actual_proc=actual_proc, - expect_proc=partial(_expect_proc, tc), - ) - for k, r in rs.items(): - results[k][scheme] = results[k][scheme] or r[scheme] + rs = self._f(TEST_TYPES.ACVP).run_scheme( + opt, + scheme, + extra_args=extra_args, + actual_proc=actual_proc, + expect_proc=partial(_expect_proc, tc), + ) + for k, r in rs.items(): + results[k][scheme] = results[k][scheme] or r[scheme] - if gh_env is not None: - print(f"::endgroup::") + if gh_env is not None: + print(f"::endgroup::") - for k, result in results.items(): - title = ( - "## " - + (self._acvp.ts[k].compile_mode) - + " " - + (k.capitalize()) - + " Tests" - ) - github_summary(title, f"{TEST_TYPES.ACVP.desc()} keyGen", result) + for k, result in results.items(): + title = "## " + (self.compile_mode) + " " + (k.capitalize()) + " Tests" + github_summary(title, f"{TEST_TYPES.ACVP.desc()} keyGen", result) - fail = reduce(lambda acc, c: acc or c, result.values(), fail) + fail = reduce(lambda acc, c: acc or c, result.values(), fail) - return fail + return fail def acvp(self, acvp_dir: str): config_logger(self.verbose) - def _acvp(opt: bool): - if self.compile: - self._acvp.compile(opt) - if self.run: - return self._run_acvp(opt, acvp_dir) - fail = False if self.opt.lower() == "all" or self.opt.lower() == "no_opt": - fail = fail or _acvp(False) + fail = fail or self._acvp(False, self.compile, self.run, acvp_dir) if self.opt.lower() == "all" or self.opt.lower() == "opt": - fail = fail or _acvp(True) + fail = fail or self._acvp(True, self.compile, self.run, acvp_dir) if fail: exit(1) - def _run_bench( - self, t: Test_Implementations, opt: bool, run_as_root: bool, exec_wrapper: str + def _bench( + self, + t: TEST_TYPES, + opt: bool, + compile: bool, + run: bool, + cycles, + run_as_root: bool, + exec_wrapper: str, + mac_taskpolicy, ) -> TypedDict: cmd_prefix = [] if run_as_root: @@ -563,12 +561,25 @@ def _run_bench( ) cmd_prefix.append("sudo") + if mac_taskpolicy: + if exec_wrapper: + logging.error(f"cannot set both --mac-taskpolicy and --exec-wrapper") + sys.exit(1) + else: + exec_wrapper = f"taskpolicy -c {mac_taskpolicy}" + if exec_wrapper: logging.info(f"Running with customized wrapper.") exec_wrapper = exec_wrapper.split(" ") cmd_prefix = cmd_prefix + exec_wrapper - return t.run_schemes(opt, cmd_prefix=cmd_prefix) + return self._f(t).test( + opt, + compile, + run, + extra_make_args=[f"CYCLES={cycles}"], + cmd_prefix=cmd_prefix, + ) def bench( self, @@ -582,41 +593,42 @@ def bench( config_logger(self.verbose) if components is False: - t = self._bench + t = TEST_TYPES.BENCH else: - t = self._bench_components + t = TEST_TYPES.BENCH_COMPONENTS output = False - if mac_taskpolicy: - if exec_wrapper: - logging.error(f"cannot set both --mac-taskpolicy and --exec-wrapper") - sys.exit(1) - else: - exec_wrapper = f"taskpolicy -c {mac_taskpolicy}" - - # NOTE: We haven't yet decided how to output both opt/no-opt benchmark results if self.opt.lower() == "all": - if self.compile: - t.compile(False, extra_make_args=[f"CYCLES={cycles}"]) - if self.run: - self._run_bench(t, False, run_as_root, exec_wrapper) - if self.compile: - t.compile(True, extra_make_args=[f"CYCLES={cycles}"]) - if self.run: - resultss = self._run_bench(t, True, run_as_root, exec_wrapper) + # NOTE: We haven't yet decided how to output both opt/no-opt benchmark results + self._bench( + t, + False, + self.compile, + self.run, + cycles, + run_as_root, + exec_wrapper, + mac_taskpolicy, + ) + resultss = self._bench( + t, + True, + self.compile, + self.run, + cycles, + run_as_root, + exec_wrapper, + mac_taskpolicy, + ) else: - if self.compile: - t.compile( - True if self.opt.lower() == "opt" else False, - extra_make_args=[f"CYCLES={cycles}"], - ) - if self.run: - resultss = self._run_bench( - t, - True if self.opt.lower() == "opt" else False, - run_as_root, - exec_wrapper, - ) + resultss = self._bench( + t, + self.compile, + self.run, + True if self.opt.lower() == "opt" else False, + run_as_root, + exec_wrapper, + ) if resultss is None: exit(0) @@ -657,33 +669,26 @@ def all(self, func: bool, kat: bool, nistkat: bool, acvp: bool): def all(opt: bool): code = 0 + fs = [ + *([self._func] if func else []), + *([self._nistkat] if nistkat else []), + *([self._kat] if kat else []), + *([self._acvp] if acvp else []), + ] + if self.compile: - compiles = [ - *([self._func.compile] if func else []), - *([self._nistkat.compile] if nistkat else []), - *([self._kat.compile] if kat else []), - *([self._acvp.compile] if acvp else []), - ] - - for f in compiles: + for f in fs: try: - f(opt) + f(opt, True, False) except SystemExit as e: code = code or e sys.stdout.flush() if self.run: - runs = [ - *([self._run_func] if func else []), - *([self._run_nistkat] if nistkat else []), - *([self._run_kat] if kat else []), - *([self._run_acvp] if acvp else []), - ] - - for f in runs: + for f in fs: try: - code = code or int(f(opt)) + code = code or int(f(opt, False, True)) except SystemExit as e: code = code or e