Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process text sketch #180

Open
wants to merge 11 commits into
base: process-text
Choose a base branch
from
107 changes: 85 additions & 22 deletions audinterface/core/process.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterable
import errno
import inspect
import itertools
Expand All @@ -17,7 +18,7 @@
from audinterface.core.typing import Timestamps


def identity(signal, sampling_rate) -> np.ndarray:
def identity(signal, sampling_rate=None) -> np.ndarray:
r"""Default processing function.

This function is used,
Expand Down Expand Up @@ -104,6 +105,13 @@ class Process:
multiprocessing
multiprocessing: use multiprocessing instead of multithreading
verbose: show debug messages
read_func: function to read in signals/data. When specified,
it needs to be able to read signals signal data as well
as text data.
Per default, :func:`audinterface.utils.read_audio`
will be used for signal file(s), and
:func:`audinterface.utils.read_text` for files
with ``.json`` or ``text``extensions.

Raises:
ValueError: if ``resample = True``, but ``sampling_rate = None``
Expand Down Expand Up @@ -171,6 +179,7 @@ def __init__(
num_workers: typing.Optional[int] = 1,
multiprocessing: bool = False,
verbose: bool = False,
read_func: typing.Callable[..., typing.Any] = None,
):
if channels is not None:
channels = audeer.to_list(channels)
Expand Down Expand Up @@ -236,6 +245,14 @@ def __init__(
self.win_dur = win_dur
r"""Window duration."""

# set read_audio and read_text methods
if read_func is None:
setattr(self.__class__, "read_audio", staticmethod(utils.read_audio))
setattr(self.__class__, "read_text", staticmethod(utils.read_text))
else:
setattr(self.__class__, "read_audio", staticmethod(read_func))
setattr(self.__class__, "read_text", staticmethod(read_func))

def _process_file(
self,
file: str,
Expand Down Expand Up @@ -274,7 +291,7 @@ def _process_file(

# Text files
if ext in ["json", "txt"]:
data = utils.read_text(file, root=root)
data = self.read_text(file, root=root)
y, file = self._process_data(
data,
idx=idx,
Expand All @@ -288,7 +305,7 @@ def _process_file(

# Audio/video files
else:
signal, sampling_rate = utils.read_audio(
signal, sampling_rate = self.read_audio(
file,
start=start,
end=end,
Expand Down Expand Up @@ -489,22 +506,7 @@ def process_files(
)
self.verbose = verbose

y = list(itertools.chain.from_iterable([x[0] for x in xs]))
files = list(itertools.chain.from_iterable([x[1] for x in xs]))
starts = list(itertools.chain.from_iterable([x[2] for x in xs]))
ends = list(itertools.chain.from_iterable([x[3] for x in xs]))

if (
len(audeer.unique(starts)) == 1
and audeer.unique(starts)[0] is None
and len(audeer.unique(ends)) == 1
and audeer.unique(ends)[0] is None
):
index = audformat.filewise_index(files)
else:
index = audformat.segmented_index(files, starts, ends)
y = pd.Series(y, index)

y = self._postprocess_xs(xs)
return y
Comment on lines +509 to 510
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)

Suggested change
y = self._postprocess_xs(xs)
return y
return self._postprocess_xs(xs)


def process_folder(
Expand Down Expand Up @@ -601,10 +603,69 @@ def _process_index_wo_segment(
task_description=f"Process {len(index)} segments",
)

y = list(itertools.chain.from_iterable([x[0] for x in xs]))
y = self._postprocess_xs(xs)
return y
Comment on lines +606 to +607
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)

Suggested change
y = self._postprocess_xs(xs)
return y
return self._postprocess_xs(xs)


@staticmethod
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring the type handling and flattening logic into separate helper methods.

The _postprocess_xs method could be simplified while maintaining functionality. Here are two specific suggestions:

  1. Extract the duplicated starts/ends handling into a helper method:
def _flatten_or_collect(items):
    """Flatten iterable items or collect None values."""
    try:
        return list(itertools.chain.from_iterable(items))
    except TypeError:
        # Handle case where all items are None
        if [x for x in filter(None, items)] == []:
            return items
        raise

def _postprocess_xs(xs):
    ys = [x[0] for x in xs]

    # Simplify type handling with clear conversion rules
    if all(isinstance(x, dict) for x in ys):
        keys = list(itertools.chain.from_iterable(x.keys() for x in ys))
        values = list(itertools.chain.from_iterable(x.values() for x in ys))
        y = [{k: v} for k, v in zip(keys, values)]
    elif all(isinstance(x, str) for x in ys):
        y = [[x] for x in ys]  # Preserve text items as single-item lists
    else:
        y = list(itertools.chain.from_iterable(ys))

    files = list(itertools.chain.from_iterable(x[1] for x in xs))
    starts = _flatten_or_collect([x[2] for x in xs])
    ends = _flatten_or_collect([x[3] for x in xs])

    # Rest of the method...
  1. Consider making the type handling more explicit by documenting expected types and using a simpler pattern that handles just the required cases rather than trying to detect all possibilities.

def _postprocess_xs(xs):
"""Postprocesses a list of tuples containing processed data,
files, starts, and ends, and returns a pandas Series.

