Skip to content

Commit

Permalink
move config to variable, use gunicorn
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Nov 16, 2024
1 parent 36c996b commit 33dc721
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 33 deletions.
108 changes: 99 additions & 9 deletions punchpipe/cli.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,112 @@
import subprocess
import multiprocessing as mp
import os
import time
from datetime import datetime
from pathlib import Path

import click
from prefect import serve, flow, deploy
from prefect.variables import Variable

from .monitor.app import create_app
from punchpipe.controlsegment.launcher import launcher_flow
from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow
from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow
from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow
from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow
from punchpipe.monitor.app import create_app

THIS_DIR = os.path.dirname(__file__)
app = create_app()
server = app.server

@click.group
def main():
"""Run the PUNCH automated pipeline"""

def launch_monitor():
app = create_app()
app.run_server(debug=False, port=8051)

@flow
def my_flow():
print("Hello, Prefect!")


def serve_flows():
launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
description="Launch a pipeline segment.",
cron="* * * * *",

)

level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment",
description="Schedule a Level 1 flow.",
cron="* * * * *",
)
level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow",
description="Process a file from Level 0 to Level 1.")

level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment",
description="Schedule a Level 2 flow.",
cron="* * * * *",
)
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.")

levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment",
description="Schedule a Level Q flow.",
cron="* * * * *",
)
levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow",
description="Process files from Level 1 to Level Q.")

level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment",
description="Schedule a Level 3 flow to make PTM.",
cron="* * * * *",
)
level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow",
description="Process PTM files from Level 2 to Level 3.")

serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
levelq_scheduler_deployment, levelq_process_deployment,
level3_PTM_scheduler_deployment, level3_PTM_process_deployment,
limit=1000
)


@main.command
def run():
@click.argument("configuration_path", type=click.Path(exists=True))
def run(configuration_path):
now = datetime.now()

configuration_path = str(Path(configuration_path).resolve())
output_path = f"punchpipe_{now.strftime("%Y%m%d_%H%M%S")}.txt"

print()
print(f"Launching punchpipe at {now} with configuration: {configuration_path}")
print(f"Terminal logs from punchpipe are in {output_path}.")
print("punchpipe Prefect flows must be stopped manually in Prefect.")
print("Launching Prefect dashboard on http://localhost:4200/.")
print("Launching punchpipe monitor on http://localhost:8051/.")
subprocess.Popen(["prefect", "server", "start"])
print("\npunchpipe Prefect flows must be stopped manually in Prefect.")
mp.Process(target=launch_monitor, args=()).start()
print("Use ctrl-c to exit.")

with open(output_path, "w") as f:
prefect_process = subprocess.Popen(["prefect", "server", "start"],
stdout=f, stderr=subprocess.STDOUT)
monitor_process = subprocess.Popen(["gunicorn",
"-b", "0.0.0.0:8051",
"--chdir", THIS_DIR,
"cli:server"],
stdout=f, stderr=subprocess.STDOUT)
time.sleep(3)
Variable.set("punchpipe_config", configuration_path, overwrite=True)
serve_flows()

try:
prefect_process.wait()
monitor_process.wait()
except Exception:
prefect_process.terminate()
monitor_process.terminate()

print()
print("punchpipe shut down.")
4 changes: 3 additions & 1 deletion punchpipe/controlsegment/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime, timedelta

