From c70d9e08550f66b156d77897b61b6525c8d0775f Mon Sep 17 00:00:00 2001 From: Greg Lucas Date: Wed, 16 Oct 2024 13:44:00 -0600 Subject: [PATCH] MNT: Move CCSDS Header information into the CCSDSPacket class --- docs/source/changelog.md | 3 +++ space_packet_parser/packets.py | 43 +++++++++++++++++----------------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/docs/source/changelog.md b/docs/source/changelog.md index 4c90fdc..df37063 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -8,6 +8,9 @@ Release notes for the `space_packet_parser` library ### v5.1.0 (unreleased) - BUGFIX: Fix kbps calculation in packet generator for showing progress. - Add support for string and float encoded enumerated lookup parameters. +- The global CCSDS_HEADER constants in packets.py are moved onto the CCSDSPacket + class itself accessible as `CCSDSPacket.HEADER_LENGTH` and `CCSDSPacket.HEADER_DEFINITION` + to more closely align with where these constants are useful. ### v5.0.1 (released) - BUGFIX: Allow raw_value representation for enums with falsy raw values. Previously these defaulted to the enum label. diff --git a/space_packet_parser/packets.py b/space_packet_parser/packets.py index 4471328..4ec33f8 100644 --- a/space_packet_parser/packets.py +++ b/space_packet_parser/packets.py @@ -5,7 +5,6 @@ import datetime as dt import io import logging -from collections import namedtuple import socket import time from typing import BinaryIO, Iterator, List, Optional, Protocol, Union @@ -130,21 +129,6 @@ def read_as_int(self, nbits: int) -> int: return int_data -CcsdsPacketHeaderElement = namedtuple('CcsdsPacketHeaderElement', ['name', 'nbits']) - -CCSDS_HEADER_DEFINITION = [ - CcsdsPacketHeaderElement('VERSION', 3), - CcsdsPacketHeaderElement('TYPE', 1), - CcsdsPacketHeaderElement('SEC_HDR_FLG', 1), - CcsdsPacketHeaderElement('PKT_APID', 11), - CcsdsPacketHeaderElement('SEQ_FLGS', 2), - CcsdsPacketHeaderElement('SRC_SEQ_CTR', 14), - CcsdsPacketHeaderElement('PKT_LEN', 16) -] - -CCSDS_HEADER_LENGTH_BYTES = 6 - - class CCSDSPacket(dict): """CCSDS Packet @@ -164,6 +148,21 @@ class CCSDSPacket(dict): **kwargs : dict Additional packet items to store, passed to the dict() constructor. """ + HEADER_LENGTH = 6 + """CCSDS Packet Header Length in bytes.""" + + HEADER_DEFINITION = {"VERSION": 3, + "TYPE": 1, + "SEC_HDR_FLG": 1, + "PKT_APID": 11, + "SEQ_FLGS": 2, + "SRC_SEQ_CTR": 14, + "PKT_LEN": 16} + """CCSDS Packet Header Definition + + Mapping of the field names to the number of bits they occupy in the header. + """ + def __init__(self, *args, raw_data: bytes = b"", **kwargs): self.raw_data = RawPacketData(raw_data) super().__init__(*args, **kwargs) @@ -194,9 +193,9 @@ def _parse_header(packet_data: bytes) -> dict: """ header = {} current_bit = 0 - for item in CCSDS_HEADER_DEFINITION: - header[item.name] = _extract_bits(packet_data, current_bit, item.nbits) - current_bit += item.nbits + for name, nbits in CCSDSPacket.HEADER_DEFINITION.items(): + header[name] = _extract_bits(packet_data, current_bit, nbits) + current_bit += nbits return header @@ -332,21 +331,21 @@ def packet_generator( # pylint: disable=too-many-branches,too-many-statements current_pos = 0 # Fill buffer enough to parse a header - while len(read_buffer) - current_pos < skip_header_bytes + CCSDS_HEADER_LENGTH_BYTES: + while len(read_buffer) - current_pos < skip_header_bytes + CCSDSPacket.HEADER_LENGTH: result = read_bytes_from_source(buffer_read_size_bytes) if not result: # If there is verifiably no more data to add, break break read_buffer += result # Skip the header bytes current_pos += skip_header_bytes - header_bytes = read_buffer[current_pos:current_pos + CCSDS_HEADER_LENGTH_BYTES] + header_bytes = read_buffer[current_pos:current_pos + CCSDSPacket.HEADER_LENGTH] header = CCSDSPacket._parse_header(header_bytes) # pylint: disable=protected-access # per the CCSDS spec # 4.1.3.5.3 The length count C shall be expressed as: # C = (Total Number of Octets in the Packet Data Field) – 1 n_bytes_data = header['PKT_LEN'] + 1 - n_bytes_packet = CCSDS_HEADER_LENGTH_BYTES + n_bytes_data + n_bytes_packet = CCSDSPacket.HEADER_LENGTH + n_bytes_data # Based on PKT_LEN fill buffer enough to read a full packet while len(read_buffer) - current_pos < n_bytes_packet: