Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check running jobs before feature generation job submission #528

Merged
merged 9 commits into from
Jan 10, 2024
60 changes: 51 additions & 9 deletions tools/generate_features_job_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd
import numpy as np
import yaml
import subprocess


BASE_DIR = pathlib.Path(__file__).parent.parent.absolute()
Expand Down Expand Up @@ -96,6 +97,30 @@ def parse_commandline():
return args


def filter_running(user):
command = f"squeue -u {user}"
result = subprocess.run(command, shell=True, capture_output=True, text=True)

running_jobs_count = 0
if result.returncode == 0:
running_jobs = result.stdout.splitlines()
# squeue output will always have 1 line for header
if len(running_jobs) > 1:
running_jobs = [x.strip().split() for x in running_jobs[1:]]
for job in running_jobs:
job_name = job[2]
if "ztf_fg" in job_name:
running_jobs_count += 1
else:
print("Error executing the command. Exit code:", result.returncode)
print("Error output:", result.stderr)
raise ValueError()

print(f"Identified {running_jobs_count} running jobs.")

return running_jobs_count


def filter_completed(df, resultsDir, filename, reset_running=False):

start_time = time.time()
Expand Down Expand Up @@ -193,9 +218,14 @@ def run_job(


if __name__ == '__main__':
# Start with 60s delay to allow previous submission job to conclude (esp. if running as cron job)
time.sleep(60)

# Parse command line
args = parse_commandline()

running_jobs_count = filter_running(args.user)

dir_path = os.path.dirname(os.path.realpath(__file__))

filename = args.filename
Expand Down Expand Up @@ -237,12 +267,13 @@ def run_job(
print('%d jobs remaining to queue...' % nchoice)

if args.doSubmit:
counter = 0
status_njobs = njobs
failure_count = 0
counter = running_jobs_count
status_njobs = len(df_to_complete)
diff_njobs = 0
# Redefine max instances if fewer jobs remain
new_max_instances = np.min([args.max_instances, nchoice])
size = new_max_instances
size = new_max_instances - counter
final_round = False
if size == nchoice:
final_round = True
Expand Down Expand Up @@ -271,10 +302,10 @@ def run_job(
print(
'Run "squeue -u <username>" to check status of remaining jobs.'
)
print(f"{failure_count} jobs failed during full run.")
break
else:
# Wait between status checks
os.system(f"squeue -u {args.user}")
print(f"Waiting {args.wait_time_minutes} minutes until next check...")
time.sleep(args.wait_time_minutes * 60)

Expand All @@ -287,19 +318,30 @@ def run_job(
print('%d jobs remaining to complete...' % njobs)
print('%d jobs remaining to queue...' % nchoice)

# Compute difference in njobs to count available instances
diff_njobs = status_njobs - njobs
status_njobs = njobs
running_jobs_count = filter_running(args.user)

failed_this_round = 0
n_jobs_diff = counter - running_jobs_count
n_jobs_finished = status_njobs - njobs
if n_jobs_finished != n_jobs_diff:
failed_this_round = np.abs(n_jobs_finished - n_jobs_diff)
failure_count += failed_this_round

# Decrease counter if jobs have finished
counter -= diff_njobs
status_njobs = njobs
counter = running_jobs_count
# Note that if a job has failed, it will not be re-queued until
# its quadrant's .running file is removed (or set --reset_running)

# Define size of the next quadrant_indices array
size = np.min([new_max_instances - counter, nchoice])
# Signal to stop looping when the last set of jobs is queued
if size == nchoice:
final_round = True

print(
f"Detected {failed_this_round} failed jobs this round ({failure_count} total failures)."
)

elif args.doSubmitLoop:
confirm = input(
"Warning: setting --doSubmitLoop ignores limits on number of jobs to submit. Continue? (yes/no): "
Expand Down
4 changes: 2 additions & 2 deletions tools/generate_features_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def check_quads_for_sources(
parser.add_argument(
"--job_name",
type=str,
default='generate_features',
default='ztf_fg',
help="job name",
)
parser.add_argument(
Expand Down Expand Up @@ -641,7 +641,7 @@ def check_quads_for_sources(
# (Python code can also be run interactively)
fid = open(os.path.join(slurmDir, 'slurm_submission.sub'), 'w')
fid.write('#!/bin/bash\n')
fid.write(f'#SBATCH --job-name={args.job_name}_submit.job\n')
fid.write('#SBATCH --job-name=submit_jobs.job\n')
fid.write(f'#SBATCH --output=../logs/{args.job_name}_submit_%A_%a.out\n')
fid.write(f'#SBATCH --error=../logs/{args.job_name}_submit_%A_%a.err\n')
fid.write(f'#SBATCH -p {args.submit_partition_type}\n')
Expand Down