From f4f992a18cbfdef85d355a880c509d81c292bb7b Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Fri, 22 Nov 2024 19:30:54 -0700 Subject: [PATCH 01/22] remove example flow --- punchpipe/cli.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 5cdfeb7..8fcc4ec 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -38,11 +38,6 @@ def main(): if args.command == 'run': run(args.config) -@flow -def my_flow(): - print("Hello, Prefect!") - - def serve_flows(configuration_path): config = load_pipeline_configuration.fn(configuration_path) launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", From fda68ab90740eb2c4dd2e840103e0e4a71a82092 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Fri, 22 Nov 2024 19:31:15 -0700 Subject: [PATCH 02/22] bump ruff in pre-commit --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f7bd8aa..c68c76b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ exclude: "\\.asdf$" repos: # This should be before any formatting hooks like isort - repo: https://github.com/astral-sh/ruff-pre-commit - rev: "v0.7.4" + rev: "v0.8.0" hooks: - id: ruff args: ["--fix"] From 8292f02ad45e95d0498262173b29313e64b853fe Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 23 Nov 2024 02:41:51 +0000 Subject: [PATCH 03/22] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- punchpipe/cli.py | 22 ++++++++++++++++------ punchpipe/flows/fcorona.py | 8 +++----- punchpipe/flows/level3.py | 3 +-- punchpipe/flows/starfield.py | 10 +++------- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 8fcc4ec..f498137 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -1,23 +1,33 @@ -import argparse import os import time +import argparse import subprocess from pathlib import Path from datetime import datetime -import multiprocessing as mp -from prefect import flow, serve +from prefect import serve from prefect.variables import Variable from punchpipe.control.health import update_machine_health_stats from punchpipe.control.launcher import launcher_flow from punchpipe.control.util import load_pipeline_configuration +from punchpipe.flows.fcorona import ( + construct_f_corona_background_process_flow, + construct_f_corona_background_scheduler_flow, +) from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow -from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow, level3_PIM_process_flow, level3_PIM_scheduler_flow +from punchpipe.flows.level3 import ( + level3_PIM_process_flow, + level3_PIM_scheduler_flow, + level3_PTM_process_flow, + level3_PTM_scheduler_flow, +) from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow -from punchpipe.flows.starfield import construct_starfield_background_process_flow, construct_starfield_background_scheduler_flow -from punchpipe.flows.fcorona import construct_f_corona_background_scheduler_flow, construct_f_corona_background_process_flow +from punchpipe.flows.starfield import ( + construct_starfield_background_process_flow, + construct_starfield_background_scheduler_flow, +) from punchpipe.monitor.app import create_app THIS_DIR = os.path.dirname(__file__) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index 71e16ba..596e933 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -1,18 +1,16 @@ -import json -import typing as t import os -from datetime import datetime, timedelta +import json import random +import typing as t +from datetime import datetime from prefect import flow, get_run_logger, task from punchbowl.level3.f_corona_model import construct_full_f_corona_model -from sqlalchemy import and_ from punchpipe import __version__ from punchpipe.control.db import File, Flow from punchpipe.control.processor import generic_process_flow_logic from punchpipe.control.scheduler import generic_scheduler_flow_logic -from punchpipe.control.util import get_database_session @task diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index b591c49..344e69d 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -8,7 +8,7 @@ from sqlalchemy import and_ from punchpipe import __version__ -from punchpipe.control.db import File, Flow, get_closest_file, get_closest_before_file, get_closest_after_file +from punchpipe.control.db import File, Flow, get_closest_after_file, get_closest_before_file, get_closest_file from punchpipe.control.processor import generic_process_flow_logic from punchpipe.control.scheduler import generic_scheduler_flow_logic from punchpipe.control.util import get_database_session @@ -225,4 +225,3 @@ def level3_PIM_scheduler_flow(pipeline_config_path=None, session=None): @flow def level3_PIM_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level3_PIM_flow, pipeline_config_path, session=session) - diff --git a/punchpipe/flows/starfield.py b/punchpipe/flows/starfield.py index 5e7d85f..b3721b8 100644 --- a/punchpipe/flows/starfield.py +++ b/punchpipe/flows/starfield.py @@ -1,19 +1,15 @@ -import typing as t - -import json import os -from datetime import datetime, timedelta +import json +import typing as t +from datetime import datetime from prefect import flow, get_run_logger, task from punchbowl.level3.stellar import generate_starfield_background -from sqlalchemy import and_ - from punchpipe import __version__ from punchpipe.control.db import File, Flow from punchpipe.control.processor import generic_process_flow_logic from punchpipe.control.scheduler import generic_scheduler_flow_logic -from punchpipe.control.util import get_database_session @task From 0ff521bcf6f8ebe401a1a0a7e85537706ed4040f Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 23 Nov 2024 16:07:47 -0700 Subject: [PATCH 04/22] add reference time --- punchpipe/control/scheduler.py | 14 ++-- punchpipe/flows/fcorona.py | 23 ++---- punchpipe/flows/level0.py | 140 +++++++++++++++++++++++++++++---- punchpipe/flows/level1.py | 14 ++-- punchpipe/flows/level2.py | 9 ++- punchpipe/flows/level3.py | 20 +++-- punchpipe/flows/levelq.py | 9 ++- 7 files changed, 168 insertions(+), 61 deletions(-) diff --git a/punchpipe/control/scheduler.py b/punchpipe/control/scheduler.py index 1d93eb6..d5e3ce9 100644 --- a/punchpipe/control/scheduler.py +++ b/punchpipe/control/scheduler.py @@ -5,8 +5,8 @@ def generic_scheduler_flow_logic( query_ready_files_func, construct_child_file_info, construct_child_flow_info, pipeline_config_path, - new_file_state="progressed", - session=None + update_input_file_state=True, new_input_file_state="progressed", + session=None, reference_time=None, ): pipeline_config = load_pipeline_configuration(pipeline_config_path) @@ -17,21 +17,23 @@ def generic_scheduler_flow_logic( session = get_database_session() # find all files that are ready to run - ready_file_ids = query_ready_files_func(session, pipeline_config)[:max_start] + ready_file_ids = query_ready_files_func(session, pipeline_config, reference_time=reference_time)[:max_start] if ready_file_ids: for group in ready_file_ids: parent_files = [] for file_id in group: # mark the file as progressed so that there aren't duplicate processing flows - update_file_state(session, file_id, new_file_state) + if update_input_file_state: + update_file_state(session, file_id, new_input_file_state) # get the prior level file's information parent_files += session.query(File).where(File.file_id == file_id).all() # prepare the new level flow and file - children_files = construct_child_file_info(parent_files, pipeline_config) + children_files = construct_child_file_info(parent_files, pipeline_config, reference_time=reference_time) database_flow_info = construct_child_flow_info(parent_files, children_files, - pipeline_config, session=session) + pipeline_config, session=session, + reference_time=reference_time) for child_file in children_files: session.add(child_file) session.add(database_flow_info) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index 596e933..cc75c12 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -14,7 +14,7 @@ @task -def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 250): +def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 250, reference_time=None): logger = get_run_logger() all_ready_files = (session.query(File) .filter(File.state.in_(["created", "progressed"])) @@ -32,7 +32,7 @@ def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: def construct_f_corona_background_flow_info(level3_files: list[File], level3_f_model_file: File, pipeline_config: dict, - session=None): + session=None, reference_time=None): flow_type = "construct_f_corona_background_process_flow" state = "planned" creation_time = datetime.now() @@ -56,7 +56,7 @@ def construct_f_corona_background_flow_info(level3_files: list[File], @task -def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]: +def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [File( level="3", file_type="PF", @@ -65,26 +65,17 @@ def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline software_version=__version__, date_obs= datetime(2024, 8, 1, 12, 0, 0), #datetime.now()-timedelta(days=60), state="planned", - ), - File( - level="3", - file_type="PF", - observatory="M", - file_version=pipeline_config["file_version"], - software_version=__version__, - date_obs=datetime(2024, 12, 1, 12, 0, 0), # datetime.now()+timedelta(days=60), - state="planned", - ) - ] + ),] @flow -def construct_f_corona_background_scheduler_flow(pipeline_config_path=None, session=None): +def construct_f_corona_background_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( f_corona_background_query_ready_files, construct_f_corona_background_file_info, construct_f_corona_background_flow_info, pipeline_config_path, - new_file_state="created", + update_input_file_state=False, + reference_time=reference_time, session=session, ) diff --git a/punchpipe/flows/level0.py b/punchpipe/flows/level0.py index 74eded6..585bb75 100644 --- a/punchpipe/flows/level0.py +++ b/punchpipe/flows/level0.py @@ -1,29 +1,135 @@ import os -from glob import glob +from datetime import datetime +import numpy as np +import pandas as pd +from ndcube import NDCube from prefect import flow +from prefect.blocks.system import Secret +from punchbowl.data import get_base_file_name +from punchbowl.data.io import write_ndcube_to_fits +from punchbowl.data.meta import NormalizedMetadata +from sqlalchemy import and_ -from punchpipe.control.scheduler import generic_scheduler_flow_logic +from punchpipe import __version__ as software_version +from punchpipe.level0.core import detect_new_tlm_files, update_tlm_database, parse_new_tlm_files, process_telemetry_file, form_from_jpeg_compressed, image_is_okay, get_fits_metadata, form_preliminary_wcs,POSITIONS_TO_CODES, convert_pfw_position_to_polarizer +from punchpipe.level0.ccsds import unpack_compression_settings +from punchpipe.control.util import get_database_session, load_pipeline_configuration +from punchpipe.control.db import TLMFiles, SciPacket, File +@flow +def level0_ingest_raw_packets(session=None): + if session is None: + session = get_database_session() -def level0_query_ready_files(session, pipeline_config: dict): - dropzone = os.path.join(pipeline_config["root"], pipeline_config["input_drop"]) - return glob(os.path.join(dropzone, "*.tlm")) + paths = detect_new_tlm_files(session=session) + for path in paths: + packets = parse_new_tlm_files(path) + update_tlm_database(packets, path) + # update the database with this tlm file + new_tlm_file = TLMFiles(path=path, is_processed=True) + session.add(new_tlm_file) + session.commit() -def level0_construct_file_info(): - pass +@flow +def level0_form_images(session=None, pipeline_config_path="config.yaml"): + if session is None: + session = get_database_session() + config = load_pipeline_configuration(pipeline_config_path) -def level0_construct_flow_info(): - pass + distinct_times = session.query(SciPacket.timestamp).filter(~SciPacket.is_used).distinct().all() + distinct_spacecraft = session.query(SciPacket.spacecraft_id).filter(~SciPacket.is_used).distinct().all() -@flow -def level0_scheduler_flow(session=None): - generic_scheduler_flow_logic( - level0_query_ready_files, - level0_construct_file_info, - level0_construct_flow_info, - session=session, - ) + for spacecraft in distinct_spacecraft: + errors = [] + + for t in distinct_times: + image_packets_entries = session.query(SciPacket).where(and_(SciPacket.timestamp == t[0], + SciPacket.spacecraft_id == spacecraft[0])).all() + image_compression = [unpack_compression_settings(packet.compression_settings) + for packet in image_packets_entries] + + # Read all the relevant TLM files + needed_tlm_ids = set([image_packet.source_tlm_file for image_packet in image_packets_entries]) + tlm_id_to_tlm_path = {tlm_id: session.query(TLMFiles.path).where(TLMFiles.tlm_id == tlm_id) + for tlm_id in needed_tlm_ids} + needed_tlm_paths = list(session.query(TLMFiles.path).where(TLMFiles.tlm_id.in_(needed_tlm_ids)).all()) + tlm_contents = [process_telemetry_file(tlm_path) for tlm_path in needed_tlm_paths] + + # Form the image packet stream for decompression + ordered_image_content = [] + for packet_entry in image_packets_entries: + tlm_content_index = needed_tlm_paths.index(tlm_id_to_tlm_path[packet_entry.source_tlm_file]) + selected_tlm_contents = tlm_contents[tlm_content_index] + ordered_image_content.append(selected_tlm_contents[0x20]['SCI_XFI_IMG_DATA'][packet_entry.packet_num]) + ordered_image_content = np.concatenate(ordered_image_content) + + # Get the proper image + skip_image = False + if image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image + try: + image = form_from_jpeg_compressed(ordered_image_content) + except ValueError: + error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), + 'start_block': image_packets_entries[0].flash_block, + 'replay_length': image_packets_entries[-1].flash_block + - image_packets_entries[0].flash_block} + errors.append(error) + else: + skip_image = True + else: + skip_image = True + error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), + 'start_block': image_packets_entries[0].flash_block, + 'replay_length': image_packets_entries[-1].flash_block + - image_packets_entries[0].flash_block} + errors.append(error) + + # check the quality of the image + if not skip_image and not image_is_okay(image, config): + skip_image = True + error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), + 'start_block': image_packets_entries[0].flash_block, + 'replay_length': image_packets_entries[-1].flash_block + - image_packets_entries[0].flash_block} + errors.append(error) + + if not skip_image: + spacecraft_secrets = Secret.load("spacecraft-ids") + spacecraft_id_mapper = spacecraft_secrets.get() + spacecraft_id = spacecraft_id_mapper[image_packets_entries[0].spacecraft_id] + + metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp, spacecraft_id) + file_type = POSITIONS_TO_CODES[convert_pfw_position_to_polarizer(metadata_contents['POSITION_CURR'])] + preliminary_wcs = form_preliminary_wcs(metadata_contents, config['plate_scale'][spacecraft_id]) + meta = NormalizedMetadata.load_template(file_type + spacecraft_id, "0") + for meta_key, meta_value in metadata_contents.items(): + meta[meta_key] = meta_value + cube = NDCube(data=image, metadata=meta, wcs=preliminary_wcs) + + l0_db_entry = File(level="0", + file_type=file_type, + observatory=str(spacecraft_id), + file_version="1", # TODO: increment the file version + software_version=software_version, + date_created=datetime.now(), + date_obs=t, + date_beg=t, + date_end=t, + state="created") + + write_ndcube_to_fits(cube, os.path.join(l0_db_entry.directory(config['data_path']), + get_base_file_name(cube))) + # TODO: write a jp2 + for image_packets_entries in image_packets_entries: + image_packets_entries.is_used = True + session.add(l0_db_entry) + session.commit() + df_errors = pd.DataFrame(errors) + date_str = datetime.now().strftime("%Y_%j") + df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{spacecraft}_REPLAY_{date_str}.csv') + os.makedirs(df_path, exist_ok=True) + df_errors.to_csv(df_path, index=False) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 1dee922..b4d9e44 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -15,13 +15,13 @@ SCIENCE_LEVEL0_TYPE_CODES = ["PM", "PZ", "PP", "CR"] @task -def level1_query_ready_files(session, pipeline_config: dict): +def level1_query_ready_files(session, pipeline_config: dict, reference_time=None): return [[f.file_id] for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) .where(and_(File.state == "created", File.level == "0")).all()] @task -def get_psf_model_path(level0_file, pipeline_config: dict, session=None): +def get_psf_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None): corresponding_psf_model_type = {"PM": "RM", "PZ": "RZ", "PP": "RP", @@ -35,7 +35,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @task -def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): +def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None): best_model = (session.query(File) .filter(File.file_type == 'FQ') .filter(File.observatory == level0_file.observatory) @@ -44,7 +44,8 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @task -def level1_construct_flow_info(level0_files: list[File], level1_files: File, pipeline_config: dict, session=None): +def level1_construct_flow_info(level0_files: list[File], level1_files: File, + pipeline_config: dict, session=None, reference_time=None): flow_type = "level1_process_flow" state = "planned" creation_time = datetime.now() @@ -70,7 +71,7 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File, pip @task -def level1_construct_file_info(level0_files: t.List[File], pipeline_config: dict) -> t.List[File]: +def level1_construct_file_info(level0_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [ File( level="1", @@ -86,12 +87,13 @@ def level1_construct_file_info(level0_files: t.List[File], pipeline_config: dict @flow -def level1_scheduler_flow(pipeline_config_path=None, session=None): +def level1_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level1_query_ready_files, level1_construct_file_info, level1_construct_flow_info, pipeline_config_path, + reference_time=reference_time, session=session, ) diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index d80240a..b1fe3de 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -15,7 +15,7 @@ @task -def level2_query_ready_files(session, pipeline_config: dict): +def level2_query_ready_files(session, pipeline_config: dict, reference_time=None): logger = get_run_logger() all_ready_files = (session.query(File).filter(File.state == "created") .filter(File.level == "1") @@ -31,7 +31,7 @@ def level2_query_ready_files(session, pipeline_config: dict): @task -def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipeline_config: dict, session=None): +def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipeline_config: dict, session=None, reference_time=None): flow_type = "level2_process_flow" state = "planned" creation_time = datetime.now() @@ -56,7 +56,7 @@ def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipe @task -def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict) -> t.List[File]: +def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [File( level=2, file_type="PT", @@ -69,12 +69,13 @@ def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict @flow -def level2_scheduler_flow(pipeline_config_path=None, session=None): +def level2_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level2_query_ready_files, level2_construct_file_info, level2_construct_flow_info, pipeline_config_path, + reference_time=reference_time, session=session, ) diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index 344e69d..867fa1a 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -31,7 +31,7 @@ def get_valid_fcorona_models(session, f: File, before_timedelta: timedelta, afte @task -def level3_PTM_query_ready_files(session, pipeline_config: dict): +def level3_PTM_query_ready_files(session, pipeline_config: dict, reference_time=None): logger = get_run_logger() all_ready_files = session.query(File).where(and_(and_(File.state.in_(["progressed", "created"]), File.level == "2"), @@ -61,7 +61,8 @@ def level3_PTM_query_ready_files(session, pipeline_config: dict): @task -def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None): +def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File, + pipeline_config: dict, session=None, reference_time=None): session = get_database_session() # TODO: replace so this works in the tests by passing in a test flow_type = "level3_PTM_process_flow" @@ -107,7 +108,7 @@ def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File, @task -def level3_PTM_construct_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]: +def level3_PTM_construct_file_info(level2_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [File( level="3", file_type="PT", @@ -120,12 +121,13 @@ def level3_PTM_construct_file_info(level2_files: t.List[File], pipeline_config: @flow -def level3_PTM_scheduler_flow(pipeline_config_path=None, session=None): +def level3_PTM_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level3_PTM_query_ready_files, level3_PTM_construct_file_info, level3_PTM_construct_flow_info, pipeline_config_path, + reference_time=reference_time, session=session, ) @@ -136,7 +138,7 @@ def level3_PTM_process_flow(flow_id: int, pipeline_config_path=None, session=Non @task -def level3_PIM_query_ready_files(session, pipeline_config: dict): +def level3_PIM_query_ready_files(session, pipeline_config: dict, reference_time=None): logger = get_run_logger() all_ready_files = session.query(File).where(and_(and_(File.state == "created", File.level == "2"), @@ -160,7 +162,8 @@ def level3_PIM_query_ready_files(session, pipeline_config: dict): @task -def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None): +def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, + session=None, reference_time=None): session = get_database_session() # TODO: replace so this works in the tests by passing in a test flow_type = "level3_PIM_process_flow" @@ -199,7 +202,7 @@ def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File, @task -def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]: +def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [File( level="3", file_type="PI", @@ -212,12 +215,13 @@ def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config: @flow -def level3_PIM_scheduler_flow(pipeline_config_path=None, session=None): +def level3_PIM_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( level3_PIM_query_ready_files, level3_PIM_construct_file_info, level3_PIM_construct_flow_info, pipeline_config_path, + reference_time=reference_time, session=session, ) diff --git a/punchpipe/flows/levelq.py b/punchpipe/flows/levelq.py index 521b023..453f3c3 100644 --- a/punchpipe/flows/levelq.py +++ b/punchpipe/flows/levelq.py @@ -13,7 +13,7 @@ @task -def levelq_query_ready_files(session, pipeline_config: dict): +def levelq_query_ready_files(session, pipeline_config: dict, reference_time=None): logger = get_run_logger() all_ready_files = (session.query(File).filter(File.state == "created") .filter(File.level == "1") @@ -29,7 +29,7 @@ def levelq_query_ready_files(session, pipeline_config: dict): @task -def levelq_construct_flow_info(level1_files: list[File], levelq_file: File, pipeline_config: dict, session=None): +def levelq_construct_flow_info(level1_files: list[File], levelq_file: File, pipeline_config: dict, session=None, reference_time=None): flow_type = "levelq_process_flow" state = "planned" creation_time = datetime.now() @@ -53,7 +53,7 @@ def levelq_construct_flow_info(level1_files: list[File], levelq_file: File, pipe @task -def levelq_construct_file_info(level1_files: t.List[File], pipeline_config: dict) -> t.List[File]: +def levelq_construct_file_info(level1_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [File( level="Q", file_type="CT", @@ -76,12 +76,13 @@ def levelq_construct_file_info(level1_files: t.List[File], pipeline_config: dict @flow -def levelq_scheduler_flow(pipeline_config_path=None, session=None): +def levelq_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( levelq_query_ready_files, levelq_construct_file_info, levelq_construct_flow_info, pipeline_config_path, + reference_time=reference_time, session=session, ) From 99d46736f551bed4fcc3feccb5c3999122eaa18d Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 23 Nov 2024 16:08:01 -0700 Subject: [PATCH 05/22] add tags --- punchpipe/cli.py | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index f498137..730bbdc 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -15,6 +15,7 @@ construct_f_corona_background_process_flow, construct_f_corona_background_scheduler_flow, ) +from punchpipe.flows.level0 import level0_form_images, level0_ingest_raw_packets from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow from punchpipe.flows.level3 import ( @@ -38,15 +39,12 @@ def main(): """Run the PUNCH automated pipeline""" # mp.set_start_method('spawn') parser = argparse.ArgumentParser(prog='punchpipe') - subparsers = parser.add_subparsers(dest='command') - run_parser = subparsers.add_parser('run', help='Run the pipline') - run_parser.add_argument('config', type=str, help='Path to config for running') - - args = parser.parse_args() if args.command == 'run': run(args.config) + else: + parser.print_help() def serve_flows(configuration_path): config = load_pipeline_configuration.fn(configuration_path) @@ -56,18 +54,31 @@ def serve_flows(configuration_path): parameters={"pipeline_configuration_path": configuration_path} ) + level0_ingest_raw_packets_deployment = level0_ingest_raw_packets.to_deployment(name="level0_ingest_raw_packets", + description="Ingest raw packets.", + cron="* * * * *", tags=['L0']) + + level0_form_images_deployment = level0_form_images.to_deployment(name="level0_form_images", + description="Form images from packets.", + cron="* * * * *", + tags=['L0'], + parameters={"pipeline_config_path": configuration_path}) + level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment", description="Schedule a Level 1 flow.", + tags=["L1", "scheduler"], cron=config['levels']['level1_process_flow'].get("schedule", "* * * * *"), parameters={"pipeline_config_path": configuration_path} ) level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow", + tags=["L1", "process"], description="Process a file from Level 0 to Level 1.", parameters={"pipeline_config_path": configuration_path} ) level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment", description="Schedule a Level 2 flow.", + tags=["L2", "scheduler"], cron=config['levels']['level2_process_flow'].get("schedule", "* * * * *"), parameters={ "pipeline_config_path": configuration_path} @@ -75,11 +86,13 @@ def serve_flows(configuration_path): ) level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", description="Process files from Level 1 to Level 2.", + tags=["L2", "process"], parameters={"pipeline_config_path": configuration_path} ) levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment", description="Schedule a Level Q flow.", + tags=["LQ", "scheduler"], cron=config['levels']['levelq_process_flow'].get("schedule", "* * * * *"), parameters={ "pipeline_config_path": configuration_path} @@ -87,11 +100,13 @@ def serve_flows(configuration_path): ) levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow", description="Process files from Level 1 to Level Q.", + tags=["LQ", "process"], parameters={"pipeline_config_path": configuration_path} ) level3_PIM_scheduler_deployment = level3_PIM_scheduler_flow.to_deployment(name="level3-PIM-scheduler-deployment", description="Schedule a Level 3 flow to make PIM.", + tags=["L3", "scheduler"], cron=config['levels']['level3_PIM_process_flow'].get("schedule", "* * * * *"), parameters={ "pipeline_config_path": configuration_path} @@ -99,6 +114,7 @@ def serve_flows(configuration_path): ) level3_PIM_process_deployment = level3_PIM_process_flow.to_deployment(name="level3_PIM_process_flow", description="Process to PIM files.", + tags=["L3", "process"], parameters={ "pipeline_config_path": configuration_path} ) @@ -106,12 +122,14 @@ def serve_flows(configuration_path): construct_f_corona_background_scheduler_deployment = construct_f_corona_background_scheduler_flow.to_deployment(name="construct_f_corona_background-scheduler-deployment", description="Schedule an F corona background.", cron=config['levels']['construct_f_corona_background_process_flow'].get("schedule", "* * * * *"), + tags=["L3", "scheduler"], parameters={ "pipeline_config_path": configuration_path} ) construct_f_corona_background_process_deployment = construct_f_corona_background_process_flow.to_deployment(name="construct_f_corona_background_process_flow", description="Process F corona background.", + tags=["L3", "process"], parameters={ "pipeline_config_path": configuration_path} ) @@ -119,12 +137,14 @@ def serve_flows(configuration_path): construct_starfield_background_scheduler_deployment = construct_starfield_background_scheduler_flow.to_deployment(name="construct_starfield-scheduler-deployment", description="Schedule a starfield background.", cron=config['levels']['construct_starfield_background_process_flow'].get("schedule", "* * * * *"), + tags=["L3", "scheduler"], parameters={ "pipeline_config_path": configuration_path} ) construct_starfield_background_process_deployment = construct_starfield_background_process_flow.to_deployment(name="construct_starfield_background_process_flow", description="Create starfield background.", + tags=["L3", "process"], parameters={ "pipeline_config_path": configuration_path} ) @@ -133,12 +153,14 @@ def serve_flows(configuration_path): level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", description="Schedule a Level 3 flow to make PTM.", cron=config['levels']['level3_PTM_process_flow'].get("schedule", "* * * * *"), + tags=["L3", "scheduler"], parameters={ "pipeline_config_path": configuration_path} ) level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow", description="Process PTM files from Level 2 to Level 3.", + tags=["L3", "process"], parameters={ "pipeline_config_path": configuration_path} ) @@ -148,6 +170,7 @@ def serve_flows(configuration_path): cron="* * * * *") serve(launcher_deployment, + level0_form_images_deployment, level0_ingest_raw_packets_deployment, level1_scheduler_deployment, level1_process_deployment, level2_scheduler_deployment, level2_process_deployment, levelq_scheduler_deployment, levelq_process_deployment, @@ -202,3 +225,9 @@ def run(configuration_path): monitor_process.terminate() print() print("punchpipe abruptly shut down.") + +if __name__ == "__main__": + monitor_process = subprocess.Popen(["gunicorn", + "-b", "0.0.0.0:8050", + "--chdir", THIS_DIR, + "cli:server"]) \ No newline at end of file From 6a1f159607c3dd1e5ac5eb23b69f429c7c32df88 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 23 Nov 2024 16:11:25 -0700 Subject: [PATCH 06/22] simplified l0 --- punchpipe/level0/ccsds.py | 67 +++++++++++-- punchpipe/level0/{flow.py => core.py} | 133 +------------------------- 2 files changed, 62 insertions(+), 138 deletions(-) rename punchpipe/level0/{flow.py => core.py} (66%) diff --git a/punchpipe/level0/ccsds.py b/punchpipe/level0/ccsds.py index 7079fd8..5038672 100644 --- a/punchpipe/level0/ccsds.py +++ b/punchpipe/level0/ccsds.py @@ -115,16 +115,63 @@ def unpack_acquisition_settings(acq_set_val: "bytes|int"): if __name__ == "__main__": - path = "/Users/jhughes/Desktop/data/PUNCH_CCSDS/RAW_CCSDS_DATA/PUNCH_NFI00_RAW_2024_160_19_37_V01.tlm" + from punchbowl.data.visualize import cmap_punch + + path = "/Users/jhughes/new_results/nov17-0753/PUNCH_EM-01_RAW_2024_320_22_36_V01.tlm" # path = "/Users/jhughes/Desktop/data/PUNCH_CCSDS/RAW_CCSDS_DATA/PUNCH_WFI01_RAW_2024_117_22_00_V01.tlm" parsed = process_telemetry_file(path) - print(parse_compression_settings(parsed[0x20]['SCI_XFI_COM_SET'])[22:44]) - - fig, ax = plt.subplots() - ax.plot(parsed[0x20]['CCSDS_PACKET_LENGTH']) - plt.show() - - print(parsed[0x20]['CCSDS_PACKET_LENGTH'][22:44]) - - img = np.concatenate(parsed[0x20]['SCI_XFI_IMG_DATA'][22:44]) + print(parsed[0x20].keys()) + for i in range(len(parsed[0x20])): + print(i) + print(unpack_compression_settings(parsed[0x20]['SCI_XFI_COM_SET'][i])) + print(unpack_acquisition_settings(parsed[0x20]['SCI_XFI_COM_SET'][i])) + print("-"*80) + # + # fig, ax = plt.subplots() + # ax.plot(parsed[0x20]['SCI_XFI_HDR_SEC']) + # plt.show() + + print({k: len(parsed[k]) for k in parsed}) + + print(parsed[0x20]['CCSDS_PACKET_LENGTH']) + print(parsed[0x20]['SCI_XFI_HDR_SCID']) + + img = np.concatenate(parsed[0x20]['SCI_XFI_IMG_DATA'][5:24]) + # img = parsed[0x20]['SCI_XFI_IMG_DATA'][0] img = pylibjpeg.decode(img.tobytes()) + + from punchbowl.data.io import load_ndcube_from_fits + cube = load_ndcube_from_fits("/Users/jhughes/new_results/nov17-0753/PUNCH_L0_PZ2_20241002142916_v1.fits") + + # vmin, vmax = 0, 1_000 + # fig, axs = plt.subplots(ncols=2, sharex=True, sharey=True) + # im0 = axs[0].imshow(img, vmin=np.sqrt(vmin*8), vmax=np.sqrt(8*vmax), interpolation="none") + # im1 = axs[1].imshow(cube.data, vmin=vmin, vmax=vmax, interpolation="none") + # axs[0].set_title("Unpacked image") + # axs[1].set_title("SOC image") + # fig.colorbar(im0, ax=axs[0]) + # fig.colorbar(im1, ax=axs[1]) + # plt.show() + + # fig, ax = plt.subplots() + # ax.plot(cube.data.flatten(), img.flatten(), 'bo', label='data') + # ax.plot(np.arange(65_000), np.sqrt(np.arange(65_000)*8).astype(int), 'k', label='sqrt(8x)') + # ax.set_xlabel("SOC image value") + # ax.set_ylabel("Unpacked image value") + # ax.set_xlim((50_000, 70_000)) + # ax.set_ylim((np.sqrt(8*50_000), np.sqrt(8*70_000))) + # + # ax.legend() + # plt.show() + + vmin, vmax = 0, 1_600 + fig, axs = plt.subplots(ncols=2, sharex=True, sharey=True, figsize=(12, 6)) + im0 = axs[1].imshow(img, vmin=np.sqrt(vmin * 8), vmax=np.sqrt(8 * vmax), interpolation="none", cmap=cmap_punch()) + im1 = axs[0].imshow(np.sqrt(cube.data*8), vmin=np.sqrt(vmin * 8), vmax=np.sqrt(8 * vmax), interpolation="none", cmap=cmap_punch()) + axs[1].set_title("Output test image") + axs[0].set_title("Input test image") + # fig.colorbar(im0, ax=axs[0]) + # fig.colorbar(im1, ax=axs[1]) + fig.tight_layout() + fig.savefig("mmr_image.png", dpi=300) + plt.show() \ No newline at end of file diff --git a/punchpipe/level0/flow.py b/punchpipe/level0/core.py similarity index 66% rename from punchpipe/level0/flow.py rename to punchpipe/level0/core.py index fb6372f..6d69dc0 100644 --- a/punchpipe/level0/flow.py +++ b/punchpipe/level0/core.py @@ -1,4 +1,3 @@ -import os import json import base64 import warnings @@ -7,26 +6,19 @@ from datetime import datetime, timedelta import numpy as np -import pandas as pd import pylibjpeg import pymysql import sqlalchemy.exc from astropy.wcs import WCS -from ndcube import NDCube -from prefect import flow, task -from prefect.blocks.system import Secret -from punchbowl.data import get_base_file_name -from punchbowl.data.io import write_ndcube_to_fits -from punchbowl.data.meta import NormalizedMetadata +from prefect import task from punchbowl.data.wcs import calculate_helio_wcs_from_celestial, calculate_pc_matrix -from sqlalchemy import and_ from sunpy.coordinates import sun -from punchpipe.control.db import ENGPFWPacket, EngXACTPacket, File, SciPacket, TLMFiles, get_closest_eng_packets +from punchpipe.control.db import ENGPFWPacket, EngXACTPacket, SciPacket, TLMFiles, get_closest_eng_packets from punchpipe.control.util import get_database_session, load_pipeline_configuration from punchpipe.error import CCSDSPacketConstructionWarning, CCSDSPacketDatabaseUpdateWarning -from punchpipe.level0.ccsds import PACKET_APID2NAME, process_telemetry_file, unpack_compression_settings -from punchpipe.level0.meta import POSITIONS_TO_CODES, convert_pfw_position_to_polarizer, eci_quaternion_to_ra_dec +from punchpipe.level0.ccsds import PACKET_APID2NAME, process_telemetry_file +from punchpipe.level0.meta import eci_quaternion_to_ra_dec software_version = importlib.metadata.version("punchpipe") @@ -191,6 +183,7 @@ def form_packet_entry(apid, packet, packet_num, source_tlm_file_id): ) case _: warnings.warn("Unable to add packet to database.", CCSDSPacketDatabaseUpdateWarning) + @task def update_tlm_database(packets, telemetry_file_path: str, session=None): if session is None: @@ -207,21 +200,6 @@ def update_tlm_database(packets, telemetry_file_path: str, session=None): session.commit() -@flow -def ingest_raw_packets(session=None): - if session is None: - session = get_database_session() - - paths = detect_new_tlm_files(session=session) - for path in paths: - packets = parse_new_tlm_files(path) - update_tlm_database(packets, path) - - # update the database with this tlm file - new_tlm_file = TLMFiles(path=path, is_processed=True) - session.add(new_tlm_file) - session.commit() - def interpolate_value(query_time, before_time, before_value, after_time, after_value): if query_time == before_time: return before_value @@ -285,104 +263,3 @@ def form_from_jpeg_compressed(packets): img = pylibjpeg.decode(packets.tobytes()) return img -@flow -def form_level0_fits(session=None, pipeline_config_path="config.yaml"): - if session is None: - session = get_database_session() - - config = load_pipeline_configuration(pipeline_config_path) - - distinct_times = session.query(SciPacket.timestamp).filter(~SciPacket.is_used).distinct().all() - distinct_spacecraft = session.query(SciPacket.spacecraft_id).filter(~SciPacket.is_used).distinct().all() - - - for spacecraft in distinct_spacecraft: - errors = [] - - for t in distinct_times: - image_packets_entries = session.query(SciPacket).where(and_(SciPacket.timestamp == t[0], - SciPacket.spacecraft_id == spacecraft[0])).all() - image_compression = [unpack_compression_settings(packet.compression_settings) - for packet in image_packets_entries] - - # Read all the relevant TLM files - needed_tlm_ids = set([image_packet.source_tlm_file for image_packet in image_packets_entries]) - tlm_id_to_tlm_path = {tlm_id: session.query(TLMFiles.path).where(TLMFiles.tlm_id == tlm_id) - for tlm_id in needed_tlm_ids} - needed_tlm_paths = list(session.query(TLMFiles.path).where(TLMFiles.tlm_id.in_(needed_tlm_ids)).all()) - tlm_contents = [process_telemetry_file(tlm_path) for tlm_path in needed_tlm_paths] - - # Form the image packet stream for decompression - ordered_image_content = [] - for packet_entry in image_packets_entries: - tlm_content_index = needed_tlm_paths.index(tlm_id_to_tlm_path[packet_entry.source_tlm_file]) - selected_tlm_contents = tlm_contents[tlm_content_index] - ordered_image_content.append(selected_tlm_contents[0x20]['SCI_XFI_IMG_DATA'][packet_entry.packet_num]) - ordered_image_content = np.concatenate(ordered_image_content) - - # Get the proper image - skip_image = False - if image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image - try: - image = form_from_jpeg_compressed(ordered_image_content) - except ValueError: - error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), - 'start_block': image_packets_entries[0].flash_block, - 'replay_length': image_packets_entries[-1].flash_block - - image_packets_entries[0].flash_block} - errors.append(error) - else: - skip_image = True - else: - skip_image = True - error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), - 'start_block': image_packets_entries[0].flash_block, - 'replay_length': image_packets_entries[-1].flash_block - - image_packets_entries[0].flash_block} - errors.append(error) - - # check the quality of the image - if not skip_image and not image_is_okay(image, config): - skip_image = True - error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), - 'start_block': image_packets_entries[0].flash_block, - 'replay_length': image_packets_entries[-1].flash_block - - image_packets_entries[0].flash_block} - errors.append(error) - - if not skip_image: - spacecraft_secrets = Secret.load("spacecraft-ids") - spacecraft_id_mapper = spacecraft_secrets.get() - spacecraft_id = spacecraft_id_mapper[image_packets_entries[0].spacecraft_id] - - metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp, spacecraft_id) - file_type = POSITIONS_TO_CODES[convert_pfw_position_to_polarizer(metadata_contents['POSITION_CURR'])] - preliminary_wcs = form_preliminary_wcs(metadata_contents, config['plate_scale'][spacecraft_id]) - meta = NormalizedMetadata.load_template(file_type + spacecraft_id, "0") - for meta_key, meta_value in metadata_contents.items(): - meta[meta_key] = meta_value - cube = NDCube(data=image, metadata=meta, wcs=preliminary_wcs) - - l0_db_entry = File(level="0", - file_type=file_type, - observatory=str(spacecraft_id), - file_version="1", # TODO: increment the file version - software_version=software_version, - date_created=datetime.now(), - date_obs=t, - date_beg=t, - date_end=t, - state="created") - - write_ndcube_to_fits(cube, os.path.join(l0_db_entry.directory(config['data_path']), - get_base_file_name(cube))) - # TODO: write a jp2 - for image_packets_entries in image_packets_entries: - image_packets_entries.is_used = True - session.add(l0_db_entry) - session.commit() - df_errors = pd.DataFrame(errors) - date_str = datetime.now().strftime("%Y_%j") - df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{spacecraft}_REPLAY_{date_str}.csv') - os.makedirs(df_path, exist_ok=True) - df_errors.to_csv(df_path, index=False) From 84c24bc0b392c5ec69337b66f30e1f4754a4735b Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 23 Nov 2024 16:11:40 -0700 Subject: [PATCH 07/22] adds pim --- config.yaml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/config.yaml b/config.yaml index 5c9a339..2c08006 100644 --- a/config.yaml +++ b/config.yaml @@ -61,3 +61,27 @@ levels: escalation: [10000] schedule: "* * * * *" options: + + level3_PIM_process_flow: + priority: + initial: 10000 + seconds: [1] + escalation: [10000] + schedule: "* * * * *" + options: + + construct_f_corona_background_process_flow: + priority: + initial: 10000 + seconds: [1] + escalation: [10000] + schedule: "* * * * *" + options: + + construct_starfield_background_process_flow: + priority: + initial: 10000 + seconds: [1] + escalation: [10000] + schedule: "* * * * *" + options: \ No newline at end of file From c7b7e60bdcbf30043e3060c0a2a38738df1383f3 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 23 Nov 2024 16:11:48 -0700 Subject: [PATCH 08/22] drops click --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 08a982a..db57c03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,6 @@ build-backend = "setuptools.build_meta" name = "punchpipe" dynamic = ["version"] dependencies = [ - "click", "ccsdspy", "punchbowl", "prefect[sqlalchemy]", From 283f6582685ba2e215b526e408aefd17562618d8 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 23 Nov 2024 16:11:55 -0700 Subject: [PATCH 09/22] remove extra hdr --- punchpipe/level0/defs/SCI_XFI.csv | 1 - 1 file changed, 1 deletion(-) diff --git a/punchpipe/level0/defs/SCI_XFI.csv b/punchpipe/level0/defs/SCI_XFI.csv index 96b33ac..b22056a 100644 --- a/punchpipe/level0/defs/SCI_XFI.csv +++ b/punchpipe/level0/defs/SCI_XFI.csv @@ -1,5 +1,4 @@ name,data_type,bit_length -SCI_XFI_HDR_LEN,uint,16 SCI_XFI_HDR_SCID,uint,8 SCI_XFI_HDR_FILL_1,uint,1 SCI_XFI_HDR_FLASH_BLOCK,uint,15 From 56d9b15b9ec64f44d11ce9d052dd0fe493d29cf7 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 23 Nov 2024 16:12:07 -0700 Subject: [PATCH 10/22] adds reference time --- punchpipe/flows/starfield.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/punchpipe/flows/starfield.py b/punchpipe/flows/starfield.py index b3721b8..5e7e343 100644 --- a/punchpipe/flows/starfield.py +++ b/punchpipe/flows/starfield.py @@ -13,7 +13,7 @@ @task -def starfield_background_query_ready_files(session, pipeline_config: dict): +def starfield_background_query_ready_files(session, pipeline_config: dict, reference_time=None): logger = get_run_logger() all_ready_files = (session.query(File) .filter(File.state == "created") @@ -31,7 +31,7 @@ def starfield_background_query_ready_files(session, pipeline_config: dict): def construct_starfield_background_flow_info(level3_fcorona_subtracted_files: list[File], level3_starfield_model_file: File, pipeline_config: dict, - session=None): + session=None, reference_time=None): flow_type = "construct_starfield_background_process_flow" state = "planned" creation_time = datetime.now() @@ -42,6 +42,7 @@ def construct_starfield_background_flow_info(level3_fcorona_subtracted_files: li os.path.join(level3_file.directory(pipeline_config["root"]), level3_file.filename()) for level3_file in level3_fcorona_subtracted_files ])), + "reference_time": reference_time } ) return Flow( @@ -55,7 +56,7 @@ def construct_starfield_background_flow_info(level3_fcorona_subtracted_files: li @task -def construct_starfield_background_file_info(level3_files: t.List[File], pipeline_config: dict) -> t.List[File]: +def construct_starfield_background_file_info(level3_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: return [File( level="3", file_type="PS", @@ -78,13 +79,14 @@ def construct_starfield_background_file_info(level3_files: t.List[File], pipelin @flow -def construct_starfield_background_scheduler_flow(pipeline_config_path=None, session=None): +def construct_starfield_background_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( starfield_background_query_ready_files, construct_starfield_background_file_info, construct_starfield_background_flow_info, pipeline_config_path, - new_file_state="created", + update_input_file_state=False, + reference_time=reference_time, session=session, ) From e89a49f2fba77306faa2c76247d3310987921723 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sun, 24 Nov 2024 11:53:57 -0700 Subject: [PATCH 11/22] refresh --- punchpipe/cli.py | 68 +++++++++++++++++++----------------- punchpipe/flows/fcorona.py | 4 +-- punchpipe/flows/level3.py | 2 +- punchpipe/flows/starfield.py | 4 +-- punchpipe/monitor/app.py | 43 ++++++++--------------- 5 files changed, 55 insertions(+), 66 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 730bbdc..ee163d8 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -12,8 +12,8 @@ from punchpipe.control.launcher import launcher_flow from punchpipe.control.util import load_pipeline_configuration from punchpipe.flows.fcorona import ( - construct_f_corona_background_process_flow, - construct_f_corona_background_scheduler_flow, + f_corona_process, + f_corona_scheduler, ) from punchpipe.flows.level0 import level0_form_images, level0_ingest_raw_packets from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow @@ -26,8 +26,8 @@ ) from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow from punchpipe.flows.starfield import ( - construct_starfield_background_process_flow, - construct_starfield_background_scheduler_flow, + starfield_process_flow, + starfield_scheduler_flow, ) from punchpipe.monitor.app import create_app @@ -39,7 +39,11 @@ def main(): """Run the PUNCH automated pipeline""" # mp.set_start_method('spawn') parser = argparse.ArgumentParser(prog='punchpipe') + subparsers = parser.add_subparsers(dest="command") + run_parser = subparsers.add_parser('run', help="Run the pipeline.") + run_parser.add_argument("config", type=str, help="Path to config.") + args = parser.parse_args() if args.command == 'run': run(args.config) @@ -107,7 +111,7 @@ def serve_flows(configuration_path): level3_PIM_scheduler_deployment = level3_PIM_scheduler_flow.to_deployment(name="level3-PIM-scheduler-deployment", description="Schedule a Level 3 flow to make PIM.", tags=["L3", "scheduler"], - cron=config['levels']['level3_PIM_process_flow'].get("schedule", "* * * * *"), + cron=config['levels']['L3_PIM'].get("schedule", "* * * * *"), parameters={ "pipeline_config_path": configuration_path} @@ -119,40 +123,38 @@ def serve_flows(configuration_path): "pipeline_config_path": configuration_path} ) - construct_f_corona_background_scheduler_deployment = construct_f_corona_background_scheduler_flow.to_deployment(name="construct_f_corona_background-scheduler-deployment", - description="Schedule an F corona background.", - cron=config['levels']['construct_f_corona_background_process_flow'].get("schedule", "* * * * *"), - tags=["L3", "scheduler"], - parameters={ + f_corona_scheduler_dep = f_corona_scheduler.to_deployment(name="construct_f_corona_background-scheduler-deployment", + description="Schedule an F corona background.", + cron=config['levels']['f_corona'].get("schedule", "* * * * *"), + tags=["L3", "scheduler"], + parameters={ "pipeline_config_path": configuration_path} - ) - construct_f_corona_background_process_deployment = construct_f_corona_background_process_flow.to_deployment(name="construct_f_corona_background_process_flow", - description="Process F corona background.", - tags=["L3", "process"], - parameters={ + ) + f_corona_process_dep = f_corona_process.to_deployment(name="construct_f_corona_background_process_flow", + description="Process F corona background.", + tags=["L3", "process"], + parameters={ "pipeline_config_path": configuration_path} - ) + ) - construct_starfield_background_scheduler_deployment = construct_starfield_background_scheduler_flow.to_deployment(name="construct_starfield-scheduler-deployment", - description="Schedule a starfield background.", - cron=config['levels']['construct_starfield_background_process_flow'].get("schedule", "* * * * *"), - tags=["L3", "scheduler"], - parameters={ - "pipeline_config_path": configuration_path} + starfield_scheduler_dep = starfield_scheduler_flow.to_deployment(name="construct_starfield-scheduler-deployment", + description="Schedule a starfield background.", + cron=config['levels']['starfield'].get("schedule", "* * * * *"), + tags=["L3", "scheduler"], + parameters={"pipeline_config_path": configuration_path} - ) - construct_starfield_background_process_deployment = construct_starfield_background_process_flow.to_deployment(name="construct_starfield_background_process_flow", - description="Create starfield background.", - tags=["L3", "process"], - parameters={ - "pipeline_config_path": configuration_path} - ) + ) + starfield_process_dep = starfield_process_flow.to_deployment(name="construct_starfield_background_process_flow", + description="Create starfield background.", + tags=["L3", "process"], + parameters={"pipeline_config_path": configuration_path} + ) level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", description="Schedule a Level 3 flow to make PTM.", - cron=config['levels']['level3_PTM_process_flow'].get("schedule", "* * * * *"), + cron=config['levels']['L3_PTM'].get("schedule", "* * * * *"), tags=["L3", "scheduler"], parameters={ "pipeline_config_path": configuration_path} @@ -175,12 +177,12 @@ def serve_flows(configuration_path): level2_scheduler_deployment, level2_process_deployment, levelq_scheduler_deployment, levelq_process_deployment, level3_PTM_scheduler_deployment, level3_PTM_process_deployment, - construct_f_corona_background_process_deployment, construct_f_corona_background_scheduler_deployment, - construct_starfield_background_process_deployment, construct_starfield_background_scheduler_deployment, + f_corona_process_dep, f_corona_scheduler_dep, + starfield_process_dep, starfield_scheduler_dep, level3_PIM_scheduler_deployment, level3_PIM_process_deployment, health_deployment, limit=1000 - ) + ) def run(configuration_path): now = datetime.now() diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index cc75c12..ac07be0 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -68,7 +68,7 @@ def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline ),] @flow -def construct_f_corona_background_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): +def f_corona_scheduler(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( f_corona_background_query_ready_files, construct_f_corona_background_file_info, @@ -80,5 +80,5 @@ def construct_f_corona_background_scheduler_flow(pipeline_config_path=None, sess ) @flow -def construct_f_corona_background_process_flow(flow_id: int, pipeline_config_path=None, session=None): +def f_corona_process(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, construct_full_f_corona_model, pipeline_config_path, session=session) diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index 867fa1a..8733df1 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -166,7 +166,7 @@ def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File, session=None, reference_time=None): session = get_database_session() # TODO: replace so this works in the tests by passing in a test - flow_type = "level3_PIM_process_flow" + flow_type = "L3_PIM" state = "planned" creation_time = datetime.now() priority = pipeline_config["levels"][flow_type]["priority"]["initial"] diff --git a/punchpipe/flows/starfield.py b/punchpipe/flows/starfield.py index 5e7e343..4a54c1e 100644 --- a/punchpipe/flows/starfield.py +++ b/punchpipe/flows/starfield.py @@ -79,7 +79,7 @@ def construct_starfield_background_file_info(level3_files: t.List[File], pipelin @flow -def construct_starfield_background_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): +def starfield_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): generic_scheduler_flow_logic( starfield_background_query_ready_files, construct_starfield_background_file_info, @@ -92,7 +92,7 @@ def construct_starfield_background_scheduler_flow(pipeline_config_path=None, ses @flow -def construct_starfield_background_process_flow(flow_id: int, pipeline_config_path=None, session=None): +def starfield_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, generate_starfield_background, pipeline_config_path, diff --git a/punchpipe/monitor/app.py b/punchpipe/monitor/app.py index d8761fa..d32058e 100644 --- a/punchpipe/monitor/app.py +++ b/punchpipe/monitor/app.py @@ -123,50 +123,37 @@ def update_flows(n, page_current, page_size, sort_by, filter): size = page_size return dff.iloc[page * size: (page + 1) * size].to_dict('records') - @callback( - Output('status-cards', 'children'), - Input('interval-component', 'n_intervals'), - ) - def update_cards(n): - card_content = [ - dbc.CardHeader("Card header"), + + def create_card_content(level: int, status: str): + return [ + # dbc.CardHeader(f"Level {level} Flow Pressure"), dbc.CardBody( [ - html.H5("Card title", className="card-title"), + html.H5(f"Level {level} Flow Pressure", className="card-title"), html.P( - "This is some card content that we'll reuse", + status, className="card-text", ), ] ), ] + @callback( + Output('status-cards', 'children'), + Input('interval-component', 'n_intervals'), + ) + def update_cards(n): cards = html.Div( [ dbc.Row( [ - dbc.Col(dbc.Card(card_content, color="primary", inverse=True)), - dbc.Col( - dbc.Card(card_content, color="secondary", inverse=True) - ), - dbc.Col(dbc.Card(card_content, color="info", inverse=True)), + dbc.Col(dbc.Card(create_card_content(0, "Good"), color="success", inverse=True)), + dbc.Col(dbc.Card(create_card_content(1, "Good"), color="success", inverse=True)), + dbc.Col(dbc.Card(create_card_content(2, "Warning"), color="warning", inverse=True)), + dbc.Col(dbc.Card(create_card_content(3, "Bad"), color="danger", inverse=True)), ], className="mb-4", ), - dbc.Row( - [ - dbc.Col(dbc.Card(card_content, color="success", inverse=True)), - dbc.Col(dbc.Card(card_content, color="warning", inverse=True)), - dbc.Col(dbc.Card(card_content, color="danger", inverse=True)), - ], - className="mb-4", - ), - dbc.Row( - [ - dbc.Col(dbc.Card(card_content, color="light")), - dbc.Col(dbc.Card(card_content, color="dark", inverse=True)), - ] - ), ] ) return cards From d4dcd6f6a70999c3e0f0f0f27304565f638e3c92 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sun, 24 Nov 2024 11:56:00 -0700 Subject: [PATCH 12/22] fix import --- punchpipe/flows/fcorona.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index ac07be0..9527986 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -5,7 +5,7 @@ from datetime import datetime from prefect import flow, get_run_logger, task -from punchbowl.level3.f_corona_model import construct_full_f_corona_model +from punchbowl.level3.f_corona_model import construct_polarized_f_corona_model from punchpipe import __version__ from punchpipe.control.db import File, Flow @@ -81,4 +81,4 @@ def f_corona_scheduler(pipeline_config_path=None, session=None, reference_time=N @flow def f_corona_process(flow_id: int, pipeline_config_path=None, session=None): - generic_process_flow_logic(flow_id, construct_full_f_corona_model, pipeline_config_path, session=session) + generic_process_flow_logic(flow_id, construct_polarized_f_corona_model, pipeline_config_path, session=session) From 6fc9c4432d92c3d0d840f4dd07130f436611db6d Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sun, 24 Nov 2024 11:58:21 -0700 Subject: [PATCH 13/22] fix import --- punchpipe/flows/level0.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/punchpipe/flows/level0.py b/punchpipe/flows/level0.py index 585bb75..d9b4c08 100644 --- a/punchpipe/flows/level0.py +++ b/punchpipe/flows/level0.py @@ -12,7 +12,8 @@ from sqlalchemy import and_ from punchpipe import __version__ as software_version -from punchpipe.level0.core import detect_new_tlm_files, update_tlm_database, parse_new_tlm_files, process_telemetry_file, form_from_jpeg_compressed, image_is_okay, get_fits_metadata, form_preliminary_wcs,POSITIONS_TO_CODES, convert_pfw_position_to_polarizer +from punchpipe.level0.core import detect_new_tlm_files, update_tlm_database, parse_new_tlm_files, process_telemetry_file, form_from_jpeg_compressed, image_is_okay, get_fits_metadata, form_preliminary_wcs +from punchpipe.level0.meta import POSITIONS_TO_CODES, convert_pfw_position_to_polarizer from punchpipe.level0.ccsds import unpack_compression_settings from punchpipe.control.util import get_database_session, load_pipeline_configuration from punchpipe.control.db import TLMFiles, SciPacket, File From 0b870ed5159761e9eed140bac95505150a09e442 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 25 Nov 2024 12:47:03 -0700 Subject: [PATCH 14/22] use reference time --- punchpipe/flows/fcorona.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index 9527986..a3e7360 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -32,7 +32,8 @@ def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: def construct_f_corona_background_flow_info(level3_files: list[File], level3_f_model_file: File, pipeline_config: dict, - session=None, reference_time=None): + session=None, + reference_time=None): flow_type = "construct_f_corona_background_process_flow" state = "planned" creation_time = datetime.now() @@ -43,6 +44,7 @@ def construct_f_corona_background_flow_info(level3_files: list[File], os.path.join(level3_file.directory(pipeline_config["root"]), level3_file.filename()) for level3_file in level3_files ], + "reference_time": reference_time } ) return Flow( @@ -63,7 +65,7 @@ def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline observatory="M", file_version=pipeline_config["file_version"], software_version=__version__, - date_obs= datetime(2024, 8, 1, 12, 0, 0), #datetime.now()-timedelta(days=60), + date_obs= reference_time, state="planned", ),] From ff05f3a571284de408179988774b5edd03a221b8 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 25 Nov 2024 12:52:37 -0700 Subject: [PATCH 15/22] use reference time properly --- punchpipe/flows/fcorona.py | 8 +++++++- punchpipe/flows/starfield.py | 13 ++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index a3e7360..0f2457d 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -2,7 +2,7 @@ import json import random import typing as t -from datetime import datetime +from datetime import datetime, timedelta from prefect import flow, get_run_logger, task from punchbowl.level3.f_corona_model import construct_polarized_f_corona_model @@ -15,9 +15,15 @@ @task def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 250, reference_time=None): + reference_time = reference_time or datetime.now() + before = reference_time - timedelta(weeks=2) + after = reference_time + timedelta(weeks=2) + logger = get_run_logger() all_ready_files = (session.query(File) .filter(File.state.in_(["created", "progressed"])) + .filter(File.date_obs >= before) + .filter(File.date_obs <= after) .filter(File.level == "2") .filter(File.file_type == "PT") .filter(File.observatory == "M").all()) diff --git a/punchpipe/flows/starfield.py b/punchpipe/flows/starfield.py index 4a54c1e..39c93ba 100644 --- a/punchpipe/flows/starfield.py +++ b/punchpipe/flows/starfield.py @@ -63,18 +63,9 @@ def construct_starfield_background_file_info(level3_files: t.List[File], pipelin observatory="M", file_version=pipeline_config["file_version"], software_version=__version__, - date_obs= datetime(2024, 8, 1, 12, 0, 0), # datetime.now()-timedelta(days=60), + date_obs= reference_time, state="planned", - ), - File( - level="3", - file_type="PS", - observatory="M", - file_version=pipeline_config["file_version"], - software_version=__version__, - date_obs=datetime(2024, 12, 1, 12, 0, 0), # datetime.now()+timedelta(days=60), - state="planned", - ) + ) ] From 5a9835966ba745f31c46086c64bdc2bdca2084f3 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 26 Nov 2024 04:58:15 -0700 Subject: [PATCH 16/22] use_n for f corona construction --- punchpipe/flows/fcorona.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index 0f2457d..b15f492 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -30,7 +30,7 @@ def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: logger.info(f"{len(all_ready_files)} Level 2 PTM files will be used for F corona background modeling.") if len(all_ready_files) > 30: # need at least 30 images random.shuffle(all_ready_files) - return [[f.file_id for f in all_ready_files[:250]]] + return [[f.file_id for f in all_ready_files[:use_n]]] else: return [] From e3c1316123cd4883b41956f6cf20870ce9ab7910 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 26 Nov 2024 05:14:37 -0700 Subject: [PATCH 17/22] always parse datetime --- punchpipe/flows/fcorona.py | 4 ++++ punchpipe/flows/level3.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index b15f492..ede4282 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -3,6 +3,7 @@ import random import typing as t from datetime import datetime, timedelta +from dateutil.parser import parse as parse_datetime_str from prefect import flow, get_run_logger, task from punchbowl.level3.f_corona_model import construct_polarized_f_corona_model @@ -77,6 +78,9 @@ def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline @flow def f_corona_scheduler(pipeline_config_path=None, session=None, reference_time=None): + if not isinstance(reference_time, datetime): + reference_time = parse_datetime_str(reference_time) + generic_scheduler_flow_logic( f_corona_background_query_ready_files, construct_f_corona_background_file_info, diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index 8733df1..c16c2d7 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -2,6 +2,7 @@ import json import typing as t from datetime import datetime, timedelta +from dateutil.parser import parse as parse_datetime_str from prefect import flow, get_run_logger, task from punchbowl.level3.flow import level3_core_flow, level3_PIM_flow @@ -215,7 +216,9 @@ def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config: @flow -def level3_PIM_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None): +def level3_PIM_scheduler_flow(pipeline_config_path=None, session=None, reference_time:datetime=None): + if not isinstance(reference_time, datetime): + reference_time = parse_datetime_str(reference_time) generic_scheduler_flow_logic( level3_PIM_query_ready_files, level3_PIM_construct_file_info, From b469cac79c5f826c0dbb93fd54ce2882844c7339 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 26 Nov 2024 05:17:37 -0700 Subject: [PATCH 18/22] indicate datetime type --- punchpipe/flows/fcorona.py | 10 ++++++---- punchpipe/flows/starfield.py | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index ede4282..718a2e5 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -15,7 +15,8 @@ @task -def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 250, reference_time=None): +def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 250, + reference_time: datetime =None): reference_time = reference_time or datetime.now() before = reference_time - timedelta(weeks=2) after = reference_time + timedelta(weeks=2) @@ -40,7 +41,7 @@ def construct_f_corona_background_flow_info(level3_files: list[File], level3_f_model_file: File, pipeline_config: dict, session=None, - reference_time=None): + reference_time: datetime = None): flow_type = "construct_f_corona_background_process_flow" state = "planned" creation_time = datetime.now() @@ -65,7 +66,8 @@ def construct_f_corona_background_flow_info(level3_files: list[File], @task -def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]: +def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict, + reference_time:datetime = None) -> t.List[File]: return [File( level="3", file_type="PF", @@ -77,7 +79,7 @@ def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline ),] @flow -def f_corona_scheduler(pipeline_config_path=None, session=None, reference_time=None): +def f_corona_scheduler(pipeline_config_path=None, session=None, reference_time: datetime = None): if not isinstance(reference_time, datetime): reference_time = parse_datetime_str(reference_time) diff --git a/punchpipe/flows/starfield.py b/punchpipe/flows/starfield.py index 39c93ba..5a34676 100644 --- a/punchpipe/flows/starfield.py +++ b/punchpipe/flows/starfield.py @@ -13,7 +13,7 @@ @task -def starfield_background_query_ready_files(session, pipeline_config: dict, reference_time=None): +def starfield_background_query_ready_files(session, pipeline_config: dict, reference_time: datetime =None): logger = get_run_logger() all_ready_files = (session.query(File) .filter(File.state == "created") @@ -31,7 +31,7 @@ def starfield_background_query_ready_files(session, pipeline_config: dict, refer def construct_starfield_background_flow_info(level3_fcorona_subtracted_files: list[File], level3_starfield_model_file: File, pipeline_config: dict, - session=None, reference_time=None): + session=None, reference_time: datetime =None): flow_type = "construct_starfield_background_process_flow" state = "planned" creation_time = datetime.now() From 158ccd0204bfa8b4b654bff5486d59edf3df3e1f Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 26 Nov 2024 05:27:47 -0700 Subject: [PATCH 19/22] convert datetime to str for serialization --- punchpipe/flows/fcorona.py | 2 +- punchpipe/flows/starfield.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index 718a2e5..80b76d8 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -52,7 +52,7 @@ def construct_f_corona_background_flow_info(level3_files: list[File], os.path.join(level3_file.directory(pipeline_config["root"]), level3_file.filename()) for level3_file in level3_files ], - "reference_time": reference_time + "reference_time": str(reference_time) } ) return Flow( diff --git a/punchpipe/flows/starfield.py b/punchpipe/flows/starfield.py index 5a34676..fdb2a0f 100644 --- a/punchpipe/flows/starfield.py +++ b/punchpipe/flows/starfield.py @@ -42,7 +42,7 @@ def construct_starfield_background_flow_info(level3_fcorona_subtracted_files: li os.path.join(level3_file.directory(pipeline_config["root"]), level3_file.filename()) for level3_file in level3_fcorona_subtracted_files ])), - "reference_time": reference_time + "reference_time": str(reference_time) } ) return Flow( From cdc844f61a44c29e9f1b5c86883c2bf1132d8c76 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 26 Nov 2024 07:18:06 -0700 Subject: [PATCH 20/22] limit inputs for ndcube --- punchpipe/flows/fcorona.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index 80b76d8..9fec344 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -15,7 +15,7 @@ @task -def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 250, +def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 50, reference_time: datetime =None): reference_time = reference_time or datetime.now() before = reference_time - timedelta(weeks=2) From cce6faaddfb3988680c2fd434258993d2044e0de Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Wed, 27 Nov 2024 01:25:57 -0700 Subject: [PATCH 21/22] rename flows in config --- config.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/config.yaml b/config.yaml index 2c08006..3b892cc 100644 --- a/config.yaml +++ b/config.yaml @@ -54,7 +54,7 @@ levels: schedule: "* * * * *" options: - level3_PTM_process_flow: + L3_PTM: priority: initial: 10000 seconds: [1] @@ -62,7 +62,7 @@ levels: schedule: "* * * * *" options: - level3_PIM_process_flow: + L3_PIM: priority: initial: 10000 seconds: [1] @@ -70,7 +70,7 @@ levels: schedule: "* * * * *" options: - construct_f_corona_background_process_flow: + f_corona: priority: initial: 10000 seconds: [1] @@ -78,7 +78,7 @@ levels: schedule: "* * * * *" options: - construct_starfield_background_process_flow: + starfield: priority: initial: 10000 seconds: [1] From 485b9e452cdd0ad6a9ff328b7f77ec9080d05479 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 27 Nov 2024 08:30:44 +0000 Subject: [PATCH 22/22] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- config.yaml | 2 +- punchpipe/cli.py | 12 +++--------- punchpipe/flows/fcorona.py | 2 +- punchpipe/flows/level0.py | 18 ++++++++++++++---- punchpipe/flows/level3.py | 2 +- punchpipe/level0/ccsds.py | 2 +- punchpipe/level0/core.py | 1 - 7 files changed, 21 insertions(+), 18 deletions(-) diff --git a/config.yaml b/config.yaml index 3b892cc..1f9c6fc 100644 --- a/config.yaml +++ b/config.yaml @@ -84,4 +84,4 @@ levels: seconds: [1] escalation: [10000] schedule: "* * * * *" - options: \ No newline at end of file + options: diff --git a/punchpipe/cli.py b/punchpipe/cli.py index ee163d8..e7da21e 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -11,10 +11,7 @@ from punchpipe.control.health import update_machine_health_stats from punchpipe.control.launcher import launcher_flow from punchpipe.control.util import load_pipeline_configuration -from punchpipe.flows.fcorona import ( - f_corona_process, - f_corona_scheduler, -) +from punchpipe.flows.fcorona import f_corona_process, f_corona_scheduler from punchpipe.flows.level0 import level0_form_images, level0_ingest_raw_packets from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow @@ -25,10 +22,7 @@ level3_PTM_scheduler_flow, ) from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow -from punchpipe.flows.starfield import ( - starfield_process_flow, - starfield_scheduler_flow, -) +from punchpipe.flows.starfield import starfield_process_flow, starfield_scheduler_flow from punchpipe.monitor.app import create_app THIS_DIR = os.path.dirname(__file__) @@ -232,4 +226,4 @@ def run(configuration_path): monitor_process = subprocess.Popen(["gunicorn", "-b", "0.0.0.0:8050", "--chdir", THIS_DIR, - "cli:server"]) \ No newline at end of file + "cli:server"]) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py index 9fec344..1ec7a72 100644 --- a/punchpipe/flows/fcorona.py +++ b/punchpipe/flows/fcorona.py @@ -3,8 +3,8 @@ import random import typing as t from datetime import datetime, timedelta -from dateutil.parser import parse as parse_datetime_str +from dateutil.parser import parse as parse_datetime_str from prefect import flow, get_run_logger, task from punchbowl.level3.f_corona_model import construct_polarized_f_corona_model diff --git a/punchpipe/flows/level0.py b/punchpipe/flows/level0.py index d9b4c08..e635888 100644 --- a/punchpipe/flows/level0.py +++ b/punchpipe/flows/level0.py @@ -12,11 +12,21 @@ from sqlalchemy import and_ from punchpipe import __version__ as software_version -from punchpipe.level0.core import detect_new_tlm_files, update_tlm_database, parse_new_tlm_files, process_telemetry_file, form_from_jpeg_compressed, image_is_okay, get_fits_metadata, form_preliminary_wcs -from punchpipe.level0.meta import POSITIONS_TO_CODES, convert_pfw_position_to_polarizer -from punchpipe.level0.ccsds import unpack_compression_settings +from punchpipe.control.db import File, SciPacket, TLMFiles from punchpipe.control.util import get_database_session, load_pipeline_configuration -from punchpipe.control.db import TLMFiles, SciPacket, File +from punchpipe.level0.ccsds import unpack_compression_settings +from punchpipe.level0.core import ( + detect_new_tlm_files, + form_from_jpeg_compressed, + form_preliminary_wcs, + get_fits_metadata, + image_is_okay, + parse_new_tlm_files, + process_telemetry_file, + update_tlm_database, +) +from punchpipe.level0.meta import POSITIONS_TO_CODES, convert_pfw_position_to_polarizer + @flow def level0_ingest_raw_packets(session=None): diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index c16c2d7..bb4e83d 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -2,8 +2,8 @@ import json import typing as t from datetime import datetime, timedelta -from dateutil.parser import parse as parse_datetime_str +from dateutil.parser import parse as parse_datetime_str from prefect import flow, get_run_logger, task from punchbowl.level3.flow import level3_core_flow, level3_PIM_flow from sqlalchemy import and_ diff --git a/punchpipe/level0/ccsds.py b/punchpipe/level0/ccsds.py index 5038672..0bab2bf 100644 --- a/punchpipe/level0/ccsds.py +++ b/punchpipe/level0/ccsds.py @@ -174,4 +174,4 @@ def unpack_acquisition_settings(acq_set_val: "bytes|int"): # fig.colorbar(im1, ax=axs[1]) fig.tight_layout() fig.savefig("mmr_image.png", dpi=300) - plt.show() \ No newline at end of file + plt.show() diff --git a/punchpipe/level0/core.py b/punchpipe/level0/core.py index 6d69dc0..1e1bb02 100644 --- a/punchpipe/level0/core.py +++ b/punchpipe/level0/core.py @@ -262,4 +262,3 @@ def form_from_jpeg_compressed(packets): """Form a JPEG-LS image from packets""" img = pylibjpeg.decode(packets.tobytes()) return img -