Skip to content

Commit

Permalink
convert level to a string
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Oct 23, 2024
1 parent 6294300 commit 9151707
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 18 deletions.
50 changes: 46 additions & 4 deletions punchpipe/controlsegment/db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from sqlalchemy import TEXT, Column, DateTime, Integer, String
from sqlalchemy import TEXT, Column, DateTime, Integer, String, Boolean, JSON
from sqlalchemy.orm import declarative_base

Base = declarative_base()
Expand All @@ -9,7 +9,7 @@
class File(Base):
__tablename__ = "files"
file_id = Column(Integer, primary_key=True)
level = Column(Integer, nullable=False)
level = Column(String(1), nullable=False)
file_type = Column(String(2), nullable=False)
observatory = Column(String(1), nullable=False)
file_version = Column(String(16), nullable=False)
Expand Down Expand Up @@ -48,13 +48,13 @@ def directory(self, root: str):
str
the place to write the file
"""
return os.path.join(root, str(self.level), self.file_type + self.observatory, self.date_obs.strftime("%Y/%m/%d"))
return os.path.join(root, self.level, self.file_type + self.observatory, self.date_obs.strftime("%Y/%m/%d"))


class Flow(Base):
__tablename__ = "flows"
flow_id = Column(Integer, primary_key=True)
flow_level = Column(Integer, nullable=False)
flow_level = Column(String(1), nullable=False)
flow_type = Column(String(64), nullable=False)
flow_run_name = Column(String(64), nullable=True)
flow_run_id = Column(String(36), nullable=True)
Expand All @@ -71,3 +71,45 @@ class FileRelationship(Base):
relationship_id = Column(Integer, primary_key=True)
parent = Column(Integer, nullable=False)
child = Column(Integer, nullable=False)


class SciPacket(Base):
__tablename__ = "sci_packets"
packet_id = Column(Integer, primary_key=True)
apid = Column(Integer, nullable=False, index=True)
sequence_count = Column(Integer, nullable=False)
length = Column(Integer, nullable=False)
spacecraft_id = Column(Integer, nullable=False, index=True)
flash_block = Column(Integer, nullable=False)
timestamp = Column(DateTime, nullable=False, index=True)
packet_num = Column(Integer, nullable=False)
source_tlm_file = Column(String(128), nullable=False) # TODO: make realistic size
is_used = Column(Boolean)
l0_version = Column(Integer)
compression_settings = Column(Integer)

class EngPacket(Base):
__tablename__ = "eng_packets"
packet_id = Column(Integer, primary_key=True)
apid = Column(Integer, nullable=False, index=True)
sequence_count = Column(Integer, nullable=False)
length = Column(Integer, nullable=False)
spacecraft_id = Column(Integer, nullable=False, index=True)
flash_block = Column(Integer, nullable=False)
timestamp = Column(DateTime, nullable=False, index=True)
packet_num = Column(Integer, nullable=False)
source_tlm_file = Column(String(128), nullable=False) # TODO: make realistic size
is_used = Column(Boolean)
l0_version = Column(Integer)


# def json_numpy_obj_hook(dct):
# """Decodes a previously encoded numpy ndarray with proper shape and dtype.
#
# :param dct: (dict) json encoded ndarray
# :return: (ndarray) if input was an encoded ndarray
# """
# if isinstance(dct, dict) and '__ndarray__' in dct:
# data = base64.b64decode(dct['__ndarray__'])
# return np.frombuffer(data, dct['dtype']).reshape(dct['shape'])
# return dct
4 changes: 2 additions & 2 deletions punchpipe/flows/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@task
def level1_query_ready_files(session, pipeline_config: dict):
return [[f.file_id] for f in session.query(File).where(and_(File.state == "created", File.level == 0)).all()]
return [[f.file_id] for f in session.query(File).where(and_(File.state == "created", File.level == "0")).all()]