This is mainly factored into a separate method as it
is used in multiple places:

- :meth:`process._process_index_wo_segment`
- :meth:`process._postprocess_xs`

I find it hard to come up with less inelegance

Parameters:
xs (list): A list of tuples containing processed data,
files, starts, and ends.
index (pd.Index): The index of the resulting pandas Series.

Returns:
pd.Series: A pandas Series containing the postprocessed data.
"""
ys = [x[0] for x in xs]
# TODO: put into single list comprehension for all these three diagnostics
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Keep separate diagnostic checks for better maintainability

The current separate checks are more readable and easier to debug than a combined list comprehension would be. Consider removing this TODO and keeping the current structure.

        all_dict = all(map(lambda x: isinstance(x, dict), [x[0] for x in xs]))
        all_iterable = all(map(lambda x: isinstance(x, Iterable), [x[0] for x in xs]))

all_dict = all(map(lambda x: isinstance(x, dict), [x[0] for x in xs]))
all_iterable = all(map(lambda x: isinstance(x, Iterable), [x[0] for x in xs]))
all_text = all(map(lambda x: isinstance(x, str), [x[0] for x in xs]))

if all_dict:
# prevent pd.Series from converting0 to list of values
keys = list(itertools.chain.from_iterable([x.keys() for x in ys]))
values = list(itertools.chain.from_iterable([x.values() for x in ys]))
y = [{x: y} for (x, y) in zip(keys, values)]
else:
# if all text, need to pack into a list in order to avoid flattening
# and the resulting dimension problems
if all_iterable and all_text:
y = list(itertools.chain.from_iterable([[x[0]] for x in xs]))
else:
y = list(itertools.chain.from_iterable([x[0] for x in xs]))

files = list(itertools.chain.from_iterable([x[1] for x in xs]))
starts = list(itertools.chain.from_iterable([x[2] for x in xs]))
ends = list(itertools.chain.from_iterable([x[3] for x in xs]))

# avoid 'NoneType' object is not iterable error in itertools.chain
# for starts: this happens when all entries are None
try:
starts = list(itertools.chain.from_iterable([x[2] for x in xs]))
except TypeError:
pass
starts_non_iterable = [x for x in filter(None, [x[2] for x in xs])] == []
assert starts_non_iterable, "unknown problem"
starts = [x[2] for x in xs]

# same as for starts
try:
ends = list(itertools.chain.from_iterable([x[3] for x in xs]))
except TypeError:
pass
ends_non_iterable = [x for x in filter(None, [x[3] for x in xs])] == []
Comment on lines +656 to +666
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:


Explanation
Convert list/set/tuple comprehensions that do not change the input elements into.

Before

# List comprehensions
[item for item in coll]
[item for item in friends.names()]

# Dict comprehensions
{k: v for k, v in coll}
{k: v for k, v in coll.items()}  # Only if we know coll is a `dict`

# Unneeded call to `.items()`
dict(coll.items())  # Only if we know coll is a `dict`

# Set comprehensions
{item for item in coll}

After

# List comprehensions
list(iter(coll))
list(iter(friends.names()))

# Dict comprehensions
dict(coll)
dict(coll)

# Unneeded call to `.items()`
dict(coll)

# Set comprehensions
set(coll)

All these comprehensions are just creating a copy of the original collection.
They can all be simplified by simply constructing a new collection directly. The
resulting code is easier to read and shows the intent more clearly.

Convert list/set/tuple comprehensions that do not change the input elements into.

Before

# List comprehensions
[item for item in coll]
[item for item in friends.names()]

# Dict comprehensions
{k: v for k, v in coll}
{k: v for k, v in coll.items()}  # Only if we know coll is a `dict`

# Unneeded call to `.items()`
dict(coll.items())  # Only if we know coll is a `dict`

# Set comprehensions
{item for item in coll}

After

# List comprehensions
list(iter(coll))
list(iter(friends.names()))

# Dict comprehensions
dict(coll)
dict(coll)

# Unneeded call to `.items()`
dict(coll)

# Set comprehensions
set(coll)

All these comprehensions are just creating a copy of the original collection.
They can all be simplified by simply constructing a new collection directly. The
resulting code is easier to read and shows the intent more clearly.

assert ends_non_iterable, "unknown problem"
ends = [x[3] for x in xs]

if (
len(audeer.unique(starts)) == 1
Expand Down Expand Up @@ -1147,6 +1208,8 @@ def _call_data(
process_func_args = process_func_args or self.process_func_args
special_args = self._special_args(idx, root, file, process_func_args)
y = self.process_func(data, **special_args, **process_func_args)
# ensure non-scalar answer
y = [y] if len(y) == 1 else y
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Handle potential scalar values in _call_data return processing

The len(y) call assumes y is a sequence type. Consider checking if y is a scalar value first to avoid potential AttributeError.

return y

def _special_args(
Expand Down
1 change: 1 addition & 0 deletions audinterface/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from audinterface.core.utils import read_audio
from audinterface.core.utils import read_text
from audinterface.core.utils import signal_index
from audinterface.core.utils import sliding_window
from audinterface.core.utils import to_timedelta
Loading