Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce explicit object model for device streams #342

Merged
merged 14 commits into from
Mar 27, 2024
Merged
3 changes: 3 additions & 0 deletions aeon/io/device.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import inspect
from typing_extensions import deprecated


@deprecated("Please use the StreamGroup class from the streams module instead.")
def compositeStream(pattern, *args):
"""Merges multiple data streams into a single composite stream."""
composite = {}
Expand All @@ -15,6 +17,7 @@ def compositeStream(pattern, *args):
return composite


@deprecated("The Device class has been moved to the streams module.")
class Device:
"""Groups multiple data streams into a logical device.

Expand Down
56 changes: 37 additions & 19 deletions aeon/schema/core.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,65 @@
import aeon.io.device as _device
from aeon.schema.streams import Stream, StreamGroup
import aeon.io.reader as _reader


def heartbeat(pattern):
class Heartbeat(Stream):
"""Heartbeat event for Harp devices."""
return {"Heartbeat": _reader.Heartbeat(f"{pattern}_8_*")}

def __init__(self, pattern):
super().__init__(_reader.Heartbeat(f"{pattern}_8_*"))

def video(pattern):

class Video(Stream):
"""Video frame metadata."""
return {"Video": _reader.Video(f"{pattern}_*")}

def __init__(self, pattern):
super().__init__(_reader.Video(f"{pattern}_*"))


def position(pattern):
class Position(Stream):
"""Position tracking data for the specified camera."""
return {"Position": _reader.Position(f"{pattern}_200_*")}

def __init__(self, pattern):
super().__init__(_reader.Position(f"{pattern}_200_*"))


def encoder(pattern):
class Encoder(Stream):
"""Wheel magnetic encoder data."""
return {"Encoder": _reader.Encoder(f"{pattern}_90_*")}

def __init__(self, pattern):
super().__init__(_reader.Encoder(f"{pattern}_90_*"))

def environment(pattern):

class Environment(StreamGroup):
"""Metadata for environment mode and subjects."""
return _device.compositeStream(pattern, environment_state, subject_state)

def __init__(self, pattern):
super().__init__(pattern, EnvironmentState, SubjectState)


def environment_state(pattern):
class EnvironmentState(Stream):
"""Environment state log."""
return {"EnvironmentState": _reader.Csv(f"{pattern}_EnvironmentState_*", ["state"])}

def __init__(self, pattern):
super().__init__(_reader.Csv(f"{pattern}_EnvironmentState_*", ["state"]))

def subject_state(pattern):

class SubjectState(Stream):
"""Subject state log."""
return {"SubjectState": _reader.Subject(f"{pattern}_SubjectState_*")}

def __init__(self, pattern):
super().__init__(_reader.Subject(f"{pattern}_SubjectState_*"))


def messageLog(pattern):
class MessageLog(Stream):
"""Message log data."""
return {"MessageLog": _reader.Log(f"{pattern}_MessageLog_*")}

def __init__(self, pattern):
super().__init__(_reader.Log(f"{pattern}_MessageLog_*"))

def metadata(pattern):

class Metadata(Stream):
"""Metadata for acquisition epochs."""
return {pattern: _reader.Metadata(pattern)}

def __init__(self, pattern):
super().__init__(_reader.Metadata(pattern))
60 changes: 30 additions & 30 deletions aeon/schema/dataset.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,50 @@
from dotmap import DotMap

import aeon.schema.core as stream
from aeon.io.device import Device
from aeon.schema.streams import Device
from aeon.schema import foraging, octagon

exp02 = DotMap(
[
Device("Metadata", stream.metadata),
Device("ExperimentalMetadata", stream.environment, stream.messageLog),
Device("CameraTop", stream.video, stream.position, foraging.region),
Device("CameraEast", stream.video),
Device("CameraNest", stream.video),
Device("CameraNorth", stream.video),
Device("CameraPatch1", stream.video),
Device("CameraPatch2", stream.video),
Device("CameraSouth", stream.video),
Device("CameraWest", stream.video),
Device("Nest", foraging.weight),
Device("Patch1", foraging.patch),
Device("Patch2", foraging.patch),
Device("Metadata", stream.Metadata),
Device("ExperimentalMetadata", stream.Environment, stream.MessageLog),
Device("CameraTop", stream.Video, stream.Position, foraging.Region),
Device("CameraEast", stream.Video),
Device("CameraNest", stream.Video),
Device("CameraNorth", stream.Video),
Device("CameraPatch1", stream.Video),
Device("CameraPatch2", stream.Video),
Device("CameraSouth", stream.Video),
Device("CameraWest", stream.Video),
Device("Nest", foraging.Weight),
Device("Patch1", foraging.Patch),
Device("Patch2", foraging.Patch),
]
)

exp01 = DotMap(
[
Device("SessionData", foraging.session),
Device("FrameTop", stream.video, stream.position),
Device("FrameEast", stream.video),
Device("FrameGate", stream.video),
Device("FrameNorth", stream.video),
Device("FramePatch1", stream.video),
Device("FramePatch2", stream.video),
Device("FrameSouth", stream.video),
Device("FrameWest", stream.video),
Device("Patch1", foraging.depletionFunction, stream.encoder, foraging.feeder),
Device("Patch2", foraging.depletionFunction, stream.encoder, foraging.feeder),
Device("SessionData", foraging.SessionData),
Device("FrameTop", stream.Video, stream.Position),
Device("FrameEast", stream.Video),
Device("FrameGate", stream.Video),
Device("FrameNorth", stream.Video),
Device("FramePatch1", stream.Video),
Device("FramePatch2", stream.Video),
Device("FrameSouth", stream.Video),
Device("FrameWest", stream.Video),
Device("Patch1", foraging.DepletionFunction, stream.Encoder, foraging.Feeder),
Device("Patch2", foraging.DepletionFunction, stream.Encoder, foraging.Feeder),
]
)

octagon01 = DotMap(
[
Device("Metadata", stream.metadata),
Device("CameraTop", stream.video, stream.position),
Device("CameraColorTop", stream.video),
Device("ExperimentalMetadata", stream.subject_state),
Device("Photodiode", octagon.photodiode),
Device("Metadata", stream.Metadata),
Device("CameraTop", stream.Video, stream.Position),
Device("CameraColorTop", stream.Video),
Device("ExperimentalMetadata", stream.SubjectState),
Device("Photodiode", octagon.Photodiode),
Device("OSC", octagon.OSC),
Device("TaskLogic", octagon.TaskLogic),
Device("Wall1", octagon.Wall),
Expand Down
74 changes: 47 additions & 27 deletions aeon/schema/foraging.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from enum import Enum as _Enum

from enum import Enum
import pandas as pd

import aeon.io.device as _device
import aeon.io.reader as _reader
import aeon.schema.core as _stream
from aeon.schema.streams import Stream, StreamGroup


class Area(_Enum):
class Area(Enum):
Null = 0
Nest = 1
Corridor = 2
Expand Down Expand Up @@ -53,56 +51,78 @@ def __init__(self, pattern):
super().__init__(pattern, columns=["value", "stable"])


def region(pattern):
class Region(Stream):
"""Region tracking data for the specified camera."""
return {"Region": _RegionReader(f"{pattern}_201_*")}

def __init__(self, pattern):
super().__init__(_RegionReader(f"{pattern}_201_*"))


def depletionFunction(pattern):
class DepletionFunction(Stream):
"""State of the linear depletion function for foraging patches."""
return {"DepletionState": _PatchState(f"{pattern}_State_*")}

def __init__(self, pattern):
super().__init__(_PatchState(f"{pattern}_State_*"))


def feeder(pattern):
class Feeder(StreamGroup):
"""Feeder commands and events."""
return _device.compositeStream(pattern, beam_break, deliver_pellet)

def __init__(self, pattern):
super().__init__(pattern, BeamBreak, DeliverPellet)


def beam_break(pattern):
class BeamBreak(Stream):
"""Beam break events for pellet detection."""
return {"BeamBreak": _reader.BitmaskEvent(f"{pattern}_32_*", 0x22, "PelletDetected")}

def __init__(self, pattern):
super().__init__(_reader.BitmaskEvent(f"{pattern}_32_*", 0x22, "PelletDetected"))


def deliver_pellet(pattern):
class DeliverPellet(Stream):
"""Pellet delivery commands."""
return {"DeliverPellet": _reader.BitmaskEvent(f"{pattern}_35_*", 0x80, "TriggerPellet")}

def __init__(self, pattern):
super().__init__(_reader.BitmaskEvent(f"{pattern}_35_*", 0x80, "TriggerPellet"))


def patch(pattern):
class Patch(StreamGroup):
"""Data streams for a patch."""
return _device.compositeStream(pattern, depletionFunction, _stream.encoder, feeder)

def __init__(self, pattern):
super().__init__(pattern, DepletionFunction, _stream.Encoder, Feeder)


def weight(pattern):
class Weight(StreamGroup):
"""Weight measurement data streams for a specific nest."""
return _device.compositeStream(pattern, weight_raw, weight_filtered, weight_subject)

def __init__(self, pattern):
super().__init__(pattern, WeightRaw, WeightFiltered, WeightSubject)


def weight_raw(pattern):
class WeightRaw(Stream):
"""Raw weight measurement for a specific nest."""
return {"WeightRaw": _Weight(f"{pattern}_200_*")}

def __init__(self, pattern):
super().__init__(_Weight(f"{pattern}_200_*"))


def weight_filtered(pattern):
class WeightFiltered(Stream):
"""Filtered weight measurement for a specific nest."""
return {"WeightFiltered": _Weight(f"{pattern}_202_*")}

def __init__(self, pattern):
super().__init__(_Weight(f"{pattern}_202_*"))


def weight_subject(pattern):
class WeightSubject(Stream):
"""Subject weight measurement for a specific nest."""
return {"WeightSubject": _Weight(f"{pattern}_204_*")}

def __init__(self, pattern):
super().__init__(_Weight(f"{pattern}_204_*"))


def session(pattern):
class SessionData(Stream):
"""Session metadata for Experiment 0.1."""
return {pattern: _reader.Csv(f"{pattern}_2*", columns=["id", "weight", "event"])}

def __init__(self, pattern):
super().__init__(_reader.Csv(f"{pattern}_2*", columns=["id", "weight", "event"]))
Loading
Loading