Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reference times for f corona models #83

Merged
merged 22 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ exclude: "\\.asdf$"
repos:
# This should be before any formatting hooks like isort
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: "v0.7.4"
rev: "v0.8.0"
hooks:
- id: ruff
args: ["--fix"]
Expand Down
26 changes: 25 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,31 @@ levels:
schedule: "* * * * *"
options:

level3_PTM_process_flow:
L3_PTM:
priority:
initial: 10000
seconds: [1]
escalation: [10000]
schedule: "* * * * *"
options:

L3_PIM:
priority:
initial: 10000
seconds: [1]
escalation: [10000]
schedule: "* * * * *"
options:

f_corona:
priority:
initial: 10000
seconds: [1]
escalation: [10000]
schedule: "* * * * *"
options:

starfield:
priority:
initial: 10000
seconds: [1]
Expand Down
110 changes: 70 additions & 40 deletions punchpipe/cli.py

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions punchpipe/control/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

def generic_scheduler_flow_logic(
query_ready_files_func, construct_child_file_info, construct_child_flow_info, pipeline_config_path,
new_file_state="progressed",
session=None
update_input_file_state=True, new_input_file_state="progressed",
session=None, reference_time=None,
):
pipeline_config = load_pipeline_configuration(pipeline_config_path)

Expand All @@ -17,21 +17,23 @@ def generic_scheduler_flow_logic(
session = get_database_session()

# find all files that are ready to run
ready_file_ids = query_ready_files_func(session, pipeline_config)[:max_start]
ready_file_ids = query_ready_files_func(session, pipeline_config, reference_time=reference_time)[:max_start]
if ready_file_ids:
for group in ready_file_ids:
parent_files = []
for file_id in group:
# mark the file as progressed so that there aren't duplicate processing flows
update_file_state(session, file_id, new_file_state)
if update_input_file_state:
update_file_state(session, file_id, new_input_file_state)

# get the prior level file's information
parent_files += session.query(File).where(File.file_id == file_id).all()

# prepare the new level flow and file
children_files = construct_child_file_info(parent_files, pipeline_config)
children_files = construct_child_file_info(parent_files, pipeline_config, reference_time=reference_time)
database_flow_info = construct_child_flow_info(parent_files, children_files,
pipeline_config, session=session)
pipeline_config, session=session,
reference_time=reference_time)
for child_file in children_files:
session.add(child_file)
session.add(database_flow_info)
Expand Down
53 changes: 28 additions & 25 deletions punchpipe/flows/fcorona.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,47 @@
import os
import json
import random
import typing as t
import os
from datetime import datetime, timedelta
import random

from dateutil.parser import parse as parse_datetime_str
from prefect import flow, get_run_logger, task
from punchbowl.level3.f_corona_model import construct_full_f_corona_model
from sqlalchemy import and_
from punchbowl.level3.f_corona_model import construct_polarized_f_corona_model

from punchpipe import __version__
from punchpipe.control.db import File, Flow
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.control.util import get_database_session


@task
def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 250):
def f_corona_background_query_ready_files(session, pipeline_config: dict, use_n: int = 50,
reference_time: datetime =None):
reference_time = reference_time or datetime.now()
before = reference_time - timedelta(weeks=2)
after = reference_time + timedelta(weeks=2)

logger = get_run_logger()
all_ready_files = (session.query(File)
.filter(File.state.in_(["created", "progressed"]))
.filter(File.date_obs >= before)
.filter(File.date_obs <= after)
.filter(File.level == "2")
.filter(File.file_type == "PT")
.filter(File.observatory == "M").all())
logger.info(f"{len(all_ready_files)} Level 2 PTM files will be used for F corona background modeling.")
if len(all_ready_files) > 30: # need at least 30 images
random.shuffle(all_ready_files)
return [[f.file_id for f in all_ready_files[:250]]]
return [[f.file_id for f in all_ready_files[:use_n]]]
else:
return []

