From 5ccd6626f03e2603426c0a97fdd4437a6bad538a Mon Sep 17 00:00:00 2001 From: Brian Healy <42810347+bfhealy@users.noreply.github.com> Date: Wed, 10 Jan 2024 15:59:33 -0500 Subject: [PATCH] Check running jobs before feature generation job submission (#528) * Check running jobs before feature generation job submission * Rename default job names * Bugfix number of jobs submitted * Redefine size variable, create filter_running function * Try simpler way to decrement job counter * Detect and log failed jobs * Update status_njobs for failure counting * Define initial failed_this_round * Make failure count more descriptive --- tools/generate_features_job_submission.py | 60 +++++++++++++++++++---- tools/generate_features_slurm.py | 4 +- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/tools/generate_features_job_submission.py b/tools/generate_features_job_submission.py index 7ad6a507..25cc4bd6 100755 --- a/tools/generate_features_job_submission.py +++ b/tools/generate_features_job_submission.py @@ -6,6 +6,7 @@ import pandas as pd import numpy as np import yaml +import subprocess BASE_DIR = pathlib.Path(__file__).parent.parent.absolute() @@ -96,6 +97,30 @@ def parse_commandline(): return args +def filter_running(user): + command = f"squeue -u {user}" + result = subprocess.run(command, shell=True, capture_output=True, text=True) + + running_jobs_count = 0 + if result.returncode == 0: + running_jobs = result.stdout.splitlines() + # squeue output will always have 1 line for header + if len(running_jobs) > 1: + running_jobs = [x.strip().split() for x in running_jobs[1:]] + for job in running_jobs: + job_name = job[2] + if "ztf_fg" in job_name: + running_jobs_count += 1 + else: + print("Error executing the command. Exit code:", result.returncode) + print("Error output:", result.stderr) + raise ValueError() + + print(f"Identified {running_jobs_count} running jobs.") + + return running_jobs_count + + def filter_completed(df, resultsDir, filename, reset_running=False): start_time = time.time() @@ -193,9 +218,14 @@ def run_job( if __name__ == '__main__': + # Start with 60s delay to allow previous submission job to conclude (esp. if running as cron job) + time.sleep(60) + # Parse command line args = parse_commandline() + running_jobs_count = filter_running(args.user) + dir_path = os.path.dirname(os.path.realpath(__file__)) filename = args.filename @@ -237,12 +267,13 @@ def run_job( print('%d jobs remaining to queue...' % nchoice) if args.doSubmit: - counter = 0 - status_njobs = njobs + failure_count = 0 + counter = running_jobs_count + status_njobs = len(df_to_complete) diff_njobs = 0 # Redefine max instances if fewer jobs remain new_max_instances = np.min([args.max_instances, nchoice]) - size = new_max_instances + size = new_max_instances - counter final_round = False if size == nchoice: final_round = True @@ -271,10 +302,10 @@ def run_job( print( 'Run "squeue -u " to check status of remaining jobs.' ) + print(f"{failure_count} jobs failed during full run.") break else: # Wait between status checks - os.system(f"squeue -u {args.user}") print(f"Waiting {args.wait_time_minutes} minutes until next check...") time.sleep(args.wait_time_minutes * 60) @@ -287,12 +318,19 @@ def run_job( print('%d jobs remaining to complete...' % njobs) print('%d jobs remaining to queue...' % nchoice) - # Compute difference in njobs to count available instances - diff_njobs = status_njobs - njobs - status_njobs = njobs + running_jobs_count = filter_running(args.user) + + failed_this_round = 0 + n_jobs_diff = counter - running_jobs_count + n_jobs_finished = status_njobs - njobs + if n_jobs_finished != n_jobs_diff: + failed_this_round = np.abs(n_jobs_finished - n_jobs_diff) + failure_count += failed_this_round - # Decrease counter if jobs have finished - counter -= diff_njobs + status_njobs = njobs + counter = running_jobs_count + # Note that if a job has failed, it will not be re-queued until + # its quadrant's .running file is removed (or set --reset_running) # Define size of the next quadrant_indices array size = np.min([new_max_instances - counter, nchoice]) @@ -300,6 +338,10 @@ def run_job( if size == nchoice: final_round = True + print( + f"Detected {failed_this_round} failed jobs this round ({failure_count} total failures)." + ) + elif args.doSubmitLoop: confirm = input( "Warning: setting --doSubmitLoop ignores limits on number of jobs to submit. Continue? (yes/no): " diff --git a/tools/generate_features_slurm.py b/tools/generate_features_slurm.py index d6f47c46..81151662 100755 --- a/tools/generate_features_slurm.py +++ b/tools/generate_features_slurm.py @@ -315,7 +315,7 @@ def check_quads_for_sources( parser.add_argument( "--job_name", type=str, - default='generate_features', + default='ztf_fg', help="job name", ) parser.add_argument( @@ -641,7 +641,7 @@ def check_quads_for_sources( # (Python code can also be run interactively) fid = open(os.path.join(slurmDir, 'slurm_submission.sub'), 'w') fid.write('#!/bin/bash\n') - fid.write(f'#SBATCH --job-name={args.job_name}_submit.job\n') + fid.write('#SBATCH --job-name=submit_jobs.job\n') fid.write(f'#SBATCH --output=../logs/{args.job_name}_submit_%A_%a.out\n') fid.write(f'#SBATCH --error=../logs/{args.job_name}_submit_%A_%a.err\n') fid.write(f'#SBATCH -p {args.submit_partition_type}\n')