Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MNT/ENH: Add a raw CCSDS packet generator #107

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/source/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ Release notes for the `space_packet_parser` library
### v5.1.0 (unreleased)
- BUGFIX: Fix kbps calculation in packet generator for showing progress.
- Add support for string and float encoded enumerated lookup parameters.
- Add properties to extract the CCSDS Header items from the ``RawPacketData`` object directly.
e.g. ``RawPacketData.apid``
- Add a ``create_ccsds_packet`` function that can create a CCSDS Packet
with the given header items and data. This is useful for creating
mock packets in testing and experimentation for creating debugging
streams as needed.
- Add a ``ccsds_packet_generator()`` function that iterates through raw
bytes and yields individual CCSDS packets.

### v5.0.1 (released)
- BUGFIX: Allow raw_value representation for enums with falsy raw values. Previously these defaulted to the enum label.
Expand Down
233 changes: 20 additions & 213 deletions space_packet_parser/definitions.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,17 @@
"""Module for parsing XTCE xml files to specify packet format"""
# Standard
from collections import namedtuple
import datetime as dt
import io
import logging
from pathlib import Path
import socket
import time
from typing import Tuple, Optional, List, TextIO, Dict, Union, BinaryIO, Iterator
# Installed
import lxml.etree as ElementTree
# Local
from space_packet_parser.exceptions import ElementNotFoundError, InvalidParameterTypeError
from space_packet_parser.exceptions import ElementNotFoundError, InvalidParameterTypeError, UnrecognizedPacketTypeError
from space_packet_parser import comparisons, parameters, packets

logger = logging.getLogger(__name__)

CcsdsPacketHeaderElement = namedtuple('CcsdsPacketHeaderElement', ['name', 'nbits'])

CCSDS_HEADER_DEFINITION = [
CcsdsPacketHeaderElement('VERSION', 3),
CcsdsPacketHeaderElement('TYPE', 1),
CcsdsPacketHeaderElement('SEC_HDR_FLG', 1),
CcsdsPacketHeaderElement('PKT_APID', 11),
CcsdsPacketHeaderElement('SEQ_FLGS', 2),
CcsdsPacketHeaderElement('SRC_SEQ_CTR', 14),
CcsdsPacketHeaderElement('PKT_LEN', 16)
]

CCSDS_HEADER_LENGTH_BYTES = 6


class UnrecognizedPacketTypeError(Exception):
"""Error raised when we can't figure out which kind of packet we are dealing with based on the header"""

def __init__(self, *args, partial_data: dict = None):
"""
Parameters
----------
partial_data : dict, Optional
Data parsed so far (for debugging at higher levels)
"""
super().__init__(*args)
self.partial_data = partial_data


class XtcePacketDefinition:
"""Object representation of the XTCE definition of a CCSDS packet object"""
Expand Down Expand Up @@ -353,33 +321,10 @@ def _get_container_base_container(
restrictions = []
return self._find_container(base_container_element.attrib['containerRef']), restrictions

@staticmethod
def _parse_header(packet_data: bytes) -> dict:
"""Parses the CCSDS standard header.

Parameters
----------
packet_data : bytes
6 bytes of binary data.

Returns
-------
header : dict
Dictionary of header items.
"""
header = {}
current_bit = 0
for item in CCSDS_HEADER_DEFINITION:
# pylint: disable=protected-access
header[item.name] = packets._extract_bits(packet_data, current_bit, item.nbits)
current_bit += item.nbits
return header

def parse_ccsds_packet(self,
packet: packets.CCSDSPacket,
*,
root_container_name: str = "CCSDSPacket",
**parse_value_kwargs) -> packets.CCSDSPacket:
root_container_name: str = "CCSDSPacket") -> packets.CCSDSPacket:
"""Parse binary packet data according to the self.packet_definition object

Parameters
Expand All @@ -397,7 +342,7 @@ def parse_ccsds_packet(self,
"""
current_container: packets.SequenceContainer = self._sequence_container_cache[root_container_name]
while True:
current_container.parse(packet, **parse_value_kwargs)
current_container.parse(packet)

