From c1ec8843b02549e8b646e4d8580a7d7ceacfa0a0 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 7 Dec 2024 11:39:41 -0700 Subject: [PATCH 01/10] avoid concatenate empty error --- punchpipe/flows/level0.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/punchpipe/flows/level0.py b/punchpipe/flows/level0.py index 60b4f91..13a7661 100644 --- a/punchpipe/flows/level0.py +++ b/punchpipe/flows/level0.py @@ -107,12 +107,12 @@ def level0_form_images(session=None, pipeline_config_path=None): tlm_content_index = needed_tlm_paths.index(tlm_id_to_tlm_path[packet_entry.source_tlm_file]) selected_tlm_contents = tlm_contents[tlm_content_index] ordered_image_content.append(selected_tlm_contents[0x20]['SCI_XFI_IMG_DATA'][packet_entry.packet_num]) - ordered_image_content = np.concatenate(ordered_image_content) # Get the proper image skip_image = False if image_compression[0]['CMP_BYP'] == 0 and image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image try: + ordered_image_content = np.concatenate(ordered_image_content) image = form_from_jpeg_compressed(ordered_image_content) except (RuntimeError, ValueError): skip_image = True @@ -123,6 +123,7 @@ def level0_form_images(session=None, pipeline_config_path=None): errors.append(error) elif image_compression[0]['CMP_BYP'] == 1: try: + ordered_image_content = np.concatenate(ordered_image_content) logger.info(f"Packet shape {ordered_image_content.shape[0]}", ) image = form_from_raw(ordered_image_content) except (RuntimeError, ValueError): @@ -136,15 +137,6 @@ def level0_form_images(session=None, pipeline_config_path=None): skip_image = True print("Not implemented") - # check the quality of the image - # if not skip_image and not image_is_okay(image, config): - # skip_image = True - # error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), - # 'start_block': image_packets_entries[0].flash_block, - # 'replay_length': image_packets_entries[-1].flash_block - # - image_packets_entries[0].flash_block} - # errors.append(error) - if not skip_image: spacecraft_secrets = SpacecraftMapping.load("spacecraft-ids").mapping.get_secret_value() moc_index = spacecraft_secrets["moc"].index(image_packets_entries[0].spacecraft_id) From 5890b683824a3310b079b6679781c7aedcd4aeb5 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 9 Dec 2024 11:04:57 -0700 Subject: [PATCH 02/10] add packet history --- punchpipe/control/db.py | 8 ++++++++ punchpipe/flows/level0.py | 12 +++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/punchpipe/control/db.py b/punchpipe/control/db.py index 3dcb7a6..fdf5b57 100644 --- a/punchpipe/control/db.py +++ b/punchpipe/control/db.py @@ -204,6 +204,14 @@ class Health(Base): num_pids = Column(Integer, nullable=False) +class PacketHistory(Base): + __tablename__ = "packet_history" + id = Column(Integer, primary_key=True) + datetime = Column(DATETIME(fsp=6), nullable=False) + num_images_succeeded = Column(Integer, nullable=False) + num_images_failed = Column(Integer, nullable=False) + + def get_closest_eng_packets(table, timestamp, spacecraft_id, session): # find the closest events which are greater/less than the timestamp gt_event = session.query(table).filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp > timestamp).order_by(table.timestamp.asc()).first() diff --git a/punchpipe/flows/level0.py b/punchpipe/flows/level0.py index 13a7661..4ddd848 100644 --- a/punchpipe/flows/level0.py +++ b/punchpipe/flows/level0.py @@ -13,7 +13,7 @@ from sqlalchemy import and_ from punchpipe import __version__ as software_version -from punchpipe.control.db import File, SciPacket, TLMFiles +from punchpipe.control.db import File, SciPacket, TLMFiles, PacketHistory from punchpipe.control.util import get_database_session, load_pipeline_configuration from punchpipe.level0.ccsds import unpack_compression_settings from punchpipe.level0.core import ( @@ -76,6 +76,7 @@ def level0_form_images(session=None, pipeline_config_path=None): already_parsed_tlms = {} # tlm_path maps to the parsed contents + skip_count, success_count = 0, 0 for spacecraft in distinct_spacecraft: errors = [] @@ -173,6 +174,15 @@ def level0_form_images(session=None, pipeline_config_path=None): image_packets_entries.is_used = True session.add(l0_db_entry) session.commit() + success_count += 1 + else: + skip_count += 1 + history = PacketHistory(datetime=datetime.now(), + num_images_succeeded=success_count, + num_images_failed=skip_count) + session.add(history) + session.commit() + df_errors = pd.DataFrame(errors) date_str = datetime.now().strftime("%Y_%j") df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{str(spacecraft[0])}_REPLAY_{date_str}.csv') From 45901a095a73af8e84185d780cbea4381b7680c7 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 9 Dec 2024 11:07:42 -0700 Subject: [PATCH 03/10] output quicklook always --- punchpipe/control/util.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/punchpipe/control/util.py b/punchpipe/control/util.py index eaeaf67..eac61c2 100644 --- a/punchpipe/control/util.py +++ b/punchpipe/control/util.py @@ -6,7 +6,7 @@ from prefect import task from prefect.variables import Variable from prefect_sqlalchemy import SqlAlchemyConnector -from punchbowl.data import get_base_file_name, write_ndcube_to_fits +from punchbowl.data import get_base_file_name, write_ndcube_to_fits, write_ndcube_to_jp2 from sqlalchemy import or_ from sqlalchemy.orm import Session from yaml.loader import FullLoader @@ -48,6 +48,9 @@ def write_file(data: NDCube, corresponding_file_db_entry, pipeline_config) -> No write_ndcube_to_fits(data, output_filename) corresponding_file_db_entry.state = "created" + layer = 0 if len(data.data.shape) > 2 else None + write_ndcube_to_jp2(data, output_filename.replace(".fits", ".jp2"), layer=layer) + def match_data_with_file_db_entry(data: NDCube, file_db_entry_list): # figure out which file_db_entry this corresponds to From ad1c9413b22a248f41cbc5d34e43d107dbf04ecb Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 9 Dec 2024 20:18:35 -0700 Subject: [PATCH 04/10] make sure l1 scheduler doesn't fail on no model --- punchpipe/flows/level1.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index b4d9e44..945e5b2 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -16,9 +16,13 @@ @task def level1_query_ready_files(session, pipeline_config: dict, reference_time=None): - return [[f.file_id] for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) - .where(and_(File.state == "created", File.level == "0")).all()] - + ready = [f for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) + .filter(File.state == "created").filter(File.level == "0").all()] + actually_ready = [] + for f in ready: + if get_psf_model_path(f, pipeline_config) is not None and get_psf_model_path(f, pipeline_config) is not None: + actually_ready.append([f]) + return actually_ready @task def get_psf_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None): @@ -32,7 +36,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None, referen .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) - return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) + return best_model @task def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None): @@ -41,7 +45,7 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, ref .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) - return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) + return best_model @task def level1_construct_flow_info(level0_files: list[File], level1_files: File, @@ -50,14 +54,20 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File, state = "planned" creation_time = datetime.now() priority = pipeline_config["levels"][flow_type]["priority"]["initial"] + + best_psf_model = get_psf_model_path(level0_files[0], pipeline_config, session=session) + best_quartic_model = get_quartic_model_path(level0_files[0], pipeline_config, session=session) + call_data = json.dumps( { "input_data": [ os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename()) for level0_file in level0_files ], - "psf_model_path": get_psf_model_path(level0_files[0], pipeline_config, session=session), - "quartic_coefficient_path": get_quartic_model_path(level0_files[0], pipeline_config, session=session), + "psf_model_path": os.path.join(best_psf_model.directory(pipeline_config['root']), + best_psf_model.filename()), + "quartic_coefficient_path": os.path.join(best_quartic_model.directory(pipeline_config['root']), + best_quartic_model.filename()), } ) return Flow( From 47d9ff46026b6318a87c3fb684a4519393b7135b Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 9 Dec 2024 20:25:43 -0700 Subject: [PATCH 05/10] provide a session to l1 --- punchpipe/flows/level1.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 945e5b2..732734e 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -20,7 +20,8 @@ def level1_query_ready_files(session, pipeline_config: dict, reference_time=None .filter(File.state == "created").filter(File.level == "0").all()] actually_ready = [] for f in ready: - if get_psf_model_path(f, pipeline_config) is not None and get_psf_model_path(f, pipeline_config) is not None: + if (get_psf_model_path(f, pipeline_config, session=session) is not None + and get_psf_model_path(f, pipeline_config, session=session) is not None): actually_ready.append([f]) return actually_ready From 548fcf9f67aef77f4e3eb258c178fc5141d0fe1b Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 9 Dec 2024 20:29:26 -0700 Subject: [PATCH 06/10] return the file_id --- punchpipe/flows/level1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 732734e..4acca1a 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -22,7 +22,7 @@ def level1_query_ready_files(session, pipeline_config: dict, reference_time=None for f in ready: if (get_psf_model_path(f, pipeline_config, session=session) is not None and get_psf_model_path(f, pipeline_config, session=session) is not None): - actually_ready.append([f]) + actually_ready.append([f.file_id]) return actually_ready @task From 48f6b31feeac624da73dc432221d03793a073ace Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 10 Dec 2024 16:02:47 -0700 Subject: [PATCH 07/10] add l0 schedules --- punchpipe/cli.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 84546e1..a8f9357 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -54,14 +54,14 @@ def serve_flows(configuration_path): level0_ingest_raw_packets_deployment = level0_ingest_raw_packets.to_deployment(name="level0_ingest_raw_packets", description="Ingest raw packets.", - #cron="* * * * *", + cron="*/5 * * * *", parameters={ "pipeline_config_path": configuration_path}, tags=['L0']) level0_form_images_deployment = level0_form_images.to_deployment(name="level0_form_images", description="Form images from packets.", - #cron="* * * * *", + cron="*/5 * * * *", tags=['L0'], parameters={"pipeline_config_path": configuration_path}) @@ -140,8 +140,8 @@ def serve_flows(configuration_path): cron=config['levels']['construct_starfield_background_process_flow'].get("schedule", "* * * * *"), tags=["L3", "scheduler"], parameters={"pipeline_config_path": configuration_path} - ) + starfield_process_dep = starfield_process_flow.to_deployment(name="construct_starfield_background_process_flow", description="Create starfield background.", tags=["L3", "process"], @@ -196,7 +196,7 @@ def run(configuration_path): try: prefect_process = subprocess.Popen(["prefect", "server", "start"], stdout=f, stderr=subprocess.STDOUT) - time.sleep(10) + time.sleep(5) monitor_process = subprocess.Popen(["gunicorn", "-b", "0.0.0.0:8050", "--chdir", THIS_DIR, @@ -213,14 +213,14 @@ def run(configuration_path): except KeyboardInterrupt: print("Shutting down.") prefect_process.terminate() - time.sleep(10) + time.sleep(5) monitor_process.terminate() print() print("punchpipe safely shut down.") except Exception as e: print(f"Received error: {e}") prefect_process.terminate() - time.sleep(10) + time.sleep(5) monitor_process.terminate() print() print("punchpipe abruptly shut down.") From 58a6351437d3994a9a6bf797dc80e8e774e36e23 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 10 Dec 2024 16:02:58 -0700 Subject: [PATCH 08/10] dynamically update cards --- punchpipe/monitor/app.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/punchpipe/monitor/app.py b/punchpipe/monitor/app.py index d32058e..7c079f2 100644 --- a/punchpipe/monitor/app.py +++ b/punchpipe/monitor/app.py @@ -126,10 +126,9 @@ def update_flows(n, page_current, page_size, sort_by, filter): def create_card_content(level: int, status: str): return [ - # dbc.CardHeader(f"Level {level} Flow Pressure"), dbc.CardBody( [ - html.H5(f"Level {level} Flow Pressure", className="card-title"), + html.H5(f"Level {level} Status", className="card-title"), html.P( status, className="card-text", @@ -143,14 +142,30 @@ def create_card_content(level: int, status: str): Input('interval-component', 'n_intervals'), ) def update_cards(n): + now = datetime.now() + with get_database_session() as session: + reference_time = now - timedelta(hours=24) + query = (f"SELECT SUM(num_images_succeeded), SUM(num_images_failed) " + f"FROM packet_history WHERE datetime > '{reference_time}';") + df = pd.read_sql_query(query, session.connection()) + num_l0_success = df['SUM(num_images_succeeded)'].sum() + num_l0_fails = df['SUM(num_images_failed)'].sum() + l0_fraction = num_l0_success / (1 + num_l0_success + num_l0_fails) # add one to avoid div by 0 errors + if l0_fraction > 0.95: + l0_status = f"Good ({num_l0_success} : {num_l0_fails})" + l0_color = "success" + else: + l0_status = f"Bad ({num_l0_success} : {num_l0_fails})" + l0_color = "danger" + cards = html.Div( [ dbc.Row( [ - dbc.Col(dbc.Card(create_card_content(0, "Good"), color="success", inverse=True)), + dbc.Col(dbc.Card(create_card_content(0, l0_status), color=l0_color, inverse=True)), dbc.Col(dbc.Card(create_card_content(1, "Good"), color="success", inverse=True)), - dbc.Col(dbc.Card(create_card_content(2, "Warning"), color="warning", inverse=True)), - dbc.Col(dbc.Card(create_card_content(3, "Bad"), color="danger", inverse=True)), + dbc.Col(dbc.Card(create_card_content(2, "Good"), color="success", inverse=True)), + dbc.Col(dbc.Card(create_card_content(3, "Good"), color="success", inverse=True)), ], className="mb-4", ), From d015f1f13cb4f054a2f4fc6f31f33eaca91a6c33 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 10 Dec 2024 16:10:06 -0700 Subject: [PATCH 09/10] limit the number of files launched at one time --- punchpipe/flows/level1.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 4acca1a..e223306 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -16,8 +16,9 @@ @task def level1_query_ready_files(session, pipeline_config: dict, reference_time=None): + max_start = pipeline_config['scheduler']['max_start'] ready = [f for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) - .filter(File.state == "created").filter(File.level == "0").all()] + .filter(File.state == "created").filter(File.level == "0").all()][:max_start*3] actually_ready = [] for f in ready: if (get_psf_model_path(f, pipeline_config, session=session) is not None From e483397090c104e2e9887857b3604a618b6de5b2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 11 Dec 2024 09:18:59 +0000 Subject: [PATCH 10/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- punchpipe/flows/level0.py | 2 +- punchpipe/flows/level1.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/punchpipe/flows/level0.py b/punchpipe/flows/level0.py index 4ddd848..526f5ab 100644 --- a/punchpipe/flows/level0.py +++ b/punchpipe/flows/level0.py @@ -13,7 +13,7 @@ from sqlalchemy import and_ from punchpipe import __version__ as software_version -from punchpipe.control.db import File, SciPacket, TLMFiles, PacketHistory +from punchpipe.control.db import File, PacketHistory, SciPacket, TLMFiles from punchpipe.control.util import get_database_session, load_pipeline_configuration from punchpipe.level0.ccsds import unpack_compression_settings from punchpipe.level0.core import ( diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index e223306..4399c08 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -5,7 +5,6 @@ from prefect import flow, task from punchbowl.level1.flow import level1_core_flow -from sqlalchemy import and_ from punchpipe import __version__ from punchpipe.control.db import File, Flow