Skip to content

Commit

Permalink
Continuing development of A0
Browse files Browse the repository at this point in the history
  • Loading branch information
NikosDelijohn committed Sep 12, 2024
1 parent a0af523 commit 3df2811
Showing 1 changed file with 216 additions and 55 deletions.
271 changes: 216 additions & 55 deletions src/a0.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,30 @@
log.addHandler(log_file)
###########################

def compile_assembly(exit_on_error : bool = False, *instructions) -> bool:
"""Executes a sequence of bash instructions to compile the `self.asm_file`.
def compile_assembly(*instructions, exit_on_error : bool = False) -> bool:
"""
Executes a sequence of bash instructions to compile the `self.asm_file`.
Uses subprocess for each instruction and optionally exits on error.
- Parameters:
- exit_on_error (bool): If an error is encountered during
compilation and this is True, then the program terminates.
Otherwise it continues.
- *instructions (str): A sequence of bash commands
required in order to (cross) compile the assembly files.
Args:
exit_on_error (bool): If an error is encountered during
compilation and this is True, then the
program terminates. Otherwise it continues.
*instructions (str): A sequence of bash commands required in order to
(cross) compile the assembly files.
Returns:
bool: True if no message was written to ``stderr`` from any
of the executed instructions (subprocesses). False otherwise.
- Returns:
- bool: True if no message was written to `stderr` from any
of the executed instructions (subprocesses). False otherwise."""
Raises:
SystemExit: if ``stderr`` contains text and ``exit_on_error`` is True.
"""
log.debug("Compiling assembly sources.")

for cmd in instructions:

log.debug(f"Executing instruction '{cmd}'.")
log.debug(f"Executing instruction \"{cmd}\".")