@task
def construct_f_corona_background_flow_info(level3_files: list[File],
level3_f_model_file: File,
pipeline_config: dict,
session=None):
session=None,
reference_time: datetime = None):
flow_type = "construct_f_corona_background_process_flow"
state = "planned"
creation_time = datetime.now()
Expand All @@ -45,6 +52,7 @@ def construct_f_corona_background_flow_info(level3_files: list[File],
os.path.join(level3_file.directory(pipeline_config["root"]), level3_file.filename())
for level3_file in level3_files
],
"reference_time": str(reference_time)
}
)
return Flow(
Expand All @@ -58,38 +66,33 @@ def construct_f_corona_background_flow_info(level3_files: list[File],


@task
def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]:
def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict,
reference_time:datetime = None) -> t.List[File]:
return [File(
level="3",
file_type="PF",
observatory="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs= datetime(2024, 8, 1, 12, 0, 0), #datetime.now()-timedelta(days=60),
date_obs= reference_time,
state="planned",
),
File(
level="3",
file_type="PF",
observatory="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=datetime(2024, 12, 1, 12, 0, 0), # datetime.now()+timedelta(days=60),
state="planned",
)
]
),]

@flow
def construct_f_corona_background_scheduler_flow(pipeline_config_path=None, session=None):
def f_corona_scheduler(pipeline_config_path=None, session=None, reference_time: datetime = None):
if not isinstance(reference_time, datetime):
reference_time = parse_datetime_str(reference_time)

generic_scheduler_flow_logic(
f_corona_background_query_ready_files,
construct_f_corona_background_file_info,
construct_f_corona_background_flow_info,
pipeline_config_path,
new_file_state="created",
update_input_file_state=False,
reference_time=reference_time,
session=session,
)

@flow
def construct_f_corona_background_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, construct_full_f_corona_model, pipeline_config_path, session=session)
def f_corona_process(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, construct_polarized_f_corona_model, pipeline_config_path, session=session)
151 changes: 134 additions & 17 deletions punchpipe/flows/level0.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,146 @@
import os
from glob import glob
from datetime import datetime

import numpy as np
import pandas as pd
from ndcube import NDCube
from prefect import flow
from prefect.blocks.system import Secret
from punchbowl.data import get_base_file_name
from punchbowl.data.io import write_ndcube_to_fits
from punchbowl.data.meta import NormalizedMetadata
from sqlalchemy import and_

from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe import __version__ as software_version
from punchpipe.control.db import File, SciPacket, TLMFiles
from punchpipe.control.util import get_database_session, load_pipeline_configuration
from punchpipe.level0.ccsds import unpack_compression_settings
from punchpipe.level0.core import (
detect_new_tlm_files,
form_from_jpeg_compressed,
form_preliminary_wcs,
get_fits_metadata,
image_is_okay,
parse_new_tlm_files,
process_telemetry_file,
update_tlm_database,
)
from punchpipe.level0.meta import POSITIONS_TO_CODES, convert_pfw_position_to_polarizer


def level0_query_ready_files(session, pipeline_config: dict):
dropzone = os.path.join(pipeline_config["root"], pipeline_config["input_drop"])
return glob(os.path.join(dropzone, "*.tlm"))
@flow
def level0_ingest_raw_packets(session=None):
if session is None:
session = get_database_session()

paths = detect_new_tlm_files(session=session)
for path in paths:
packets = parse_new_tlm_files(path)
update_tlm_database(packets, path)

# update the database with this tlm file
new_tlm_file = TLMFiles(path=path, is_processed=True)
session.add(new_tlm_file)
session.commit()

def level0_construct_file_info():
pass
@flow
def level0_form_images(session=None, pipeline_config_path="config.yaml"):
if session is None:
session = get_database_session()

config = load_pipeline_configuration(pipeline_config_path)

def level0_construct_flow_info():
pass
distinct_times = session.query(SciPacket.timestamp).filter(~SciPacket.is_used).distinct().all()
distinct_spacecraft = session.query(SciPacket.spacecraft_id).filter(~SciPacket.is_used).distinct().all()