valid_inheritors = []
for inheritor_name in current_container.inheritors:
Expand All @@ -424,56 +369,6 @@ def parse_ccsds_packet(self,
partial_data=packet)
return packet

@staticmethod
def _print_progress(
*,
current_bytes: int,
total_bytes: Optional[int],
start_time_ns: int,
current_packets: int,
end: str = '\r',
log: bool = False
):
"""Prints a progress bar, including statistics on parsing rate.

Parameters
----------
current_bytes : int
Number of bytes parsed so far.
total_bytes : Optional[int]
Number of total bytes to parse, if known. None otherwise.
current_packets : int
Number of packets parsed so far.
start_time_ns : int
Start time on system clock, in nanoseconds.
end : str
Print function end string. Default is `\\r` to create a dynamically updating loading bar.
log : bool
If True, log the progress bar at INFO level.
"""
progress_char = "="
bar_length = 20

if total_bytes is not None: # If we actually have an endpoint (i.e. not using a socket)
percentage = int((current_bytes / total_bytes) * 100) # Percent Completed Calculation
progress = int((bar_length * current_bytes) / total_bytes) # Progress Done Calculation
else:
percentage = "???"
progress = 0

# Fast calls initially on Windows can result in a zero elapsed time
elapsed_ns = max(time.time_ns() - start_time_ns, 1)
delta = dt.timedelta(microseconds=elapsed_ns / 1E3)
kbps = int(current_bytes * 8E6 / elapsed_ns) # 8 bits per byte, 1E9 s per ns, 1E3 bits per kb
pps = int(current_packets * 1E9 / elapsed_ns)
info_str = f"[Elapsed: {delta}, " \
f"Parsed {current_bytes} bytes ({current_packets} packets) " \
f"at {kbps}kb/s ({pps}pkts/s)]"
loadbar = f"Progress: [{progress * progress_char:{bar_length}}]{percentage}% {info_str}"
print(loadbar, end=end)
if log:
logger.info(loadbar)

