Skip to content

Commit

Permalink
tests: Integrate acvp client
Browse files Browse the repository at this point in the history
Signed-off-by: Thing-han, Lim <[email protected]>
  • Loading branch information
potsrevennil authored and mkannwischer committed Nov 12, 2024
1 parent 577296e commit 196e8a7
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 117 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ quickcheck: buildall
$(MLKEM512_DIR)/bin/test_mlkem512
$(MLKEM768_DIR)/bin/test_mlkem768
$(MLKEM1024_DIR)/bin/test_mlkem1024
python3 ./test/acvp_client.py
./scripts/acvp
$(Q)echo " Functionality and ACVP tests passed!"

mlkem: \
Expand Down
42 changes: 42 additions & 0 deletions scripts/acvp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0

import os
import sys
import re

sys.path.append(f"{os.path.join(os.path.dirname(__file__), 'lib')}")
from mlkem_test import *
from util import (
path,
config_logger,
)

opts = Options()
opts.compile = False

config_logger(opts.verbose)

build_config = path("test/build/config.mk")
acvp_test_data = path("test/acvp_data/")

if not os.path.isfile(build_config):
logging.error(f"{build_config} not found")
exit(1)


def get_opt() -> bool:
with open(build_config, "r") as file:
for line in file:
# Use regex to match lines in the format "OPT := value"
match = re.match(r"^OPT\s*:=\s*(.*)$", line)
if match:
value = match.group(1).strip()
return value == "1"
logging.error(f"OPT is not defined in {build_config}")
exit(1)


opts.opt = "opt" if get_opt() else "no_opt"

Tests(opts).acvp("test/acvp_data/")
223 changes: 210 additions & 13 deletions scripts/lib/mlkem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io
import logging
import subprocess
from functools import reduce
from functools import reduce, partial
from typing import Optional, Callable, TypedDict
from util import (
TEST_TYPES,
Expand All @@ -17,6 +17,7 @@
github_summary,
logger,
)
import json

gh_env = os.environ.get("GITHUB_ENV")

Expand Down Expand Up @@ -60,6 +61,7 @@ def __init__(self, test_type: TEST_TYPES, copts: CompileOptions, opt: bool):
self.opt = opt
self.compile_mode = copts.compile_mode()
self.opt_label = "opt" if self.opt else "no_opt"
self.i = 0