with subprocess.Popen(
["/bin/bash", "-c", cmd],
Expand All @@ -57,14 +63,16 @@ def compile_assembly(exit_on_error : bool = False, *instructions) -> bool:
stdout, stderr = process.communicate()

if stderr:

log.debug(f"Error during execution of {cmd}\n\
---------[MESSAGE]---------\n\
{'-'.join(stderr.splitlines())}\n\
---------------------------\n")

if exit_on_error:

log.fatal(f"Unrecoverable Error during compilation of assembly files. Exiting...")
log.critical("Unrecoverable Error during compilation of \
assembly files. Exiting...")
exit(1)

return False
Expand All @@ -74,14 +82,15 @@ def compile_assembly(exit_on_error : bool = False, *instructions) -> bool:

return True

def zip_archive(archive_name : str, *files) -> str:
"""Generates a .zip archive of arbitrary files.
def zip_archive(archive_name: str, *files) -> str:
"""
Generates a .zip archive of arbitrary files.
- Paremeters:
- archive_name (str): The filename (stem) of the zip archive.
Paremeters:
archive_name (str): The filename (stem) of the zip archive.
- Returns:
- str: The generated archive path string.
Returns:
str: The generated archive path string.
"""
archive = pathlib.Path(archive_name)
archive.mkdir(exist_ok = True)
Expand All @@ -104,99 +113,251 @@ def zip_archive(archive_name : str, *files) -> str:
shutil.rmtree(archive)
return zip_filename

def coverage(coverage_formula : str) -> float:
# TODO: Compute coverage (eval trick), or add it to zoix.CSVFaultReport?
...

class A0():
"""Implements the A0 compaction algorithm of https://doi.org/10.1109/TC.2016.2643663"""
"""Implements the A0 compaction algorithm of
https://doi.org/10.1109/TC.2016.2643663"""

def __init__(self, isa : str, *a0_asm_sources : str, **a0_settings) -> "A0":
def __init__(self, isa: str, *a0_asm_sources: str, **a0_settings) -> "A0":

log.debug(f"Generating AssemblyHandlers for {a0_asm_sources}")
self.assembly_sources : list[asm.AssemblyHandler] = [
self.assembly_sources: list[asm.AssemblyHandler] = [
asm.AssemblyHandler(isa, pathlib.Path(asm_file), chunksize = 1) # chunksize is always 1 for A0
for asm_file in a0_asm_sources ]

self.assembly_compilation_instructions = a0_settings.get("assembly_compilation_instructions")
for asm_file in a0_asm_sources]

self.assembly_compilation_instructions = \
a0_settings.get("assembly_compilation_instructions")
self.fsim_report = zoix.CSVFaultReport(
fault_summary = pathlib.Path(a0_settings.get("fsim_fault_summary")),
fault_report = pathlib.Path(a0_settings.get("fsim_fault_report")))
self.sff_config = a0_settings.get("sff_config")
self.coverage_formula = a0_settings.get("coverage_formula")
log.debug(f"Fault reports set to {self.fsim_report=}")

self.zoix_compilation_args : list[str] = a0_settings.get("vcs_compilation_instructions")
log.debug(f"VCS Compilation instructions for HDL sources set to {self.zoix_compilation_args}")

self.zoix_lsim_args : list[str] = a0_settings.get("logic_simulation_instructions")
self.zoix_lsim_kwargs : dict[str, float | re.Pattern | int | list] = \
{ k : eval(v) for k, v in a0_settings.get("logic_simulation_options").items() }
{k : eval(v) for k, v in
a0_settings.get("logic_simulation_options").items()}

self.zoix_fcm_file : pathlib.Path = pathlib.Path(a0_settings.get("fcm_file"))
self.zoix_fcm_kwargs : dict[str, str] = a0_settings.get("fcm_options")

self.zoix_fsim_args : list[str] = a0_settings.get("fault_simulation_instructions")
self.zoix_fsim_kwargs : dict[str, float] = \
{ k : eval(v) for k, v in a0_settings.get("fault_simulation_options").items() }
{k : eval(v) for k, v in
a0_settings.get("fault_simulation_options").items()}

self.vc_zoix = zoix.ZoixInvoker()

@staticmethod
def evaluate(previous_result: tuple[int, float],
new_result: tuple[int, float]) -> bool:
"""
Evaluates the new results with respect to the previous ones.
Specifically, if new tat <= old tat and
if new coverage >= old coverage.
Args:
previous_result (tuple[int, float]): the old tat value (int) and
coverage (float) values.
new_result (tuple[int, float]): the new tat value (int) and
coverage values.
Returns:
bool: ``True`` if new tat <= old tat and new coverage >= old
coverage. ``False`` otherwise
"""

old_tat, old_coverage = previous_result
new_tat, new_coverage = new_result

return (new_tat <= old_tat) and (new_coverage >= old_coverage)

def get_coverage(self) -> float:
...

def pre_run(self) -> tuple[int, float]:
"""
Extracts the initial test application time and coverage of the STL.
The test application time is extracted by a logic simulation of the STL
whereas the coverage is computed by performing a fault simulation.
Returns:
tuple[int, float]: The test application time (index 0) and the
coverage of the STL (index 1)
Raises:
SystemExit: If HDL sources cannot be compiled (if specified) or
if logic simulation cannot be performed.
TimeoutError: if logic or fault simulation timed out.
"""

vc_zoix = self.vc_zoix
fault_report = self.fsim_report

test_application_time = list()

compile_assembly(*self.assembly_compilation_instructions)

if self.zoix_compilation_args:

comp = vc_zoix.compile_sources(*self.zoix_compilation_args)

if comp == zoix.Compilation.ERROR:

log.critical("Unable to compile HDL sources!")
exit(1)

try:

lsim = vc_zoix.logic_simulate(*self.zoix_lsim_args,
**self.zoix_lsim_kwargs,
tat_value = test_application_time)

except zoix.LogicSimulationException:

log.critical("Unable to perform logic simulation for tat \
computation")
exit(1)

def run(self) -> None:
if lsim == zoix.LogicSimulation.TIMEOUT:

raise TimeoutError("Logic simulation timed out")


fsim = vc_zoix.fault_simulate(*self.zoix_fsim_args,
**self.zoix_fsim_kwargs)

if fsim == zoix.FaultSimulation.TIMEOUT:

raise TimeoutError("Fault simulation timed out")

fault_list = fault_report.parse_fault_report()

coverage = \
fault_report.compute_flist_coverage(fault_list,
self.sff_config,
self.coverage_formula)

return (test_application_time.pop(), coverage)

def run(self,
initial_stl_stats: tuple[int, float],
times_to_shuffle: int = 100) -> None:

# Step 1: Compute initial stats of the STL
initial_tat, coverage = initial_stl_stats
log.debug(f"Initial coverage {coverage}, TaT {initial_tat}")

vc_zoix = self.vc_zoix
fault_report = self.fsim_report

vc_zoix = zoix.ZoixInvoker()
vc_zoix.create_fcm_script(self.zoix_fcm_file, **self.zoix_fcm_kwargs)

# Keep a backup of all sources since
# they will be modified in-place.
zip_archive("../backup", *[ asm.get_asm_source() for asm in self.assembly_sources])
zip_archive("../backup", *[asm.get_asm_source() for asm
in self.assembly_sources])

all_instructions : list[asm.Codeline] = \
[ (asm_id, codeline) for asm_id, asm in enumerate(self.assembly_sources, start = 1) for codeline in asm.get_code() ]
[(asm_id, codeline) for asm_id, asm in
enumerate(self.assembly_sources) for codeline in asm.get_code()]

for i in range(100):
for i in range(times_to_shuffle):
random.shuffle(all_instructions)

# Step 0: Initial fault simulation for the
# computation of the fault coverage
compile_assembly(*self.assembly_compilation_instructions)
# Step 2: Select instructions in a random order
while len(all_instructions) != 0:

if self.zoix_compilation_args:
vc_zoix.compile_sources(*self.zoix_compilation_args)
asm_id, codeline = all_instructions.pop(0)
asm_source_file = self.assembly_sources[asm_id].get_asm_source()

print(f"Removing {codeline} of assembly source {asm_source_file}")

# Step 3: Removal of the selected instruction
handler = self.assembly_sources[asm_id]
handler.remove(codeline)

asm_compilation = \
compile_assembly(*self.assembly_compilation_instructions)

# Continue ... TODO
if not asm_compilation:

print(f"Assembly source {asm_source_file} does not compile \
after the removal of: {codeline}.")
continue

test_application_time = list()

if self.zoix_compilation_args:

comp = vc_zoix.compile_sources(*self.zoix_compilation_args)

if comp == zoix.Compilation.ERROR:

log.critical("Unable to compile HDL sources!")
exit(1)

try:

lsim = vc_zoix.logic_simulate(*self.zoix_lsim_args,
**self.zoix_lsim_kwargs,
tat_value = test_application_time)

except zoix.LogicSimulationException:

log.critical("Unable to perform logic simulation for tat \
computation")
exit(1)

if lsim == zoix.LogicSimulation.TIMEOUT:

raise TimeoutError("Logic simulation timed out")

# TODO: Continue here...

return True

def main():
"""Sandbox/Testing Env"""

a0_sources = [ "../sandbox/sbst_01/src/tests/test1.S", "../sandbox/sbst_02/src/tests/test1_short_div.S" ]
a0_sources = [ "../cv32e40p/sbst/tests/test1.S"]
a0_settings = {
"assembly_compilation_instructions" : ["make all"],
"fsim_fault_summary" : "mock_fault_summary",
"fsim_fault_report" : "mock_fault_report",
"vcs_compilation_instructions" : ["make all"],
"logic_simulation_instructions" : ["make vcs/lsim/gate/shell"],
"assembly_compilation_instructions" : ["make -C ../cv32e40p/sbst all"],
"fsim_fault_summary" : "../cv32e40p/run/vc-z01x/fsim_out_csv_files/DEFAULT_summary.csv",
"fsim_fault_report" : "../cv32e40p/run/vc-z01x/fsim_out_csv_files/DEFAULT_faultlist.csv",
"sff_config" : "../cv32e40p/fsim/config.sff",
"coverage_formula" : "(DD + DN)/(NA + DA + DN + DD + SU)",
"vcs_compilation_instructions" : [],
"logic_simulation_instructions" : ["make -C ../cv32e40p vcs/sim/gate/shell"],
"logic_simulation_options" : {
"timeout" : "120.0",
"success_regexp" : 're.compile(r"\$finish[^0-9]+([0-9]+)[m|u|n|p]s", re.DOTALL)',
"timeout" : "20.0",
"success_regexp" : 're.compile(r"test application time = ([0-9]+)", re.DOTALL)',
"tat_capture_group" : "1"
},
"fcm_file" : "fcm.tcl",
"fcm_file" : "../cv32e40p/fsim/fcm.tcl",
"fcm_options" : {
"set_config" : "-global_max_jobs 64",
"create_testcases" : '-name {"test1"} -exec ${::env(VCS_WORK_DIR)}/simv -args "./simv +firmware=${::env(FIRMWARE)}" -fsim_args "-fsim=fault+dictionary"',
"fsim" : "-verbose",
"report" : "-campaign cv32e40p -report fsim_out.rpt -overwrite",
"report" : "-campaign cv32e40p -report fsim_out_hier.rpt -overwrite -hierarchical 3"
"report" : "-campaign cv32e40p -report fsim_out -csv -overwrite",
},
"fault_simulation_instructions" : ["make vcs/fsim/gate/shell"],
"fault_simulation_instructions" : ["make -C ../cv32e40p vcs/fgen/saf", "make -C ../cv32e40p vcs/fsim/gate/shell"],
"fault_simulation_options" : {
"timeout" : "None",
"allow" : '[re.compile(r"Info\: Connected to started server")]'
}
}

isa = asm.ISA(pathlib.Path("../langs/riscv.isa"))

A = A0(isa, *a0_sources, **a0_settings)

A.run()
tat, coverage = A.pre_run()
A.run((tat, coverage))

if __name__ == "__main__":
main()

0 comments on commit 3df2811

Please sign in to comment.