Skip to content

Commit

Permalink
Check running jobs before feature generation job submission (#528)
Browse files Browse the repository at this point in the history
* Check running jobs before feature generation job submission

* Rename default job names

* Bugfix number of jobs submitted

* Redefine size variable, create filter_running function

* Try simpler way to decrement job counter

* Detect and log failed jobs

* Update status_njobs for failure counting

* Define initial failed_this_round

* Make failure count more descriptive
  • Loading branch information
bfhealy authored Jan 10, 2024
1 parent aee212f commit 5ccd662
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 11 deletions.
60 changes: 51 additions & 9 deletions tools/generate_features_job_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd
import numpy as np
import yaml
import subprocess


BASE_DIR = pathlib.Path(__file__).parent.parent.absolute()
Expand Down Expand Up @@ -96,6 +97,30 @@ def parse_commandline():
return args


def filter_running(user):
command = f"squeue -u {user}"
result = subprocess.run(command, shell=True, capture_output=True, text=True)

running_jobs_count = 0
if result.returncode == 0:
running_jobs = result.stdout.splitlines()
# squeue output will always have 1 line for header
if len(running_jobs) > 1:
running_jobs = [x.strip().split() for x in running_jobs[1:]]
for job in running_jobs:
job_name = job[2]
if "ztf_fg" in job_name:
running_jobs_count += 1
else:
print("Error executing the command. Exit code:", result.returncode)
print("Error output:", result.stderr)
raise ValueError()

print(f"Identified {running_jobs_count} running jobs.")

return running_jobs_count


def filter_completed(df, resultsDir, filename, reset_running=False):

start_time = time.time()
Expand Down Expand Up @@ -193,9 +218,14 @@ def run_job(


if __name__ == '__main__':
# Start with 60s delay to allow previous submission job to conclude (esp. if running as cron job)
time.sleep(60)

# Parse command line
args = parse_commandline()

running_jobs_count = filter_running(args.user)

dir_path = os.path.dirname(os.path.realpath(__file__))

filename = args.filename
Expand Down Expand Up @@ -237,12 +267,13 @@ def run_job(
print('%d jobs remaining to queue...' % nchoice)

if args.doSubmit:
counter = 0
status_njobs = njobs
failure_count = 0
counter = running_jobs_count
status_njobs = len(df_to_complete)
diff_njobs = 0
# Redefine max instances if fewer jobs remain
new_max_instances = np.min([args.max_instances, nchoice])
size = new_max_instances
size = new_max_instances - counter
final_round = False
if size == nchoice:
final_round = True
Expand Down Expand Up @@ -271,10 +302,10 @@ def run_job(
print(
'Run "squeue -u <username>" to check status of remaining jobs.'
)
print(f"{failure_count} jobs failed during full run.")
break
else:
# Wait between status checks
os.system(f"squeue -u {args.user}")
print(f"Waiting {args.wait_time_minutes} minutes until next check...")
time.sleep(args.wait_time_minutes * 60)

Expand All @@ -287,19 +318,30 @@ def run_job(
print('%d jobs remaining to complete...' % njobs)
print('%d jobs remaining to queue...' % nchoice)

# Compute difference in njobs to count available instances
diff_njobs = status_njobs - njobs
status_njobs = njobs
running_jobs_count = filter_running(args.user)

failed_this_round = 0
n_jobs_diff = counter - running_jobs_count
n_jobs_finished = status_njobs - njobs
if n_jobs_finished != n_jobs_diff:
failed_this_round = np.abs(n_jobs_finished - n_jobs_diff)
failure_count += failed_this_round

# Decrease counter if jobs have finished
counter -= diff_njobs
status_njobs = njobs
counter = running_jobs_count
# Note that if a job has failed, it will not be re-queued until
# its quadrant's .running file is removed (or set --reset_running)

# Define size of the next quadrant_indices array
size = np.min([new_max_instances - counter, nchoice])
# Signal to stop looping when the last set of jobs is queued
if size == nchoice:
final_round = True

print(
f"Detected {failed_this_round} failed jobs this round ({failure_count} total failures)."
)

elif args.doSubmitLoop:
confirm = input(
"Warning: setting --doSubmitLoop ignores limits on number of jobs to submit. Continue? (yes/no): "
Expand Down
4 changes: 2 additions & 2 deletions tools/generate_features_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def check_quads_for_sources(
parser.add_argument(
"--job_name",
type=str,
default='generate_features',
default='ztf_fg',
help="job name",
)
parser.add_argument(
Expand Down Expand Up @@ -641,7 +641,7 @@ def check_quads_for_sources(
# (Python code can also be run interactively)
fid = open(os.path.join(slurmDir, 'slurm_submission.sub'), 'w')
fid.write('#!/bin/bash\n')
fid.write(f'#SBATCH --job-name={args.job_name}_submit.job\n')
fid.write('#SBATCH --job-name=submit_jobs.job\n')
fid.write(f'#SBATCH --output=../logs/{args.job_name}_submit_%A_%a.out\n')
fid.write(f'#SBATCH --error=../logs/{args.job_name}_submit_%A_%a.err\n')
fid.write(f'#SBATCH -p {args.submit_partition_type}\n')
Expand Down

0 comments on commit 5ccd662

Please sign in to comment.