@flow
def level0_scheduler_flow(session=None):
generic_scheduler_flow_logic(
level0_query_ready_files,
level0_construct_file_info,
level0_construct_flow_info,
session=session,
)
for spacecraft in distinct_spacecraft:
errors = []

for t in distinct_times:
image_packets_entries = session.query(SciPacket).where(and_(SciPacket.timestamp == t[0],
SciPacket.spacecraft_id == spacecraft[0])).all()
image_compression = [unpack_compression_settings(packet.compression_settings)
for packet in image_packets_entries]

# Read all the relevant TLM files
needed_tlm_ids = set([image_packet.source_tlm_file for image_packet in image_packets_entries])
tlm_id_to_tlm_path = {tlm_id: session.query(TLMFiles.path).where(TLMFiles.tlm_id == tlm_id)
for tlm_id in needed_tlm_ids}
needed_tlm_paths = list(session.query(TLMFiles.path).where(TLMFiles.tlm_id.in_(needed_tlm_ids)).all())
tlm_contents = [process_telemetry_file(tlm_path) for tlm_path in needed_tlm_paths]

# Form the image packet stream for decompression
ordered_image_content = []
for packet_entry in image_packets_entries:
tlm_content_index = needed_tlm_paths.index(tlm_id_to_tlm_path[packet_entry.source_tlm_file])
selected_tlm_contents = tlm_contents[tlm_content_index]
ordered_image_content.append(selected_tlm_contents[0x20]['SCI_XFI_IMG_DATA'][packet_entry.packet_num])
ordered_image_content = np.concatenate(ordered_image_content)

# Get the proper image
skip_image = False
if image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image
try:
image = form_from_jpeg_compressed(ordered_image_content)
except ValueError:
error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"),
'start_block': image_packets_entries[0].flash_block,
'replay_length': image_packets_entries[-1].flash_block
- image_packets_entries[0].flash_block}
errors.append(error)
else:
skip_image = True
else:
skip_image = True
error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"),
'start_block': image_packets_entries[0].flash_block,
'replay_length': image_packets_entries[-1].flash_block
- image_packets_entries[0].flash_block}
errors.append(error)

# check the quality of the image
if not skip_image and not image_is_okay(image, config):
skip_image = True
error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"),
'start_block': image_packets_entries[0].flash_block,
'replay_length': image_packets_entries[-1].flash_block
- image_packets_entries[0].flash_block}
errors.append(error)

if not skip_image:
spacecraft_secrets = Secret.load("spacecraft-ids")
spacecraft_id_mapper = spacecraft_secrets.get()
spacecraft_id = spacecraft_id_mapper[image_packets_entries[0].spacecraft_id]

metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp, spacecraft_id)
file_type = POSITIONS_TO_CODES[convert_pfw_position_to_polarizer(metadata_contents['POSITION_CURR'])]
preliminary_wcs = form_preliminary_wcs(metadata_contents, config['plate_scale'][spacecraft_id])
meta = NormalizedMetadata.load_template(file_type + spacecraft_id, "0")
for meta_key, meta_value in metadata_contents.items():
meta[meta_key] = meta_value
cube = NDCube(data=image, metadata=meta, wcs=preliminary_wcs)

l0_db_entry = File(level="0",
file_type=file_type,
observatory=str(spacecraft_id),
file_version="1", # TODO: increment the file version
software_version=software_version,
date_created=datetime.now(),
date_obs=t,
date_beg=t,
date_end=t,
state="created")

write_ndcube_to_fits(cube, os.path.join(l0_db_entry.directory(config['data_path']),
get_base_file_name(cube)))
# TODO: write a jp2
for image_packets_entries in image_packets_entries:
image_packets_entries.is_used = True
session.add(l0_db_entry)
session.commit()
df_errors = pd.DataFrame(errors)
date_str = datetime.now().strftime("%Y_%j")
df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{spacecraft}_REPLAY_{date_str}.csv')
os.makedirs(df_path, exist_ok=True)
df_errors.to_csv(df_path, index=False)
Loading
Loading