diff --git a/controller.py b/controller.py index 1495035..4eb5392 100755 --- a/controller.py +++ b/controller.py @@ -19,8 +19,9 @@ import argparse import hashlib import logging -from multiprocessing import Manager, Process +from multiprocessing import Manager, Process, Value from pathlib import Path +import signal import subprocess import sys import tables @@ -47,6 +48,24 @@ clogger = logging.getLogger(__name__) +stop_signal_received = Value("i", 0) + + +def signal_handler(signum, frame): + global stop_signal_received + stop_signal_received.value = 1 + + +def register_signal_handlers(): + signal.signal( + signal.SIGTERM, + signal_handler, + ) + signal.signal( + signal.SIGINT, + signal_handler, + ) + def build_ranges_dict(fault_dict): """ @@ -580,6 +599,7 @@ def controller( hdf5mode, queue_output, len(faultlist), + stop_signal_received, compressionlevel, logger_postprocess, log_config, @@ -610,9 +630,26 @@ def controller( continue goldenrun_data[keyword] = pd.DataFrame(goldenrun_data[keyword]) - pbar = tqdm(total=len(faultlist), desc="Simulating faults", disable=not len(faultlist)) + # Handlers are used for a graceful exit, in case of a signal + register_signal_handlers() + + pbar = tqdm( + total=len(faultlist), desc="Simulating faults", disable=not len(faultlist) + ) itter = 0 while 1: + if stop_signal_received.value == 1: + clogger.info( + "Stop signal received, finishing the current write operation..." + ) + + p_logger.join() + + for p in p_list: + p["process"].kill() + + break + if len(p_list) == 0 and itter == len(faultlist): clogger.debug("Done inserting qemu jobs") break diff --git a/hdf5logger.py b/hdf5logger.py index d4b337c..d2568ee 100644 --- a/hdf5logger.py +++ b/hdf5logger.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import signal import logging import time @@ -25,6 +26,14 @@ logger = logging.getLogger(__name__) +def register_signal_handlers(): + """ + Ignore signals, they will be handled by the controller.py anyway + """ + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Tables for storing the elements from queue class translation_block_exec_table(tables.IsDescription): tb = tables.UInt64Col() @@ -431,7 +440,7 @@ def process_config(f, configgroup, exp, myfilter): endtable.close() -def process_backup(f, configgroup, exp, myfilter): +def process_backup(f, configgroup, exp, myfilter, stop_signal): process_config(f, configgroup, exp["config"], myfilter) fault_expanded_group = f.create_group( @@ -444,6 +453,9 @@ def process_backup(f, configgroup, exp, myfilter): for exp_number in tqdm( range(len(exp["expanded_faultlist"])), desc="Creating backup" ): + if stop_signal.value == 1: + break + exp_group = f.create_group( fault_expanded_group, exp_name.format(exp_number), "Group containing faults" ) @@ -463,12 +475,15 @@ def hdf5collector( mode, queue_output, num_exp, + stop_signal, compressionlevel, logger_postprocess=None, log_goldenrun=True, log_config=False, overwrite_faults=False, ): + register_signal_handlers() + prctl.set_name("logger") prctl.set_proctitle("logger") f = tables.open_file(hdf5path, mode, max_group_width=65536) @@ -492,6 +507,8 @@ def hdf5collector( n._f_remove(recursive=True) while num_exp > 0 or log_goldenrun or log_pregoldenrun or log_config: + if stop_signal.value == 1: + break # readout queue and get next output from qemu. Will block exp = queue_output.get() t1 = time.time() @@ -537,7 +554,7 @@ def hdf5collector( "/", "Backup", "Group containing backup and run information" ) - process_backup(f, exp_group, exp, myfilter) + process_backup(f, exp_group, exp, myfilter, stop_signal) log_config = False continue else: