Skip to content

Commit

Permalink
simulate.py: Hot-fix to leakage of infinite processes
Browse files Browse the repository at this point in the history
  • Loading branch information
colluca committed Nov 24, 2023
1 parent 523e549 commit 1b0786c
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 112 deletions.
3 changes: 2 additions & 1 deletion target/snitch_cluster/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
/work-vsim/
/work-vlt/
/work-vcs/
/*.log
/*.log
/runs/
253 changes: 142 additions & 111 deletions util/sim/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
# TODO colluca: timeout feature

import argparse
import multiprocessing
from pathlib import Path
import subprocess
from termcolor import colored, cprint
import os
import signal
import re
import sys
import time
import yaml


BANSHEE_CFG = 'src/banshee.yaml'
LOG_FILE = 'run.txt'

# Tool settings
SIMULATORS = ['vsim', 'banshee', 'verilator', 'vcs', 'other']
Expand All @@ -38,6 +39,10 @@
'vcs': '{sim_bin} {elf}'
}

# Globals
running_tests = []
terminated = False


def parser():
# Argument parsing
Expand All @@ -57,6 +62,12 @@ def parser():
action='store',
nargs='?',
help='Override default path to simulator binary')
parser.add_argument(
'--run-dir',
action='store',
default='runs',
nargs='?',
help='Parent directory of each test run directory')
parser.add_argument(
'--dry-run',
action='store_true',
Expand Down Expand Up @@ -84,6 +95,19 @@ def parser():
return parser


# Handle SIGTERM signal and forward it to the process group.
# To avoid recursion, forward signal to group only if it's
# the first time receiving the signal.
def sigterm_handler(sig, frame):
global terminated
if not terminated:
terminated = True
print('SIGTERM signal received')
pgid = os.getpgid(0)
print(f'Forwarding SIGTERM to process group {pgid}')
os.killpg(pgid, signal.SIGTERM)


# Get tests from a test list file
def get_tests(testlist_path):
testlist_path = Path(testlist_path).absolute()
Expand All @@ -103,80 +127,48 @@ def multiple_processes(args):
return args.n_procs != 1


def run_simulation(cmd, simulator, test, quiet=False):
# Defaults
result = 1
log = ''

# Spawn simulation subprocess
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)

# Poll simulation subprocess and log its output
while p.poll() is None:
line = p.stdout.readline()
log += line
if not quiet:
print(line, end='', flush=True)

# When simulating with vsim or vcs, we need to parse the simulation
# log to catch the application's return code
if simulator in ['vsim', 'vcs']:
# Capture success
regex_success = r'\[SUCCESS\] Program finished successfully'
match_success = re.search(regex_success, line)
if match_success:
result = 0
else:
regex_fail = r'\[FAILURE\] Finished with exit code\s+(\d+)'
match = re.search(regex_fail, line)
if match:
exit_code = match.group(1)
result = check_exit_code(test, exit_code)

# Check if the subprocess terminated correctly
exit_code = p.poll()
# In Banshee and Verilator the exit code of the Snitch binary is returned
# through the exit code of the simulation command
if simulator in ['banshee', 'verilator']:
result = check_exit_code(test, exit_code)
# For custom commands the return code is that of the command
elif simulator == 'other':
result = exit_code
# For standard simulation commands the simulated Snitch binary exit
# code is overriden only if the simulator failed
else:
if exit_code != 0:
result = exit_code

return result, log
def get_run_dir(test, args):
run_dir = Path.cwd()
if 'rundir' in test:
run_dir = test['rundir']
elif args.run_dir:
run_dir = Path(args.run_dir) / Path(test['elf']).stem
return run_dir


def run_test(test, args):
def launch_simulation(test, args):
# Extract args
simulator = args.simulator
dry_run = args.dry_run
testlist = args.testlist
quiet = multiple_processes(args)

# Simulator binary can be overriden on the command-line or test-wise
sim_bin = SIMULATOR_BINS[simulator]
if args.sim_bin:
sim_bin = args.sim_bin
if 'sim_bin' in test:
sim_bin = test['sim_bin']

# Check if simulator is supported for this test
if 'simulators' in test:
if simulator not in test['simulators']:
return (0, '')
# Make path to sim_bin absolute such that it can be run from a different directory than the
# current working directory
sim_bin = Path.cwd() / sim_bin

# Construct path to executable
elf = Path(test['elf'])
if testlist:
elf = Path(testlist).absolute().parent / elf
cprint(f'Run test {colored(elf, "cyan")}', attrs=["bold"])

# Check if the simulation should be run in a specific directory.
# This is useful, e.g. to preserve the logs of multiple simulations
# which are executed in parallel
run_dir = get_run_dir(test, args)
os.makedirs(run_dir, exist_ok=True)

# Check if simulator is supported for this test
if 'simulators' in test:
if simulator not in test['simulators']:
return None

# Construct simulation command (override only supported for RTL)
if 'cmd' in test and simulator != 'banshee':
cmd = test['cmd']
Expand All @@ -186,27 +178,79 @@ def run_test(test, args):
cmd = SIMULATOR_CMDS[simulator]
cmd = cmd.format(sim_bin=sim_bin, elf=elf)

# Check if the simulation should be run in a specific directory.
# This is useful, e.g. to preserve the logs of multiple simulations
# which are executed in parallel
if 'rundir' in test:
cmd = f'cd {test["rundir"]} && {cmd}'
if not quiet or args.verbose:
print(f'$ {cmd}', flush=True)

# Run simulation
result = 0
log = ''
print(f'$ {cmd}', flush=True)
if not dry_run:
result, log = run_simulation(cmd, simulator, test, quiet)
# Create file for stdout
stdout_path = run_dir / LOG_FILE
stdout = open(stdout_path, 'w')
# TODO create args directly not from string
args = cmd.split()
# Make path to sim_bin absolute such that it can be run from a different directory than the
# current working directory
args[0] = Path.cwd() / args[0]
p = subprocess.Popen(args, stdout=stdout, stderr=subprocess.STDOUT,
cwd=run_dir, universal_newlines=True)
return p


def check_simulation(test, args, proc):
elf = test['elf']
result = 1

# Check if simulator is supported for this test
simulator = args.simulator
if 'simulators' in test:
if simulator not in test['simulators']:
return 0

# Construct simulation command (override only supported for RTL)
if 'cmd' in test and simulator != 'banshee':
simulator = 'other'

# Read simulation log
run_dir = get_run_dir(test, args)
log_path = Path(run_dir) / LOG_FILE
with open(log_path, 'r') as log:
for line in log.readlines():

# When simulating with vsim or vcs, we need to parse the simulation
# log to catch the application's return code
if simulator in ['vsim', 'vcs']:
# Capture success
regex_success = r'\[SUCCESS\] Program finished successfully'
match_success = re.search(regex_success, line)
if match_success:
result = 0
else:
regex_fail = r'\[FAILURE\] Finished with exit code\s+(\d+)'
match = re.search(regex_fail, line)
if match:
exit_code = match.group(1)
result = check_exit_code(test, exit_code)

# Check if the subprocess terminated correctly
exit_code = proc.returncode
# In Banshee and Verilator the exit code of the Snitch binary is returned
# through the exit code of the simulation command
if simulator in ['banshee', 'verilator']:
result = check_exit_code(test, exit_code)
# For custom commands the return code is that of the command
elif simulator == 'other':
result = exit_code
# For standard simulation commands the simulated Snitch binary exit
# code is overriden only if the simulator failed
else:
if exit_code != 0:
result = exit_code

# Report failure or success
if result != 0:
cprint(f'{elf} test failed', 'red', attrs=['bold'], flush=True)
else:
cprint(f'{elf} test passed', 'green', attrs=['bold'], flush=True)

return (result, log)
return result


def print_failed_test(test):
Expand All @@ -225,50 +269,37 @@ def print_test_summary(failed_tests, args):


def run_tests(tests, args):

# Create a process Pool
with multiprocessing.Pool(args.n_procs) as pool:

# Create a shared object which parent and child processes can access
# concurrently to terminate the pool early as soon as one process fails
exit_early = multiprocessing.Value('B')
exit_early.value = 0

# Define callback for early exit
def completion_callback(return_value):
result = return_value[0]
log = return_value[1]
if args.early_exit and result != 0:
exit_early.value = 1
# Printing the log all at once here, rather than line-by-line
# in run_simulation, ensures that the logs of different processes
# are not interleaved in stdout.
# However, as we prefer line-by-line printing when a single process
# is used, we have to make sure we don't print twice.
if args.verbose and multiple_processes(args):
print(log)

# Queue tests to process pool
results = []
for test in tests:
result = pool.apply_async(run_test, args=(test, args), callback=completion_callback)
results.append(result)

# Wait for all tests to complete
running = range(len(tests))
while len(running) != 0 and not exit_early.value:
time.sleep(1)
running = [i for i in running if not results[i].ready()]

# Query test results
failed_tests = []
for test, result in zip(tests, results):
if result.ready() and result.get()[0] != 0:
failed_tests.append(test)

print_test_summary(failed_tests, args)

return len(failed_tests)
global running_tests

signal.signal(signal.SIGTERM, sigterm_handler)

# Spawn a process for every test, wait for all running tests to terminate and check results
n_failed_tests = 0
while len(tests) or len(running_tests):
# If there are still tests to run and there are less running tests than the maximum number
# of processes we are allowed to execute in parallel, spawn new test
if len(tests) and len(running_tests) < args.n_procs:
test = tests.pop(0)
running_tests.append({'test': test, 'proc': launch_simulation(test, args)})
# Remove completed tests from running tests list and move to completed tests list
idcs = [i for i, run in enumerate(running_tests) if args.dry_run or
run['proc'].poll() is not None]
completed_tests = [running_tests.pop(i) for i in sorted(idcs, reverse=True)]
# Check completed
for run in completed_tests:
result = check_simulation(run['test'], args, run['proc'])
n_failed_tests += (result != 0)
time.sleep(0.5)

# # Query test results
# failed_tests = []
# for test, result in zip(tests, results):
# if result.ready() and result.get()[0] != 0:
# failed_tests.append(test)
# print_test_summary(failed_tests, args)

print(f'{n_failed_tests} tests failed')
return n_failed_tests


def main():
Expand Down

0 comments on commit 1b0786c

Please sign in to comment.