Skip to content

Commit

Permalink
create noaa delivery flow
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Oct 23, 2024
1 parent 9151707 commit e67f481
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
4 changes: 4 additions & 0 deletions deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow
from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow
from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow
from punchpipe.deliver import create_noaa_delivery

launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
description="Launch a pipeline segment.",
Expand Down Expand Up @@ -30,10 +31,13 @@
level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow",
description="Process PTM files from Level 2 to Level 3.")

noaa_deployment = create_noaa_delivery.to_deployment(name="noaa-deployment",
description="Create a Noaa delivery.")

serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
level3_PTM_scheduler_deployment, level3_PTM_process_deployment,
noaa_deployment,
limit=100 # TODO: remove arbitrary limit
)
19 changes: 19 additions & 0 deletions punchpipe/controlsegment/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from datetime import datetime

import yaml
from prefect import task
Expand All @@ -7,6 +8,7 @@
from prefect_sqlalchemy import SqlAlchemyConnector
from ndcube import NDCube
from punchbowl.data import write_ndcube_to_fits, get_base_file_name
from sqlalchemy import and_, or_

from punchpipe.controlsegment.db import File

Expand Down Expand Up @@ -57,3 +59,20 @@ def match_data_with_file_db_entry(data: NDCube, file_db_entry_list):
raise RuntimeError("There were many database entries matching this result. There should only be one.")
else:
return matching_entries[0]


def get_files_in_time_window(level: str,
file_type: str,
obs_code: str,
start_time: datetime,
end_time: datetime,
session: Session | None) -> [File]:
if session is None:
get_database_session()

return (((((session.query(File).filter(or_(File.state == "created", File.state == "progressed"))
.filter(File.level == level))
.filter(File.file_type == file_type))
.filter(File.observatory == obs_code))
.filter(File.date_obs > start_time))
.filter(File.date_obs <= end_time).all())
56 changes: 56 additions & 0 deletions punchpipe/deliver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import hashlib
import os
from zipfile import ZipFile
import time
from datetime import datetime, timedelta

import pandas as pd
from prefect import task, flow

from punchpipe.controlsegment.util import get_database_session, get_files_in_time_window


def hash_one_file(path):
file_hash = hashlib.sha256()
with open(path, 'rb') as f:
fb = f.read()
file_hash.update(fb)

return file_hash.hexdigest()

@task
def build_noaa_manifest(file_paths):
hashes = [hash_one_file(file_path) for file_path in file_paths]
return pd.DataFrame({'file': file_paths, 'hash': hashes})


@task
def get_noaa_files_in_time_range(start_time, end_time, session=None):
if session is None:
session = get_database_session()

paths = []
for obs_code in ["1", "2", "3", "4"]:
paths += get_files_in_time_window("Q", "CR", obs_code, start_time, end_time, session=session)
return paths


@flow
def create_noaa_delivery(time_window=timedelta(hours=1)):
end_time = datetime.now()
start_time = end_time - time_window
timestamp = end_time.strftime('%Y%m%d%H%M%S')

file_paths = get_noaa_files_in_time_range(start_time, end_time)

manifest = build_noaa_manifest(file_paths)
manifest_path = f'manifest_{timestamp}.csv'
manifest.to_csv(manifest_path)

zip_path = f'noaa_{timestamp}.zip'
with ZipFile(zip_path, 'w') as zip_object:
for file_path in file_paths:
zip_object.write(file_path)
zip_object.write(manifest_path)

return zip_path

0 comments on commit e67f481

Please sign in to comment.