Skip to content

Commit

Permalink
Merge pull request #81 from punch-mission/mhughes-nov13
Browse files Browse the repository at this point in the history
Prepare for End2End Test
  • Loading branch information
jmbhughes authored Nov 18, 2024
2 parents 422f9ed + 7613d1c commit fa7d854
Show file tree
Hide file tree
Showing 30 changed files with 420 additions and 172 deletions.
15 changes: 8 additions & 7 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
root: "/d0/punchsoc/gamera_data/"
root: "/Users/jhughes/Desktop/d0/"
input_drop: "dropzone/"
tlm_directory: "dropzone/"
file_version: "1"
Expand All @@ -17,15 +17,16 @@ scheduler:
max_start: 10

launcher:
max_flows_running: 5
schedule: "* * * * *"
max_flows_running: 25

levels:
level0_process_flow:
priority:
initial: 5
seconds: [ 30, 120, 600 ]
escalation: [ 10, 20, 30 ]
schedule:
schedule: "* * * * *"
options:
num_quads: 100

Expand All @@ -34,29 +35,29 @@ levels:
initial: 6
seconds: [ 30, 120, 600 ]
escalation: [ 11, 21, 31 ]
schedule:
schedule: "* * * * *"
options:

level2_process_flow:
priority:
initial: 1000
seconds: [ 3000, 12000, 60000 ]
escalation: [ 1000, 1000, 1000 ]
schedule:
schedule: "* * * * *"
options:

levelq_process_flow:
priority:
initial: 1000
seconds: [ 3000, 12000, 60000 ]
escalation: [ 1000, 1000, 1000 ]
schedule:
schedule: "* * * * *"
options:

level3_PTM_process_flow:
priority:
initial: 10000
seconds: [1]
escalation: [10000]
schedule:
schedule: "* * * * *"
options:
49 changes: 0 additions & 49 deletions deploy.py

This file was deleted.

143 changes: 133 additions & 10 deletions punchpipe/cli.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,145 @@
import os
import time
import subprocess
import multiprocessing as mp
from pathlib import Path
from datetime import datetime

import click
from prefect import flow, serve
from prefect.variables import Variable

from .monitor.app import create_app
from punchpipe.control.health import update_machine_health_stats
from punchpipe.control.launcher import launcher_flow
from punchpipe.control.util import load_pipeline_configuration
from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow
from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow
from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow
from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow
from punchpipe.monitor.app import create_app

THIS_DIR = os.path.dirname(__file__)
app = create_app()
server = app.server

@click.group
def main():
"""Run the PUNCH automated pipeline"""

def launch_monitor():
app = create_app()
app.run_server(debug=False, port=8051)

@flow
def my_flow():
print("Hello, Prefect!")


def serve_flows(configuration_path):
config = load_pipeline_configuration.fn(configuration_path)
launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
description="Launch a pipeline segment.",
cron=config['launcher'].get("schedule", "* * * * *"),
parameters={"pipeline_configuration_path": configuration_path}
)

level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment",
description="Schedule a Level 1 flow.",
cron="* * * * *",
parameters={"pipeline_config_path": configuration_path}
)
level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow",
description="Process a file from Level 0 to Level 1.",
parameters={"pipeline_config_path": configuration_path}
)

level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment",
description="Schedule a Level 2 flow.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.",
parameters={"pipeline_config_path": configuration_path}
)

levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment",
description="Schedule a Level Q flow.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow",
description="Process files from Level 1 to Level Q.",
parameters={"pipeline_config_path": configuration_path}
)

level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment",
description="Schedule a Level 3 flow to make PTM.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow",
description="Process PTM files from Level 2 to Level 3.",
parameters={
"pipeline_config_path": configuration_path}
)

health_deployment = update_machine_health_stats.to_deployment(name="update-health-stats-deployment",
description="Update the health stats table data.",
cron="* * * * *")

serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
levelq_scheduler_deployment, levelq_process_deployment,
level3_PTM_scheduler_deployment, level3_PTM_process_deployment,
health_deployment,
limit=1000
)


@main.command
def run():
print("Launching punchpipe monitor on http://localhost:8051/.")
subprocess.Popen(["prefect", "server", "start"])
print("\npunchpipe Prefect flows must be stopped manually in Prefect.")
mp.Process(target=launch_monitor, args=()).start()
@click.argument("configuration_path", type=click.Path(exists=True))
def run(configuration_path):
now = datetime.now()

configuration_path = str(Path(configuration_path).resolve())
output_path = f"punchpipe_{now.strftime('%Y%m%d_%H%M%S')}.txt"

print()
print(f"Launching punchpipe at {now} with configuration: {configuration_path}")
print(f"Terminal logs from punchpipe are in {output_path}")


