From 5346e4d828befe6c778f91a1d896e17c9c553d98 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 22 Oct 2024 22:58:43 -0600 Subject: [PATCH] make level q --- config.yaml | 8 ++++ deploy.py | 9 ++++ punchpipe/flows/level2.py | 4 +- punchpipe/flows/levelq.py | 92 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 punchpipe/flows/levelq.py diff --git a/config.yaml b/config.yaml index a0b48d5..afd9a65 100644 --- a/config.yaml +++ b/config.yaml @@ -34,6 +34,14 @@ levels: schedule: options: + levelq_process_flow: + priority: + initial: 1000 + seconds: [ 3000, 12000, 60000 ] + escalation: [ 1000, 1000, 1000 ] + schedule: + options: + level3_PTM_process_flow: priority: initial: 10000 diff --git a/deploy.py b/deploy.py index cbf00fb..ecebeda 100644 --- a/deploy.py +++ b/deploy.py @@ -3,6 +3,7 @@ from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow +from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow from punchpipe.deliver import create_noaa_delivery launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", @@ -24,6 +25,13 @@ level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", description="Process files from Level 1 to Level 2.") +levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment", + description="Schedule a Level Q flow.", + cron="* * * * *", + ) +levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow", + description="Process files from Level 1 to Level Q.") + level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", description="Schedule a Level 3 flow to make PTM.", cron="* * * * *", @@ -37,6 +45,7 @@ serve(launcher_deployment, level1_scheduler_deployment, level1_process_deployment, level2_scheduler_deployment, level2_process_deployment, + levelq_scheduler_deployment, levelq_process_deployment, level3_PTM_scheduler_deployment, level3_PTM_process_deployment, noaa_deployment, limit=100 # TODO: remove arbitrary limit diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index 7013b4e..5621c79 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -16,7 +16,9 @@ @task def level2_query_ready_files(session, pipeline_config: dict): logger = get_run_logger() - all_ready_files = session.query(File).where(and_(File.state == "created", File.level == "1")).all() + all_ready_files = (session.query(File).filter(File.state == "created") + .filter(File.level == "1") + .filter(File.file_type in ['PP', 'PZ', 'PM']).all()) logger.info(f"{len(all_ready_files)} ready files") unique_times = set(f.date_obs for f in all_ready_files) logger.info(f"{len(unique_times)} unique times: {unique_times}") diff --git a/punchpipe/flows/levelq.py b/punchpipe/flows/levelq.py new file mode 100644 index 0000000..c3c2029 --- /dev/null +++ b/punchpipe/flows/levelq.py @@ -0,0 +1,92 @@ +import json +import os +import typing as t +from datetime import datetime + +from prefect import flow, task, get_run_logger +from punchbowl.level2.flow import level2_core_flow, levelq_core_flow +from sqlalchemy import and_ + +from punchpipe import __version__ +from punchpipe.controlsegment.db import File, Flow +from punchpipe.controlsegment.processor import generic_process_flow_logic +from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic + + +@task +def levelq_query_ready_files(session, pipeline_config: dict): + logger = get_run_logger() + all_ready_files = (session.query(File).filter(File.state == "created") + .filter(File.level == "1") + .filter(File.file_type == "CR").all()) + logger.info(f"{len(all_ready_files)} ready files") + unique_times = set(f.date_obs for f in all_ready_files) + logger.info(f"{len(unique_times)} unique times: {unique_times}") + grouped_ready_files = [[f.file_id for f in all_ready_files if f.date_obs == time] for time in unique_times] + logger.info(f"{len(grouped_ready_files)} grouped ready files") + out = [g for g in grouped_ready_files if len(g) == 4] + logger.info(f"{len(out)} groups heading out") + return out + + +@task +def levelq_construct_flow_info(level1_files: list[File], levelq_file: File, pipeline_config: dict): + flow_type = "levelq_process_flow" + state = "planned" + creation_time = datetime.now() + priority = pipeline_config["levels"][flow_type]["priority"]["initial"] + call_data = json.dumps( + { + "data_list": [ + os.path.join(level1_file.directory(pipeline_config["root"]), level1_file.filename()) + for level1_file in level1_files + ] + } + ) + return Flow( + flow_type=flow_type, + state=state, + flow_level="Q", + creation_time=creation_time, + priority=priority, + call_data=call_data, + ) + + +@task +def levelq_construct_file_info(level1_files: t.List[File], pipeline_config: dict) -> t.List[File]: + return [File( + level="Q", + file_type="CT", + observatory="M", + file_version=pipeline_config["file_version"], + software_version=__version__, + date_obs=level1_files[0].date_obs, + state="planned", + ), + File( + level="Q", + file_type="CN", + observatory="N", + file_version=pipeline_config["file_version"], + software_version=__version__, + date_obs=level1_files[0].date_obs, + state="planned", + ) + ] + + +@flow +def levelq_scheduler_flow(pipeline_config_path="config.yaml", session=None): + generic_scheduler_flow_logic( + levelq_query_ready_files, + levelq_construct_file_info, + levelq_construct_flow_info, + pipeline_config_path, + session=session, + ) + + +@flow +def levelq_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None): + generic_process_flow_logic(flow_id, levelq_core_flow, pipeline_config_path, session=session)