Skip to content

Commit

Permalink
Updates for V4 RFR2 (#89)
Browse files Browse the repository at this point in the history
* check that the reference_time is always set

* adds config to test

* force full traceback printing

* make sure both psf and quartic models exist

* don't warn if no l0 have run

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixing reference time for PIM

* change sleep time

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jmbhughes and pre-commit-ci[bot] authored Dec 19, 2024
1 parent 1b3be82 commit 402221b
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 21 deletions.
4 changes: 3 additions & 1 deletion punchpipe/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import time
import argparse
import traceback
import subprocess
from pathlib import Path
from datetime import datetime
Expand Down Expand Up @@ -196,7 +197,7 @@ def run(configuration_path):
try:
prefect_process = subprocess.Popen(["prefect", "server", "start"],
stdout=f, stderr=subprocess.STDOUT)
time.sleep(5)
time.sleep(10)
monitor_process = subprocess.Popen(["gunicorn",
"-b", "0.0.0.0:8050",
"--chdir", THIS_DIR,
Expand All @@ -219,6 +220,7 @@ def run(configuration_path):
print("punchpipe safely shut down.")
except Exception as e:
print(f"Received error: {e}")
print(traceback.format_exc())
prefect_process.terminate()
time.sleep(5)
monitor_process.terminate()
Expand Down
4 changes: 2 additions & 2 deletions punchpipe/control/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime

from punchpipe.control.db import File, FileRelationship
from punchpipe.control.util import get_database_session, load_pipeline_configuration, update_file_state
Expand All @@ -6,13 +7,12 @@
def generic_scheduler_flow_logic(
query_ready_files_func, construct_child_file_info, construct_child_flow_info, pipeline_config_path,
update_input_file_state=True, new_input_file_state="progressed",
session=None, reference_time=None,
session=None, reference_time: datetime | None = None,
):
pipeline_config = load_pipeline_configuration(pipeline_config_path)

max_start = pipeline_config['scheduler']['max_start']

# get database connection
if session is None:
session = get_database_session()

Expand Down
17 changes: 7 additions & 10 deletions punchpipe/flows/fcorona.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import typing as t
from datetime import datetime, timedelta

from dateutil.parser import parse as parse_datetime_str
from prefect import flow, get_run_logger, task
from punchbowl.level3.f_corona_model import construct_polarized_f_corona_model

Expand All @@ -15,9 +14,7 @@


@task
def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 50,
reference_time: datetime =None):
reference_time = reference_time or datetime.now()
def f_corona_background_query_ready_files(session, pipeline_config: dict, reference_time: datetime, use_n: int = 50):
before = reference_time - timedelta(weeks=2)
after = reference_time + timedelta(weeks=2)

Expand All @@ -40,8 +37,9 @@ def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n:
def construct_f_corona_background_flow_info(level3_files: list[File],
level3_f_model_file: File,
pipeline_config: dict,
session=None,
reference_time: datetime = None):
reference_time: datetime,
session=None
):
flow_type = "construct_f_corona_background_process_flow"
state = "planned"
creation_time = datetime.now()
Expand All @@ -67,7 +65,7 @@ def construct_f_corona_background_flow_info(level3_files: list[File],

@task
def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict,
reference_time:datetime = None) -> t.List[File]:
reference_time: datetime) -> t.List[File]:
return [File(
level="3",
file_type="PF",
Expand All @@ -79,9 +77,8 @@ def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline
),]

@flow
def f_corona_scheduler(pipeline_config_path=None, session=None, reference_time: datetime = None):
if not isinstance(reference_time, datetime):
reference_time = parse_datetime_str(reference_time)
def f_corona_scheduler(pipeline_config_path=None, session=None, reference_time: datetime | None = None):
reference_time = reference_time or datetime.now()

generic_scheduler_flow_logic(
f_corona_background_query_ready_files,
Expand Down
2 changes: 1 addition & 1 deletion punchpipe/flows/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def level1_query_ready_files(session, pipeline_config: dict, reference_time=None
actually_ready = []
for f in ready:
if (get_psf_model_path(f, pipeline_config, session=session) is not None
and get_psf_model_path(f, pipeline_config, session=session) is not None):
and get_quartic_model_path(f, pipeline_config, session=session) is not None):
actually_ready.append([f.file_id])
return actually_ready

Expand Down
4 changes: 3 additions & 1 deletion punchpipe/flows/level3.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config:


@flow
def level3_PIM_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None):
def level3_PIM_scheduler_flow(pipeline_config_path: str | None = None,
session=None,
reference_time: datetime | None = None):
generic_scheduler_flow_logic(
level3_PIM_query_ready_files,
level3_PIM_construct_file_info,
Expand Down
12 changes: 8 additions & 4 deletions punchpipe/flows/starfield.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


@task
def starfield_background_query_ready_files(session, pipeline_config: dict, reference_time: datetime =None):
def starfield_background_query_ready_files(session, pipeline_config: dict, reference_time: datetime):
logger = get_run_logger()
all_ready_files = (session.query(File)
.filter(File.state == "created")
Expand All @@ -31,7 +31,8 @@ def starfield_background_query_ready_files(session, pipeline_config: dict, refer
def construct_starfield_background_flow_info(level3_fcorona_subtracted_files: list[File],
level3_starfield_model_file: File,
pipeline_config: dict,
session=None, reference_time: datetime =None):
reference_time: datetime,
session=None ):
flow_type = "construct_starfield_background_process_flow"
state = "planned"
creation_time = datetime.now()
Expand All @@ -56,7 +57,8 @@ def construct_starfield_background_flow_info(level3_fcorona_subtracted_files: li


@task
def construct_starfield_background_file_info(level3_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]:
def construct_starfield_background_file_info(level3_files: t.List[File], pipeline_config: dict,
reference_time: datetime) -> t.List[File]:
return [File(
level="3",
file_type="PS",
Expand All @@ -70,7 +72,9 @@ def construct_starfield_background_file_info(level3_files: t.List[File], pipelin


@flow
def starfield_scheduler_flow(pipeline_config_path=None, session=None, reference_time: datetime=None):
def starfield_scheduler_flow(pipeline_config_path=None, session=None, reference_time: datetime | None = None):
reference_time = reference_time or datetime.now()

generic_scheduler_flow_logic(
starfield_background_query_ready_files,
construct_starfield_background_file_info,
Expand Down
3 changes: 2 additions & 1 deletion punchpipe/flows/tests/test_level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def session_fn(session):


def test_query_ready_files(db):
pipeline_config = {}
pipeline_config_path = os.path.join(TEST_DIR, "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)
ready_file_ids = level1_query_ready_files.fn(db, pipeline_config)
assert len(ready_file_ids) == 1

Expand Down
2 changes: 1 addition & 1 deletion punchpipe/monitor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def update_cards(n):
num_l0_success = df['SUM(num_images_succeeded)'].sum()
num_l0_fails = df['SUM(num_images_failed)'].sum()
l0_fraction = num_l0_success / (1 + num_l0_success + num_l0_fails) # add one to avoid div by 0 errors
if l0_fraction > 0.95:
if l0_fraction > 0.95 or (num_l0_success + num_l0_fails) == 0:
l0_status = f"Good ({num_l0_success} : {num_l0_fails})"
l0_color = "success"
else:
Expand Down

0 comments on commit 402221b

Please sign in to comment.