with open(output_path, "w") as f:
try:
prefect_process = subprocess.Popen(["prefect", "server", "start"],
stdout=f, stderr=subprocess.STDOUT)
time.sleep(10)
monitor_process = subprocess.Popen(["gunicorn",
"-b", "0.0.0.0:8050",
"--chdir", THIS_DIR,
"cli:server"],
stdout=f, stderr=subprocess.STDOUT)
Variable.set("punchpipe_config", configuration_path, overwrite=True)
print("Launched Prefect dashboard on http://localhost:4200/")
print("Launched punchpipe monitor on http://localhost:8050/")
print("Use ctrl-c to exit.")

serve_flows(configuration_path)
prefect_process.wait()
monitor_process.wait()
except KeyboardInterrupt:
print("Shutting down.")
prefect_process.terminate()
monitor_process.terminate()
print()
print("punchpipe safely shut down.")
except Exception as e:
print(f"Received error: {e}")
prefect_process.terminate()
monitor_process.terminate()
print()
print("punchpipe abruptly shut down.")
File renamed without changes.
2 changes: 1 addition & 1 deletion punchpipe/controlsegment/db.py → punchpipe/control/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class File(Base):
file_type = Column(String(2), nullable=False)
observatory = Column(String(1), nullable=False)
file_version = Column(String(16), nullable=False)
software_version = Column(String(20), nullable=False)
software_version = Column(String(35), nullable=False)
date_created = Column(DateTime, nullable=True)
date_obs = Column(DateTime, nullable=False)
date_beg = Column(DateTime, nullable=True)
Expand Down
31 changes: 31 additions & 0 deletions punchpipe/control/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime

import psutil
from prefect import flow

from punchpipe.control.db import Health
from punchpipe.control.util import get_database_session, load_pipeline_configuration


@flow
def update_machine_health_stats():
config = load_pipeline_configuration()

now = datetime.now()
cpu_usage = psutil.cpu_percent(interval=5)
memory_usage = psutil.virtual_memory().used / 1E9 # store in GB
memory_percentage = psutil.virtual_memory().percent
disk_usage = psutil.disk_usage(config.get("root", "/")).used / 1E9 # store in GB
disk_percentage = psutil.disk_usage(config.get("root", "/")).percent
num_pids = len(psutil.pids())

with get_database_session() as session:
new_health_entry = Health(datetime=now,
cpu_usage=cpu_usage,
memory_usage=memory_usage,
memory_percentage=memory_percentage,
disk_usage=disk_usage,
disk_percentage=disk_percentage,
num_pids=num_pids)
session.add(new_health_entry)
session.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

from prefect import flow, get_run_logger, task
from prefect.client import get_client
from prefect.variables import Variable
from sqlalchemy import and_
from sqlalchemy.orm import Session

from punchpipe.controlsegment.db import Flow
from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration
from punchpipe.control.db import Flow
from punchpipe.control.util import get_database_session, load_pipeline_configuration


@task
Expand Down Expand Up @@ -87,7 +88,7 @@ async def launch_ready_flows(session: Session, flow_ids: List[int]) -> List:


@flow
async def launcher_flow(pipeline_configuration_path="config.yaml"):
async def launcher_flow(pipeline_configuration_path=None):
"""The main launcher flow for Prefect, responsible for identifying flows, based on priority,
that are ready to run and creating flow runs for them. It also escalates long-waiting flows' priorities.
Expand All @@ -99,6 +100,8 @@ async def launcher_flow(pipeline_configuration_path="config.yaml"):
"""
logger = get_run_logger()

if pipeline_configuration_path is None:
pipeline_configuration_path = await Variable.get("punchpipe_config", "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration(pipeline_configuration_path)

logger.info("Establishing database connection")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from prefect import get_run_logger
from prefect.context import get_run_context

from punchpipe.controlsegment.db import File, Flow
from punchpipe.controlsegment.util import (
from punchpipe.control.db import File, Flow
from punchpipe.control.util import (
get_database_session,
load_pipeline_configuration,
match_data_with_file_db_entry,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from punchpipe.controlsegment.db import File, FileRelationship
from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration, update_file_state

from punchpipe.control.db import File, FileRelationship
from punchpipe.control.util import get_database_session, load_pipeline_configuration, update_file_state


def generic_scheduler_flow_logic(
query_ready_files_func, construct_child_file_info, construct_child_flow_info, pipeline_config_path, session=None
):
# load pipeline configuration
pipeline_config = load_pipeline_configuration(pipeline_config_path)

max_start = pipeline_config['scheduler']['max_start']

# get database connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from ndcube import NDCube
from punchbowl.data import NormalizedMetadata

from punchpipe.controlsegment.db import File
from punchpipe.controlsegment.util import match_data_with_file_db_entry
from punchpipe.control.db import File
from punchpipe.control.util import match_data_with_file_db_entry


@pytest.fixture()
Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit fa7d854

Please sign in to comment.