# TODO handle more robustly
Expand Down Expand Up @@ -65,7 +65,7 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File, pip
def level1_construct_file_info(level0_files: t.List[File], pipeline_config: dict) -> t.List[File]:
return [
File(
level=1,
level="1",
file_type=level0_files[0].file_type,
observatory=level0_files[0].observatory,
file_version=pipeline_config["file_version"],
Expand Down
5 changes: 2 additions & 3 deletions punchpipe/flows/level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@task
def level2_query_ready_files(session, pipeline_config: dict):
logger = get_run_logger()
all_ready_files = session.query(File).where(and_(File.state == "created", File.level == 1)).all()
all_ready_files = session.query(File).where(and_(File.state == "created", File.level == "1")).all()
logger.info(f"{len(all_ready_files)} ready files")
unique_times = set(f.date_obs for f in all_ready_files)
logger.info(f"{len(unique_times)} unique times: {unique_times}")
Expand Down Expand Up @@ -44,7 +44,7 @@ def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipe
return Flow(
flow_type=flow_type,
state=state,
flow_level=2,
flow_level="2",
creation_time=creation_time,
priority=priority,
call_data=call_data,
Expand All @@ -53,7 +53,6 @@ def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipe

@task
def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict) -> t.List[File]:
# TODO: make realistic to level 2 products
return [File(
level=2,
file_type="PT",
Expand Down
8 changes: 4 additions & 4 deletions punchpipe/flows/level3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

def get_valid_starfields(session, f: File, timedelta_window: timedelta):
valid_star_start, valid_star_end = f.date_obs - timedelta_window, f.date_obs + timedelta_window
return (session.query(File).filter(File.state == "created").filter(File.level == 2)
return (session.query(File).filter(File.state == "created").filter(File.level == "2")
.filter(File.file_type == 'PS').filter(File.observatory == 'M')
.filter(and_(f.date_obs >= valid_star_start,
f.date_obs <= valid_star_end)).all())
Expand All @@ -38,7 +38,7 @@ def get_closest_file(f_target: File, f_others: list[File]) -> File:
def level3_PTM_query_ready_files(session, pipeline_config: dict):
logger = get_run_logger()
all_ready_files = session.query(File).where(and_(and_(File.state == "created",
File.level == 2),
File.level == "2"),
File.file_type == "PT")).all()
logger.info(f"{len(all_ready_files)} Level 3 PTM files need to be processed.")

Expand Down Expand Up @@ -86,7 +86,7 @@ def level3_PTM_construct_flow_info(level2_files: File, level3_file: File, pipeli
return Flow(
flow_type=flow_type,
state=state,
flow_level=3,
flow_level="3",
creation_time=creation_time,
priority=priority,
call_data=call_data,
Expand All @@ -96,7 +96,7 @@ def level3_PTM_construct_flow_info(level2_files: File, level3_file: File, pipeli
@task
def level3_PTM_construct_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]:
return [File(
level=3,
level="3",
file_type="PT",
observatory="M",
file_version=pipeline_config["file_version"],
Expand Down
6 changes: 3 additions & 3 deletions punchpipe/flows/tests/test_level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ def test_query_ready_files(db):
def test_level1_construct_file_info():
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)
level0_file = [File(level=0,
level0_file = [File(level="0",
file_type='XX',
observatory='0',
state='created',
file_version='none',
software_version='none',
date_obs=datetime.now())]
constructed_file_info = level1_construct_file_info.fn(level0_file, pipeline_config)[0]
assert constructed_file_info.level == 1
assert constructed_file_info.level == "1"
assert constructed_file_info.file_type == level0_file[0].file_type
assert constructed_file_info.observatory == level0_file[0].observatory
assert constructed_file_info.file_version == "0.0.1"
Expand All @@ -73,7 +73,7 @@ def test_level1_construct_file_info():
def test_level1_construct_flow_info():
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)
level0_file = [File(level=0,
level0_file = [File(level="0",
file_type='XX',
observatory='0',
state='created',
Expand Down
4 changes: 2 additions & 2 deletions punchpipe/flows/tests/test_level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_level2_construct_file_info():
def test_level2_construct_flow_info():
pipeline_config_path = os.path.join(TEST_DIR, "config.yaml")
pipeline_config = load_pipeline_configuration.fn(pipeline_config_path)
level1_file = [File(level=1,
level1_file = [File(level="1",
file_type='XX',
observatory='0',
state='created',
Expand All @@ -87,7 +87,7 @@ def test_level2_construct_flow_info():

assert flow_info.flow_type == 'level2_process_flow'
assert flow_info.state == "planned"
assert flow_info.flow_level == 2
assert flow_info.flow_level == "2"
assert flow_info.priority == 7


Expand Down

0 comments on commit 9151707

Please sign in to comment.