Skip to content

Commit

Permalink
fully tested level 0
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Nov 27, 2024
1 parent 267f61c commit 30834e9
Show file tree
Hide file tree
Showing 27 changed files with 194 additions and 167 deletions.
96 changes: 48 additions & 48 deletions punchpipe/control/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from sqlalchemy import TEXT, Boolean, Column, DateTime, Float, Integer, String
from sqlalchemy.orm import declarative_base
from sqlalchemy.dialects.mysql import INTEGER

from punchpipe.error import MissingCCSDSDataError

Expand Down Expand Up @@ -134,54 +135,53 @@ class ENGPFWPacket(Base):
flash_block = Column(Integer, nullable=False)
timestamp = Column(DateTime, nullable=False, index=True)
packet_num = Column(Integer, nullable=False)
source_tlm_file = Column(Integer, nullable=False)

PFW_STATUS =Column(Integer, nullable=False) # Current PFW Status (0 - no error, else error)
STEP_CALC = Column(Integer, nullable=False) # Calculated step (0-1199)
LAST_CMD_N_STEPS = Column(Integer, nullable=False) # Commanded number of steps (1-1199)
HOME_POSITION_OVRD = Column(Integer, nullable=False) # HOME Position OVERRIDE
POSITION_CURR = Column(Integer, nullable=False) # Current position (1-5, 0 - manual stepping)
POSITION_CMD = Column(Integer, nullable=False) # Commanded position (1-5, 0 - manual stepping)
RESOLVER_POS_RAW = Column(Integer, nullable=False) # Resolver position - raw resolver counts (0-65000)
RESOLVER_POS_CORR = Column(Integer, nullable=False) # Resolver position - error correction applied (0-65000)
RESOLVER_READ_CNT = Column(Integer, nullable=False) # Accumulative # of resolver reads (resets on boot)
LAST_MOVE_N_STEPS = Column(Integer, nullable=False)# Number of steps on last move (1-1199)
source_tlm_file = Column(INTEGER(unsigned=True), nullable=False)

