Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce explicit object model for device streams #342

Merged
merged 14 commits into from
Mar 27, 2024
Merged
3 changes: 3 additions & 0 deletions aeon/io/device.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import inspect
from typing_extensions import deprecated


@deprecated("Please use the streams module instead.")
glopesdev marked this conversation as resolved.
Show resolved Hide resolved
def compositeStream(pattern, *args):
"""Merges multiple data streams into a single composite stream."""
composite = {}
Expand All @@ -15,6 +17,7 @@ def compositeStream(pattern, *args):
return composite


@deprecated("Please use the Device class in the streams module instead.")
glopesdev marked this conversation as resolved.
Show resolved Hide resolved
class Device:
"""Groups multiple data streams into a logical device.

Expand Down
81 changes: 81 additions & 0 deletions aeon/io/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import inspect
from warnings import warn


class Stream:
"""Represents a single data stream.

Attributes:
reader (Reader): The reader used to retrieve the stream data.
"""

def __init__(self, reader):
self.reader = reader

def __iter__(self):
yield (self.__class__.__name__, self.reader)


class StreamGroup:
"""Represents a logical group of multiple data streams.

Attributes:
path (str): Path to the folder where stream chunks are located.
args (Any): Data streams or data stream groups to be included in this stream group.
"""

def __init__(self, path, *args):
self.path = path
self._args = args

def __iter__(self):
for callable in self._args:
for stream in iter(callable(self.path)):
yield stream

glopesdev marked this conversation as resolved.
Show resolved Hide resolved

class Device:
"""Groups multiple data streams into a logical device.

If a device contains a single stream with the same pattern as the device
`name`, it will be considered a singleton, and the stream reader will be
paired directly with the device without nesting.

Attributes:
name (str): Name of the device.
args (Any): Data streams collected from the device.
path (str, optional): Path to the folder where stream chunks are located.
"""

def __init__(self, name, *args, path=None):
if name is None:
raise ValueError("name cannot be None.")

glopesdev marked this conversation as resolved.
Show resolved Hide resolved
self.name = name
self._streams = Device._createStreams(name if path is None else path, args)

@staticmethod
def _createStreams(path, args):
streams = {}
for callable in args:
try:
streams.update(callable(path))
except TypeError:
if inspect.isclass(callable):
warn(
f"Stream group classes with no constructors are deprecated. {callable}",
category=DeprecationWarning,
)
for method in vars(callable).values():
if isinstance(method, staticmethod):
streams.update(method.__func__(path))
else:
raise
return streams

def __iter__(self):
if len(self._streams) == 1:
singleton = self._streams.get(self.name, None)
if singleton:
return iter((self.name, singleton))
return iter((self.name, self._streams))
glopesdev marked this conversation as resolved.
Show resolved Hide resolved
Loading