from prefect import flow, get_run_logger, task
from prefect.variables import Variable
from prefect.client import get_client
from sqlalchemy import and_
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -99,7 +100,8 @@ async def launcher_flow(pipeline_configuration_path="config.yaml"):
"""
logger = get_run_logger()

pipeline_config = load_pipeline_configuration(pipeline_configuration_path)
config_path = await Variable.get("punchpipe_config", "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration(config_path)

logger.info("Establishing database connection")
session = get_database_session()
Expand Down
4 changes: 3 additions & 1 deletion punchpipe/controlsegment/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from prefect.variables import Variable

from punchpipe.controlsegment.db import File, FileRelationship
from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration, update_file_state


def generic_scheduler_flow_logic(
query_ready_files_func, construct_child_file_info, construct_child_flow_info, pipeline_config_path, session=None
):
# load pipeline configuration
pipeline_config = load_pipeline_configuration(pipeline_config_path)

max_start = pipeline_config['scheduler']['max_start']

# get database connection
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion punchpipe/controlsegment/tests/test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_count_running_flows(db):


def test_escalate_long_waiting_flows(db):
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config_path = os.path.join(TEST_DIR, "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)

with freeze_time(datetime(2023, 2, 2, 0, 0, 0)) as frozen_datetime:
Expand Down
4 changes: 2 additions & 2 deletions punchpipe/controlsegment/tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def empty_core_flow():
return []

@flow
def empty_flow(flow_id: int, pipeline_config_path=TESTDATA_DIR+"/config.yaml", session=None):
def empty_flow(flow_id: int, pipeline_config_path=TESTDATA_DIR+"/punchpipe_config.yaml", session=None):
generic_process_flow_logic(flow_id, empty_core_flow, pipeline_config_path, session=session)


Expand Down Expand Up @@ -131,7 +131,7 @@ def normal_core_flow():


@flow
def normal_flow(flow_id: int, pipeline_config_path=TESTDATA_DIR+"/config.yaml", session=None):
def normal_flow(flow_id: int, pipeline_config_path=TESTDATA_DIR+"/punchpipe_config.yaml", session=None):
generic_process_flow_logic(flow_id, normal_core_flow, pipeline_config_path, session=session)


Expand Down
5 changes: 4 additions & 1 deletion punchpipe/controlsegment/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import yaml
from ndcube import NDCube
from prefect import task
from prefect.variables import Variable
from prefect_sqlalchemy import SqlAlchemyConnector
from punchbowl.data import get_base_file_name, write_ndcube_to_fits
from sqlalchemy import or_
Expand All @@ -28,7 +29,9 @@ def update_file_state(session, file_id, new_state):


@task
def load_pipeline_configuration(path: str) -> dict:
def load_pipeline_configuration(path: str = None) -> dict:
if path is None:
path = Variable.get("punchpipe_config", "punchpipe_config.yaml")
with open(path) as f:
config = yaml.load(f, Loader=FullLoader)
# TODO: add validation
Expand Down
3 changes: 1 addition & 2 deletions punchpipe/flows/level0.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ def level0_construct_flow_info():


@flow
def level0_scheduler_flow(pipeline_config_path="config.yaml", session=None):
def level0_scheduler_flow(session=None):
generic_scheduler_flow_logic(
level0_query_ready_files,
level0_construct_file_info,
level0_construct_flow_info,
pipeline_config_path,
session=session,
)
36 changes: 27 additions & 9 deletions punchpipe/flows/level3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@ def get_valid_starfields(session, f: File, timedelta_window: timedelta):
f.date_obs <= valid_star_end)).all())


def get_valid_fcorona_models(session, f: File, timedelta_window: timedelta):
valid_fcorona_start, valid_fcorona_end = f.date_obs - timedelta_window, f.date_obs +timedelta_window
def get_valid_fcorona_models(session, f: File, before_timedelta: timedelta, after_timedelta: timedelta):
valid_fcorona_start, valid_fcorona_end = f.date_obs - before_timedelta, f.date_obs + after_timedelta
return (session.query(File).filter(File.state == "created").filter(File.level == 2)
.filter(File.file_type == 'PF').filter(File.observatory == 'M')
.filter(and_(f.date_obs >= valid_fcorona_start,
f.date_obs <= valid_fcorona_end)).all())


def get_closest_file(f_target: File, f_others: list[File]) -> File:
return f_others[0]
return min(f_others, key=lambda o: abs((f_target.date_obs - o.date_obs).total_seconds()))

def get_closest_before_file(f_target: File, f_others: list[File]) -> File:
return get_closest_file(f_target, [o for o in f_others if f_target.date_obs >= o.date_obs])

def get_closest_after_file(f_target: File, f_others: list[File]) -> File:
return get_closest_file(f_target, [o for o in f_others if f_target.date_obs <= o.date_obs])

@task
def level3_PTM_query_ready_files(session, pipeline_config: dict):
Expand All @@ -48,17 +53,24 @@ def level3_PTM_query_ready_files(session, pipeline_config: dict):
valid_starfields = get_valid_starfields(session, f, timedelta_window=timedelta(days=14))

# TODO put magic numbers in config
valid_fcorona_models = get_valid_fcorona_models(session, f, timedelta_window=timedelta(days=3))

if len(valid_fcorona_models) >= 1 and len(valid_starfields) >= 1:
valid_before_fcorona_models = get_valid_fcorona_models(session, f,
before_timedelta=timedelta(days=3),
after_timedelta=timedelta(days=0))
valid_after_fcorona_models = get_valid_fcorona_models(session, f,
before_timedelta=timedelta(days=0),
after_timedelta=timedelta(days=3))

if (len(valid_before_fcorona_models) >= 1
and len(valid_after_fcorona_models) >= 1
and len(valid_starfields) >= 1):
actually_ready_files.append(f)
logger.info(f"{len(actually_ready_files)} Level 3 PTM files have necessary calibration data.")

return [[f.file_id] for f in actually_ready_files]


@task
def level3_PTM_construct_flow_info(level2_files: File, level3_file: File, pipeline_config: dict, session=None):
def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None):
session = get_database_session() # TODO: replace so this works in the tests by passing in a test

flow_type = "level3_PTM_process_flow"
Expand All @@ -72,10 +84,16 @@ def level3_PTM_construct_flow_info(level2_files: File, level3_file: File, pipeli
for level2_file in level2_files
],
# TODO put magic numbers in config
"f_corona_model_path": get_closest_file(level2_files[0],
"before_f_corona_model_path": get_closest_before_file(level2_files[0],
get_valid_fcorona_models(session,
level2_files[0],
before_timedelta=timedelta(days=3),
after_timedelta=timedelta(days=3))),
"after_f_corona_model_path": get_closest_after_file(level2_files[0],
get_valid_fcorona_models(session,
level2_files[0],
timedelta_window=timedelta(days=3))),
before_timedelta=timedelta(days=3),
after_timedelta=timedelta(days=3))),
# TODO put magic numbers in config
"starfield_background_path": get_closest_file(level2_files[0],
get_valid_starfields(session,
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions punchpipe/flows/tests/test_level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_query_ready_files(db):


def test_level1_construct_file_info():
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config_path = os.path.join(TEST_DIR, "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)
level0_file = [File(level="0",
file_type='PM',
Expand All @@ -91,7 +91,7 @@ def test_level1_construct_file_info():


def test_level1_construct_flow_info(db):
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config_path = os.path.join(TEST_DIR, "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)
level0_file = [File(level="0",
file_type='PM',
Expand All @@ -110,7 +110,7 @@ def test_level1_construct_flow_info(db):


def test_level1_scheduler_flow(db):
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config_path = os.path.join(TEST_DIR, "punchpipe_config.yaml")
with prefect_test_harness():
level1_scheduler_flow(pipeline_config_path, db)
results = db.query(Flow).where(Flow.state == 'planned').all()
Expand Down
6 changes: 3 additions & 3 deletions punchpipe/flows/tests/test_level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_level2_query_ready_files(db):


def test_level2_construct_file_info():
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config_path = os.path.join(TEST_DIR, "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)

level1_file = [File(level=0,
Expand All @@ -75,7 +75,7 @@ def test_level2_construct_file_info():


def test_level2_construct_flow_info():
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config_path = os.path.join(TEST_DIR, "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)
level1_file = [File(level="1",
file_type='XX',
Expand All @@ -94,7 +94,7 @@ def test_level2_construct_flow_info():


def test_level2_scheduler_flow(db):
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config_path = os.path.join(TEST_DIR, "punchpipe_config.yaml")
with prefect_test_harness():
level2_scheduler_flow(pipeline_config_path, db)
results = db.query(Flow).where(Flow.state == 'planned').all()
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ dependencies = [
"pyyaml",
"click",
"pylibjpeg[libjpeg]",
"psutil"
"psutil",
"gunicorn"
]
requires-python = ">=3.11"
authors = [
Expand Down

0 comments on commit 33dc721

Please sign in to comment.