PFW_STATUS =Column(INTEGER(unsigned=True), nullable=False) # Current PFW Status (0 - no error, else error)
STEP_CALC = Column(INTEGER(unsigned=True), nullable=False) # Calculated step (0-1199)
LAST_CMD_N_STEPS = Column(INTEGER(unsigned=True), nullable=False) # Commanded number of steps (1-1199)
POSITION_CURR = Column(INTEGER(unsigned=True), nullable=False) # Current position (1-5, 0 - manual stepping)
POSITION_CMD = Column(INTEGER(unsigned=True), nullable=False) # Commanded position (1-5, 0 - manual stepping)
RESOLVER_POS_RAW = Column(INTEGER(unsigned=True), nullable=False) # Resolver position - raw resolver counts (0-65000)
RESOLVER_POS_CORR = Column(INTEGER(unsigned=True), nullable=False) # Resolver position - error correction applied (0-65000)
RESOLVER_READ_CNT = Column(INTEGER(unsigned=True), nullable=False) # Accumulative # of resolver reads (resets on boot)
LAST_MOVE_N_STEPS = Column(INTEGER(unsigned=True), nullable=False)# Number of steps on last move (1-1199)
LAST_MOVE_EXECUTION_TIME = Column(Float, nullable=False) # Current move execution time
LIFETIME_STEPS_TAKEN = Column(Integer, nullable=False) # Lifetime accumulative number of steps taken
LIFETIME_STEPS_TAKEN = Column(INTEGER(unsigned=True), nullable=False) # Lifetime accumulative number of steps taken
LIFETIME_EXECUTION_TIME = Column(Float, nullable=False) # Lifetime accumulative execution time
FSM_CTRL_STATE = Column(Integer, nullable=False) # Controller FSM State
READ_SUB_STATE = Column(Integer, nullable=False) # READ Sub-FSM State
MOVE_SUB_STATE = Column(Integer, nullable=False) # MOVE Sub-FSM State
HOME_SUB_STATE = Column(Integer, nullable=False) # HOME Sub-FSM State
HOME_POSITION = Column(Integer, nullable=False) # Home Position (1-5)
RESOLVER_SELECT = Column(Integer, nullable=False) # Resolver Select
RESOLVER_TOLERANCE_HOME = Column(Integer, nullable=False) # Resolver Tolerance
RESOLVER_TOLERANCE_CURR = Column(Integer, nullable=False) # Resolver Tolerance
STEPPER_SELECT= Column(Integer, nullable=False) # Stepper Motor Select
STEPPER_RATE_DELAY = Column(Integer, nullable=False) # Stepper Motor Rate Delay
FSM_CTRL_STATE = Column(INTEGER(unsigned=True), nullable=False) # Controller FSM State
READ_SUB_STATE = Column(INTEGER(unsigned=True), nullable=False) # READ Sub-FSM State
MOVE_SUB_STATE = Column(INTEGER(unsigned=True), nullable=False) # MOVE Sub-FSM State
HOME_SUB_STATE = Column(INTEGER(unsigned=True), nullable=False) # HOME Sub-FSM State
HOME_POSITION = Column(INTEGER(unsigned=True), nullable=False) # Home Position (1-5)
RESOLVER_SELECT = Column(INTEGER(unsigned=True), nullable=False) # Resolver Select
RESOLVER_TOLERANCE_HOME = Column(INTEGER(unsigned=True), nullable=False) # Resolver Tolerance
RESOLVER_TOLERANCE_CURR = Column(INTEGER(unsigned=True), nullable=False) # Resolver Tolerance
STEPPER_SELECT= Column(INTEGER(unsigned=True), nullable=False) # Stepper Motor Select
STEPPER_RATE_DELAY = Column(INTEGER(unsigned=True), nullable=False) # Stepper Motor Rate Delay
STEPPER_RATE = Column(Float, nullable=False) # Stepper Motor Rate
SHORT_MOVE_SETTLING_TIME_MS = Column(Integer, nullable=False) # Short Move(1-4 steps) Settling time before reading resolver
LONG_MOVE_SETTLING_TIME_MS = Column(Integer, nullable=False) # Long Move(5-1199 steps) Setting time before reading resolver
PRIMARY_STEP_OFFSET_1 = Column(Integer, nullable=False) # Primary Step Offset 1
PRIMARY_STEP_OFFSET_2 = Column(Integer, nullable=False) # Short Move(1-4 steps) Delay before reading resolver
PRIMARY_STEP_OFFSET_3 = Column(Integer, nullable=False) # Primary Step Offset 3
PRIMARY_STEP_OFFSET_4 = Column(Integer, nullable=False) # Primary Step Offset 4
PRIMARY_STEP_OFFSET_5 = Column(Integer, nullable=False) # Primary Step Offset 5
REDUNDANT_STEP_OFFSET_1 = Column(Integer, nullable=False) # Redundant Step Offset 1
REDUNDANT_STEP_OFFSET_2 = Column(Integer, nullable=False) # Redundant Step Offset 2
REDUNDANT_STEP_OFFSET_3 = Column(Integer, nullable=False) # Redundant Step Offset 3
REDUNDANT_STEP_OFFSET_4 = Column(Integer, nullable=False) # Redundant Step Offset 4
REDUNDANT_STEP_OFFSET_5 = Column(Integer, nullable=False) # Redundant Step Offset 5
PRIMARY_RESOLVER_POSITION_1 = Column(Integer, nullable=False) # Primary Resolver Position 1
PRIMARY_RESOLVER_POSITION_2 = Column(Integer, nullable=False) # Primary Resolver Position 2
PRIMARY_RESOLVER_POSITION_3 = Column(Integer, nullable=False) # Primary Resolver Position 3
PRIMARY_RESOLVER_POSITION_4 = Column(Integer, nullable=False) # Primary Resolver Position 4
PRIMARY_RESOLVER_POSITION_5 = Column(Integer, nullable=False) # Primary Resolver Position 5
REDUNDANT_RESOLVER_POSITION_1 = Column(Integer, nullable=False) # Redundant Resolver Position 1
REDUNDANT_RESOLVER_POSITION_2 = Column(Integer, nullable=False) # Redundant Resolver Position 2
REDUNDANT_RESOLVER_POSITION_3 = Column(Integer, nullable=False) # Redundant Resolver Position 3
REDUNDANT_RESOLVER_POSITION_4 = Column(Integer, nullable=False) # Redundant Resolver Position 4
REDUNDANT_RESOLVER_POSITION_5 = Column(Integer, nullable=False) # Redundant Resolver Position 5
SHORT_MOVE_SETTLING_TIME_MS = Column(INTEGER(unsigned=True), nullable=False) # Short Move(1-4 steps) Settling time before reading resolver
LONG_MOVE_SETTLING_TIME_MS = Column(INTEGER(unsigned=True), nullable=False) # Long Move(5-1199 steps) Setting time before reading resolver
PRIMARY_STEP_OFFSET_1 = Column(INTEGER(unsigned=True), nullable=False) # Primary Step Offset 1
PRIMARY_STEP_OFFSET_2 = Column(INTEGER(unsigned=True), nullable=False) # Short Move(1-4 steps) Delay before reading resolver
PRIMARY_STEP_OFFSET_3 = Column(INTEGER(unsigned=True), nullable=False) # Primary Step Offset 3
PRIMARY_STEP_OFFSET_4 = Column(INTEGER(unsigned=True), nullable=False) # Primary Step Offset 4
PRIMARY_STEP_OFFSET_5 = Column(INTEGER(unsigned=True), nullable=False) # Primary Step Offset 5
REDUNDANT_STEP_OFFSET_1 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Step Offset 1
REDUNDANT_STEP_OFFSET_2 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Step Offset 2
REDUNDANT_STEP_OFFSET_3 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Step Offset 3
REDUNDANT_STEP_OFFSET_4 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Step Offset 4
REDUNDANT_STEP_OFFSET_5 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Step Offset 5
PRIMARY_RESOLVER_POSITION_1 = Column(INTEGER(unsigned=True), nullable=False) # Primary Resolver Position 1
PRIMARY_RESOLVER_POSITION_2 = Column(INTEGER(unsigned=True), nullable=False) # Primary Resolver Position 2
PRIMARY_RESOLVER_POSITION_3 = Column(INTEGER(unsigned=True), nullable=False) # Primary Resolver Position 3
PRIMARY_RESOLVER_POSITION_4 = Column(INTEGER(unsigned=True), nullable=False) # Primary Resolver Position 4
PRIMARY_RESOLVER_POSITION_5 = Column(INTEGER(unsigned=True), nullable=False) # Primary Resolver Position 5
REDUNDANT_RESOLVER_POSITION_1 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Resolver Position 1
REDUNDANT_RESOLVER_POSITION_2 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Resolver Position 2
REDUNDANT_RESOLVER_POSITION_3 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Resolver Position 3
REDUNDANT_RESOLVER_POSITION_4 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Resolver Position 4
REDUNDANT_RESOLVER_POSITION_5 = Column(INTEGER(unsigned=True), nullable=False) # Redundant Resolver Position 5


