Skip to content

Commit

Permalink
Merge pull request #55 from punch-mission/test-level0
Browse files Browse the repository at this point in the history
Create level 0 pipeline, improves CI
  • Loading branch information
jmbhughes authored May 6, 2024
2 parents 08fd20d + 270c4b2 commit 2d68acd
Show file tree
Hide file tree
Showing 43 changed files with 4,040 additions and 445 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10"]
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v3
Expand All @@ -35,10 +35,7 @@ jobs:
pip install ".[dev]"
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
ruff check .
- name: Test with pytest
run: |
eval `ssh-agent -s`
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ scripts/credentials.py
build/
punchpipe.egg-info
.idea
/punchpipe/level0/decoding_tables/
55 changes: 30 additions & 25 deletions punchpipe/controlsegment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,36 @@ def test_match_data_with_file_db_entry_fails_on_empty_list(sample_punchdata):


def test_match_data_with_file_db_entry(sample_punchdata):
file_db_entry_list = [File(level=1,
file_type='XX',
observatory='Y',
file_version='0',
software_version='0',
date_created=datetime.now(),
date_obs=datetime.now(),
date_beg=datetime.now(),
date_end=datetime.now(),
polarization='ZZ',
state='created',
processing_flow=0),
File(level=100,
file_type='XX',
observatory='Y',
file_version='0',
software_version='0',
date_created=datetime.now(),
date_obs=datetime.now(),
date_beg=datetime.now(),
date_end=datetime.now(),
polarization='ZZ',
state='created',
processing_flow=0)
]
file_db_entry_list = [
File(
level=1,
file_type="XX",
observatory="Y",
file_version="0",
software_version="0",
date_created=datetime.now(),
date_obs=datetime.now(),
date_beg=datetime.now(),
date_end=datetime.now(),
polarization="ZZ",
state="created",
processing_flow=0,
),
File(
level=100,
file_type="XX",
observatory="Y",
file_version="0",
software_version="0",
date_created=datetime.now(),
date_obs=datetime.now(),
date_beg=datetime.now(),
date_end=datetime.now(),
polarization="ZZ",
state="created",
processing_flow=0,
),
]
output = match_data_with_file_db_entry(sample_punchdata, file_db_entry_list)
assert len(output) == 1
assert output == file_db_entry_list[0]
5 changes: 1 addition & 4 deletions punchpipe/controlsegment/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ def directory(self, root: str):
str
the place to write the file
"""
return os.path.join(root,
str(self.level),
self.file_type,
self.date_obs.strftime("%Y/%m/%d"))
return os.path.join(root, str(self.level), self.file_type, self.date_obs.strftime("%Y/%m/%d"))


class Flow(Base):
Expand Down
27 changes: 15 additions & 12 deletions punchpipe/controlsegment/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from sqlalchemy.orm import Session

from punchpipe.controlsegment.db import Flow
from punchpipe.controlsegment.util import (get_database_session,
load_pipeline_configuration)
from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration


@task
Expand All @@ -23,13 +22,15 @@ def count_running_flows(session):

@task
def escalate_long_waiting_flows(session, pipeline_config):
for flow_type in pipeline_config['levels']:
for max_seconds_waiting, escalated_priority in zip(pipeline_config['levels'][flow_type]['priority']['seconds'],
pipeline_config['levels'][flow_type]['priority']['escalation']):
for flow_type in pipeline_config["levels"]:
for max_seconds_waiting, escalated_priority in zip(
pipeline_config["levels"][flow_type]["priority"]["seconds"],
pipeline_config["levels"][flow_type]["priority"]["escalation"],
):
since = datetime.now() - timedelta(seconds=max_seconds_waiting)
session.query(Flow).where(and_(Flow.state == "planned",
Flow.creation_time < since,
Flow.flow_type == flow_type)).update({'priority': escalated_priority})
session.query(Flow).where(
and_(Flow.state == "planned", Flow.creation_time < since, Flow.flow_type == flow_type)
).update({"priority": escalated_priority})
session.commit()


Expand Down Expand Up @@ -78,8 +79,9 @@ async def launch_ready_flows(session: Session, flow_ids: List[int]) -> List:
# for every flow launch it and store the response
for this_flow in flow_info:
this_deployment_id = deployment_ids[this_flow.flow_type]
response = await client.create_flow_run_from_deployment(this_deployment_id,
parameters={'flow_id': this_flow.flow_id})
response = await client.create_flow_run_from_deployment(
this_deployment_id, parameters={"flow_id": this_flow.flow_id}
)
responses.append(response)
return responses

Expand Down Expand Up @@ -108,8 +110,9 @@ def launcher_flow(pipeline_configuration_path="config.yaml"):
escalate_long_waiting_flows(session, pipeline_config)
queued_flows = gather_planned_flows(session)
logger.info(f"There are {len(queued_flows)} planned flows right now.")
flows_to_launch = filter_for_launchable_flows(queued_flows, num_running_flows,
pipeline_config['launcher']['max_flows_running'])
flows_to_launch = filter_for_launchable_flows(
queued_flows, num_running_flows, pipeline_config["launcher"]["max_flows_running"]
)
logger.info(f"Flows with IDs of {flows_to_launch} will be launched.")
launch_ready_flows(session, flows_to_launch)
logger.info("Launcher flow exit.")
14 changes: 7 additions & 7 deletions punchpipe/controlsegment/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from prefect.context import get_run_context

from punchpipe.controlsegment.db import File, Flow
from punchpipe.controlsegment.util import (get_database_session,
load_pipeline_configuration,
match_data_with_file_db_entry,
write_file)
from punchpipe.controlsegment.util import (
get_database_session,
load_pipeline_configuration,
match_data_with_file_db_entry,
write_file,
)


def generic_process_flow_logic(flow_id: int, core_flow_to_launch,
pipeline_config_path: str,
session=None):
def generic_process_flow_logic(flow_id: int, core_flow_to_launch, pipeline_config_path: str, session=None):
if session is None:
session = get_database_session()

Expand Down
13 changes: 4 additions & 9 deletions punchpipe/controlsegment/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@

from punchpipe.controlsegment.db import File, FileRelationship
from punchpipe.controlsegment.util import (get_database_session,
load_pipeline_configuration,
update_file_state)
from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration, update_file_state


def generic_scheduler_flow_logic(query_ready_files_func,
construct_child_file_info,
construct_child_flow_info,
pipeline_config_path,
session=None):
def generic_scheduler_flow_logic(
query_ready_files_func, construct_child_file_info, construct_child_flow_info, pipeline_config_path, session=None
):
# load pipeline configuration
pipeline_config = load_pipeline_configuration(pipeline_config_path)

Expand Down
15 changes: 10 additions & 5 deletions punchpipe/controlsegment/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def get_database_session():
"""Sets up a session to connect to the MariaDB punchpipe database"""
credentials = MySQLCredentials.load("mysql-cred")
engine = create_engine(
f'mysql+pymysql://{credentials.user}:{credentials.password.get_secret_value()}@localhost/punchpipe')
f"mysql+pymysql://{credentials.user}:{credentials.password.get_secret_value()}@localhost/punchpipe"
)
session = Session(engine)
return session

Expand All @@ -34,8 +35,9 @@ def load_pipeline_configuration(path: str) -> dict:


def write_file(data: PUNCHData, corresponding_file_db_entry, pipeline_config) -> None:
output_filename = os.path.join(corresponding_file_db_entry.directory(pipeline_config['root']),
corresponding_file_db_entry.filename())
output_filename = os.path.join(
corresponding_file_db_entry.directory(pipeline_config["root"]), corresponding_file_db_entry.filename()
)
output_dir = os.path.dirname(output_filename)
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
Expand All @@ -45,8 +47,11 @@ def write_file(data: PUNCHData, corresponding_file_db_entry, pipeline_config) ->

def match_data_with_file_db_entry(data: PUNCHData, file_db_entry_list):
# figure out which file_db_entry this corresponds to
matching_entries = [file_db_entry for file_db_entry in file_db_entry_list
if file_db_entry.filename() == data.filename_base + ".fits"]
matching_entries = [
file_db_entry
for file_db_entry in file_db_entry_list
if file_db_entry.filename() == data.filename_base + ".fits"
]
if len(matching_entries) == 0:
raise RuntimeError(f"There did not exist a file_db_entry for this result: result={data.filename_base}.")
elif len(matching_entries) > 1:
Expand Down
14 changes: 8 additions & 6 deletions punchpipe/flows/level0.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def level0_query_ready_files(session, pipeline_config: dict):
dropzone = os.path.join(pipeline_config['root'], pipeline_config['input_drop'])
dropzone = os.path.join(pipeline_config["root"], pipeline_config["input_drop"])
return glob(os.path.join(dropzone, "*.tlm"))


Expand All @@ -21,8 +21,10 @@ def level0_construct_flow_info():

@flow
def level0_scheduler_flow(pipeline_config_path="config.yaml", session=None):
generic_scheduler_flow_logic(level0_query_ready_files,
level0_construct_file_info,
level0_construct_flow_info,
pipeline_config_path,
session=session)
generic_scheduler_flow_logic(
level0_query_ready_files,
level0_construct_file_info,
level0_construct_flow_info,
pipeline_config_path,
session=session,
)
67 changes: 41 additions & 26 deletions punchpipe/flows/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,54 @@ def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_
flow_type = "level1_process_flow"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config['levels'][flow_type]['priority']['initial']
call_data = json.dumps({"input_data": [os.path.join(level0_file.directory(pipeline_config['root']),
level0_file.filename()) for level0_file in level0_files],
"output_filename": [os.path.join(level1_file.directory(pipeline_config['root']),
level1_file.filename()) for level1_file in level1_files]})
return Flow(flow_type=flow_type,
flow_level=1,
state=state,
creation_time=creation_time,
priority=priority,
call_data=call_data)
priority = pipeline_config["levels"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"input_data": [
os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename())
for level0_file in level0_files
],
"output_filename": [
os.path.join(level1_file.directory(pipeline_config["root"]), level1_file.filename())
for level1_file in level1_files
],
}
)
return Flow(
flow_type=flow_type,
flow_level=1,
state=state,
creation_time=creation_time,
priority=priority,
call_data=call_data,
)


@task
def level1_construct_file_info(level0_files: t.List[File],
pipeline_config: dict) -> t.List[File]:
return [File(level=1,
file_type=level0_files[0].file_type,
observatory=level0_files[0].observatory,
file_version=pipeline_config['file_version'],
software_version=__version__,
date_obs=level0_files[0].date_obs,
polarization=level0_files[0].polarization,
state="planned")]
def level1_construct_file_info(level0_files: t.List[File], pipeline_config: dict) -> t.List[File]:
return [
File(
level=1,
file_type=level0_files[0].file_type,
observatory=level0_files[0].observatory,
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level0_files[0].date_obs,
polarization=level0_files[0].polarization,
state="planned",
)
]


@flow
def level1_scheduler_flow(pipeline_config_path="config.yaml", session=None):
generic_scheduler_flow_logic(level1_query_ready_files,
level1_construct_file_info,
level1_construct_flow_info,
pipeline_config_path,
session=session)
generic_scheduler_flow_logic(
level1_query_ready_files,
level1_construct_file_info,
level1_construct_flow_info,
pipeline_config_path,
session=session,
)


@flow
Expand Down
Loading

0 comments on commit 2d68acd

Please sign in to comment.