diff --git a/docs/source/changelog.md b/docs/source/changelog.md index 4c90fdc..b4a69b8 100644 --- a/docs/source/changelog.md +++ b/docs/source/changelog.md @@ -8,6 +8,21 @@ Release notes for the `space_packet_parser` library ### v5.1.0 (unreleased) - BUGFIX: Fix kbps calculation in packet generator for showing progress. - Add support for string and float encoded enumerated lookup parameters. +- Add properties to extract the CCSDS Header items from the ``RawPacketData`` object directly. + e.g. ``RawPacketData.apid`` +- Add a ``create_ccsds_packet`` function that can create a CCSDS Packet + with the given header items and data. This is useful for creating + mock packets in testing and experimentation for creating debugging + streams as needed. +- Add a ``ccsds_packet_generator()`` function that iterates through raw + bytes and yields individual CCSDS packets. +- Add continuation packet support to the XTCE parsing and packet generation. + This adds logic to concatenate packet data fields together across successive + packets (if there was too much data to fit in a single CCSDS packet or it + was logically better to split by other teams). + - Add warnings if packets are out of sequence within a given apid. + - Add ability to remove secondary header bytes from subsequent packets. + ``definition.packet_generator(data, combine_segmented_packets=True, secondary_header_bytes=4)`` ### v5.0.1 (released) - BUGFIX: Allow raw_value representation for enums with falsy raw values. Previously these defaulted to the enum label. diff --git a/space_packet_parser/definitions.py b/space_packet_parser/definitions.py index b2aa568..5d2bcd9 100644 --- a/space_packet_parser/definitions.py +++ b/space_packet_parser/definitions.py @@ -1,49 +1,18 @@ """Module for parsing XTCE xml files to specify packet format""" # Standard -from collections import namedtuple -import datetime as dt -import io import logging from pathlib import Path import socket -import time from typing import Tuple, Optional, List, TextIO, Dict, Union, BinaryIO, Iterator +import warnings # Installed import lxml.etree as ElementTree # Local -from space_packet_parser.exceptions import ElementNotFoundError, InvalidParameterTypeError +from space_packet_parser.exceptions import ElementNotFoundError, InvalidParameterTypeError, UnrecognizedPacketTypeError from space_packet_parser import comparisons, parameters, packets logger = logging.getLogger(__name__) -CcsdsPacketHeaderElement = namedtuple('CcsdsPacketHeaderElement', ['name', 'nbits']) - -CCSDS_HEADER_DEFINITION = [ - CcsdsPacketHeaderElement('VERSION', 3), - CcsdsPacketHeaderElement('TYPE', 1), - CcsdsPacketHeaderElement('SEC_HDR_FLG', 1), - CcsdsPacketHeaderElement('PKT_APID', 11), - CcsdsPacketHeaderElement('SEQ_FLGS', 2), - CcsdsPacketHeaderElement('SRC_SEQ_CTR', 14), - CcsdsPacketHeaderElement('PKT_LEN', 16) -] - -CCSDS_HEADER_LENGTH_BYTES = 6 - - -class UnrecognizedPacketTypeError(Exception): - """Error raised when we can't figure out which kind of packet we are dealing with based on the header""" - - def __init__(self, *args, partial_data: dict = None): - """ - Parameters - ---------- - partial_data : dict, Optional - Data parsed so far (for debugging at higher levels) - """ - super().__init__(*args) - self.partial_data = partial_data - class XtcePacketDefinition: """Object representation of the XTCE definition of a CCSDS packet object""" @@ -353,33 +322,10 @@ def _get_container_base_container( restrictions = [] return self._find_container(base_container_element.attrib['containerRef']), restrictions - @staticmethod - def _parse_header(packet_data: bytes) -> dict: - """Parses the CCSDS standard header. - - Parameters - ---------- - packet_data : bytes - 6 bytes of binary data. - - Returns - ------- - header : dict - Dictionary of header items. - """ - header = {} - current_bit = 0 - for item in CCSDS_HEADER_DEFINITION: - # pylint: disable=protected-access - header[item.name] = packets._extract_bits(packet_data, current_bit, item.nbits) - current_bit += item.nbits - return header - def parse_ccsds_packet(self, packet: packets.CCSDSPacket, *, - root_container_name: str = "CCSDSPacket", - **parse_value_kwargs) -> packets.CCSDSPacket: + root_container_name: str = "CCSDSPacket") -> packets.CCSDSPacket: """Parse binary packet data according to the self.packet_definition object Parameters @@ -397,7 +343,7 @@ def parse_ccsds_packet(self, """ current_container: packets.SequenceContainer = self._sequence_container_cache[root_container_name] while True: - current_container.parse(packet, **parse_value_kwargs) + current_container.parse(packet) valid_inheritors = [] for inheritor_name in current_container.inheritors: @@ -424,56 +370,6 @@ def parse_ccsds_packet(self, partial_data=packet) return packet - @staticmethod - def _print_progress( - *, - current_bytes: int, - total_bytes: Optional[int], - start_time_ns: int, - current_packets: int, - end: str = '\r', - log: bool = False - ): - """Prints a progress bar, including statistics on parsing rate. - - Parameters - ---------- - current_bytes : int - Number of bytes parsed so far. - total_bytes : Optional[int] - Number of total bytes to parse, if known. None otherwise. - current_packets : int - Number of packets parsed so far. - start_time_ns : int - Start time on system clock, in nanoseconds. - end : str - Print function end string. Default is `\\r` to create a dynamically updating loading bar. - log : bool - If True, log the progress bar at INFO level. - """ - progress_char = "=" - bar_length = 20 - - if total_bytes is not None: # If we actually have an endpoint (i.e. not using a socket) - percentage = int((current_bytes / total_bytes) * 100) # Percent Completed Calculation - progress = int((bar_length * current_bytes) / total_bytes) # Progress Done Calculation - else: - percentage = "???" - progress = 0 - - # Fast calls initially on Windows can result in a zero elapsed time - elapsed_ns = max(time.time_ns() - start_time_ns, 1) - delta = dt.timedelta(microseconds=elapsed_ns / 1E3) - kbps = int(current_bytes * 8E6 / elapsed_ns) # 8 bits per byte, 1E9 s per ns, 1E3 bits per kb - pps = int(current_packets * 1E9 / elapsed_ns) - info_str = f"[Elapsed: {delta}, " \ - f"Parsed {current_bytes} bytes ({current_packets} packets) " \ - f"at {kbps}kb/s ({pps}pkts/s)]" - loadbar = f"Progress: [{progress * progress_char:{bar_length}}]{percentage}% {info_str}" - print(loadbar, end=end) - if log: - logger.info(loadbar) - def packet_generator( # pylint: disable=too-many-branches,too-many-statements self, binary_data: Union[BinaryIO, socket.socket], @@ -481,6 +377,8 @@ def packet_generator( # pylint: disable=too-many-branches,too-many-statements parse_bad_pkts: bool = True, root_container_name="CCSDSPacket", ccsds_headers_only: bool = False, + combine_segmented_packets: bool = False, + secondary_header_bytes: int = 0, yield_unrecognized_packet_errors: bool = False, show_progress: bool = False, buffer_read_size_bytes: Optional[int] = None, @@ -506,6 +404,17 @@ def packet_generator( # pylint: disable=too-many-branches,too-many-statements point for parsing. Default is 'CCSDSPacket'. ccsds_headers_only : bool Default False. If True, only parses the packet headers (does not use the provided packet definition). + ``space_packet_parser.packets.ccsds_packet_generator`` can be used directly to parse only the CCSDS headers + without needing a packet definition. + combine_segmented_packets : bool + Default False. If True, combines segmented packets into a single packet for parsing. This is useful for + parsing packets that are split into multiple packets due to size constraints. The packet data is combined + by concatenating the data from each packet together. The combined packet is then parsed as a single packet. + secondary_header_bytes : int + Default 0. The length of the secondary header in bytes. + This is used to skip the secondary header of segmented packets. + The byte layout within the returned packet has all data concatenated together as follows: + [packet0header, packet0secondaryheader, packet0data, packet1data, packet2data, ...]. yield_unrecognized_packet_errors : bool Default False. If False, UnrecognizedPacketTypeErrors are caught silently and parsing continues to the next packet. @@ -531,123 +440,65 @@ def packet_generator( # pylint: disable=too-many-branches,too-many-statements If yield_unrecognized_packet_errors is True, it will yield an unraised exception object, which can be raised or used for debugging purposes. """ - # ======== - # Set up the reader based on the type of binary_data - # ======== - if isinstance(binary_data, io.BufferedIOBase): - if buffer_read_size_bytes is None: - # Default to a full read of the file - buffer_read_size_bytes = -1 - total_length_bytes = binary_data.seek(0, io.SEEK_END) # This is probably preferable to len - binary_data.seek(0, 0) - logger.info(f"Creating packet generator from a filelike object, {binary_data}. " - f"Total length is {total_length_bytes} bytes") - read_bytes_from_source = binary_data.read - elif isinstance(binary_data, socket.socket): # It's a socket and we don't know how much data we will get - logger.info("Creating packet generator to read from a socket. Total length to parse is unknown.") - total_length_bytes = None # We don't know how long it is - if buffer_read_size_bytes is None: - # Default to 4096 bytes from a socket - buffer_read_size_bytes = 4096 - read_bytes_from_source = binary_data.recv - elif isinstance(binary_data, io.TextIOWrapper): - raise IOError("Packet data file opened in TextIO mode. You must open packet data in binary mode.") - else: - raise IOError(f"Unrecognized data source: {binary_data}") - - # ======== - # Packet loop. Each iteration of this loop yields a CCSDSPacket object - # ======== - start_time = time.time_ns() - n_bytes_parsed = 0 # Keep track of how many bytes we have parsed - n_packets_parsed = 0 # Keep track of how many packets we have parsed - read_buffer = b"" # Empty bytes object to start - current_pos = 0 # Keep track of where we are in the buffer - while True: - if total_length_bytes and n_bytes_parsed == total_length_bytes: - break # Exit if we know the length and we've reached it - - if show_progress: - self._print_progress(current_bytes=n_bytes_parsed, total_bytes=total_length_bytes, - start_time_ns=start_time, current_packets=n_packets_parsed) - - if current_pos > 20_000_000: - # Only trim the buffer after 20 MB read to prevent modifying - # the bitstream and trimming after every packet - read_buffer = read_buffer[current_pos:] - current_pos = 0 - - # Fill buffer enough to parse a header - while len(read_buffer) - current_pos < skip_header_bytes + CCSDS_HEADER_LENGTH_BYTES: - result = read_bytes_from_source(buffer_read_size_bytes) - if not result: # If there is verifiably no more data to add, break - break - read_buffer += result - # Skip the header bytes - current_pos += skip_header_bytes - header_bytes = read_buffer[current_pos:current_pos + CCSDS_HEADER_LENGTH_BYTES] - header = self._parse_header(header_bytes) - - # per the CCSDS spec - # 4.1.3.5.3 The length count C shall be expressed as: - # C = (Total Number of Octets in the Packet Data Field) – 1 - n_bytes_data = header['PKT_LEN'] + 1 - n_bytes_packet = CCSDS_HEADER_LENGTH_BYTES + n_bytes_data - - # Based on PKT_LEN fill buffer enough to read a full packet - while len(read_buffer) - current_pos < n_bytes_packet: - result = read_bytes_from_source(buffer_read_size_bytes) - if not result: # If there is verifiably no more data to add, break - break - read_buffer += result - - # Consider it a counted packet once we've verified that we have read the full packet and parsed the header - # Update the number of packets and bytes parsed - n_packets_parsed += 1 - n_bytes_parsed += skip_header_bytes + n_bytes_packet + # Used to keep track of any continuation packets that we encounter + # gathering them all up before combining them into a single packet + # for the XTCE to parse, lookup is by APID. + # _segmented_packets[APID] = [RawPacketData, ...] + _segmented_packets = {} + + # Iterate over individual packets in the binary data + for raw_packet_data in packets.ccsds_generator(binary_data, + buffer_read_size_bytes=buffer_read_size_bytes, + show_progress=show_progress, + skip_header_bytes=skip_header_bytes): if ccsds_headers_only: - # update the current position to the end of the packet data - current_pos += n_bytes_packet - p = packets.CCSDSPacket(raw_data=read_buffer[current_pos - n_bytes_packet:current_pos], **header) - yield p + yield raw_packet_data continue - # current_pos is still before the header, so we are reading the entire packet here - packet_bytes = read_buffer[current_pos:current_pos + n_bytes_packet] - current_pos += n_bytes_packet - # Wrap the bytes in a class that can keep track of position as we read from it - packet = packets.CCSDSPacket(raw_data=packet_bytes) + if not combine_segmented_packets or raw_packet_data.sequence_flags == packets.SequenceFlags.UNSEGMENTED: + packet = packets.CCSDSPacket(raw_data=raw_packet_data) + elif raw_packet_data.sequence_flags == packets.SequenceFlags.FIRST: + _segmented_packets[raw_packet_data.apid] = [raw_packet_data] + continue + elif not _segmented_packets.get(raw_packet_data.apid, []): + warnings.warn("Continuation packet found without declaring the start, skipping this packet.") + continue + elif raw_packet_data.sequence_flags == packets.SequenceFlags.CONTINUATION: + _segmented_packets[raw_packet_data.apid].append(raw_packet_data) + continue + else: # raw_packet_data.sequence_flags == packets.SequenceFlags.LAST: + _segmented_packets[raw_packet_data.apid].append(raw_packet_data) + # We have received the final packet, close it up and combine all of + # the segmented packets into a single "packet" for XTCE parsing + sequence_counts = [p.sequence_count for p in _segmented_packets[raw_packet_data.apid]] + if not all((sequence_counts[i + 1] - sequence_counts[i]) % 16384 == 1 + for i in range(len(sequence_counts) - 1)): + warnings.warn(f"Continuation packets for apid {raw_packet_data.apid} " + f"are not in sequence {sequence_counts}, skipping these packets.") + continue + # Add all content (including header) from the first packet + raw_data = _segmented_packets[raw_packet_data.apid][0] + # Add the continuation packets to the first packet, skipping the headers + for p in _segmented_packets[raw_packet_data.apid][1:]: + raw_data += p[raw_packet_data.HEADER_LENGTH_BYTES + secondary_header_bytes:] + packet = packets.CCSDSPacket(raw_data=raw_data) + + # Now do the actual parsing of the packet data try: - packet = self.parse_ccsds_packet(packet, - root_container_name=root_container_name) + packet = self.parse_ccsds_packet(packet, root_container_name=root_container_name) except UnrecognizedPacketTypeError as e: - logger.debug(f"Unrecognized error on packet with APID {header['PKT_APID']}'") + logger.debug(f"Unrecognized error on packet with APID {packet.raw_data.apid}") if yield_unrecognized_packet_errors: # Yield the caught exception without raising it (raising ends generator) yield e # Continue to next packet continue - if packet.header['PKT_LEN'] != header['PKT_LEN']: - raise ValueError(f"Hardcoded header parsing found a different packet length " - f"{header['PKT_LEN']} than the definition-based parsing found " - f"{packet.header['PKT_LEN']}. This might be because the CCSDS header is " - f"incorrectly represented in your packet definition document.") - - actual_length_parsed = packet.raw_data.pos // 8 - if actual_length_parsed != n_bytes_packet: - logger.warning(f"Parsed packet length " - f"({actual_length_parsed}B) did not match " - f"length specified in header ({n_bytes_packet}B). " - f"Updating the position to the correct position " - "indicated by CCSDS header.") + if packet.raw_data.pos != len(packet.raw_data) * 8: + logger.warning(f"Number of bits parsed ({packet.raw_data.pos}b) did not match " + f"the length of data available ({len(packet.raw_data) * 8}b).") if not parse_bad_pkts: - logger.warning(f"Skipping (not yielding) bad packet with apid {header['PKT_APID']}.") + logger.warning(f"Skipping (not yielding) bad packet with apid {raw_packet_data.apid}.") continue yield packet - - if show_progress: - self._print_progress(current_bytes=n_bytes_parsed, total_bytes=total_length_bytes, - start_time_ns=start_time, current_packets=n_packets_parsed, - end="\n", log=True) diff --git a/space_packet_parser/encodings.py b/space_packet_parser/encodings.py index 9d53662..5c4fd66 100644 --- a/space_packet_parser/encodings.py +++ b/space_packet_parser/encodings.py @@ -145,7 +145,7 @@ def _calculate_size(self, packet: packets.CCSDSPacket) -> int: """ raise NotImplementedError() - def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.ParameterDataTypes: + def parse_value(self, packet: packets.CCSDSPacket) -> packets.ParameterDataTypes: """Parse a value from packet data, possibly using previously parsed data items to inform parsing. Parameters @@ -334,7 +334,7 @@ def _get_raw_buffer(self, packet: packets.CCSDSPacket) -> bytes: ).to_bytes(buflen_bytes, "big") return raw_string_buffer - def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.StrParameter: + def parse_value(self, packet: packets.CCSDSPacket) -> packets.StrParameter: """Parse a string value from packet data, possibly using previously parsed data items to inform parsing. Parameters @@ -529,7 +529,7 @@ def _twos_complement(val: int, bit_width: int) -> int: def parse_value(self, packet: packets.CCSDSPacket, - **kwargs) -> Union[packets.FloatParameter, packets.IntParameter]: + ) -> Union[packets.FloatParameter, packets.IntParameter]: """Parse a value from packet data, possibly using previously parsed data items to inform parsing. Parameters @@ -793,7 +793,7 @@ def _calculate_size(self, packet: packets.CCSDSPacket) -> int: len_bits = self.linear_adjuster(len_bits) return len_bits - def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.BinaryParameter: + def parse_value(self, packet: packets.CCSDSPacket) -> packets.BinaryParameter: """Parse a value from packet data, possibly using previously parsed data items to inform parsing. Parameters diff --git a/space_packet_parser/exceptions.py b/space_packet_parser/exceptions.py index 143d5db..35de8f0 100644 --- a/space_packet_parser/exceptions.py +++ b/space_packet_parser/exceptions.py @@ -30,3 +30,17 @@ class CalibrationError(Exception): class InvalidParameterTypeError(Exception): """Error raised when someone is using an invalid ParameterType element""" pass + + +class UnrecognizedPacketTypeError(Exception): + """Error raised when we can't figure out which kind of packet we are dealing with based on the header""" + + def __init__(self, *args, partial_data: dict = None): + """ + Parameters + ---------- + partial_data : dict, Optional + Data parsed so far (for debugging at higher levels) + """ + super().__init__(*args) + self.partial_data = partial_data diff --git a/space_packet_parser/packets.py b/space_packet_parser/packets.py index 4c806ed..51a63d3 100644 --- a/space_packet_parser/packets.py +++ b/space_packet_parser/packets.py @@ -1,10 +1,27 @@ -"""Packet containers and parsing utilities for space packets.""" +"""Packet containers and parsing utilities for space packets. + +The parsing begins with binary data representing CCSDS Packets. A user can then create a generator +from the binary data reading from a filelike object or a socket. The ``ccsds_generator`` function yields +``RawPacketData`` objects that are the raw bytes of a single CCSDS packet. The ``RawPacketData`` +class can be used to inspect the CCSDS header fields of the packet, but it does not have any +parsed content from the data field. This generator is useful for debugging and passing off +to other parsing functions. +""" from dataclasses import dataclass, field -from typing import List, Optional, Protocol, Union +import datetime as dt +from functools import cached_property +from enum import IntEnum +import io +import logging +import socket +import time +from typing import BinaryIO, Iterator, List, Optional, Protocol, Union + BuiltinDataTypes = Union[bytes, float, int, str] +logger = logging.getLogger(__name__) class _Parameter: @@ -56,28 +73,87 @@ class StrParameter(_Parameter, str): ParameterDataTypes = Union[BinaryParameter, BoolParameter, FloatParameter, IntParameter, StrParameter] +class SequenceFlags(IntEnum): + """Enumeration of the possible sequence flags in a CCSDS packet.""" + CONTINUATION = 0 + FIRST = 1 + LAST = 2 + UNSEGMENTED = 3 + + class RawPacketData(bytes): """A class to represent raw packet data as bytes but whose length is represented by bit length. This class is a subclass of bytes and is used to represent the raw packet data in a more readable way. It is used to store the raw packet data in the Packet - class and used to keep track of the current parsing position. + class and used to keep track of the current parsing position (accessible through the `pos` attribute). - Parameters - ---------- - data : bytes - Raw packet data. Full CCSDS packets are always an integer number of bytes. + There are also a few convenience methods to extract the CCSDS header fields from the packet data. """ - def __init__(self, data: bytes, *, pos: int = 0): - self.pos = pos - self._nbits = len(data) * 8 - super().__init__() + HEADER_LENGTH_BYTES = 6 + pos = 0 # in bits + + def __str__(self) -> str: + return (f"RawPacketData Header: ({self.version_number=}, {self.type=}, " + f"{self.secondary_header_flag=}, {self.apid=}, {self.sequence_flags=}, " + f"{self.sequence_count=}, {self.data_length=})").replace("self.", "") + + @cached_property + def version_number(self) -> int: + """CCSDS Packet Version Number""" + return _extract_bits(self, 0, 3) + + @cached_property + def type(self) -> int: + """CCSDS Packet Type + + 0 = Telemetry Packet + 1 = Telecommand Packet + """ + return _extract_bits(self, 3, 1) + + @cached_property + def secondary_header_flag(self) -> int: + """CCSDS Secondary Header Flag + + 0 = No secondary header + 1 = Secondary header present + """ + + return _extract_bits(self, 4, 1) + + @cached_property + def apid(self) -> int: + """CCSDS Application Process Identifier (APID)""" + return _extract_bits(self, 5, 11) + + @cached_property + def sequence_flags(self) -> int: + """CCSDS Packet Sequence Flags + + 00 = Continuation packet + 01 = First packet + 10 = Last packet + 11 = Unsegmented packet (standalone) + """ + return _extract_bits(self, 16, 2) + + @cached_property + def sequence_count(self) -> int: + """CCSDS Packet Sequence Count""" + return _extract_bits(self, 18, 14) - def __len__(self): - return self._nbits + @cached_property + def data_length(self) -> int: + """CCSDS Packet Data Length - def __repr__(self): - return f"RawPacketData({self}, {len(self)}b, pos={self.pos})" + Section 4.1.3.5.3 The length count C shall be expressed as: + C = (Total Number of Octets in the Packet Data Field) – 1 + """ + # This has already been parsed previously to give us the length of the packet + # so avoid the extract_bits call again and calculate it based on the length of the data + # Subtract 6 bytes for the header and 1 for the length count + return len(self) - RawPacketData.HEADER_LENGTH_BYTES - 1 def read_as_bytes(self, nbits: int) -> bytes: """Read a number of bits from the packet data as bytes. Reads minimum number of complete bytes required to @@ -93,7 +169,7 @@ def read_as_bytes(self, nbits: int) -> bytes: : bytes Raw bytes from the packet data """ - if self.pos + nbits > len(self): + if self.pos + nbits > len(self) * 8: raise ValueError("End of packet reached") if self.pos % 8 == 0 and nbits % 8 == 0: # If the read is byte-aligned, we can just return the bytes directly @@ -123,11 +199,71 @@ def read_as_int(self, nbits: int) -> int: return int_data +def create_ccsds_packet(data=b"\x00", + *, + version_number=0, + type=0, # pylint: disable=redefined-builtin + secondary_header_flag=0, + apid=2047, # 2047 is defined as a fill packet in the CCSDS spec + sequence_flags=SequenceFlags.UNSEGMENTED, + sequence_count=0): + """Create a binary CCSDS packet. + + Pack the header fields into the proper bit locations and append the data bytes. + + Parameters + ---------- + data : bytes + User data bytes (up to 65536 bytes) + version_number : int + CCSDS Packet Version Number (3 bits) + type : int + CCSDS Packet Type (1 bit) + secondary_header_flag : int + CCSDS Secondary Header Flag (1 bit) + apid : int + CCSDS Application Process Identifier (APID) (11 bits) + sequence_flags : int + CCSDS Packet Sequence Flags (2 bits) + sequence_count : int + CCSDS Packet Sequence Count (14 bits) + """ + if version_number < 0 or version_number > 7: # 3 bits + raise ValueError("version_number must be between 0 and 7") + if type < 0 or type > 1: # 1 bit + raise ValueError("type_ must be 0 or 1") + if secondary_header_flag < 0 or secondary_header_flag > 1: # 1 bit + raise ValueError("secondary_header_flag must be 0 or 1") + if apid < 0 or apid > 2047: # 11 bits + raise ValueError("apid must be between 0 and 2047") + if sequence_flags < 0 or sequence_flags > 3: # 2 bits + raise ValueError("sequence_flags must be between 0 and 3") + if sequence_count < 0 or sequence_count > 16383: # 14 bits + raise ValueError("sequence_count must be between 0 and 16383") + if len(data) < 1 or len(data) > 65536: # 16 bits + raise ValueError("length of data (in bytes) must be between 1 and 65536") + + # CCSDS primary header + # bitshift left to the correct position for that field (48 - start_bit - nbits) + try: + header = (version_number << 48 - 3 + | type << 48 - 4 + | secondary_header_flag << 48 - 5 + | apid << 48 - 16 + | sequence_flags << 48 - 18 + | sequence_count << 48 - 32 + | len(data) - 1) + packet = header.to_bytes(RawPacketData.HEADER_LENGTH_BYTES, "big") + data + except TypeError as e: + raise TypeError("CCSDS Header items must be integers and the input data bytes.") from e + return RawPacketData(packet) + + class CCSDSPacket(dict): - """CCSDS Packet + """Packet representing parsed data items from CCSDS packet(s). Container that stores the raw packet data (bytes) as an instance attribute and the parsed - data items in a dict interface. A ``CCSDSPacket`` generally begins as an empty dictionary that gets + data items in a dictionary interface. A ``CCSDSPacket`` generally begins as an empty dictionary that gets filled as the packet is parsed. The first 7 items in the dictionary make up the packet header (accessed with ``CCSDSPacket.header``), and the rest of the items make up the user data (accessed with ``CCSDSPacket.user_data``). To access the @@ -159,7 +295,7 @@ def user_data(self) -> dict: class Parseable(Protocol): """Defines an object that can be parsed from packet data.""" - def parse(self, packet: CCSDSPacket, **parse_value_kwargs) -> None: + def parse(self, packet: CCSDSPacket) -> None: """Parse this entry from the packet data and add the necessary items to the packet.""" @@ -200,13 +336,187 @@ def __post_init__(self): self.restriction_criteria = self.restriction_criteria or [] self.inheritors = self.inheritors or [] - def parse(self, packet: CCSDSPacket, **parse_value_kwargs) -> None: + def parse(self, packet: CCSDSPacket) -> None: """Parse the entry list of parameters/containers in the order they are expected in the packet. This could be recursive if the entry list contains SequenceContainers. """ for entry in self.entry_list: - entry.parse(packet=packet, **parse_value_kwargs) + entry.parse(packet=packet) + + +def ccsds_generator( # pylint: disable=too-many-branches,too-many-statements + binary_data: Union[BinaryIO, socket.socket, bytes], + *, + buffer_read_size_bytes: Optional[int] = None, + show_progress: bool = False, + skip_header_bytes: int = 0, +) -> Iterator[RawPacketData]: + """A generator that reads raw packet data from a filelike object or a socket. + + Each iteration of the generator yields a ``RawPacketData`` object that makes up + a single CCSDS packet. + + Parameters + ---------- + binary_data : Union[BinaryIO, socket.socket, bytes] + Binary data source containing CCSDSPackets. + buffer_read_size_bytes : int, optional + Number of bytes to read from e.g. a BufferedReader or socket binary data source on each read attempt. + If None, defaults to 4096 bytes from a socket, -1 (full read) from a file. + show_progress : bool + Default False. + If True, prints a status bar. Note that for socket sources, the percentage will be zero until the generator + ends. + skip_header_bytes : int + Default 0. The parser skips this many bytes at the beginning of every packet. This allows dynamic stripping + of additional header data that may be prepended to packets in "raw record" file formats. + + Yields + ------- + RawPacketData + Generator yields a RawPacketData object containing the raw packet data. + """ + n_bytes_parsed = 0 # Keep track of how many bytes we have parsed + n_packets_parsed = 0 # Keep track of how many packets we have parsed + read_buffer = b"" # Empty bytes object to start + current_pos = 0 # Keep track of where we are in the buffer + + # ======== + # Set up the reader based on the type of binary_data + # ======== + if isinstance(binary_data, io.BufferedIOBase): + if buffer_read_size_bytes is None: + # Default to a full read of the file + buffer_read_size_bytes = -1 + total_length_bytes = binary_data.seek(0, io.SEEK_END) # This is probably preferable to len + binary_data.seek(0, 0) + logger.info(f"Creating packet generator from a filelike object, {binary_data}. " + f"Total length is {total_length_bytes} bytes") + read_bytes_from_source = binary_data.read + elif isinstance(binary_data, socket.socket): # It's a socket and we don't know how much data we will get + logger.info("Creating packet generator to read from a socket. Total length to parse is unknown.") + total_length_bytes = None # We don't know how long it is + if buffer_read_size_bytes is None: + # Default to 4096 bytes from a socket + buffer_read_size_bytes = 4096 + read_bytes_from_source = binary_data.recv + elif isinstance(binary_data, bytes): + read_buffer = binary_data + total_length_bytes = len(read_buffer) + read_bytes_from_source = None # No data to read, we've filled the read_buffer already + logger.info(f"Creating packet generator from a bytes object. Total length is {total_length_bytes} bytes") + elif isinstance(binary_data, io.TextIOWrapper): + raise IOError("Packet data file opened in TextIO mode. You must open packet data in binary mode.") + else: + raise IOError(f"Unrecognized data source: {binary_data}") + + # ======== + # Packet loop. Each iteration of this loop yields a RawPacketData object + # ======== + start_time = time.time_ns() + while True: + if total_length_bytes and n_bytes_parsed == total_length_bytes: + break # Exit if we know the length and we've reached it + + if show_progress: + _print_progress(current_bytes=n_bytes_parsed, total_bytes=total_length_bytes, + start_time_ns=start_time, current_packets=n_packets_parsed) + + if current_pos > 20_000_000: + # Only trim the buffer after 20 MB read to prevent modifying + # the bitstream and trimming after every packet + read_buffer = read_buffer[current_pos:] + current_pos = 0 + + # Fill buffer enough to parse a header + while len(read_buffer) - current_pos < skip_header_bytes + RawPacketData.HEADER_LENGTH_BYTES: + result = read_bytes_from_source(buffer_read_size_bytes) + if not result: # If there is verifiably no more data to add, break + break + read_buffer += result + # Skip the header bytes + current_pos += skip_header_bytes + header_bytes = read_buffer[current_pos:current_pos + RawPacketData.HEADER_LENGTH_BYTES] + + # per the CCSDS spec + # 4.1.3.5.3 The length count C shall be expressed as: + # C = (Total Number of Octets in the Packet Data Field) – 1 + n_bytes_data = _extract_bits(header_bytes, 32, 16) + 1 + n_bytes_packet = RawPacketData.HEADER_LENGTH_BYTES + n_bytes_data + + # Fill the buffer enough to read a full packet, taking into account the user data length + while len(read_buffer) - current_pos < n_bytes_packet: + result = read_bytes_from_source(buffer_read_size_bytes) + if not result: # If there is verifiably no more data to add, break + break + read_buffer += result + + # Consider it a counted packet once we've verified that we have read the full packet and parsed the header + # Update the number of packets and bytes parsed + n_packets_parsed += 1 + n_bytes_parsed += skip_header_bytes + n_bytes_packet + + # current_pos is still before the header, so we are reading the entire packet here + packet_bytes = read_buffer[current_pos:current_pos + n_bytes_packet] + current_pos += n_bytes_packet + # Wrap the bytes in a RawPacketData object that adds convenience methods for parsing the header + yield RawPacketData(packet_bytes) + + if show_progress: + _print_progress(current_bytes=n_bytes_parsed, total_bytes=total_length_bytes, + start_time_ns=start_time, current_packets=n_packets_parsed, + end="\n", log=True) + + +def _print_progress( + *, + current_bytes: int, + total_bytes: Optional[int], + start_time_ns: int, + current_packets: int, + end: str = '\r', + log: bool = False + ): + """Prints a progress bar, including statistics on parsing rate. + + Parameters + ---------- + current_bytes : int + Number of bytes parsed so far. + total_bytes : Optional[int] + Number of total bytes to parse, if known. None otherwise. + current_packets : int + Number of packets parsed so far. + start_time_ns : int + Start time on system clock, in nanoseconds. + end : str + Print function end string. Default is `\\r` to create a dynamically updating loading bar. + log : bool + If True, log the progress bar at INFO level. + """ + progress_char = "=" + bar_length = 20 + + if total_bytes is not None: # If we actually have an endpoint (i.e. not using a socket) + percentage = int((current_bytes / total_bytes) * 100) # Percent Completed Calculation + progress = int((bar_length * current_bytes) / total_bytes) # Progress Done Calculation + else: + percentage = "???" + progress = 0 + + # Fast calls initially on Windows can result in a zero elapsed time + elapsed_ns = max(time.time_ns() - start_time_ns, 1) + delta = dt.timedelta(microseconds=elapsed_ns / 1E3) + kbps = int(current_bytes * 8E6 / elapsed_ns) # 8 bits per byte, 1E9 s per ns, 1E3 bits per kb + pps = int(current_packets * 1E9 / elapsed_ns) + info_str = f"[Elapsed: {delta}, " \ + f"Parsed {current_bytes} bytes ({current_packets} packets) " \ + f"at {kbps}kb/s ({pps}pkts/s)]" + loadbar = f"Progress: [{progress * progress_char:{bar_length}}]{percentage}% {info_str}" + print(loadbar, end=end) + if log: + logger.info(loadbar) def _extract_bits(data: bytes, start_bit: int, nbits: int): diff --git a/space_packet_parser/parameters.py b/space_packet_parser/parameters.py index 6b752da..3013427 100644 --- a/space_packet_parser/parameters.py +++ b/space_packet_parser/parameters.py @@ -119,7 +119,7 @@ def get_data_encoding(parameter_type_element: ElementTree.Element, ns: dict) -> raise ValueError(f"No Data Encoding element found for Parameter Type " f"{parameter_type_element.tag}: {parameter_type_element.attrib}") - def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.ParameterDataTypes: + def parse_value(self, packet: packets.CCSDSPacket) -> packets.ParameterDataTypes: """Using the parameter type definition and associated data encoding, parse a value from a bit stream starting at the current cursor position. @@ -134,7 +134,7 @@ def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.Paramete parsed_value : packets.ParameterDataTypes Resulting parsed parameter value """ - return self.encoding.parse_value(packet, **kwargs) + return self.encoding.parse_value(packet) class StringParameterType(ParameterType): @@ -259,7 +259,7 @@ def get_enumeration_list_contents(element: ElementTree.Element, encoding: encodi "Supported encodings for enums are FloatDataEncoding, IntegerDataEncoding, " "and StringDataEncoding.") - def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.StrParameter: + def parse_value(self, packet: packets.CCSDSPacket) -> packets.StrParameter: """Using the parameter type definition and associated data encoding, parse a value from a bit stream starting at the current cursor position. @@ -274,7 +274,7 @@ def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.StrParam derived_value : packets.StrParameter Resulting enum label associated with the (usually integer-)encoded data value. """ - raw_enum_value = super().parse_value(packet, **kwargs).raw_value + raw_enum_value = super().parse_value(packet).raw_value # Note: The enum lookup only operates on raw values. This is specified in Fig 4-43 in # section 4.3.2.4.3.6 of the XTCE spec CCSDS 660.1-G-2 # Note, this doesn't prohibit a user from defining a calibrator on an encoding that is used for an enum lookup. @@ -320,7 +320,7 @@ def __init__(self, name: str, encoding: encodings.DataEncoding, unit: Optional[s f"encoded booleans is not specified in XTCE. e.g. is the string \"0\" truthy?") super().__init__(name, encoding, unit) - def parse_value(self, packet: packets.CCSDSPacket, **kwargs): + def parse_value(self, packet: packets.CCSDSPacket): """Using the parameter type definition and associated data encoding, parse a value from a bit stream starting at the current cursor position. @@ -338,7 +338,7 @@ def parse_value(self, packet: packets.CCSDSPacket, **kwargs): # NOTE: The XTCE spec states that Booleans are "a restricted form of # enumeration." Enumerated parameters are only permitted to perform lookups based on raw encoded values # (not calibrated ones). We force this by taking the bool of the raw form of the parsed parameter. - parsed_value = super().parse_value(packet, **kwargs).raw_value + parsed_value = super().parse_value(packet).raw_value # NOTE: Boolean parameters may behave unexpectedly when encoded as String and Binary values. # This is because it's not obvious nor specified in XTCE which values of # binary encoded or string encoded data should be truthy/falsy. @@ -553,10 +553,9 @@ class Parameter(packets.Parseable): short_description: Optional[str] = None long_description: Optional[str] = None - def parse(self, packet: packets.CCSDSPacket, **parse_value_kwargs) -> None: + def parse(self, packet: packets.CCSDSPacket) -> None: """Parse this parameter from the packet data. Parse the parameter and add it to the packet dictionary. """ - packet[self.name] = self.parameter_type.parse_value( - packet, **parse_value_kwargs) + packet[self.name] = self.parameter_type.parse_value(packet) diff --git a/tests/unit/test_packets.py b/tests/unit/test_packets.py index b418b16..8c7d584 100644 --- a/tests/unit/test_packets.py +++ b/tests/unit/test_packets.py @@ -2,7 +2,59 @@ # Standard import pytest # Local -from space_packet_parser import packets +from space_packet_parser import definitions, packets + + +@pytest.mark.parametrize(("input_var", "input_value"), + [("version_number", 0), ("version_number", 7), + ("type", 0), ("type", 1), + ("secondary_header_flag", 0), ("secondary_header_flag", 1), + ("apid", 0), ("apid", 2**11 - 1), + ("sequence_flags", 0), ("sequence_flags", 3), + ("sequence_count", 0), ("sequence_count", 2**14 - 1), + ("data", bytes(1)), pytest.param("data", bytes(65536), id="max-bytes")]) +def test_create_ccsds_packet_input_range(input_var, input_value): + """Validate the min/max integer inputs""" + p = packets.create_ccsds_packet(**{input_var: input_value}) + if input_var == "data": + assert p[6:] == input_value + else: + assert getattr(p, input_var) == input_value + + +@pytest.mark.parametrize(("input_var", "input_value"), + [("version_number", -1), ("version_number", 8), + ("type", -1), ("type", 2), + ("secondary_header_flag", -1), ("secondary_header_flag", 2), + ("apid", -1), ("apid", 2**11), + ("sequence_flags", -1), ("sequence_flags", 4), + ("sequence_count", -1), ("sequence_count", 2**14), + ("data", bytes(0)), pytest.param("data", bytes(65537), id="max-bytes")]) +def test_create_ccsds_packet_value_range_error(input_var, input_value): + """Validate the min/max integer inputs""" + with pytest.raises(ValueError): + packets.create_ccsds_packet(**{input_var: input_value}) + +@pytest.mark.parametrize("input_var", ["version_number", "type", "secondary_header_flag", "apid", + "sequence_flags", "sequence_count", "data"]) +@pytest.mark.parametrize("input_value", [1.0, "1", 0.5]) +def test_create_ccsds_packet_type_validation(input_var, input_value): + """Only integers are allowed for the header fields and bytes for the data field.""" + with pytest.raises(TypeError): + packets.create_ccsds_packet(**{input_var: input_value}) + + +def test_raw_packet_attributes(): + p = packets.create_ccsds_packet(data=b"123", version_number=3, type=1, secondary_header_flag=1, + apid=1234, sequence_flags=2, sequence_count=5) + assert p.version_number == 3 + assert p.type == 1 + assert p.secondary_header_flag == 1 + assert p.apid == 1234 + assert p.sequence_flags == 2 + assert p.sequence_count == 5 + assert len(p) == 6 + 3 + assert p[6:] == b"123" @pytest.mark.parametrize(("raw_value", "start", "nbits", "expected"), @@ -19,7 +71,7 @@ # Multiple bytes (0b110000111100001100000000, 8, 10, 0b1100001100), (0b110000111100001100000000, 0, 24, 0b110000111100001100000000)]) -def test_raw_packet_data(raw_value, start, nbits, expected): +def test_raw_packet_reads(raw_value, start, nbits, expected): raw_bytes = raw_value.to_bytes((raw_value.bit_length() + 7) // 8, "big") raw_packet = packets.RawPacketData(raw_bytes) raw_packet.pos = start @@ -32,7 +84,7 @@ def test_raw_packet_data(raw_value, start, nbits, expected): assert raw_packet.pos == start + nbits -def test_ccsds_packet(): +def test_ccsds_packet_data_lookups(): packet = packets.CCSDSPacket(raw_data=b"123") assert packet.raw_data == b"123" # There are no items yet, so it should be an empty dictionary @@ -49,3 +101,68 @@ def test_ccsds_packet(): with pytest.raises(KeyError): packet[10] + + +def test_continuation_packets(test_data_dir): + # This definition has 65 bytes worth of data + d = definitions.XtcePacketDefinition(test_data_dir / "test_xtce.xml") + # We can put that all in one unsegmented packet, just to verify this is working as expected + raw_bytes = packets.create_ccsds_packet(data=b"0"*65, apid=11, sequence_flags=packets.SequenceFlags.UNSEGMENTED) + orig_packets = list(d.packet_generator(raw_bytes)) + assert len(orig_packets) == 1 + # Remove the sequence flags, counter, and packet length, as they are expected to vary across tests + def remove_keys(d): + d.pop("SEQ_FLGS") + d.pop("PKT_LEN") + d.pop("SRC_SEQ_CTR") + remove_keys(orig_packets[0]) + + # Now we will split the data across 2 CCSDS packets, but expect them to be combined into one for parsing purposes + p0 = packets.create_ccsds_packet(data=b"0"*64, apid=11, sequence_flags=packets.SequenceFlags.FIRST, sequence_count=0) + p1 = packets.create_ccsds_packet(data=b"0"*1, apid=11, sequence_flags=packets.SequenceFlags.LAST, sequence_count=1) + raw_bytes = p0 + p1 + result_packets = list(d.packet_generator(raw_bytes, combine_segmented_packets=True)) + remove_keys(result_packets[0]) + assert result_packets == orig_packets + + # Now we will split the data across 3 CCSDS packets and test the sequence_count wrap-around + p0 = packets.create_ccsds_packet(data=b"0"*63, apid=11, sequence_flags=packets.SequenceFlags.FIRST, sequence_count=16382) + p1 = packets.create_ccsds_packet(data=b"0"*1, apid=11, sequence_flags=packets.SequenceFlags.CONTINUATION, sequence_count=16383) + p2 = packets.create_ccsds_packet(data=b"0"*1, apid=11, sequence_flags=packets.SequenceFlags.LAST, sequence_count=0) + raw_bytes = p0 + p1 + p2 + result_packets = list(d.packet_generator(raw_bytes, combine_segmented_packets=True)) + remove_keys(result_packets[0]) + assert result_packets == orig_packets + + # Test stripping secondary headers (4 bytes per packet), should keep the first packet's header, but skip the following + # Add in 4 1s to the 2nd and 3rd packet that should be removed + p0 = packets.create_ccsds_packet(data=b"0"*63, apid=11, sequence_flags=packets.SequenceFlags.FIRST, sequence_count=16382) + p1 = packets.create_ccsds_packet(data=b"1"*4 + b"0"*1, apid=11, sequence_flags=packets.SequenceFlags.CONTINUATION, sequence_count=16383) + p2 = packets.create_ccsds_packet(data=b"1"*4 + b"0"*1, apid=11, sequence_flags=packets.SequenceFlags.LAST, sequence_count=0) + raw_bytes = p0 + p1 + p2 + result_packets = list(d.packet_generator(raw_bytes, combine_segmented_packets=True, secondary_header_bytes=4)) + remove_keys(result_packets[0]) + assert result_packets == orig_packets + + +def test_continuation_packet_warnings(test_data_dir): + # This definition has 65 bytes worth of data + d = definitions.XtcePacketDefinition(test_data_dir / "test_xtce.xml") + + # CONTINUATION / LAST without FIRST + p0 = packets.create_ccsds_packet(data=b"0"*65, apid=11, sequence_flags=packets.SequenceFlags.CONTINUATION) + p1 = packets.create_ccsds_packet(data=b"0"*65, apid=11, sequence_flags=packets.SequenceFlags.LAST) + raw_bytes = p0 + p1 + with pytest.warns(match="Continuation packet found without declaring the start"): + list(d.packet_generator(raw_bytes, combine_segmented_packets=True)) + # Nothing expected to be returned + assert len(list(d.packet_generator(raw_bytes, combine_segmented_packets=True))) == 0 + + # Out of sequence packets + p0 = packets.create_ccsds_packet(data=b"0"*65, apid=11, sequence_flags=packets.SequenceFlags.FIRST, sequence_count=1) + p1 = packets.create_ccsds_packet(data=b"0"*65, apid=11, sequence_flags=packets.SequenceFlags.LAST, sequence_count=0) + raw_bytes = p0 + p1 + + with pytest.warns(match="not in sequence"): + # Nothing expected to be returned + assert len(list(d.packet_generator(raw_bytes, combine_segmented_packets=True))) == 0