def compile_schemes(
self,
Expand Down Expand Up @@ -120,19 +122,20 @@ def run_scheme(
self,
scheme: SCHEME,
actual_proc: Callable[[bytes], str] = None,
expect_proc: Callable[[SCHEME], str] = None,
expect_proc: Callable[[SCHEME, str], tuple[bool, str]] = None,
cmd_prefix: [str] = [],
extra_args: [str] = [],
):
"""Run the binary in all different ways"""
log = logger(self.test_type, scheme, self.cross_prefix, self.opt)
log = logger(self.test_type, scheme, self.cross_prefix, self.opt, self.i)
self.i += 1

bin = self.test_type.bin_path(scheme)
if not os.path.isfile(bin):
log.error(f"{bin} does not exists")
sys.exit(1)

cmd = [f"./{bin}"]
cmd = [f"{bin}"]
if self.cross_prefix and platform.system() != "Darwin":
log.info(f"Emulating {bin} with QEMU")
if "x86_64" in self.cross_prefix:
Expand Down Expand Up @@ -162,10 +165,9 @@ def run_scheme(
)
elif actual_proc is not None and expect_proc is not None:
actual = actual_proc(p.stdout)
expect = expect_proc(scheme)
result = actual != expect
result, err = expect_proc(scheme, actual)
if result:
log.error(f"failed, expecting {expect}, but getting {actual}")
log.error(f"{err}")
else:
log.info(f"passed")
else:
Expand Down Expand Up @@ -197,11 +199,30 @@ def compile(
extra_make_args,
)

def run_scheme(
self,
opt: bool,
scheme: SCHEME,
actual_proc: Callable[[bytes], str] = None,
expect_proc: Callable[[SCHEME, str], tuple[bool, str]] = None,
prefix: [str] = [],
extra_args: [str] = [],
) -> TypedDict:
k = "opt" if opt else "no_opt"

results = {}
results[k] = {}
results[k][scheme] = self.ts[k].run_scheme(
scheme, actual_proc, expect_proc, prefix, extra_args
)

return results

def run_schemes(
self,
opt: bool,
actual_proc: Callable[[bytes], str] = None,
expect_proc: Callable[[SCHEME], str] = None,
expect_proc: Callable[[SCHEME, str], tuple[bool, str]] = None,
cmd_prefix: [str] = [],
extra_args: [str] = [],
) -> TypedDict:
Expand All @@ -225,7 +246,7 @@ def run_schemes(
results[k][scheme] = result

title = "## " + (self.compile_mode) + " " + (k.capitalize()) + " Tests"
github_summary(title, self.test_type, results[k])
github_summary(title, self.test_type.desc(), results[k])

if gh_env is not None:
print(f"::endgroup::")
Expand Down Expand Up @@ -257,6 +278,7 @@ def __init__(self, opts: Options):
self._func = Test_Implementations(TEST_TYPES.MLKEM, copts)
self._nistkat = Test_Implementations(TEST_TYPES.NISTKAT, copts)
self._kat = Test_Implementations(TEST_TYPES.KAT, copts)
self._acvp = Test_Implementations(TEST_TYPES.ACVP, copts)
self._bench = Test_Implementations(TEST_TYPES.BENCH, copts)
self._bench_components = Test_Implementations(
TEST_TYPES.BENCH_COMPONENTS, copts
Expand All @@ -269,16 +291,21 @@ def __init__(self, opts: Options):
def _run_func(self, opt: bool):
"""Underlying function for functional test"""

def expect(scheme: SCHEME) -> str:
def expect(scheme: SCHEME, actual: str) -> tuple[bool, str]:
sk_bytes = parse_meta(scheme, "length-secret-key")
pk_bytes = parse_meta(scheme, "length-public-key")
ct_bytes = parse_meta(scheme, "length-ciphertext")

return (
expect = (
f"CRYPTO_SECRETKEYBYTES: {sk_bytes}\n"
+ f"CRYPTO_PUBLICKEYBYTES: {pk_bytes}\n"
+ f"CRYPTO_CIPHERTEXTBYTES: {ct_bytes}\n"
)
fail = expect != actual
return (
fail,
f"Failed, expecting {expect}, but getting {actual}" if fail else "",
)

return self._func.run_schemes(
opt,
Expand Down Expand Up @@ -306,10 +333,19 @@ def _func(opt: bool):
exit(1)

def _run_nistkat(self, opt: bool):
def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]:
expect = parse_meta(scheme, "nistkat-sha256")
fail = expect != actual

return (
fail,
f"Failed, expecting {expect}, but getting {actual}" if fail else "",
)

return self._nistkat.run_schemes(
opt,
actual_proc=sha256sum,
expect_proc=lambda scheme: parse_meta(scheme, "nistkat-sha256"),
expect_proc=expect_proc,
)

def nistkat(self):
Expand All @@ -331,10 +367,19 @@ def _nistkat(opt: bool):
exit(1)

def _run_kat(self, opt: bool):
def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]:
expect = parse_meta(scheme, "kat-sha256")
fail = expect != actual

return (
fail,
f"Failed, expecting {expect}, but getting {actual}" if fail else "",
)

return self._kat.run_schemes(
opt,
actual_proc=sha256sum,
expect_proc=lambda scheme: parse_meta(scheme, "kat-sha256"),
expect_proc=expect_proc,
)

def kat(self):
Expand All @@ -356,6 +401,158 @@ def _kat(opt: bool):
if fail:
exit(1)

def _run_acvp(self, opt: bool, acvp_dir: str = "test/acvp_data"):
acvp_keygen_json = f"{acvp_dir}/acvp_keygen_internalProjection.json"
acvp_encapDecap_json = f"{acvp_dir}/acvp_encapDecap_internalProjection.json"

with open(acvp_keygen_json, "r") as f:
acvp_keygen_data = json.load(f)

with open(acvp_encapDecap_json, "r") as f:
acvp_encapDecap_data = json.load(f)

def actual_proc(bs: bytes) -> str:
return bs.decode("utf-8")

def _expect_proc(
tc: TypedDict, scheme: SCHEME, actual: str
) -> tuple[bool, str]:
fail = False
err = ""
for l in actual.splitlines():
(k, v) = l.split("=")
if v != tc[k]:
fail = True
err = (
err
+ f"Failed, Mismatching result for {k}: expect {tc[k]}, but got {v}\n"
)
return (fail, err)

opt_label = "opt" if opt else "no_opt"

def init_results() -> TypedDict:
results = {}
results[opt_label] = {}
for s in SCHEME:
results[opt_label][s] = False
return results

fail = False
results = init_results()
# encapDecap
if gh_env is not None:
print(
f"::group::run {self.compile_mode} {opt_label} {TEST_TYPES.ACVP.desc()} encapDecap"
)

for i, tg in enumerate(acvp_encapDecap_data["testGroups"]):
scheme = SCHEME.from_str(tg["parameterSet"])

for tc in tg["tests"]:
if tg["function"] == "encapsulation":
extra_args = [
"encapDecap",
"AFT",
"encapsulation",
f"ek={tc['ek']}",
f"m={tc['m']}",
]

elif tg["function"] == "decapsulation":
extra_args = [
"encapDecap",
"VAL",
"decapsulation",
f"dk={tg['dk']}",
f"c={tc['c']}",
]

rs = self._acvp.run_scheme(
opt,
scheme,
extra_args=extra_args,
actual_proc=actual_proc,
expect_proc=partial(_expect_proc, tc),
)
for k, r in rs.items():
results[k][scheme] = results[k][scheme] or r[scheme]

if gh_env is not None:
print(f"::endgroup::")

for k, result in results.items():
title = (
"## " + (self._acvp.compile_mode) + " " + (k.capitalize()) + " Tests"
)
github_summary(title, f"{TEST_TYPES.ACVP.desc()} encapDecap", result)

fail = reduce(lambda acc, c: acc or c, result.values(), fail)

results = init_results()

if gh_env is not None:
print(
f"::group::run {self.compile_mode} {opt_label} {TEST_TYPES.ACVP.desc()} keyGen"
)

for i, tg in enumerate(acvp_keygen_data["testGroups"]):
scheme = SCHEME.from_str(tg["parameterSet"])

for tc in tg["tests"]:
extra_args = [
"keyGen",
"AFT",
f"z={tc['z']}",
f"d={tc['d']}",
]

rs = self._acvp.run_scheme(
opt,
scheme,
extra_args=extra_args,
actual_proc=actual_proc,
expect_proc=partial(_expect_proc, tc),
)
for k, r in rs.items():
results[k][scheme] = results[k][scheme] or r[scheme]

if gh_env is not None:
print(f"::endgroup::")

for k, result in results.items():
title = (
"## "
+ (self._acvp.ts[k].compile_mode)
+ " "
+ (k.capitalize())
+ " Tests"
)
github_summary(title, f"{TEST_TYPES.ACVP.desc()} keyGen", result)

fail = reduce(lambda acc, c: acc or c, result.values(), fail)

return fail

def acvp(self, acvp_dir: str):
config_logger(self.verbose)

def _acvp(opt: bool):
if self.compile:
self._acvp.compile(opt)
if self.run:
return self._run_acvp(opt, acvp_dir)

fail = False

if self.opt.lower() == "all" or self.opt.lower() == "no_opt":
fail = fail or _acvp(False)
if self.opt.lower() == "all" or self.opt.lower() == "opt":
fail = fail or _acvp(True)

if fail:
exit(1)

def _run_bench(
self, t: Test_Implementations, opt: bool, run_as_root: bool, exec_wrapper: str
) -> TypedDict:
Expand Down
Loading

0 comments on commit 196e8a7

Please sign in to comment.