Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve l0 #88

Merged
merged 10 commits into from
Dec 11, 2024
12 changes: 6 additions & 6 deletions punchpipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ def serve_flows(configuration_path):

level0_ingest_raw_packets_deployment = level0_ingest_raw_packets.to_deployment(name="level0_ingest_raw_packets",
description="Ingest raw packets.",
#cron="* * * * *",
cron="*/5 * * * *",
parameters={
"pipeline_config_path": configuration_path},
tags=['L0'])

level0_form_images_deployment = level0_form_images.to_deployment(name="level0_form_images",
description="Form images from packets.",
#cron="* * * * *",
cron="*/5 * * * *",
tags=['L0'],
parameters={"pipeline_config_path": configuration_path})

Expand Down Expand Up @@ -140,8 +140,8 @@ def serve_flows(configuration_path):
cron=config['levels']['construct_starfield_background_process_flow'].get("schedule", "* * * * *"),
tags=["L3", "scheduler"],
parameters={"pipeline_config_path": configuration_path}

)

starfield_process_dep = starfield_process_flow.to_deployment(name="construct_starfield_background_process_flow",
description="Create starfield background.",
tags=["L3", "process"],
Expand Down Expand Up @@ -196,7 +196,7 @@ def run(configuration_path):
try:
prefect_process = subprocess.Popen(["prefect", "server", "start"],
stdout=f, stderr=subprocess.STDOUT)
time.sleep(10)
time.sleep(5)
monitor_process = subprocess.Popen(["gunicorn",
"-b", "0.0.0.0:8050",
"--chdir", THIS_DIR,
Expand All @@ -213,14 +213,14 @@ def run(configuration_path):
except KeyboardInterrupt:
print("Shutting down.")
prefect_process.terminate()
time.sleep(10)
time.sleep(5)
monitor_process.terminate()
print()
print("punchpipe safely shut down.")
except Exception as e:
print(f"Received error: {e}")
prefect_process.terminate()
time.sleep(10)
time.sleep(5)
monitor_process.terminate()
print()
print("punchpipe abruptly shut down.")
Expand Down
8 changes: 8 additions & 0 deletions punchpipe/control/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ class Health(Base):
num_pids = Column(Integer, nullable=False)


class PacketHistory(Base):
__tablename__ = "packet_history"
id = Column(Integer, primary_key=True)
datetime = Column(DATETIME(fsp=6), nullable=False)
num_images_succeeded = Column(Integer, nullable=False)
num_images_failed = Column(Integer, nullable=False)


def get_closest_eng_packets(table, timestamp, spacecraft_id, session):
# find the closest events which are greater/less than the timestamp
gt_event = session.query(table).filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp > timestamp).order_by(table.timestamp.asc()).first()
Expand Down
5 changes: 4 additions & 1 deletion punchpipe/control/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from prefect import task
from prefect.variables import Variable
from prefect_sqlalchemy import SqlAlchemyConnector
from punchbowl.data import get_base_file_name, write_ndcube_to_fits
from punchbowl.data import get_base_file_name, write_ndcube_to_fits, write_ndcube_to_jp2
from sqlalchemy import or_
from sqlalchemy.orm import Session
from yaml.loader import FullLoader
Expand Down Expand Up @@ -48,6 +48,9 @@ def write_file(data: NDCube, corresponding_file_db_entry, pipeline_config) -> No
write_ndcube_to_fits(data, output_filename)
corresponding_file_db_entry.state = "created"

layer = 0 if len(data.data.shape) > 2 else None
write_ndcube_to_jp2(data, output_filename.replace(".fits", ".jp2"), layer=layer)


def match_data_with_file_db_entry(data: NDCube, file_db_entry_list):
# figure out which file_db_entry this corresponds to
Expand Down
24 changes: 13 additions & 11 deletions punchpipe/flows/level0.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from sqlalchemy import and_

from punchpipe import __version__ as software_version
from punchpipe.control.db import File, SciPacket, TLMFiles
from punchpipe.control.db import File, PacketHistory, SciPacket, TLMFiles
from punchpipe.control.util import get_database_session, load_pipeline_configuration
from punchpipe.level0.ccsds import unpack_compression_settings
from punchpipe.level0.core import (
Expand Down Expand Up @@ -76,6 +76,7 @@ def level0_form_images(session=None, pipeline_config_path=None):

already_parsed_tlms = {} # tlm_path maps to the parsed contents

skip_count, success_count = 0, 0
for spacecraft in distinct_spacecraft:
errors = []

Expand Down Expand Up @@ -107,12 +108,12 @@ def level0_form_images(session=None, pipeline_config_path=None):
tlm_content_index = needed_tlm_paths.index(tlm_id_to_tlm_path[packet_entry.source_tlm_file])
selected_tlm_contents = tlm_contents[tlm_content_index]
ordered_image_content.append(selected_tlm_contents[0x20]['SCI_XFI_IMG_DATA'][packet_entry.packet_num])
ordered_image_content = np.concatenate(ordered_image_content)

# Get the proper image
skip_image = False
if image_compression[0]['CMP_BYP'] == 0 and image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image
try:
ordered_image_content = np.concatenate(ordered_image_content)
image = form_from_jpeg_compressed(ordered_image_content)
except (RuntimeError, ValueError):
skip_image = True
Expand All @@ -123,6 +124,7 @@ def level0_form_images(session=None, pipeline_config_path=None):
errors.append(error)
elif image_compression[0]['CMP_BYP'] == 1:
try:
ordered_image_content = np.concatenate(ordered_image_content)
logger.info(f"Packet shape {ordered_image_content.shape[0]}", )
image = form_from_raw(ordered_image_content)
except (RuntimeError, ValueError):
Expand All @@ -136,15 +138,6 @@ def level0_form_images(session=None, pipeline_config_path=None):
skip_image = True
print("Not implemented")

# check the quality of the image
# if not skip_image and not image_is_okay(image, config):
# skip_image = True
# error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"),
# 'start_block': image_packets_entries[0].flash_block,
# 'replay_length': image_packets_entries[-1].flash_block
# - image_packets_entries[0].flash_block}
# errors.append(error)

if not skip_image:
spacecraft_secrets = SpacecraftMapping.load("spacecraft-ids").mapping.get_secret_value()
moc_index = spacecraft_secrets["moc"].index(image_packets_entries[0].spacecraft_id)
Expand Down Expand Up @@ -181,6 +174,15 @@ def level0_form_images(session=None, pipeline_config_path=None):
image_packets_entries.is_used = True
session.add(l0_db_entry)
session.commit()
success_count += 1
else:
skip_count += 1
history = PacketHistory(datetime=datetime.now(),
num_images_succeeded=success_count,
num_images_failed=skip_count)
session.add(history)
session.commit()

df_errors = pd.DataFrame(errors)
date_str = datetime.now().strftime("%Y_%j")
df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{str(spacecraft[0])}_REPLAY_{date_str}.csv')
Expand Down
27 changes: 19 additions & 8 deletions punchpipe/flows/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from prefect import flow, task
from punchbowl.level1.flow import level1_core_flow
from sqlalchemy import and_

from punchpipe import __version__
from punchpipe.control.db import File, Flow
Expand All @@ -16,9 +15,15 @@

@task
def level1_query_ready_files(session, pipeline_config: dict, reference_time=None):
return [[f.file_id] for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES))
.where(and_(File.state == "created", File.level == "0")).all()]

max_start = pipeline_config['scheduler']['max_start']
ready = [f for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES))
.filter(File.state == "created").filter(File.level == "0").all()][:max_start*3]
actually_ready = []
for f in ready:
if (get_psf_model_path(f, pipeline_config, session=session) is not None
and get_psf_model_path(f, pipeline_config, session=session) is not None):
actually_ready.append([f.file_id])
return actually_ready

