From b54e1c3440b58d0bd8ca3fa37e49de74b0de86da Mon Sep 17 00:00:00 2001 From: Jai Date: Thu, 10 Oct 2024 22:07:14 +0100 Subject: [PATCH 1/4] new readers and schemas for reduced data storage in db --- aeon/io/reader.py | 21 +--- aeon/schema/ingestion_schemas.py | 204 +++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 20 deletions(-) create mode 100644 aeon/schema/ingestion_schemas.py diff --git a/aeon/io/reader.py b/aeon/io/reader.py index d44c2995..32215122 100644 --- a/aeon/io/reader.py +++ b/aeon/io/reader.py @@ -11,8 +11,7 @@ from dotmap import DotMap from aeon import util -from aeon.io.api import aeon as aeon_time -from aeon.io.api import chunk, chunk_key +from aeon.io.api import chunk_key _SECONDS_PER_TICK = 32e-6 _payloadtypes = { @@ -187,24 +186,6 @@ class Encoder(Harp): def __init__(self, pattern): super().__init__(pattern, columns=["angle", "intensity"]) - def read(self, file, downsample=True): - """Reads encoder data from the specified Harp binary file. - - By default the encoder data is downsampled to 50Hz. Setting downsample to - False or None can be used to force the raw data to be returned. - """ - data = super().read(file) - if downsample is True: - # resample requires a DatetimeIndex so we convert early - data.index = aeon_time(data.index) - - first_index = data.first_valid_index() - if first_index is not None: - # since data is absolute angular position we decimate by taking first of each bin - chunk_origin = chunk(first_index) - data = data.resample("20ms", origin=chunk_origin).first() - return data - class Position(Harp): """Extract 2D position tracking data for a specific camera. diff --git a/aeon/schema/ingestion_schemas.py b/aeon/schema/ingestion_schemas.py new file mode 100644 index 00000000..a1a51cf5 --- /dev/null +++ b/aeon/schema/ingestion_schemas.py @@ -0,0 +1,204 @@ +"""Aeon experiment schemas for DataJoint database ingestion.""" + +from os import PathLike + +import pandas as pd +from dotmap import DotMap + +import aeon.schema.core as stream +from aeon.io import reader +from aeon.io.api import aeon as aeon_time +from aeon.schema import foraging, octagon, social_01, social_02, social_03 +from aeon.schema.foraging import DepletionFunction, Feeder +from aeon.schema.streams import Device, Stream, StreamGroup + + +# Define new readers +class _Encoder(reader.Encoder): + """A version of the encoder reader that can downsample the data.""" + + def read(self, file: PathLike[str], sr_hz: int = 50) -> pd.DataFrame: + """Reads encoder data from the specified Harp binary file.""" + data = super().read(file) + data.index = aeon_time(data.index) + first_index = data.first_valid_index() + if first_index is not None: + data = data.resample(f"{1/sr_hz}S").first() # take first sample in each resampled bin + return data + + +class _Video(reader.Video): + """A version of the video reader that drops columns that can be recreated from data and metadata.""" + + def read(self, file: PathLike[str], drop_cols=None) -> pd.DataFrame: + """Reads video metadata from the specified file.""" + drop_cols = ["hw_counter", "_frame", "_path", "_epoch"] if drop_cols is None else drop_cols + data = pd.read_csv(file, header=0, names=self._rawcolumns) + data.drop(columns=drop_cols, errors="ignore", inplace=True) + data.set_index("time", inplace=True) + return data + + +# Define new streams and stream groups +class Video(Stream): + """Video frame metadata.""" + + def __init__(self, pattern): + super().__init__(_Video(f"{pattern}_*")) + + +class Encoder(Stream): + """Wheel magnetic encoder data.""" + + def __init__(self, pattern): + super().__init__(_Encoder(f"{pattern}_90_*")) + + +class Patch(StreamGroup): + """Data streams for a patch.""" + + def __init__(self, pattern): + super().__init__(pattern, DepletionFunction, Encoder, Feeder) + + +# Define schemas +octagon01 = DotMap( + [ + Device("Metadata", stream.Metadata), + Device("CameraTop", Video, stream.Position), + Device("CameraColorTop", Video), + Device("ExperimentalMetadata", stream.SubjectState), + Device("Photodiode", octagon.Photodiode), + Device("OSC", octagon.OSC), + Device("TaskLogic", octagon.TaskLogic), + Device("Wall1", octagon.Wall), + Device("Wall2", octagon.Wall), + Device("Wall3", octagon.Wall), + Device("Wall4", octagon.Wall), + Device("Wall5", octagon.Wall), + Device("Wall6", octagon.Wall), + Device("Wall7", octagon.Wall), + Device("Wall8", octagon.Wall), + ] +) + +exp01 = DotMap( + [ + Device("SessionData", foraging.SessionData), + Device("FrameTop", Video, stream.Position), + Device("FrameEast", Video), + Device("FrameGate", Video), + Device("FrameNorth", Video), + Device("FramePatch1", Video), + Device("FramePatch2", Video), + Device("FrameSouth", Video), + Device("FrameWest", Video), + Device("Patch1", foraging.DepletionFunction, stream.Encoder, foraging.Feeder), + Device("Patch2", foraging.DepletionFunction, stream.Encoder, foraging.Feeder), + ] +) + +exp02 = DotMap( + [ + Device("Metadata", stream.Metadata), + Device("ExperimentalMetadata", stream.Environment, stream.MessageLog), + Device("CameraTop", Video, stream.Position, foraging.Region), + Device("CameraEast", Video), + Device("CameraNest", Video), + Device("CameraNorth", Video), + Device("CameraPatch1", Video), + Device("CameraPatch2", Video), + Device("CameraSouth", Video), + Device("CameraWest", Video), + Device("Nest", foraging.Weight), + Device("Patch1", Patch), + Device("Patch2", Patch), + ] +) + +social01 = DotMap( + [ + Device("Metadata", stream.Metadata), + Device("Environment", social_02.Environment, social_02.SubjectData), + Device("CameraTop", Video, social_01.Pose), + Device("CameraNorth", Video), + Device("CameraSouth", Video), + Device("CameraEast", Video), + Device("CameraWest", Video), + Device("CameraPatch1", Video), + Device("CameraPatch2", Video), + Device("CameraPatch3", Video), + Device("CameraNest", Video), + Device("Nest", social_02.WeightRaw, social_02.WeightFiltered), + Device("Patch1", Patch), + Device("Patch2", Patch), + Device("Patch3", Patch), + Device("RfidGate", social_01.RfidEvents), + Device("RfidNest1", social_01.RfidEvents), + Device("RfidNest2", social_01.RfidEvents), + Device("RfidPatch1", social_01.RfidEvents), + Device("RfidPatch2", social_01.RfidEvents), + Device("RfidPatch3", social_01.RfidEvents), + ] +) + + +social02 = DotMap( + [ + Device("Metadata", stream.Metadata), + Device("Environment", social_02.Environment, social_02.SubjectData), + Device("CameraTop", Video, social_02.Pose), + Device("CameraNorth", Video), + Device("CameraSouth", Video), + Device("CameraEast", Video), + Device("CameraWest", Video), + Device("CameraPatch1", Video), + Device("CameraPatch2", Video), + Device("CameraPatch3", Video), + Device("CameraNest", Video), + Device("Nest", social_02.WeightRaw, social_02.WeightFiltered), + Device("Patch1", Patch), + Device("Patch2", Patch), + Device("Patch3", Patch), + Device("GateRfid", social_02.RfidEvents), + Device("NestRfid1", social_02.RfidEvents), + Device("NestRfid2", social_02.RfidEvents), + Device("Patch1Rfid", social_02.RfidEvents), + Device("Patch2Rfid", social_02.RfidEvents), + Device("Patch3Rfid", social_02.RfidEvents), + ] +) + + +social03 = DotMap( + [ + Device("Metadata", stream.Metadata), + Device("Environment", social_02.Environment, social_02.SubjectData), + Device("CameraTop", Video, social_03.Pose), + Device("CameraNorth", Video), + Device("CameraSouth", Video), + Device("CameraEast", Video), + Device("CameraWest", Video), + Device("CameraNest", Video), + Device("CameraPatch1", Video), + Device("CameraPatch2", Video), + Device("CameraPatch3", Video), + Device("Nest", social_02.WeightRaw, social_02.WeightFiltered), + Device("Patch1", Patch), + Device("Patch2", Patch), + Device("Patch3", Patch), + Device("PatchDummy1", Patch), + Device("NestRfid1", social_02.RfidEvents), + Device("NestRfid2", social_02.RfidEvents), + Device("GateRfid", social_02.RfidEvents), + Device("GateEastRfid", social_02.RfidEvents), + Device("GateWestRfid", social_02.RfidEvents), + Device("Patch1Rfid", social_02.RfidEvents), + Device("Patch2Rfid", social_02.RfidEvents), + Device("Patch3Rfid", social_02.RfidEvents), + Device("PatchDummy1Rfid", social_02.RfidEvents), + ] +) + + +__all__ = ["octagon01", "exp01", "exp02", "social01", "social02", "social03"] From d6cf52ffc1c305a2202c8cf28ab0b47d59f1c2f9 Mon Sep 17 00:00:00 2001 From: Jai Date: Thu, 10 Oct 2024 22:52:43 +0100 Subject: [PATCH 2/4] updated tests --- tests/io/test_api.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/io/test_api.py b/tests/io/test_api.py index 095439de..2456db51 100644 --- a/tests/io/test_api.py +++ b/tests/io/test_api.py @@ -6,6 +6,7 @@ import aeon from aeon.schema.schemas import exp02 +from aeon.schema.ingestion_schemas import social03 nonmonotonic_path = Path(__file__).parent.parent / "data" / "nonmonotonic" monotonic_path = Path(__file__).parent.parent / "data" / "monotonic" @@ -14,7 +15,7 @@ @mark.api def test_load_start_only(): data = aeon.load( - nonmonotonic_path, exp02.Patch2.Encoder, start=pd.Timestamp("2022-06-06T13:00:49"), downsample=None + nonmonotonic_path, exp02.Patch2.Encoder, start=pd.Timestamp("2022-06-06T13:00:49") ) assert len(data) > 0 @@ -22,7 +23,7 @@ def test_load_start_only(): @mark.api def test_load_end_only(): data = aeon.load( - nonmonotonic_path, exp02.Patch2.Encoder, end=pd.Timestamp("2022-06-06T13:00:49"), downsample=None + nonmonotonic_path, exp02.Patch2.Encoder, end=pd.Timestamp("2022-06-06T13:00:49") ) assert len(data) > 0 @@ -35,22 +36,22 @@ def test_load_filter_nonchunked(): @mark.api def test_load_monotonic(): - data = aeon.load(monotonic_path, exp02.Patch2.Encoder, downsample=None) + data = aeon.load(monotonic_path, exp02.Patch2.Encoder) assert len(data) > 0 assert data.index.is_monotonic_increasing @mark.api def test_load_nonmonotonic(): - data = aeon.load(nonmonotonic_path, exp02.Patch2.Encoder, downsample=None) + data = aeon.load(nonmonotonic_path, exp02.Patch2.Encoder) assert not data.index.is_monotonic_increasing @mark.api def test_load_encoder_with_downsampling(): DOWNSAMPLE_PERIOD = 0.02 - data = aeon.load(monotonic_path, exp02.Patch2.Encoder, downsample=True) - raw_data = aeon.load(monotonic_path, exp02.Patch2.Encoder, downsample=None) + data = aeon.load(monotonic_path, social03.Patch2.Encoder) + raw_data = aeon.load(monotonic_path, exp02.Patch2.Encoder) # Check that the length of the downsampled data is less than the raw data assert len(data) < len(raw_data) From f12e359d1be23783adaf6ae63553923ef49527bc Mon Sep 17 00:00:00 2001 From: Jai Date: Fri, 11 Oct 2024 00:12:45 +0100 Subject: [PATCH 3/4] cleaned up linting for ruff --- tests/io/test_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/io/test_api.py b/tests/io/test_api.py index 2456db51..2a491c55 100644 --- a/tests/io/test_api.py +++ b/tests/io/test_api.py @@ -5,11 +5,11 @@ from pytest import mark import aeon -from aeon.schema.schemas import exp02 from aeon.schema.ingestion_schemas import social03 +from aeon.schema.schemas import exp02 -nonmonotonic_path = Path(__file__).parent.parent / "data" / "nonmonotonic" monotonic_path = Path(__file__).parent.parent / "data" / "monotonic" +nonmonotonic_path = Path(__file__).parent.parent / "data" / "nonmonotonic" @mark.api From daf622481a8f7f69cf32ec56a139e4aca242b740 Mon Sep 17 00:00:00 2001 From: Jai Date: Fri, 11 Oct 2024 00:34:19 +0100 Subject: [PATCH 4/4] updated pandas and changed S to s lmao --- aeon/schema/ingestion_schemas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeon/schema/ingestion_schemas.py b/aeon/schema/ingestion_schemas.py index a1a51cf5..439c05fb 100644 --- a/aeon/schema/ingestion_schemas.py +++ b/aeon/schema/ingestion_schemas.py @@ -23,7 +23,7 @@ def read(self, file: PathLike[str], sr_hz: int = 50) -> pd.DataFrame: data.index = aeon_time(data.index) first_index = data.first_valid_index() if first_index is not None: - data = data.resample(f"{1/sr_hz}S").first() # take first sample in each resampled bin + data = data.resample(f"{1/sr_hz}s").first() # take first sample in each resampled bin return data