Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Social schema #310

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aeon/dj_pipeline/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd

from aeon.io import api as io_api
from aeon.schema import dataset as aeon_schema
from aeon.io import schemas as aeon_schema
Copy link
Contributor

@glopesdev glopesdev Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should lump together experiment-specific schemas with the much more general io module.

Currently somebody can use io as a PyPI package for their own experiments without having to know anything at all about the details of the foraging experiments, which I think is something we should keep. We can discuss whether aeon.schema is the best name for the module, but I do feel these should live in a separate module.

from aeon.io import reader as io_reader
from aeon.analysis import utils as analysis_utils

Expand Down
3 changes: 2 additions & 1 deletion aeon/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def load(root, reader, start=None, end=None, time=None, tolerance=None, epoch=No
# to fill missing values
previous = reader.read(files[i - 1])
data = pd.concat([previous, frame])
data = data.reindex(values, method="pad", tolerance=tolerance)
data = data.reindex(values, tolerance=tolerance)
data.dropna(inplace=True)
Copy link
Contributor

@glopesdev glopesdev Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we dropping NaN values from the output data? This feels dangerous, but regardless of the reason seems out of scope for this PR which is about refactoring schemas. If we want to discuss this we should do so in a separate PR.

else:
data.drop(columns="time", inplace=True)
dataframes.append(data)
Expand Down
35 changes: 17 additions & 18 deletions aeon/io/device.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,40 @@
import inspect


def compositeStream(pattern, *args):
"""Merges multiple data streams into a single composite stream."""
composite = {}
def register(pattern, *args):
Copy link
Contributor

@glopesdev glopesdev Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think register is a good term to use here. In computer-science this term refers either to a CPU register or something like the Windows registry, which is more like a global level dictionary of settings.

If the issue is the term composite, then my proposed alternative would be to simply use the plural streams, since conceptually a Device is simply meant to contain a collection of data streams.

I think we discussed this in a previous DA meeting, but there is a confusion here about the intended target audience. At a basic data analysis level I think the goal is to simply provide access to the collection of streams in a device. That collection is a dictionary simply because there is a unique key associated with each stream, but what we have at its heart is still simply a collection of streams.

I do agree with you that the "binder function" (provisional name) has a different role from a Reader, and likely merits having its own name separate name when we are explaining how the API works.

However, I was realizing just now that we actually don't seem to use the word "stream" for anything else in the API so it still feels to me like a strong contender to capture this concept.

"""Merges multiple Readers into a single registry."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not merging Readers but actually binder_fn objects, at least given the code below.

registry = {}
if args:
for stream in args:
if inspect.isclass(stream):
for method in vars(stream).values():
for binder_fn in args:
if inspect.isclass(binder_fn):
for method in vars(binder_fn).values():
if isinstance(method, staticmethod):
composite.update(method.__func__(pattern))
registry.update(method.__func__(pattern))
else:
composite.update(stream(pattern))
return composite
registry.update(binder_fn(pattern))
return registry


class Device:
"""Groups multiple data streams into a logical device.
"""Groups multiple Readers into a logical device.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we are not merging Readers but binder_fn objects.


If a device contains a single stream with the same pattern as the device
`name`, it will be considered a singleton, and the stream reader will be
paired directly with the device without nesting.
If a device contains a single stream reader with the same pattern as the device `name`, it will be
considered a singleton, and the stream reader will be paired directly with the device without nesting.
Comment on lines +21 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly changed here? If we are reviewing core terminology we should make sure documentation stays consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like auto-formatting


Attributes:
name (str): Name of the device.
args (Any): Data streams collected from the device.
args (any): A binder function or class that returns a dictionary of Readers.
Copy link
Contributor

@glopesdev glopesdev Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any here meant the Any type so it should either be left with its original casing, or replaced with something else from the typing module.

A possible alternative to "binder function" could be "stream selector" or "stream accessor", since basically the only functionality this function brings in addition to the reader is the pattern to search and find stream data files.

Another option is simply to call this concept a "stream", since we don't use that name anywhere else in the API. I think this is actually closer to what I originally had in mind when I defined these "binder functions":

  • a Reader is a reusable module for reading specific data file formats
  • a stream is a combination of "data" + "reader", i.e. where is the data (the "pattern") and how to read it (the Reader).

We can discuss better on a future meeting.

pattern (str, optional): Pattern used to find raw chunk files,
usually in the format `<Device>_<DataStream>`.
"""

def __init__(self, name, *args, pattern=None):
self.name = name
self.stream = compositeStream(name if pattern is None else pattern, *args)
self.registry = register(name if pattern is None else pattern, *args)

def __iter__(self):
if len(self.stream) == 1:
singleton = self.stream.get(self.name, None)
if len(self.registry) == 1:
singleton = self.registry.get(self.name, None)
if singleton:
return iter((self.name, singleton))
return iter((self.name, self.stream))
return iter((self.name, self.registry))
2 changes: 1 addition & 1 deletion aeon/io/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def read(self, file):
specified unique identifier.
"""
data = super().read(file)
data = data[data.event & self.value > 0]
data = data[(data.event & self.value) == self.value]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember anymore what this change was about, but I feel we should leave any functional changes outside of this PR, which will already be confusing enough with all the major breaking refactoring and reorganisation.

data["event"] = self.tag
return data

Expand Down
4 changes: 2 additions & 2 deletions aeon/schema/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def encoder(pattern):

def environment(pattern):
"""Metadata for environment mode and subjects."""
return _device.compositeStream(pattern, environment_state, subject_state)
return _device.register(pattern, environment_state, subject_state)


def environment_state(pattern):
Expand All @@ -37,7 +37,7 @@ def subject_state(pattern):
return {"SubjectState": _reader.Subject(f"{pattern}_SubjectState_*")}


def messageLog(pattern):
def message_log(pattern):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we focus this PR only on the major naming changes and terminology? I know we disagree on best practices for casing in python, but if we could leave this out of this PR so we can focus on the core discussions on naming, that would help a lot.

"""Message log data."""
return {"MessageLog": _reader.Log(f"{pattern}_MessageLog_*")}

Expand Down
59 changes: 0 additions & 59 deletions aeon/schema/dataset.py

This file was deleted.

8 changes: 4 additions & 4 deletions aeon/schema/foraging.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ def region(pattern):
return {"Region": _RegionReader(f"{pattern}_201_*")}


def depletionFunction(pattern):
def depletion_function(pattern):
"""State of the linear depletion function for foraging patches."""
return {"DepletionState": _PatchState(f"{pattern}_State_*")}


def feeder(pattern):
"""Feeder commands and events."""
return _device.compositeStream(pattern, beam_break, deliver_pellet)
return _device.register(pattern, beam_break, deliver_pellet)


def beam_break(pattern):
Expand All @@ -80,12 +80,12 @@ def deliver_pellet(pattern):

def patch(pattern):
"""Data streams for a patch."""
return _device.compositeStream(pattern, depletionFunction, _stream.encoder, feeder)
return _device.register(pattern, depletion_function, _stream.encoder, feeder)


def weight(pattern):
"""Weight measurement data streams for a specific nest."""
return _device.compositeStream(pattern, weight_raw, weight_filtered, weight_subject)
return _device.register(pattern, weight_raw, weight_filtered, weight_subject)


def weight_raw(pattern):
Expand Down
112 changes: 112 additions & 0 deletions aeon/schema/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from dotmap import DotMap
from aeon.io.device import Device
from aeon.schema import core, foraging, octagon

exp02 = DotMap(
[
Device("Metadata", core.metadata),
Device("ExperimentalMetadata", core.environment, core.message_log),
Device("CameraTop", core.video, core.position, foraging.region),
Device("CameraEast", core.video),
Device("CameraNest", core.video),
Device("CameraNorth", core.video),
Device("CameraPatch1", core.video),
Device("CameraPatch2", core.video),
Device("CameraSouth", core.video),
Device("CameraWest", core.video),
Device("Nest", foraging.weight),
Device("Patch1", foraging.patch),
Device("Patch2", foraging.patch),
]
)

exp01 = DotMap(
[
Device("SessionData", foraging.session),
Device("FrameTop", core.video, core.position),
Device("FrameEast", core.video),
Device("FrameGate", core.video),
Device("FrameNorth", core.video),
Device("FramePatch1", core.video),
Device("FramePatch2", core.video),
Device("FrameSouth", core.video),
Device("FrameWest", core.video),
Device("Patch1", foraging.depletion_function, core.encoder, foraging.feeder),
Device("Patch2", foraging.depletion_function, core.encoder, foraging.feeder),
]
)

octagon01 = DotMap(
[
Device("Metadata", core.metadata),
Device("CameraTop", core.video, core.position),
Device("CameraColorTop", core.video),
Device("ExperimentalMetadata", core.subject_state),
Device("Photodiode", octagon.photodiode),
Device("OSC", octagon.OSC),
Device("TaskLogic", octagon.TaskLogic),
Device("Wall1", octagon.Wall),
Device("Wall2", octagon.Wall),
Device("Wall3", octagon.Wall),
Device("Wall4", octagon.Wall),
Device("Wall5", octagon.Wall),
Device("Wall6", octagon.Wall),
Device("Wall7", octagon.Wall),
Device("Wall8", octagon.Wall),
]
)

# All recorded social01 streams:

# *Note* regiser 8 is always the harp heartbeat for any device that has this stream.

# - Metadata.yml
# - Environment_BlockState
# - Environment_EnvironmentState
# - Environment_LightEvents
# - Environment_MessageLog
# - Environment_SubjectState
# - Environment_SubjectVisits
# - Environment_SubjectWeight
# - CameraTop (200, 201, avi, csv, <model_path>,)
# - 200: position
# - 201: region
# - CameraNorth (avi, csv)
# - CameraEast (avi, csv)
# - CameraSouth (avi, csv)
# - CameraWest (avi, csv)
# - CameraPatch1 (avi, csv)
# - CameraPatch2 (avi, csv)
# - CameraPatch3 (avi, csv)
# - CameraNest (avi, csv)
# - ClockSynchronizer (8, 36)
# - 36:
# - Nest (200, 201, 202, 203)
# - 200: weight_raw
# - 201: weight_tare
# - 202: weight_filtered
# - 203: weight_baseline
# - 204: weight_subject
# - Patch1 (8, 32, 35, 36, 87, 90, 91, 200, 201, 202, 203, State)
# - 32: beam_break
# - 35: delivery_set
# - 36: delivery_clear
# - 87: expansion_board
# - 90: enocder_read
# - 91: encoder_mode
# - 200: dispenser_state
# - 201: delivery_manual
# - 202: missed_pellet
# - 203: delivery_retry
# - Patch2 (8, 32, 35, 36, 87, 90, 91, State)
# - Patch3 (8, 32, 35, 36, 87, 90, 91, 200, 203, State)
# - RfidEventsGate (8, 32, 35)
# - 32: entry_id
# - 35: hardware_notifications
# - RfidEventsNest1 (8, 32, 35)
# - RfidEventsNest2 (8, 32, 35)
# - RfidEventsPatch1 (8, 32, 35)
# - RfidEventsPatch2 (8, 32, 35)
# - RfidEventsPatch3 (8, 32, 35)
# - VideoController (8, 32, 33, 34, 35, 36, 45, 52)
# - 32: frame_number
2 changes: 1 addition & 1 deletion aeon/schema/social.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def read(
) -> pd.DataFrame:
"""Reads data from the Harp-binarized tracking file."""
# Get config file from `file`, then bodyparts from config file.
model_dir = Path(file.stem.replace("_", "/")).parent
model_dir = Path(*Path(file.stem.replace("_", "/")).parent.parts[1:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, I would remove all functional changes from this PR.

config_file_dir = ceph_proc_dir / model_dir
if not config_file_dir.exists():
raise FileNotFoundError(f"Cannot find model dir {config_file_dir}")
Expand Down
Loading