@task
def get_psf_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None):
Expand All @@ -32,7 +37,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None, referen
.filter(File.observatory == level0_file.observatory)
.where(File.date_obs <= level0_file.date_obs)
.order_by(File.date_obs.desc()).first())
return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename())
return best_model

@task
def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None):
Expand All @@ -41,7 +46,7 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, ref
.filter(File.observatory == level0_file.observatory)
.where(File.date_obs <= level0_file.date_obs)
.order_by(File.date_obs.desc()).first())
return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename())
return best_model

@task
def level1_construct_flow_info(level0_files: list[File], level1_files: File,
Expand All @@ -50,14 +55,20 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File,
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["levels"][flow_type]["priority"]["initial"]

best_psf_model = get_psf_model_path(level0_files[0], pipeline_config, session=session)
best_quartic_model = get_quartic_model_path(level0_files[0], pipeline_config, session=session)

call_data = json.dumps(
{
"input_data": [
os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename())
for level0_file in level0_files
],
"psf_model_path": get_psf_model_path(level0_files[0], pipeline_config, session=session),
"quartic_coefficient_path": get_quartic_model_path(level0_files[0], pipeline_config, session=session),
"psf_model_path": os.path.join(best_psf_model.directory(pipeline_config['root']),
best_psf_model.filename()),
"quartic_coefficient_path": os.path.join(best_quartic_model.directory(pipeline_config['root']),
best_quartic_model.filename()),
}
)
return Flow(
Expand Down
25 changes: 20 additions & 5 deletions punchpipe/monitor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ def update_flows(n, page_current, page_size, sort_by, filter):

def create_card_content(level: int, status: str):
return [
# dbc.CardHeader(f"Level {level} Flow Pressure"),
dbc.CardBody(
[
html.H5(f"Level {level} Flow Pressure", className="card-title"),
html.H5(f"Level {level} Status", className="card-title"),
html.P(
status,
className="card-text",
Expand All @@ -143,14 +142,30 @@ def create_card_content(level: int, status: str):
Input('interval-component', 'n_intervals'),
)
def update_cards(n):
now = datetime.now()
with get_database_session() as session:
reference_time = now - timedelta(hours=24)
query = (f"SELECT SUM(num_images_succeeded), SUM(num_images_failed) "
f"FROM packet_history WHERE datetime > '{reference_time}';")
df = pd.read_sql_query(query, session.connection())
num_l0_success = df['SUM(num_images_succeeded)'].sum()
num_l0_fails = df['SUM(num_images_failed)'].sum()
l0_fraction = num_l0_success / (1 + num_l0_success + num_l0_fails) # add one to avoid div by 0 errors
if l0_fraction > 0.95:
l0_status = f"Good ({num_l0_success} : {num_l0_fails})"
l0_color = "success"
else:
l0_status = f"Bad ({num_l0_success} : {num_l0_fails})"
l0_color = "danger"

cards = html.Div(
[
dbc.Row(
[
dbc.Col(dbc.Card(create_card_content(0, "Good"), color="success", inverse=True)),
dbc.Col(dbc.Card(create_card_content(0, l0_status), color=l0_color, inverse=True)),
dbc.Col(dbc.Card(create_card_content(1, "Good"), color="success", inverse=True)),
dbc.Col(dbc.Card(create_card_content(2, "Warning"), color="warning", inverse=True)),
dbc.Col(dbc.Card(create_card_content(3, "Bad"), color="danger", inverse=True)),
dbc.Col(dbc.Card(create_card_content(2, "Good"), color="success", inverse=True)),
dbc.Col(dbc.Card(create_card_content(3, "Good"), color="success", inverse=True)),
],
className="mb-4",
),
Expand Down
Loading