Skip to content

Commit

Permalink
make level q
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Oct 23, 2024
1 parent e67f481 commit 5346e4d
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 1 deletion.
8 changes: 8 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ levels:
schedule:
options:

levelq_process_flow:
priority:
initial: 1000
seconds: [ 3000, 12000, 60000 ]
escalation: [ 1000, 1000, 1000 ]
schedule:
options:

level3_PTM_process_flow:
priority:
initial: 10000
Expand Down
9 changes: 9 additions & 0 deletions deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow
from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow
from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow
from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow
from punchpipe.deliver import create_noaa_delivery

launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
Expand All @@ -24,6 +25,13 @@
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.")

levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment",
description="Schedule a Level Q flow.",
cron="* * * * *",
)
levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow",
description="Process files from Level 1 to Level Q.")

level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment",
description="Schedule a Level 3 flow to make PTM.",
cron="* * * * *",
Expand All @@ -37,6 +45,7 @@
serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
levelq_scheduler_deployment, levelq_process_deployment,
level3_PTM_scheduler_deployment, level3_PTM_process_deployment,
noaa_deployment,
limit=100 # TODO: remove arbitrary limit
Expand Down
4 changes: 3 additions & 1 deletion punchpipe/flows/level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
@task
def level2_query_ready_files(session, pipeline_config: dict):
logger = get_run_logger()
all_ready_files = session.query(File).where(and_(File.state == "created", File.level == "1")).all()
all_ready_files = (session.query(File).filter(File.state == "created")
.filter(File.level == "1")
.filter(File.file_type in ['PP', 'PZ', 'PM']).all())
logger.info(f"{len(all_ready_files)} ready files")
unique_times = set(f.date_obs for f in all_ready_files)
logger.info(f"{len(unique_times)} unique times: {unique_times}")
Expand Down
92 changes: 92 additions & 0 deletions punchpipe/flows/levelq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import json
import os
import typing as t
from datetime import datetime

from prefect import flow, task, get_run_logger
from punchbowl.level2.flow import level2_core_flow, levelq_core_flow
from sqlalchemy import and_

from punchpipe import __version__
from punchpipe.controlsegment.db import File, Flow
from punchpipe.controlsegment.processor import generic_process_flow_logic
from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic


@task
def levelq_query_ready_files(session, pipeline_config: dict):
logger = get_run_logger()
all_ready_files = (session.query(File).filter(File.state == "created")
.filter(File.level == "1")
.filter(File.file_type == "CR").all())
logger.info(f"{len(all_ready_files)} ready files")
unique_times = set(f.date_obs for f in all_ready_files)
logger.info(f"{len(unique_times)} unique times: {unique_times}")
grouped_ready_files = [[f.file_id for f in all_ready_files if f.date_obs == time] for time in unique_times]
logger.info(f"{len(grouped_ready_files)} grouped ready files")
out = [g for g in grouped_ready_files if len(g) == 4]
logger.info(f"{len(out)} groups heading out")
return out


@task
def levelq_construct_flow_info(level1_files: list[File], levelq_file: File, pipeline_config: dict):
flow_type = "levelq_process_flow"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["levels"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"data_list": [
os.path.join(level1_file.directory(pipeline_config["root"]), level1_file.filename())
for level1_file in level1_files
]
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="Q",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)


@task
def levelq_construct_file_info(level1_files: t.List[File], pipeline_config: dict) -> t.List[File]:
return [File(
level="Q",
file_type="CT",
observatory="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level1_files[0].date_obs,
state="planned",
),
File(
level="Q",
file_type="CN",
observatory="N",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level1_files[0].date_obs,
state="planned",
)
]


@flow
def levelq_scheduler_flow(pipeline_config_path="config.yaml", session=None):
generic_scheduler_flow_logic(
levelq_query_ready_files,
levelq_construct_file_info,
levelq_construct_flow_info,
pipeline_config_path,
session=session,
)


@flow
def levelq_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None):
generic_process_flow_logic(flow_id, levelq_core_flow, pipeline_config_path, session=session)

0 comments on commit 5346e4d

Please sign in to comment.