Skip to content

Commit

Permalink
add: New unittests for igslog.py functions
Browse files Browse the repository at this point in the history
  • Loading branch information
ronaldmaj committed Dec 23, 2024
1 parent 12eae0b commit 3ee4917
Show file tree
Hide file tree
Showing 3 changed files with 481 additions and 26 deletions.
105 changes: 79 additions & 26 deletions gnssanalysis/gn_io/igslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@

class LogVersionError(Exception):
"""
Log file read does not conform to known IGS version standard
Log file does not conform to known IGS version standard
"""

pass
Expand Down Expand Up @@ -148,7 +148,6 @@ def determine_log_version(data: bytes) -> str:
:param bytes data: IGS log file bytes object to determine the version of
:return str: Return the version number: "v1.0" or "v2.0" (or "Unknown" if file does not conform to standard)
"""
# check for version:

result_v1 = _REGEX_VERSION_1.search(data)
if result_v1:
Expand All @@ -161,54 +160,108 @@ def determine_log_version(data: bytes) -> str:
raise LogVersionError("Log file does not conform to any known IGS version")


def parse_igs_log(filename_array: _np.ndarray) -> _np.ndarray:
"""Parses igs log and outputs ndarray with parsed data
def extract_id_block(data: bytes, file_path: str, file_code: str, version: str = None) -> bytes:
"""Extract the site ID block given the bytes object read from an IGS site log file
:param _np.ndarray filename_array: Metadata on input log file. Expects ndarray of the form [CODE DATE PATH]
:return _np.ndarray: Returns array with data from the IGS log file parsed
:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:param str file_code: Code from the filename_array passed to the parse_igs_log() function
:param str version: Version number of log file (e.g. "v2.0") - determined if version=None, defaults to None
:raises LogVersionError: Raises an error if an unknown version string is passed in
:return bytes: The site ID block of the IGS site log
"""
file_code, _, file_path = filename_array

with open(file_path, "rb") as file:
data = file.read()

try:
if version == None:
version = determine_log_version(data)
except LogVersionError as e:
logger.warning(f"Error: {e}, skipping parsing the log file")

if version == "v1.0":
_REGEX_ID = _REGEX_ID_V1
_REGEX_LOC = _REGEX_LOC_V1
elif version == "v2.0":
_REGEX_ID = _REGEX_ID_V2
_REGEX_LOC = _REGEX_LOC_V2
else:
raise LogVersionError("Incorrect version string passed to the extract_id_block() function")

blk_id = _REGEX_ID.search(data)
if blk_id is None:
id_block = _REGEX_ID.search(data)
if id_block is None:
logger.warning(f"ID rejected from {file_path}")
return _np.array([]).reshape(0, 12)

blk_id = [blk_id[1].decode().upper(), blk_id[2].decode().upper()] # no .groups() thus 1 and 2
code = blk_id[0]
id_block = [id_block[1].decode().upper(), id_block[2].decode().upper()] # no .groups() thus 1 and 2
code = id_block[0]
if code != file_code:
logger.warning(f"{code}!={file_code} at {file_path}")
return _np.array([]).reshape(0, 12)
return id_block


def extract_location_block(data: bytes, file_path: str, version: str = None) -> bytes:
"""Extract the location block given the bytes object read from an IGS site log file
:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:param str version: Version number of log file (e.g. "v2.0") - determined if version=None, defaults to None
:raises LogVersionError: Raises an error if an unknown version string is passed in
:return bytes: The location block of the IGS site log
"""
if version == None:
version = determine_log_version(data)

if version == "v1.0":
_REGEX_LOC = _REGEX_LOC_V1
elif version == "v2.0":
_REGEX_LOC = _REGEX_LOC_V2
else:
raise LogVersionError("Incorrect version string passed to extract_location_block() function")

blk_loc = _REGEX_LOC.search(data)
if blk_loc is None:
location_block = _REGEX_LOC.search(data)
if location_block is None:
logger.warning(f"LOC rejected from {file_path}")
return _np.array([]).reshape(0, 12)
return location_block


blk_rec = _REGEX_REC.findall(data)
if blk_rec == []:
def extract_receiver_block(data: bytes, file_path: str) -> bytes:
"""Extract the location block given the bytes object read from an IGS site log file
:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:return bytes: The receiver block of the IGS site log
"""
receiver_block = _REGEX_REC.findall(data)
if receiver_block == []:
logger.warning(f"REC rejected from {file_path}")
return _np.array([]).reshape(0, 12)
return receiver_block


blk_ant = _REGEX_ANT.findall(data)
if blk_ant == []:
def extract_antenna_block(data: bytes, file_path: str) -> bytes:
antenna_block = _REGEX_ANT.findall(data)
if antenna_block == []:
logger.warning(f"ANT rejected from {file_path}")
return _np.array([]).reshape(0, 12)
return antenna_block


def parse_igs_log(filename_array: _np.ndarray) -> _np.ndarray:
"""Parses igs log and outputs ndarray with parsed data
:param _np.ndarray filename_array: Metadata on input log file. Expects ndarray of the form [CODE DATE PATH]
:return _np.ndarray: Returns array with data from the IGS log file parsed
"""
file_code, _, file_path = filename_array

with open(file_path, "rb") as file:
data = file.read()

try:
version = determine_log_version(data)
except LogVersionError as e:
logger.warning(f"Error: {e}, skipping parsing the log file")
return

blk_id = extract_id_block(data, version, file_path, file_code)
blk_loc = extract_location_block(data, version, file_path)
blk_rec = extract_receiver_block(data, file_path)
blk_ant = extract_antenna_block(data, file_path)

blk_loc = [group.decode(encoding="utf8", errors="ignore") for group in blk_loc.groups()]
blk_rec = _np.asarray(blk_rec, dtype=str)
Expand Down
Loading

0 comments on commit 3ee4917

Please sign in to comment.