class TLMFiles(Base):
Expand All @@ -202,10 +202,10 @@ class Health(Base):
num_pids = Column(Integer, nullable=False)


def get_closest_eng_packets(table, timestamp, spacecraft_id):
def get_closest_eng_packets(table, timestamp, spacecraft_id, session):
# find the closest events which are greater/less than the timestamp
gt_event = table.query.filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp > timestamp).order_by(table.timestamp.asc()).first()
lt_event = table.query.filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp < timestamp).order_by(table.timestamp.desc()).first()
gt_event = session.query(table).filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp > timestamp).order_by(table.timestamp.asc()).first()
lt_event = session.query(table).filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp < timestamp).order_by(table.timestamp.desc()).first()

Check warning on line 208 in punchpipe/control/db.py

View check run for this annotation

Codecov / codecov/patch

punchpipe/control/db.py#L207-L208

Added lines #L207 - L208 were not covered by tests

if gt_event is None and lt_event is None:
msg = "Could not find packet near that time."
Expand Down
79 changes: 52 additions & 27 deletions punchpipe/flows/level0.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,28 @@
)
from punchpipe.level0.meta import POSITIONS_TO_CODES, convert_pfw_position_to_polarizer

from prefect.blocks.fields import SecretDict
from prefect.blocks.core import Block

