Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ipc: Handle failing RTL simulations #97

Merged
merged 1 commit into from
Feb 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions target/common/test/SnitchSim.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# SPDX-License-Identifier: SHL-0.51
#
# Paul Scheffler <[email protected]>
# Luca Colagrande <[email protected]>
#
# This class implements a minimal wrapping IPC server for `tb_lib`.
# `__main__` shows a demonstrator for it, running a simulation and accessing its memory.
Expand All @@ -14,6 +15,12 @@
import subprocess
import struct
import functools
import threading
import time
import signal

# Simulation monitor polling period (in seconds)
SIM_MONITOR_POLL_PERIOD = 2


class SnitchSim:
Expand All @@ -38,16 +45,34 @@ def start(self):
# Open FIFOs
self.tx = open(tx_fd, 'wb', buffering=0) # Unbuffered
self.rx = open(rx_fd, 'rb')
# Create thread to monitor simulation
self.stop_sim_monitor = threading.Event()
self.sim_monitor = threading.Thread(target=self.__monitor_sim)
self.sim_monitor.start()

def __sim_active(func):
@functools.wraps(func)
def inner(self, *args, **kwargs):
if self.sim is None:
raise RuntimeError(f'Snitch is not running (simulation `{self.sim_bin}`'
f'binary `{self.snitch_bin}`)')
return func(self, *args, **kwargs)
# Catch SIGINT raised by simulation monitor
try:
return func(self, *args, **kwargs)
except KeyboardInterrupt:
print('Simulation monitor detected a simulation failure.')
self.stop_sim_monitor.set()
self.sim_monitor.join()
sys.exit(1)
return inner

def __monitor_sim(self):
while not self.stop_sim_monitor.is_set():
if self.sim.poll() is not None:
# Raise SIGINT to interrupt main thread, could be blocked on a read
os.kill(os.getpid(), signal.SIGINT)
time.sleep(SIM_MONITOR_POLL_PERIOD)

@__sim_active
def read(self, addr: int, length: int) -> bytes:
op = struct.pack('=QQQ', 0, addr, length)
Expand All @@ -73,15 +98,20 @@ def poll(self, addr: int, mask32: int, exp32: int):
bytestring = self.rx.read(4)
return int.from_bytes(bytestring, byteorder='little')

# Simulator can exit only once TX FIFO closes
@__sim_active
def finish(self, wait_for_sim: bool = True):
# Close FIFOs (simulator can exit only once TX FIFO closes)
self.rx.close()
self.tx.close()
# Close simulation monitor
self.stop_sim_monitor.set()
self.sim_monitor.join()
# Wait for simulation or terminate
if (wait_for_sim):
self.sim.wait()
else:
self.sim.terminate()
# Cleanup
self.tmpdir.cleanup()
self.sim = None

Expand Down
Loading