Skip to content

Commit

Permalink
add module to extract message values for use in scripting (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
bhannawa authored Nov 19, 2024
1 parent d23cb8f commit 6a18061
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
53 changes: 53 additions & 0 deletions pyulog/extract_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Extract values from a ULog file message to use in scripting
"""

import numpy as np
from .core import ULog

def extract_message(ulog_file_name: str, message: str,
time_s: "int | None" = None, time_e: "int | None" = None,
disable_str_exceptions: bool = False) -> list[dict]:
"""
Extract values from a ULog file
:param ulog_file_name: (str) The ULog filename to open and read
:param message: (str) A ULog message to return values from
:param time_s: (int) Offset time for conversion in seconds
:param time_e: (int) Limit until time for conversion in seconds
:return: (list[dict]) A list of each record from the ULog as key-value pairs
"""

if not isinstance(message, str):
raise AttributeError("Must provide a message to pull from ULog file")

ulog = ULog(ulog_file_name, message, disable_str_exceptions)

try:
data = ulog.get_dataset(message)
except Exception as exc:
raise AttributeError("Provided message is not in the ULog file") from exc

values = []

# use same field order as in the log, except for the timestamp
data_keys = [f.field_name for f in data.field_data]
data_keys.remove('timestamp')
data_keys.insert(0, 'timestamp') # we want timestamp at first position

#get the index for row where timestamp exceeds or equals the required value
time_s_i = np.where(data.data['timestamp'] >= time_s * 1e6)[0][0] \
if time_s else 0
#get the index for row upto the timestamp of the required value
time_e_i = np.where(data.data['timestamp'] >= time_e * 1e6)[0][0] \
if time_e else len(data.data['timestamp'])

# write the data
for i in range(time_s_i, time_e_i):
row = {}
for key in data_keys:
row[key] = data.data[key][i]
values.append(row)

return values
37 changes: 37 additions & 0 deletions test/test_extract_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'''
Test extract_message module
'''

import os
import inspect
import unittest

from ddt import ddt, data

from pyulog.extract_message import extract_message

TEST_PATH = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))

@ddt
class TestExtractMessage(unittest.TestCase):
"""
Test extract_message module.
"""

@data('sample')
def test_extract_message(self, test_case):
"""
Test that extract_message module runs without error.
"""

ulog_file_name = os.path.join(TEST_PATH, test_case+'.ulg')
message = "actuator_controls_0"
time_s = None
time_e = None
extract_message(ulog_file_name,
message,
time_s,
time_e)

# vim: set et fenc=utf-8 ft=python ff=unix sts=4 sw=4 ts=4

0 comments on commit 6a18061

Please sign in to comment.