From fb8cc081f1da52959b0f53f58ed1c04ca9f4d4b6 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 27 Nov 2023 09:01:32 -0700 Subject: [PATCH] converts to many-to-many instead of one-to-one for file mappings --- config.yaml | 49 ++-- old_config.yaml | 28 +++ punchpipe/controlsegment/flows.py | 238 ------------------ punchpipe/controlsegment/launcher.py | 6 +- punchpipe/controlsegment/scheduler.py | 27 +- punchpipe/controlsegment/tests/config.yaml | 49 ++-- .../controlsegment/tests/test_processor.py | 2 +- punchpipe/flows/level1.py | 30 +-- punchpipe/flows/level2.py | 34 +-- punchpipe/flows/tests/config.yaml | 49 ++-- punchpipe/flows/tests/test_level1.py | 24 +- punchpipe/flows/tests/test_level2.py | 27 +- 12 files changed, 206 insertions(+), 357 deletions(-) create mode 100644 old_config.yaml delete mode 100644 punchpipe/controlsegment/flows.py diff --git a/config.yaml b/config.yaml index a52ecd0..395a2bf 100644 --- a/config.yaml +++ b/config.yaml @@ -1,29 +1,42 @@ root: "/home/marcus.hughes/running_test/" +file_version: "0.0.1" launcher: max_seconds_waiting: 100 escalated_priority: 100 max_flows_running: 30 -priority: +levels: level0_process_flow: - initial: 5 - seconds: [30, 120, 600] - escalation: [10, 20, 30] + priority: + initial: 5 + seconds: [ 30, 120, 600 ] + escalation: [ 10, 20, 30 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: + num_quads: 100 + level1_process_flow: - initial: 6 - seconds: [ 30, 120, 600 ] - escalation: [ 11, 21, 31] - level2_process_flow: - initial: 7 - seconds: [30, 120, 600 ] - escalation: [ 12, 22, 32 ] + priority: + initial: 6 + seconds: [ 30, 120, 600 ] + escalation: [ 11, 21, 31 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: -scheduler: level2_process_flow: - latency: 3 - window_duration: 3 - -process_options: - pointing: - num_quads: 100 \ No newline at end of file + priority: + initial: 7 + seconds: [ 30, 120, 600 ] + escalation: [ 12, 22, 32 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: diff --git a/old_config.yaml b/old_config.yaml new file mode 100644 index 0000000..2dac064 --- /dev/null +++ b/old_config.yaml @@ -0,0 +1,28 @@ +root: "/home/marcus.hughes/running_test/" + +launcher: + max_seconds_waiting: 100 + max_flows_running: 30 + +priority: + level0_process_flow: + initial: 5 + seconds: [30, 120, 600] + escalation: [10, 20, 30] + level1_process_flow: + initial: 6 + seconds: [ 30, 120, 600 ] + escalation: [ 11, 21, 31] + level2_process_flow: + initial: 7 + seconds: [30, 120, 600 ] + escalation: [ 12, 22, 32 ] + +scheduler: + level2_process_flow: + latency: 3 + window_duration: 3 + +process_options: + pointing: + num_quads: 100 \ No newline at end of file diff --git a/punchpipe/controlsegment/flows.py b/punchpipe/controlsegment/flows.py deleted file mode 100644 index 423f69c..0000000 --- a/punchpipe/controlsegment/flows.py +++ /dev/null @@ -1,238 +0,0 @@ -# from __future__ import annotations -# from abc import ABCMeta, abstractmethod -# import prefect -# from prefect import Flow, Parameter -# from prefect.tasks.mysql import MySQLFetch -# from prefect.tasks.prefect.flow_run_rename import RenameFlowRun -# from prefect.schedules import IntervalSchedule -# from prefect.triggers import any_failed -# from typing import Optional, List, Dict, Set, Tuple, NewType -# import graphviz -# from datetime import timedelta, datetime -# from punchpipe.infrastructure.controlsegment import DatabaseCredentials, DatabaseCredentials -# from punchpipe.infrastructure.tasks.core import PipelineTask, OutputTask -# from punchpipe.infrastructure.tasks.processor import MarkFlowAsRunning, MarkFlowAsEnded, MarkFlowStartTime, MarkFlowEndTime, \ -# CreateFileDatabaseEntry, MarkFlowAsFailed -# from punchpipe.infrastructure.tasks.launcher import GatherQueuedFlows, CountRunningFlows, \ -# EscalateLongWaitingFlows, FilterForLaunchableFlows, LaunchFlow -# from punchpipe.infrastructure.tasks.scheduler import CheckForInputs, ScheduleFlow, AdvanceFiles - - -# __all__ = ['FlowGraph', -# 'FlowBuilder', -# 'CoreFlowBuilder', -# 'LauncherFlowBuilder', -# 'SchedulerFlowBuilder', -# 'ProcessFlowBuilder', -# 'KeywordDict'] - -# KeywordDict = NewType("KeywordDict", Dict[Tuple[PipelineTask, PipelineTask], Optional[str]]) - - -# class LauncherFlowBuilder(FlowBuilder): -# """A flow builder that makes launcher flows, i.e. the flows that start process flows running. -# """ -# def __init__(self, database_credentials: DatabaseCredentials, frequency_in_minutes: int): -# super().__init__(database_credentials, "launcher") -# self.frequency_in_minutes = frequency_in_minutes - -# def build(self) -> Flow: -# # Prepare the schedule -# schedule = IntervalSchedule(interval=timedelta(minutes=self.frequency_in_minutes)) - -# flow = Flow(self.flow_name, schedule=schedule) - -# # Create all necessary tasks and add them to the flow -# gather_queued_flows = GatherQueuedFlows(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# fetch='all') -# count_running_flows = CountRunningFlows(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name) -# escalate_long_waiting_flows = EscalateLongWaitingFlows(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name) -# filter_for_launchable_flows = FilterForLaunchableFlows() -# launch_flow = LaunchFlow() - -# flow.add_task(gather_queued_flows) -# flow.add_task(count_running_flows) -# flow.add_task(escalate_long_waiting_flows) -# flow.add_task(filter_for_launchable_flows) -# flow.add_task(launch_flow) - -# # Build the dependencies between tasks -# flow.set_dependencies(count_running_flows, -# downstream_tasks=[filter_for_launchable_flows]) -# flow.set_dependencies(escalate_long_waiting_flows, -# downstream_tasks=[gather_queued_flows]) -# flow.set_dependencies(gather_queued_flows, -# downstream_tasks=[filter_for_launchable_flows]) -# flow.set_dependencies(filter_for_launchable_flows, -# downstream_tasks=[launch_flow], -# keyword_tasks=dict(running_flow_count=count_running_flows, -# priority_sorted_queued_flows=gather_queued_flows)) -# flow.set_dependencies(launch_flow, -# downstream_tasks=[], -# keyword_tasks=dict(flow_entry=filter_for_launchable_flows), -# mapped=True) -# return flow - - -# class SchedulerFlowBuilder(FlowBuilder): -# """A flow builder that creates scheduler flows, i.e. the kinds of flows that look for ready files and schedule -# process flows to process them. -# """ -# def __init__(self, database_credentials: DatabaseCredentials, -# level: int, frequency_in_minutes: int, inputs_check_task: CheckForInputs, query_task: MySQLFetch): -# super().__init__(database_credentials, f"scheduler level {level}") -# self.level = level -# self.frequency_in_minutes = frequency_in_minutes -# self.query_task = query_task(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# fetch='all') -# self.inputs_check_task = inputs_check_task() - -# def build(self) -> Flow: -# logger = prefect.context.get("logger") - -# schedule_flow = ScheduleFlow(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# commit=True) -# advance_files = AdvanceFiles(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# commit=True) - -# schedule = IntervalSchedule(interval=timedelta(minutes=self.frequency_in_minutes)) -# flow = Flow(self.flow_name, schedule=schedule) -# flow.add_task(self.query_task) -# flow.add_task(self.inputs_check_task) -# flow.add_task(schedule_flow) -# flow.add_task(advance_files) - -# flow.set_dependencies(self.query_task, -# downstream_tasks=[self.inputs_check_task]) -# flow.set_dependencies(self.inputs_check_task, -# downstream_tasks=[schedule_flow], -# keyword_tasks=dict(query_result=self.query_task)) -# flow.set_dependencies(advance_files, -# downstream_tasks=[], -# keyword_tasks=dict(query_result=self.query_task)) -# flow.set_dependencies(schedule_flow, -# keyword_tasks=dict(pair=self.inputs_check_task), -# mapped=True) -# return flow - - -# class ProcessFlowBuilder(FlowBuilder): -# """A flow builder that converts core flows into process flows. -# """ -# def __init__(self, database_credentials: DatabaseCredentials, level: int, core_flow: Flow) -> None: -# super().__init__(database_credentials, f"processor level {level}") -# self.core_flow = core_flow -# self.level = level - -# self.core_flow_parameters = self.core_flow.parameters() - -# # Construct the entry tasks. -# # A task is an entry task if it is a non-Parameter root task, i.e. a task with no upstream tasks that is also -# # not a parameter, or if it is a Parameter's direct downstream task. -# self.entry_tasks = set() -# for task in self.core_flow.root_tasks(): -# if isinstance(task, Parameter): -# self.entry_tasks = self.entry_tasks.union(self.core_flow.downstream_tasks(task)) -# else: -# self.entry_tasks.add(task) - -# # Construct ending tasks. These are simply the terminal tasks of the core flow. -# self.ending_tasks = self.core_flow.terminal_tasks() - -# # Tasks that are output tasks -# self.output_tasks = self.core_flow.get_tasks(task_type=OutputTask) - -# def build(self) -> Flow: -# process_flow = self.core_flow.copy() -# process_flow.name = process_flow.name.replace("core", "process") -# flow_id = Parameter("flow_id") - -# # set up the pre-flow tasks -# rename_flow = RenameFlowRun() -# mark_as_running = MarkFlowAsRunning(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# commit=True) -# mark_flow_start_time = MarkFlowStartTime(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# commit=True) - -# process_flow.set_dependencies( -# rename_flow, -# downstream_tasks=[mark_as_running], -# keyword_tasks=dict(flow_run_name=flow_id) -# ) -# process_flow.set_dependencies( -# mark_as_running, -# downstream_tasks=[mark_flow_start_time], -# keyword_tasks=dict(flow_id=flow_id) -# ) -# process_flow.set_dependencies( -# mark_flow_start_time, -# downstream_tasks=self.entry_tasks, -# keyword_tasks=dict(flow_id=flow_id) -# ) - -# # set up the post-flow tasks -# mark_as_ended = MarkFlowAsEnded(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# commit=True) -# mark_flow_end_time = MarkFlowEndTime(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# commit=True) -# create_database_entry_tasks = [CreateFileDatabaseEntry(user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# commit=True) for _ in self.output_tasks] - -# process_flow.set_dependencies( -# mark_flow_end_time, -# upstream_tasks=self.ending_tasks, -# keyword_tasks=dict(flow_id=flow_id) -# ) - -# process_flow.set_dependencies( -# mark_as_ended, -# upstream_tasks=[mark_flow_end_time], -# keyword_tasks=dict(flow_id=flow_id) -# ) - -# for database_task, output_task in zip(create_database_entry_tasks, self.output_tasks): -# process_flow.set_dependencies( -# database_task, -# upstream_tasks=[output_task], -# keyword_tasks=dict(meta_data=output_task, flow_id=flow_id) -# ) - -# # Set up the graceful exit if something went wrong in executing the flow -# # We add a task that follows all core flow and database tasks -# # and marks the flow as failed if any of the prior tasks fail -# # It does require the flow_id to know which flow failed -# existing_tasks = process_flow.get_tasks() -# core_failure_task = MarkFlowAsFailed(trigger=any_failed, user=self.database_credentials.user, -# password=self.database_credentials.password, -# db_name=self.database_credentials.project_name, -# commit=True) -# process_flow.add_task(core_failure_task) -# process_flow.set_dependencies(core_failure_task, upstream_tasks=existing_tasks, -# keyword_tasks=dict(flow_id=flow_id)) - -# # The process flow is successful or not depending on the status of the core tasks -# process_flow.set_reference_tasks(self.core_flow.get_tasks()) - -# return process_flow diff --git a/punchpipe/controlsegment/launcher.py b/punchpipe/controlsegment/launcher.py index f955042..bad791b 100644 --- a/punchpipe/controlsegment/launcher.py +++ b/punchpipe/controlsegment/launcher.py @@ -22,9 +22,9 @@ def count_running_flows(session): @task def escalate_long_waiting_flows(session, pipeline_config): - for flow_type in pipeline_config['priority']: - for max_seconds_waiting, escalated_priority in zip(pipeline_config['priority'][flow_type]['seconds'], - pipeline_config['priority'][flow_type]['escalation']): + for flow_type in pipeline_config['levels']: + for max_seconds_waiting, escalated_priority in zip(pipeline_config['levels'][flow_type]['priority']['seconds'], + pipeline_config['levels'][flow_type]['priority']['escalation']): since = datetime.now() - timedelta(seconds=max_seconds_waiting) session.query(Flow).where(and_(Flow.state == "planned", Flow.creation_time < since, diff --git a/punchpipe/controlsegment/scheduler.py b/punchpipe/controlsegment/scheduler.py index 48ca5f1..3f47497 100644 --- a/punchpipe/controlsegment/scheduler.py +++ b/punchpipe/controlsegment/scheduler.py @@ -1,3 +1,5 @@ +import itertools + from punchpipe.controlsegment.db import File, FileRelationship from punchpipe.controlsegment.util import get_database_session, update_file_state, load_pipeline_configuration @@ -15,25 +17,30 @@ def generic_scheduler_flow_logic(query_ready_files_func, session = get_database_session() # find all files that are ready to run + parent_files = [] ready_file_ids = query_ready_files_func(session, pipeline_config) - for file_id in ready_file_ids: - # mark the file as progressed so that there aren't duplicate processing flows - update_file_state(session, file_id, "progressed") + if ready_file_ids: + for file_id in ready_file_ids: + # mark the file as progressed so that there aren't duplicate processing flows + update_file_state(session, file_id, "progressed") - # get the prior level file's information - parent_file = session.query(File).where(File.file_id == file_id).one() + # get the prior level file's information + parent_files += session.query(File).where(File.file_id == file_id).all() # prepare the new level flow and file - child_file = construct_child_file_info(parent_file) - database_flow_info = construct_child_flow_info(parent_file, child_file, pipeline_config) - session.add(child_file) + children_files = construct_child_file_info(parent_files, pipeline_config) + database_flow_info = construct_child_flow_info(parent_files, children_files, pipeline_config) + session.add(*children_files) session.add(database_flow_info) session.commit() # set the processing flow now that we know the flow_id after committing the flow info - child_file.processing_flow = database_flow_info.flow_id + for child_file in children_files: + child_file.processing_flow = database_flow_info.flow_id session.commit() # create a file relationship between the prior and next levels - session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id)) + for parent_file in parent_files: + for child_file in children_files: + session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id)) session.commit() diff --git a/punchpipe/controlsegment/tests/config.yaml b/punchpipe/controlsegment/tests/config.yaml index 9c27aab..c0eed6e 100644 --- a/punchpipe/controlsegment/tests/config.yaml +++ b/punchpipe/controlsegment/tests/config.yaml @@ -1,29 +1,42 @@ root: "./test_results/" +file_version: "0.0.1" launcher: max_seconds_waiting: 100 escalated_priority: 100 max_flows_running: 30 -priority: +levels: level0_process_flow: - initial: 5 - seconds: [30, 120, 600] - escalation: [10, 20, 30] + priority: + initial: 5 + seconds: [ 30, 120, 600 ] + escalation: [ 10, 20, 30 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: + num_quads: 100 + level1_process_flow: - initial: 6 - seconds: [ 30, 120, 600 ] - escalation: [ 11, 21, 31] - level2_process_flow: - initial: 7 - seconds: [30, 120, 600 ] - escalation: [ 12, 22, 32 ] + priority: + initial: 6 + seconds: [ 30, 120, 600 ] + escalation: [ 11, 21, 31 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: -scheduler: level2_process_flow: - latency: 3 - window_duration: 3 - -process_options: - pointing: - num_quads: 100 \ No newline at end of file + priority: + initial: 7 + seconds: [ 30, 120, 600 ] + escalation: [ 12, 22, 32 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: diff --git a/punchpipe/controlsegment/tests/test_processor.py b/punchpipe/controlsegment/tests/test_processor.py index 8523f1a..6fccc21 100644 --- a/punchpipe/controlsegment/tests/test_processor.py +++ b/punchpipe/controlsegment/tests/test_processor.py @@ -136,7 +136,7 @@ def normal_flow(flow_id: int, pipeline_config_path=TESTDATA_DIR+"/config.yaml", def test_simple_generic_process_flow_normal_return(db): - os.makedirs("./test_results/") + os.makedirs("./test_results/", exist_ok=True) level1_file = db.query(File).where(File.file_id == 2).one() assert level1_file.state == "planned" diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 92f406c..5755479 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -1,6 +1,7 @@ from datetime import datetime import json import os +import typing as t from sqlalchemy import and_ from prefect import flow, task @@ -19,15 +20,15 @@ def level1_query_ready_files(session, pipeline_config: dict): @task -def level1_construct_flow_info(level0_file: File, level1_file: File, pipeline_config: dict): +def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_config: dict): flow_type = "level1_process_flow" state = "planned" creation_time = datetime.now() - priority = pipeline_config['priority']['level1_process_flow']['initial'] - call_data = json.dumps({"input_data": os.path.join(level0_file.directory(pipeline_config['root']), - level0_file.filename()), - "output_filename": os.path.join(level1_file.directory(pipeline_config['root']), - level1_file.filename())}) + priority = pipeline_config['levels'][flow_type]['priority']['initial'] + call_data = json.dumps({"input_data": [os.path.join(level0_file.directory(pipeline_config['root']), + level0_file.filename()) for level0_file in level0_files], + "output_filename": [os.path.join(level1_file.directory(pipeline_config['root']), + level1_file.filename()) for level1_file in level1_files]}) return Flow(flow_type=flow_type, flow_level=1, state=state, @@ -37,15 +38,16 @@ def level1_construct_flow_info(level0_file: File, level1_file: File, pipeline_co @task -def level1_construct_file_info(level0_file: File): - return File(level=1, - file_type=level0_file.file_type, - observatory=level0_file.observatory, - file_version="0", # TODO: decide how to implement this +def level1_construct_file_info(level0_files: t.List[File], + pipeline_config: dict) -> t.List[File]: + return [File(level=1, + file_type=level0_files[0].file_type, + observatory=level0_files[0].observatory, + file_version=pipeline_config['file_version'], software_version=__version__, - date_obs=level0_file.date_obs, - polarization=level0_file.polarization, - state="planned") + date_obs=level0_files[0].date_obs, + polarization=level0_files[0].polarization, + state="planned")] @flow diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index 4e62949..a326100 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta import json import os +import typing as t from sqlalchemy import and_ from prefect import flow, task @@ -14,8 +15,8 @@ @task def level2_query_ready_files(session, pipeline_config: dict): - latency = pipeline_config['scheduler']['level2_process_flow']['latency'] - window_duration = pipeline_config['scheduler']['level2_process_flow']['window_duration'] + latency = pipeline_config['levels']['level2_process_flow']['schedule']['latency'] + window_duration = pipeline_config['levels']['level2_process_flow']['schedule']['window_duration_seconds'] start_time = datetime.now() - timedelta(minutes=latency+window_duration) end_time = datetime.now() - timedelta(minutes=latency) return [f.file_id for f in session.query(File).where(and_(File.state == "created", @@ -25,13 +26,13 @@ def level2_query_ready_files(session, pipeline_config: dict): @task -def level2_construct_flow_info(level1_file: File, level2_file: File, pipeline_config: dict): +def level2_construct_flow_info(level1_files: File, level2_file: File, pipeline_config: dict): flow_type = "level2_process_flow" state = "planned" creation_time = datetime.now() - priority = pipeline_config['priority']['level2_process_flow']['initial'] + priority = pipeline_config['levels'][flow_type]['priority']['initial'] call_data = json.dumps({"data_list": [os.path.join(level1_file.directory(pipeline_config['root']), - level1_file.filename())]}) + level1_file.filename()) for level1_file in level1_files]}) return Flow(flow_type=flow_type, state=state, flow_level=2, @@ -41,15 +42,20 @@ def level2_construct_flow_info(level1_file: File, level2_file: File, pipeline_co @task -def level2_construct_file_info(level1_file: File): - return File(level=2, - file_type=level1_file.file_type, - observatory=level1_file.observatory, - file_version="0", # TODO: decide how to implement this - software_version=__version__, - date_obs=level1_file.date_obs, - polarization=level1_file.polarization, - state="planned") +def level2_construct_file_info(level1_files: t.List[File], + pipeline_config: dict) -> t.List[File]: + # TODO: make realistic to level 2 products + out_files = [] + for level1_file in level1_files: + out_files.append(File(level=2, + file_type=level1_file.file_type, + observatory=level1_file.observatory, + file_version=pipeline_config['file_version'], + software_version=__version__, + date_obs=level1_file.date_obs, + polarization=level1_file.polarization, + state="planned")) + return out_files @flow diff --git a/punchpipe/flows/tests/config.yaml b/punchpipe/flows/tests/config.yaml index 9c27aab..c0eed6e 100644 --- a/punchpipe/flows/tests/config.yaml +++ b/punchpipe/flows/tests/config.yaml @@ -1,29 +1,42 @@ root: "./test_results/" +file_version: "0.0.1" launcher: max_seconds_waiting: 100 escalated_priority: 100 max_flows_running: 30 -priority: +levels: level0_process_flow: - initial: 5 - seconds: [30, 120, 600] - escalation: [10, 20, 30] + priority: + initial: 5 + seconds: [ 30, 120, 600 ] + escalation: [ 10, 20, 30 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: + num_quads: 100 + level1_process_flow: - initial: 6 - seconds: [ 30, 120, 600 ] - escalation: [ 11, 21, 31] - level2_process_flow: - initial: 7 - seconds: [30, 120, 600 ] - escalation: [ 12, 22, 32 ] + priority: + initial: 6 + seconds: [ 30, 120, 600 ] + escalation: [ 11, 21, 31 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: -scheduler: level2_process_flow: - latency: 3 - window_duration: 3 - -process_options: - pointing: - num_quads: 100 \ No newline at end of file + priority: + initial: 7 + seconds: [ 30, 120, 600 ] + escalation: [ 12, 22, 32 ] + schedule: + latency: 3 + window_duration_seconds: 3 + cron_rule: "******" + options: diff --git a/punchpipe/flows/tests/test_level1.py b/punchpipe/flows/tests/test_level1.py index 07efeee..09ddb2b 100644 --- a/punchpipe/flows/tests/test_level1.py +++ b/punchpipe/flows/tests/test_level1.py @@ -42,35 +42,37 @@ def test_query_ready_files(db): def test_level1_construct_file_info(): - level0_file = File(level=0, + pipeline_config_path = os.path.join(TEST_DIR, "config.yaml") + pipeline_config = load_pipeline_configuration.fn(pipeline_config_path) + level0_file = [File(level=0, file_type='XX', observatory='0', state='created', file_version='none', software_version='none', - date_obs=datetime.now()) - constructed_file_info = level1_construct_file_info.fn(level0_file) + date_obs=datetime.now())] + constructed_file_info = level1_construct_file_info.fn(level0_file, pipeline_config)[0] assert constructed_file_info.level == 1 - assert constructed_file_info.file_type == level0_file.file_type - assert constructed_file_info.observatory == level0_file.observatory - assert constructed_file_info.file_version == "0" + assert constructed_file_info.file_type == level0_file[0].file_type + assert constructed_file_info.observatory == level0_file[0].observatory + assert constructed_file_info.file_version == "0.0.1" assert constructed_file_info.software_version == __version__ - assert constructed_file_info.date_obs == level0_file.date_obs - assert constructed_file_info.polarization == level0_file.polarization + assert constructed_file_info.date_obs == level0_file[0].date_obs + assert constructed_file_info.polarization == level0_file[0].polarization assert constructed_file_info.state == "planned" def test_level1_construct_flow_info(): pipeline_config_path = os.path.join(TEST_DIR, "config.yaml") pipeline_config = load_pipeline_configuration.fn(pipeline_config_path) - level0_file = File(level=0, + level0_file = [File(level=0, file_type='XX', observatory='0', state='created', file_version='none', software_version='none', - date_obs=datetime.now()) - level1_file = level1_construct_file_info.fn(level0_file) + date_obs=datetime.now())] + level1_file = level1_construct_file_info.fn(level0_file, pipeline_config) flow_info = level1_construct_flow_info.fn(level0_file, level1_file, pipeline_config) assert flow_info.flow_type == 'level1_process_flow' diff --git a/punchpipe/flows/tests/test_level2.py b/punchpipe/flows/tests/test_level2.py index 1ade49d..8dabf6c 100644 --- a/punchpipe/flows/tests/test_level2.py +++ b/punchpipe/flows/tests/test_level2.py @@ -39,41 +39,44 @@ def session_fn(session): def test_level2_query_ready_files(db): with freeze_time(datetime(2023, 1, 1, 0, 5, 0)) as frozen_datatime: - pipeline_config = {'scheduler': {'level2_process_flow': {'latency': 3, 'window_duration': 3}}} + pipeline_config = {'levels': {'level2_process_flow': {'schedule': {'latency': 3, 'window_duration_seconds': 3}}}} ready_file_ids = level2_query_ready_files.fn(db, pipeline_config) assert len(ready_file_ids) == 1 def test_level2_construct_file_info(): - level1_file = File(level=0, + pipeline_config_path = os.path.join(TEST_DIR, "config.yaml") + pipeline_config = load_pipeline_configuration.fn(pipeline_config_path) + + level1_file = [File(level=0, file_type='XX', observatory='0', state='created', file_version='none', software_version='none', - date_obs=datetime.now()) - constructed_file_info = level2_construct_file_info.fn(level1_file) + date_obs=datetime.now())] + constructed_file_info = level2_construct_file_info.fn(level1_file, pipeline_config)[0] assert constructed_file_info.level == 2 - assert constructed_file_info.file_type == level1_file.file_type - assert constructed_file_info.observatory == level1_file.observatory - assert constructed_file_info.file_version == "0" + assert constructed_file_info.file_type == level1_file[0].file_type + assert constructed_file_info.observatory == level1_file[0].observatory + assert constructed_file_info.file_version == "0.0.1" assert constructed_file_info.software_version == __version__ - assert constructed_file_info.date_obs == level1_file.date_obs - assert constructed_file_info.polarization == level1_file.polarization + assert constructed_file_info.date_obs == level1_file[0].date_obs + assert constructed_file_info.polarization == level1_file[0].polarization assert constructed_file_info.state == "planned" def test_level2_construct_flow_info(): pipeline_config_path = os.path.join(TEST_DIR, "config.yaml") pipeline_config = load_pipeline_configuration.fn(pipeline_config_path) - level1_file = File(level=1, + level1_file = [File(level=1, file_type='XX', observatory='0', state='created', file_version='none', software_version='none', - date_obs=datetime.now()) - level2_file = level2_construct_file_info.fn(level1_file) + date_obs=datetime.now())] + level2_file = level2_construct_file_info.fn(level1_file, pipeline_config) flow_info = level2_construct_flow_info.fn(level1_file, level2_file, pipeline_config) assert flow_info.flow_type == 'level2_process_flow'