class SpacecraftMapping(Block):
mapping: SecretDict

@flow
def level0_ingest_raw_packets(session=None):
def level0_ingest_raw_packets(pipeline_config_path: str | None = None, session=None):
if session is None:
session = get_database_session()
config = load_pipeline_configuration(pipeline_config_path)

paths = detect_new_tlm_files(session=session)
paths = detect_new_tlm_files(config, session=session)
for path in paths:
packets = parse_new_tlm_files(path)
update_tlm_database(packets, path)

# update the database with this tlm file
new_tlm_file = TLMFiles(path=path, is_processed=True)
session.add(new_tlm_file)
session.commit()
update_tlm_database(packets, new_tlm_file.tlm_id)

@flow
def level0_form_images(session=None, pipeline_config_path="config.yaml"):
def level0_form_images(session=None, pipeline_config_path=None):
if session is None:
session = get_database_session()

Expand All @@ -54,21 +58,32 @@ def level0_form_images(session=None, pipeline_config_path="config.yaml"):
distinct_spacecraft = session.query(SciPacket.spacecraft_id).filter(~SciPacket.is_used).distinct().all()


already_parsed_tlms = {} # tlm_path maps to the parsed contents

for spacecraft in distinct_spacecraft:
errors = []

for t in distinct_times:
print(t)
image_packets_entries = session.query(SciPacket).where(and_(SciPacket.timestamp == t[0],
SciPacket.spacecraft_id == spacecraft[0])).all()
image_compression = [unpack_compression_settings(packet.compression_settings)
for packet in image_packets_entries]

# Read all the relevant TLM files
needed_tlm_ids = set([image_packet.source_tlm_file for image_packet in image_packets_entries])
tlm_id_to_tlm_path = {tlm_id: session.query(TLMFiles.path).where(TLMFiles.tlm_id == tlm_id)
tlm_id_to_tlm_path = {tlm_id: session.query(TLMFiles.path).where(TLMFiles.tlm_id == tlm_id).one().path
for tlm_id in needed_tlm_ids}
needed_tlm_paths = list(session.query(TLMFiles.path).where(TLMFiles.tlm_id.in_(needed_tlm_ids)).all())
tlm_contents = [process_telemetry_file(tlm_path) for tlm_path in needed_tlm_paths]
needed_tlm_paths = [p.path for p in needed_tlm_paths]

# parse any new TLM files needed
for tlm_path in needed_tlm_paths:
if tlm_path not in already_parsed_tlms:
already_parsed_tlms[tlm_path] = process_telemetry_file(tlm_path)

# make it easy to grab the right TLM files
tlm_contents = [already_parsed_tlms[tlm_path] for tlm_path in needed_tlm_paths]

# Form the image packet stream for decompression
ordered_image_content = []
Expand All @@ -84,14 +99,15 @@ def level0_form_images(session=None, pipeline_config_path="config.yaml"):
try:
image = form_from_jpeg_compressed(ordered_image_content)
except ValueError:
print("jpeg failed")
skip_image = True
error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"),
'start_block': image_packets_entries[0].flash_block,
'replay_length': image_packets_entries[-1].flash_block
- image_packets_entries[0].flash_block}
errors.append(error)
else:
skip_image = True
else:
print("not jpeg compressed")
skip_image = True
error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"),
'start_block': image_packets_entries[0].flash_block,
Expand All @@ -101,6 +117,7 @@ def level0_form_images(session=None, pipeline_config_path="config.yaml"):