def packet_generator( # pylint: disable=too-many-branches,too-many-statements
self,
binary_data: Union[BinaryIO, socket.socket],
Expand Down Expand Up @@ -506,6 +401,8 @@ def packet_generator( # pylint: disable=too-many-branches,too-many-statements
point for parsing. Default is 'CCSDSPacket'.
ccsds_headers_only : bool
Default False. If True, only parses the packet headers (does not use the provided packet definition).
``space_packet_parser.packets.ccsds_packet_generator`` can be used directly to parse only the CCSDS headers
without needing a packet definition.
yield_unrecognized_packet_errors : bool
Default False.
If False, UnrecognizedPacketTypeErrors are caught silently and parsing continues to the next packet.
Expand All @@ -531,123 +428,33 @@ def packet_generator( # pylint: disable=too-many-branches,too-many-statements
If yield_unrecognized_packet_errors is True, it will yield an unraised exception object,
which can be raised or used for debugging purposes.
"""
# ========
# Set up the reader based on the type of binary_data
# ========
if isinstance(binary_data, io.BufferedIOBase):
if buffer_read_size_bytes is None:
# Default to a full read of the file
buffer_read_size_bytes = -1
total_length_bytes = binary_data.seek(0, io.SEEK_END) # This is probably preferable to len
binary_data.seek(0, 0)
logger.info(f"Creating packet generator from a filelike object, {binary_data}. "
f"Total length is {total_length_bytes} bytes")
read_bytes_from_source = binary_data.read
elif isinstance(binary_data, socket.socket): # It's a socket and we don't know how much data we will get
logger.info("Creating packet generator to read from a socket. Total length to parse is unknown.")
total_length_bytes = None # We don't know how long it is
if buffer_read_size_bytes is None:
# Default to 4096 bytes from a socket
buffer_read_size_bytes = 4096
read_bytes_from_source = binary_data.recv
elif isinstance(binary_data, io.TextIOWrapper):
raise IOError("Packet data file opened in TextIO mode. You must open packet data in binary mode.")
else:
raise IOError(f"Unrecognized data source: {binary_data}")

# ========
# Packet loop. Each iteration of this loop yields a CCSDSPacket object
# ========
start_time = time.time_ns()
n_bytes_parsed = 0 # Keep track of how many bytes we have parsed
n_packets_parsed = 0 # Keep track of how many packets we have parsed
read_buffer = b"" # Empty bytes object to start
current_pos = 0 # Keep track of where we are in the buffer
while True:
if total_length_bytes and n_bytes_parsed == total_length_bytes:
break # Exit if we know the length and we've reached it

if show_progress:
self._print_progress(current_bytes=n_bytes_parsed, total_bytes=total_length_bytes,
start_time_ns=start_time, current_packets=n_packets_parsed)

if current_pos > 20_000_000:
# Only trim the buffer after 20 MB read to prevent modifying
# the bitstream and trimming after every packet
read_buffer = read_buffer[current_pos:]
current_pos = 0

# Fill buffer enough to parse a header
while len(read_buffer) - current_pos < skip_header_bytes + CCSDS_HEADER_LENGTH_BYTES:
result = read_bytes_from_source(buffer_read_size_bytes)
if not result: # If there is verifiably no more data to add, break
break
read_buffer += result
# Skip the header bytes
current_pos += skip_header_bytes
header_bytes = read_buffer[current_pos:current_pos + CCSDS_HEADER_LENGTH_BYTES]
header = self._parse_header(header_bytes)

# per the CCSDS spec
# 4.1.3.5.3 The length count C shall be expressed as:
# C = (Total Number of Octets in the Packet Data Field) – 1
n_bytes_data = header['PKT_LEN'] + 1
n_bytes_packet = CCSDS_HEADER_LENGTH_BYTES + n_bytes_data

# Based on PKT_LEN fill buffer enough to read a full packet
while len(read_buffer) - current_pos < n_bytes_packet:
result = read_bytes_from_source(buffer_read_size_bytes)
if not result: # If there is verifiably no more data to add, break
break
read_buffer += result

# Consider it a counted packet once we've verified that we have read the full packet and parsed the header
# Update the number of packets and bytes parsed
n_packets_parsed += 1
n_bytes_parsed += skip_header_bytes + n_bytes_packet

# Iterate over individual packets in the binary data
for raw_packet_data in packets.ccsds_generator(binary_data,
buffer_read_size_bytes=buffer_read_size_bytes,
show_progress=show_progress,
skip_header_bytes=skip_header_bytes):
if ccsds_headers_only:
# update the current position to the end of the packet data
current_pos += n_bytes_packet
p = packets.CCSDSPacket(raw_data=read_buffer[current_pos - n_bytes_packet:current_pos], **header)
yield p
yield raw_packet_data
continue

# current_pos is still before the header, so we are reading the entire packet here
packet_bytes = read_buffer[current_pos:current_pos + n_bytes_packet]
current_pos += n_bytes_packet
# Wrap the bytes in a class that can keep track of position as we read from it
packet = packets.CCSDSPacket(raw_data=packet_bytes)
packet = packets.CCSDSPacket(raw_data=raw_packet_data)
# Now do the actual parsing of the packet data
try:
packet = self.parse_ccsds_packet(packet,
root_container_name=root_container_name)
packet = self.parse_ccsds_packet(packet, root_container_name=root_container_name)
except UnrecognizedPacketTypeError as e:
logger.debug(f"Unrecognized error on packet with APID {header['PKT_APID']}'")
logger.debug(f"Unrecognized error on packet with APID {packet.raw_data.apid}")
if yield_unrecognized_packet_errors:
# Yield the caught exception without raising it (raising ends generator)
yield e
# Continue to next packet
continue

if packet.header['PKT_LEN'] != header['PKT_LEN']:
raise ValueError(f"Hardcoded header parsing found a different packet length "
f"{header['PKT_LEN']} than the definition-based parsing found "
f"{packet.header['PKT_LEN']}. This might be because the CCSDS header is "
f"incorrectly represented in your packet definition document.")

actual_length_parsed = packet.raw_data.pos // 8
if actual_length_parsed != n_bytes_packet:
logger.warning(f"Parsed packet length "
f"({actual_length_parsed}B) did not match "
f"length specified in header ({n_bytes_packet}B). "
f"Updating the position to the correct position "
"indicated by CCSDS header.")
if packet.raw_data.pos != len(packet.raw_data) * 8:
logger.warning(f"Number of bits parsed ({packet.raw_data.pos}b) did not match "
f"the length of data available ({len(packet.raw_data) * 8}b).")
if not parse_bad_pkts:
logger.warning(f"Skipping (not yielding) bad packet with apid {header['PKT_APID']}.")
logger.warning(f"Skipping (not yielding) bad packet with apid {raw_packet_data.apid}.")
continue

yield packet

if show_progress:
self._print_progress(current_bytes=n_bytes_parsed, total_bytes=total_length_bytes,
start_time_ns=start_time, current_packets=n_packets_parsed,
end="\n", log=True)
8 changes: 4 additions & 4 deletions space_packet_parser/encodings.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _calculate_size(self, packet: packets.CCSDSPacket) -> int:
"""
raise NotImplementedError()

def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.ParameterDataTypes:
def parse_value(self, packet: packets.CCSDSPacket) -> packets.ParameterDataTypes:
"""Parse a value from packet data, possibly using previously parsed data items to inform parsing.

Parameters
Expand Down Expand Up @@ -334,7 +334,7 @@ def _get_raw_buffer(self, packet: packets.CCSDSPacket) -> bytes:
).to_bytes(buflen_bytes, "big")
return raw_string_buffer

def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.StrParameter:
def parse_value(self, packet: packets.CCSDSPacket) -> packets.StrParameter:
"""Parse a string value from packet data, possibly using previously parsed data items to inform parsing.

Parameters
Expand Down Expand Up @@ -529,7 +529,7 @@ def _twos_complement(val: int, bit_width: int) -> int:

def parse_value(self,
packet: packets.CCSDSPacket,
**kwargs) -> Union[packets.FloatParameter, packets.IntParameter]:
) -> Union[packets.FloatParameter, packets.IntParameter]:
"""Parse a value from packet data, possibly using previously parsed data items to inform parsing.

Parameters
Expand Down Expand Up @@ -793,7 +793,7 @@ def _calculate_size(self, packet: packets.CCSDSPacket) -> int:
len_bits = self.linear_adjuster(len_bits)
return len_bits

def parse_value(self, packet: packets.CCSDSPacket, **kwargs) -> packets.BinaryParameter:
def parse_value(self, packet: packets.CCSDSPacket) -> packets.BinaryParameter:
"""Parse a value from packet data, possibly using previously parsed data items to inform parsing.

Parameters
Expand Down
14 changes: 14 additions & 0 deletions space_packet_parser/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,17 @@ class CalibrationError(Exception):
class InvalidParameterTypeError(Exception):
"""Error raised when someone is using an invalid ParameterType element"""
pass


class UnrecognizedPacketTypeError(Exception):
"""Error raised when we can't figure out which kind of packet we are dealing with based on the header"""

def __init__(self, *args, partial_data: dict = None):
"""
Parameters
----------
partial_data : dict, Optional
Data parsed so far (for debugging at higher levels)
"""
super().__init__(*args)
self.partial_data = partial_data
Loading
Loading