# check the quality of the image
if not skip_image and not image_is_okay(image, config):
print("image isn't okay")
skip_image = True
error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"),
'start_block': image_packets_entries[0].flash_block,
Expand All @@ -109,38 +126,46 @@ def level0_form_images(session=None, pipeline_config_path="config.yaml"):
errors.append(error)

if not skip_image:
spacecraft_secrets = Secret.load("spacecraft-ids")
spacecraft_id_mapper = spacecraft_secrets.get()
spacecraft_id = spacecraft_id_mapper[image_packets_entries[0].spacecraft_id]

metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp, spacecraft_id)
print("NOT SKIPPING")
spacecraft_secrets = SpacecraftMapping.load("spacecraft-ids").mapping.get_secret_value()
moc_index = spacecraft_secrets["moc"].index(image_packets_entries[0].spacecraft_id)
spacecraft_id = spacecraft_secrets["soc"][moc_index]
# spacecraft_id = spacecraft_id_mapper[image_packets_entries[0].spacecraft_id]
print("TO HERE")

metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp,
image_packets_entries[0].spacecraft_id,
session)
file_type = POSITIONS_TO_CODES[convert_pfw_position_to_polarizer(metadata_contents['POSITION_CURR'])]
preliminary_wcs = form_preliminary_wcs(metadata_contents, config['plate_scale'][spacecraft_id])
meta = NormalizedMetadata.load_template(file_type + spacecraft_id, "0")
for meta_key, meta_value in metadata_contents.items():
meta[meta_key] = meta_value
cube = NDCube(data=image, metadata=meta, wcs=preliminary_wcs)
preliminary_wcs = form_preliminary_wcs(metadata_contents, float(config['plate_scale'][spacecraft_id]))
meta = NormalizedMetadata.load_template(file_type + str(spacecraft_id), "0")
# TODO : activate later
# for meta_key, meta_value in metadata_contents.items():
# meta[meta_key] = meta_value
meta['DATE-OBS'] = str(t[0])
cube = NDCube(data=image, meta=meta, wcs=preliminary_wcs)

l0_db_entry = File(level="0",
file_type=file_type,
observatory=str(spacecraft_id),
file_version="1", # TODO: increment the file version
software_version=software_version,
date_created=datetime.now(),
date_obs=t,
date_beg=t,
date_end=t,
date_obs=t[0],
date_beg=t[0],
date_end=t[0],
state="created")

write_ndcube_to_fits(cube, os.path.join(l0_db_entry.directory(config['data_path']),
get_base_file_name(cube)))
out_path = os.path.join(l0_db_entry.directory(config['root']), get_base_file_name(cube)) + ".fits"
os.makedirs(os.path.dirname(out_path), exist_ok=True)
write_ndcube_to_fits(cube,out_path)
# TODO: write a jp2
for image_packets_entries in image_packets_entries:
image_packets_entries.is_used = True
session.add(l0_db_entry)
session.commit()
df_errors = pd.DataFrame(errors)
date_str = datetime.now().strftime("%Y_%j")
df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{spacecraft}_REPLAY_{date_str}.csv')
os.makedirs(df_path, exist_ok=True)
df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{str(spacecraft[0])}_REPLAY_{date_str}.csv')
os.makedirs(os.path.dirname(df_path), exist_ok=True)
df_errors.to_csv(df_path, index=False)
Loading

0 comments on commit 30834e9

Please sign in to comment.